In [23]:
import numpy as np
import random

from FPU import cmul, cadd
from utils import conj, read_binary, binary_to_fp16, store_binary, generate_fp16, fp16_to_binary, int_to_binary, binary_to_int
from ed import ed
from hilbert import hilbert_manual_conv
from auto_corr import auto_corr

# Basic functions

In [4]:
def delayseq(data, lag: int, mode: str = "zero"):
    """
    Apply delay (positive lag) or advance (negative lag) to a sequence.

    Supports both:
    - Real sequences (list of np.float16)
    - Complex sequences (list of [real, imag] pairs)

    Output length is always equal to the input length.

    Args:
        data: Input sequence (real or complex).
        lag:  Number of samples to delay (positive) or advance (negative).
        mode:
            "zero" → fill shifted positions with zeros (default).
            "wrap" → perform circular (cyclic) shift.

    Returns:
        Delayed (or advanced) sequence of the same length.
    """
    if not data:
        return []

    # Detect if the input is complex
    is_complex = isinstance(data[0], (list, tuple)) and len(data[0]) == 2
    n = len(data)

    # Circular shift mode
    if mode == "wrap":
        k = lag % n
        return data[-k:] + data[:-k] if k != 0 else data.copy()

    if mode != "zero":
        raise ValueError('mode must be "zero" or "wrap"')

    # Zero-fill mode
    zero_val = [np.float16(0.0), np.float16(0.0)] if is_complex else np.float16(0.0)

    # Positive lag → delay to the right (zeros at the front)
    if lag > 0:
        fill = [zero_val] * min(lag, n)
        tail = data[: max(0, n - lag)]
        return fill + tail

    # No lag
    if lag == 0:
        return data.copy()

    # Negative lag → advance to the left (zeros at the end)
    lag_abs = min(-lag, n)
    head = data[lag_abs:]
    fill = [zero_val] * lag_abs
    return head + fill

In [5]:
# Square, Mean, Conj, Conv, Abs
def square_complex(data_in):
    data_out = []
    for idx, val in enumerate(data_in):
        real = val[0]
        imag = val[1]

        real_out = real*real - imag*imag
        imag_out = 2*real*imag
        data_out.append([real_out, imag_out])
    return data_out

def mean_complex(data_in):
    real_out = np.float16(0)
    imag_out = np.float16(0)
    for idx, val in enumerate(data_in):
        real = val[0]
        imag = val[1]

        real_out += real
        imag_out += imag
    data_out = [real_out/len(data_in), imag_out/len(data_in)]
    return data_out

def conj(data_in):
    data_out = []
    for idx, val in enumerate(data_in):
        real = val[0]
        imag = val[1]

        real_out = real
        imag_out = -imag
        data_out.append([real_out, imag_out])
    return data_out

def abs_complex(data_in):
    data_out = []
    for idx, val in enumerate(data_in):
        real = val[0]
        imag = val[1]

        real_out = np.float16(real*real + imag*imag)
        imag_out = np.float16(0)
        data_out.append([real_out, imag_out])
    return data_out

# Default Values

In [6]:
sample = generate_fp16(64, complex=True)

# OFDM Estimation

## CP Detect

In [None]:
# Lagging & Auto-correlation

cp_output = auto_corr(sample, 4)
print(cp_output)

[np.float16(11.125), np.float16(4.375)]


## IA

In [None]:
# Need to modify the function to match the input format

y = hilbert_manual_conv(sample, filter_len=16)
output = abs_complex(y)

ValueError: object too deep for desired array

## QAM Estimation
- Mostly equalization
- Not implemented in Matlab
- Mapping is not implemented

In [16]:
# ============================================================
# QAM Estimation (Equalization) Example - Full fp16 Precision
# ============================================================
# Theory Overview:
# ------------------------------------------------------------
# In digital communication systems, when a symbol x is transmitted
# through a complex channel H, the received signal is:
#
#       y = H * x + n
#
# where:
#   y : received complex symbol
#   H : complex channel coefficient (amplitude & phase distortion)
#   n : additive complex Gaussian noise (AWGN)
#
# The receiver wants to estimate the transmitted symbol x from y.
# The estimation process removes the channel effect and reduces
# noise impact.
#
# Two major linear estimators:
#   1) Zero Forcing (ZF):
#        x_hat = y / H = (y * conj(H)) / |H|^2
#      - Cancels the channel distortion perfectly.
#      - Sensitive to noise (especially when |H| is small).
#
#   2) Minimum Mean Square Error (MMSE):
#        x_hat = (conj(H) * y) / (|H|^2 + N0)
#      - Adds noise regularization term (N0 = noise variance).
#      - Provides better robustness under low SNR conditions.
#
# Both yield a *soft* complex estimate x_hat, which can later
# be quantized to the nearest constellation point during
# symbol detection or demodulation.
# ------------------------------------------------------------

In [13]:
x_pairs = sample  # alias to emphasize we keep the format

# Average symbol energy Es in fp16: mean(r^2 + i^2)
Es = np.float16(0.0)
for r, i in x_pairs:
    Es += np.float16(r*r + i*i)
