In [None]:
"""
QPSK Transmitter + Receiver Simulation

1. Generate bits
2. QPSK modulation
3. Pulse filtering
4. Channel
5. Matched filter
6. Symbol timing synchronization
7. Carrier frequency and phase recovery
8. Demodulation
9. Analysis

TODO: 
- Add multipath effects to channel & equalizer
- Framing? Detect start of frame


Date created: 6/9/25
Author: Cole Delong
"""

In [1]:
# Reload imports every time this cell is run
%reload_ext autoreload
%autoreload 2

# Imports
import scipy
import numpy as np
import matplotlib.pyplot as plt
from numba import njit

import sys
sys.path.insert(0, '..')
# import importlib
# import utils
# importlib.reload(utils)
from utils import *

# Constants
SPS = int(2)
N_BITS = 10**6
N_SYMBOLS = int(N_BITS/2) + 1       # qpsk: 2 bits/sample, differential coding: +1 symbol
N_RRC_TAPS = SPS*10 + 1
SNR_DB = 20


In [None]:
### Generate bits ###
bits_tx = np.random.randint(2, size=N_BITS)
plot_signal(bits_tx, title='TX Bits', xlim=[0, 30])

In [None]:
### QPSK modulation ###

# Use differential coding to account for phase ambiguity
sym_tx = diff_encode_psk_symbols(modulate_qpsk(bits_tx))

fig, axs = plt.subplots(1, 2, figsize=(10, 4))
plot_signal(sym_tx.real, sym_tx.imag, title='TX Symbols', xlim=[0, 30], ax=axs[0])
plot_constellation(sym_tx, ax=axs[1])
plt.tight_layout()
plt.show()

In [None]:
### Pulse filtering ###

# Upsample by factor of SPS
sym_upsamp = upsample(sym_tx, SPS)

plot_signal(sym_upsamp.real, sym_upsamp.imag, title='Symbols with Interpolation', xlim=[0, 10*SPS])

In [None]:
h_rrc = rrc(Ts=SPS, n_taps=N_RRC_TAPS)

fig, axs = plt.subplots(1, 2, figsize=(10, 4))
plot_signal(h_rrc.real, title='Root-Raised Cosine Filter Impulse Response', ax=axs[0])
plot_spectrum(h_rrc, ax=axs[1])
plt.tight_layout()
plt.show()

In [None]:
# Convolve signal with RRC filter
sig_tx = np.convolve(sym_upsamp, h_rrc)

fig, axs = plt.subplots(1, 2, figsize=(10, 4))
plot_signal(sig_tx.real, sig_tx.imag, title='RRC Filtered Signal', xlim=[0, 150], ax=axs[0])
plot_spectrum(sig_tx, title="Transmitted Spectrum", ax=axs[1])
plt.tight_layout()
plt.show()

In [None]:
### Channel ###

# Apply CFO
# testing/realistic: 1-5%, aggressive: 10%
sig_chan = apply_cfo(sig_tx, 0.01)

# Apply CPO
sig_chan = apply_cpo(sig_chan)

# Apply STO
mu = 0.3
sig_chan = apply_sto(sig_chan, mu)

# AWGN
sig_rx = apply_awgn(sig_chan, SNR_DB)

# Visualize signal after channel
fig, axs = plt.subplots(1, 3, figsize=(15, 4))
plot_signal(sig_rx.real, sig_rx.imag, title='Signal After Channel', xlim=[0, 150], ax=axs[0])
plot_constellation(sig_rx, ax=axs[2])
plot_spectrum(sig_rx, n_samples=2**13, ax=axs[1])
plt.tight_layout()
plt.show()


In [None]:
### Matched filter ###
sig_matched = np.convolve(sig_rx, h_rrc)

fig, axs = plt.subplots(1, 3, figsize=(15, 4))
plot_signal(sig_matched.real, sig_matched.imag, xlim=[50, 200], title='Received Signal After Matched Filter', ax=axs[0])
plot_spectrum(sig_matched, n_samples=2**13, ax=axs[1])
plot_constellation(sig_matched, n_samples=1000, ax=axs[2])
plt.tight_layout()
plt.show()

# Remove extra samples from convolutions
sig_matched = sig_matched[N_RRC_TAPS-1 : (N_RRC_TAPS-1) * -1]


In [None]:
### Symbol Synchronization ###

"""
Gardner: 
- Low complexity
- 2x oversampling fine since sample rate is same for both rx and tx on PlutoSDR
- Better than M&M in lower SNR environments
- Real-time low-latency

cubic interpolation:
- Complexity not bad (better than sinc) when using Farrow structures
- More accurate than lower order quadratic interpolaters
"""



stc = GardnerSymbolTimingCorrector()
symbols_sampled = stc.correct_batch(sig_matched)


### Carrier Recovery ###
# 2nd Order Costas Loop
K_p = 0.04
K_i = 0.03

control = PIDFeedback(K_p, K_i)
lock_detector = PhaseLockDetector()
error = np.empty_like(symbols_sampled)
sym_rot = costas_loop(symbols_sampled, control, lock_detector, debug=error)
symbols = np.array([sym_rot[i]*(i % SPS == 0) for i in np.arange(sym_rot.size)], dtype=sym_rot.dtype)


# Add in before + after plots for (carrier recovery), (symbol timing recovery) 
fig, axs = plt.subplots(1, 3, figsize=(15, 4))
plot_signal(sym_rot.real, symbols.real, title='After Carrier Recovery', xlim=[50*SPS, 80*SPS], ax=axs[0])
plot_signal(error, title='Phase error over time', xlim=[0, 800], ax=axs[1])
plot_constellation(sym_rot[1000:], n_samples=1000, ax=axs[2])
plt.tight_layout()
plt.show()



In [None]:
### Demodulation ###

# Make optimum decision for AWGN channel
bits_rx = demodulate_qpsk(diff_decode_psk_symbols(optimum_decider_qpsk(sym_rot)))

print(f"BER: {np.mean(bits_tx != bits_rx)}")

In [None]:
print(bits_tx[0:15])
print(bits_rx[0:15])
locs = []
for i in range(len(bits_tx)):
    if bits_tx[i] != bits_rx[i]:
        locs.append(i)

print(np.sum(bits_rx != bits_tx))

In [None]:

# Reload imports every time this cell is run
%reload_ext autoreload
%autoreload 2

# Imports
import scipy
import numpy as np
import matplotlib.pyplot as plt
from numba import njit

import sys
sys.path.insert(0, '..')
# import importlib
# import utils
# importlib.reload(utils)
from utils import *

# Constants
SPS = int(2)
N_BITS = 10**6
N_SYMBOLS = int(N_BITS/2) + 1       # qpsk: 2 bits/sample, differential coding: +1 symbol
N_RRC_TAPS = SPS*10 + 1
SNR_DB = 20




fig, axs = plt.subplots(1, 3, figsize=(15, 4))


SPS = 2
N_RRC_TAPS = SPS*10 + 1
N_SAMPS = 1300

# TX
h_rrc = rrc(Ts=SPS, n_taps=N_RRC_TAPS)
s = np.convolve(upsample(modulate_qpsk(np.random.randint(2, size=N_SAMPS)), SPS), h_rrc)[11:-10]
samples = np.array([s[i]*(i % SPS == 0) for i in np.arange(s.size)], dtype=s.dtype)
plot_signal(s, samples, title="TX", ax=axs[0], xlim=[300, 400])


# Channel
mu = 0.3
s = apply_sto(s, mu)

samples = np.array([s[i]*(i % SPS == 0) for i in np.arange(s.size)], dtype=s.dtype)

plot_signal(s, samples, title="Post-Channel", ax=axs[1], xlim=[300, 400])


# RX
s = np.convolve(s, h_rrc)[10:-10]
stc = GardnerSymbolTimingCorrector()
s = stc.correct_batch(batch=s[0:])
samples = np.array([s[i]*(i % SPS == 0) for i in np.arange(s.size)], dtype=s.dtype)
# plot_signal(s, samples, title="RX", ax=axs[2], xlim=[300, 400])
plot_signal(s, title="RX", ax=axs[2], xlim=[300, 400])
print(stc.mu)
print()

plt.tight_layout()
plt.show()

plot_signal(stc.mu_log, n_samps=len(stc.mu_log))


In [3]:
farrow = CubicFarrowInterpolator()

a = farrow.process_batch_with_tail_padding(np.array([0, 1, 2, 3, 4, 5]), .5)
# a = farrow.process_batch(np.array([0, 1, 2, 3, 4, 5]), 0.5)
# print(a.real)



# x = np.linspace(0, 2 * np.pi, 100)  # Generates 100 points between 0 and 2π
# b1 = np.sin(x*20)

b1 = np.array([1, 0, -1, 0] * 40)
b2 = apply_sto(b1, 0.7)
b3 = apply_sto(b2, 0.3, 1)
# plot_signal(b1, b2, b3, n_samps=10)