Es = np.float16(Es / np.float16(64))


hr = np.float16(0.7)
hi = np.float16(0.5)

snr_db = np.float16(18.0)
snr_lin = np.float16(10.0) ** np.float16(snr_db / np.float16(10.0))
noise_var = np.float16(Es / snr_lin)           # N0 per complex symbol
noise_std = np.float16(np.sqrt(noise_var / np.float16(2.0)))  # per real/imag

rng = np.random.default_rng(1)
# Gaussian noise in fp16
n_real = rng.standard_normal(64).astype(np.float16) * noise_std
n_imag = rng.standard_normal(64).astype(np.float16) * noise_std
n_pairs = [[np.float16(n_real[k]), np.float16(n_imag[k])] for k in range(64)]

# Pass through channel and add noise: y = H*x + n (all in fp16 and pair format)
y_pairs = []
for k in range(64):
    xr = np.float16(x_pairs[k][0])
    xi = np.float16(x_pairs[k][1])
    # Complex multiply (hr + j*hi) * (xr + j*xi)
    yr = np.float16(hr * xr - hi * xi)
    yi = np.float16(hr * xi + hi * xr)
    # Add noise
    yr = np.float16(yr + n_pairs[k][0])
    yi = np.float16(yi + n_pairs[k][1])
    y_pairs.append([yr, yi])

# -----------------------------
# 3) Estimation (Equalization) in fp16: choose METHOD = "zf" or "mmse"
#    ZF:   x_hat = y / H = (y * conj(H)) / |H|^2
#    MMSE: x_hat = (conj(H) * y) / (|H|^2 + N0)
# -----------------------------
METHOD = "mmse"  # change to "zf" for Zero-Forcing

# Precompute conj(H) and |H|^2 in fp16
H_conj = [hr, np.float16(-hi)]
H_abs2 = np.float16(hr*hr + hi*hi)
eps = np.float16(1e-4)  # small epsilon for safety in fp16

x_hat_pairs = []
if METHOD.lower() == "zf":
    denom = np.float16(H_abs2 + eps)
    for k in range(64):
        yr, yi = np.float16(y_pairs[k][0]), np.float16(y_pairs[k][1])
        # (yr + j*yi) * (hr - j*hi)
        num_r = np.float16(yr * H_conj[0] - yi * H_conj[1])
        num_i = np.float16(yr * H_conj[1] + yi * H_conj[0])
        xr_hat = np.float16(num_r / denom)
        xi_hat = np.float16(num_i / denom)
        x_hat_pairs.append([xr_hat, xi_hat])

elif METHOD.lower() == "mmse":
    denom = np.float16(H_abs2 + noise_var)
    for k in range(64):
        yr, yi = np.float16(y_pairs[k][0]), np.float16(y_pairs[k][1])
        # (hr - j*hi) * (yr + j*yi)
        num_r = np.float16(H_conj[0] * yr - H_conj[1] * yi)
        num_i = np.float16(H_conj[0] * yi + H_conj[1] * yr)
        xr_hat = np.float16(num_r / denom)
        xi_hat = np.float16(num_i / denom)
        x_hat_pairs.append([xr_hat, xi_hat])


# PSK Classficiation

In [18]:
# ----------------------------------------
# Step 1. Zero-mean (remove average)
# ----------------------------------------
# Compute mean value
sample_mean = mean_complex(sample)
# Subtract mean from each sample
sample_zm = []
for val in sample:
    sample_zm.append([np.float16(val[0]-sample_mean[0]), np.float16(val[1]-sample_mean[1])])

# ----------------------------------------
# Step 2. Normalize to unit power
# ----------------------------------------
# Compute |sample|^2 for each element
abs_sq = abs_complex(sample_zm)
# Compute mean power
pwr_mean = mean_complex(abs_sq)
power = np.float16(np.sqrt(pwr_mean[0] + 1e-6))

# Normalize signal
sample_n = []
for val in sample_zm:
    sample_n.append([np.float16(val[0]/power), np.float16(val[1]/power)])

# ----------------------------------------
# Step 3. Compute m20 and m40
# ----------------------------------------
# m20 = E[x^2]
sample_sq = square_complex(sample_n)
m20 = mean_complex(sample_sq)

# m40 = E[x^4]
sample_4 = square_complex(sample_sq)
m40 = mean_complex(sample_4)

# ----------------------------------------
# Step 4. Compute C20 and C40
# ----------------------------------------
# C20 = m20
C20 = m20

# C40 = m40 - 3*(m20)^2
m20_sq_real = np.float16(m20[0]*m20[0] - m20[1]*m20[1])
m20_sq_imag = np.float16(2*m20[0]*m20[1])

C40_real = np.float16(m40[0] - 3*m20_sq_real)
C40_imag = np.float16(m40[1] - 3*m20_sq_imag)
C40 = [C40_real, C40_imag]

# ----------------------------------------
# Step 5. Print results
# ----------------------------------------
print("C20 =", C20)
print("C40 =", C40)

C20 = [np.float16(0.1454), np.float16(0.127)]
C40 = [np.float16(-0.4133), np.float16(0.1462)]


# Chirp Detection & Estimation