def func(mu):

    N_SAMPS=700
    s = np.convolve(upsample(modulate_qpsk(np.random.randint(2, size=N_SAMPS)), SPS), h_rrc)
    # s = upsample(modulate_qpsk(np.random.randint(2, size=N_SAMPS)), SPS)
    # s = np.array([1, 0, -1, 0] * (N_SAMPS//4))
    s = apply_sto(s, mu)
    s = np.convolve(s, h_rrc)[20:-20]

    # s = apply_sto(s, 0.3)
    # plot_signal(s[0:100])

    mu = np.linspace(0, 1, 100)
    # mean_e = np.zeros(len(s)+1)
    all_e = np.empty((0, len(mu)))  # empty array with 0 rows, len(mu) columns


    farrow.reset()
    for i, samp in enumerate(s):
        farrow.load(samp)
        # all_e = np.append(all_e, [gardner_ted(m, farrow) for m in mu], axis=0)
        # all_e = np.vstack([all_e, [gardner_ted(m, farrow) for m in mu]])
        row = [GardnerSymbolTimingCorrector.ted(m, farrow) for m in mu]
        all_e = np.vstack([all_e, row])  # append row to all_e
        # print(all_e)

    mean_e0 = np.array([])
    mean_e1 = np.array([])
    mean_e_tot = np.array([])
    for i in range(len(mu)):
        mean_e0 = np.append(mean_e0, np.mean(all_e[:, i][0::2]))
        mean_e1 = np.append(mean_e1, np.mean(all_e[:, i][1::2]))
        mean_e_tot = np.append(mean_e_tot, np.mean(all_e[:, i]))

    fig = plot_signal(mean_e0, mean_e1, mean_e_tot, title='Mean Alternating Error vs Fractional Timing Offset', label=['Alternating Error #1', 'Alternating Error #2', 'Full Error'], x=mu)
    # fig.savefig('gardner_every_other_sample.png', dpi=90, bbox_inches="tight")

    return mean_e_tot

func(0.2)
# fig = plot_signal(func(0.2), func(0.4), func(0.6), func(0.8), title='Mean Gardner Error vs Fractional Timing Offset', xlabel='Fractional Timing Offset', ylabel='Mean Gardner Error',
            # label=['mu=0.2', 'mu=0.4', 'mu=0.6', 'mu=0.8'])
# fig.savefig('gardner_every_sample.png', dpi=90, bbox_inches="tight")
# plot_signal(func(0.6), title='Mean Gardner Error vs Fractional Timing Offset', xlabel='Fractional Timing Offset', ylabel='Mean Gardner Error')

# s = np.convolve(apply_sto(s, 0.7), h_rrc)[n:(n+15)]
# plot_signal(s)
# farrow.load(s)
# print(f"buffer {list(farrow.buffer)}")

# mu = np.linspace(0, 1, 100)
# e = np.array([gardner_ted(m, farrow) for m in mu])
# plot_signal(e, x=mu)



# help(np.arange)

NameError: name 'h_rrc' is not defined

In [12]:

apply_frame_timing_offset(np.array([[1, 2, 3, 4], [5, 6, 7, 8], [9, 1, 2, 3], [4, 5, 6, 7]]), 5)

# print(zadoff_chu(5, 1))
# j = complex(0, 1)
# # complex(0, 1) * np.arange(5) 
# np.exp(-j*2*np.pi/5)

preamble = zadoff_chu(5)
payload = np.random.randint(2, size=20)
print(payload)
print(to_frames(preamble, payload, 5))

[1 1 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0]
[array([ 1.        +0.00000000e+00j,  0.30901699-9.51056516e-01j,
       -0.80901699+5.87785252e-01j,  0.30901699-9.51056516e-01j,
        1.        +4.89858720e-16j,  1.        +0.00000000e+00j,
        1.        +0.00000000e+00j,  0.        +0.00000000e+00j,
        0.        +0.00000000e+00j,  0.        +0.00000000e+00j]), array([ 1.        +0.00000000e+00j,  0.30901699-9.51056516e-01j,
       -0.80901699+5.87785252e-01j,  0.30901699-9.51056516e-01j,
        1.        +4.89858720e-16j,  0.        +0.00000000e+00j,
        0.        +0.00000000e+00j,  0.        +0.00000000e+00j,
        0.        +0.00000000e+00j,  0.        +0.00000000e+00j]), array([ 1.        +0.00000000e+00j,  0.30901699-9.51056516e-01j,
       -0.80901699+5.87785252e-01j,  0.30901699-9.51056516e-01j,
        1.        +4.89858720e-16j,  1.        +0.00000000e+00j,
        0.        +0.00000000e+00j,  0.        +0.00000000e+00j,
        0.        +0.00000000e+00j,  0.    