In [None]:
import numpy as np
import matplotlib.pyplot as plt
import sys
import os
import os.path
import struct
import scipy.signal
import scipy.io.wavfile
from numpy.fft import fft, fftshift
from scipy.ndimage.filters import maximum_filter1d

In [None]:
Ft = 27500000  # frequency the SDR is tuned on
Fc = 27120000  # carrier frequency
Ft -= Fc/2
Fc -= Fc/2
Fsub = 848000  # subcarrier frequency (relative to carrier)
Tsym = 1/106000  # symbol time for 106 kbps
DECODER_PLOTS = False

In [None]:
capture_path = './gqrx_20200722_135952_27500000_2400000_fc.wav'
if capture_path.endswith('.raw'):
    # The capture file is in gnuradio format (or GQRX I/Q record)
    SAMPLE_SIZE = 2
    size = os.stat(capture_path).st_size
    with open(capture_path, 'rb') as f:
        raw = np.frombuffer(f.read(), dtype='float32')
    v = np.array([
        raw[range(0, len(raw), 2)],
        raw[range(1, len(raw), 2)]
    ], dtype='float32').transpose()
    Fs = 2400000  # sampling frequency of the SDR

    # Let's also write a wave file for convenience
    wave = v * (1 / np.max(np.abs(v)))
    scipy.io.wavfile.write(capture_path[:-4] + ".wav", Fs, wave)

elif capture_path.endswith('.wav'):
    # The capture file is in WAV audio file format
    # Stereo channels for I/Q data
    Fs, v = scipy.io.wavfile.read(capture_path)
    assert v.shape[1] == 2

y = v[:,0] + 1j * v[:,1]
del v
#y = y[int(0.5*Fs):int(1*Fs)]
y -= np.mean(y)
y /= np.std(y)
t = np.arange(len(y)) / Fs

plt.figure()
plt.plot(np.arange(0, len(y)) * (1/Fs), np.abs(y))
plt.show()


In [None]:
def plotfft(y, Fs, Ft=0):
    Y = fft(y)
    n = len(y)
    fshift = np.arange(-n/2, n/2)*(Fs/n)
    yshift = fftshift(Y)
    yshift_mag = np.abs(yshift)
    plt.plot(fshift + Ft, yshift_mag)
    plt.yscale('log')
    plt.grid()
    return (yshift, fshift)

In [None]:
x = np.arange(0, len(y)) / Fs

plt.figure(figsize=(16,8))
plt.subplot(2, 1, 1)
plt.title("power plot")
plt.grid()
plt.plot(x, np.abs(y))
plt.axis([x[0], x[-1], None, None])

plt.subplot(2, 1, 2)
plt.specgram(y, NFFT=1024, Fs=Fs, Fc=Ft, noverlap=900)

plt.show()

In [None]:
plt.figure(figsize=(10,6))
Y, fshift = plotfft(y, Fs, Ft)

print((Ft-Fc))
c_expt_idx = int(.5 * len(Y) + len(Y) * (Fc-Ft) // Fs)
c_expt_range = range(c_expt_idx-20000, c_expt_idx+20000)
Fbb_idx = c_expt_range.start + np.argmax(np.abs(Y[c_expt_range]))

sc_expt_idx = int(Fbb_idx + len(Y) * Fsub // Fs)
sc_expt_range = range(sc_expt_idx-20000, sc_expt_idx+20000)
Fsc_idx = sc_expt_range.start + np.argmax(np.abs(Y[sc_expt_range]))

Fbb = fshift[Fbb_idx]
Fsc = fshift[Fsc_idx]
p_Fbb = np.abs(Y[Fbb_idx])
p_Fsc = np.abs(Y[Fsc_idx])

plt.scatter(Fbb + Ft, p_Fbb, color='y')
plt.annotate('%.2f MHz' % ((Fbb + Ft) * 1e-6), (Fbb + Ft, p_Fbb))
plt.scatter(Fsc + Ft, p_Fsc, color='y')
plt.annotate('%.2f MHz' % ((Fsc + Ft) * 1e-6), (Fsc + Ft, p_Fsc))
plt.title("FFT of signal")
plt.ylim([1e-3, 1e7])
plt.show()

In [None]:
# This cell applies filters to extract the communication from PICC to PCD
# Shift baseband to the subcarrier
scbb_shifter = np.exp(-2j * np.pi * ((Fbb + Fsub) / Fs) * np.arange(len(y)))
y0 = y * scbb_shifter

def pol(angle=0):
    return np.cos(angle) + 1j * np.sin(angle)

poles = [
    .65,
    .65,
    .65,
    .9 * pol((-2/Tsym)/Fs * 2 * np.pi),
    .9 * pol((+2/Tsym)/Fs * 2 * np.pi),
]

zeroes = [
    pol((-Fsub)/Fs * 2 * np.pi),
    pol((+4/Tsym)/Fs * 2 * np.pi),
    pol((-4/Tsym)/Fs * 2 * np.pi),
]

b, a = scipy.signal.zpk2tf(zeroes, poles, .02)
y2 = scipy.signal.lfilter( b, a, y0 )

plt.figure(figsize=(20, 4))
plt.plot(t, np.abs(y))
plt.plot(t, np.abs(y2))
plt.show()

# Some points of interest in the trace, used to show the plot
P1 = 0.0117021
P2 = P1 + 0.0001655
r0 = np.arange(int((P1 - 0.00005) * Fs), int((P2 + 0.00025) * Fs))
r1 = r0
r2 = r1

plt.figure(figsize=(20, 5))
plt.figure(figsize=(20, 5))
plt.xlabel("Time [s]")
plt.ylabel("Carrier Amplitude")
plt.axis([t[r0[0]], t[r0[-1]], -.5, 3])
plt.plot(t[r0], np.abs(y[r0]))
samples = np.array(Fs * (np.concatenate(
    [P1 + Tsym*np.arange(0,10), P2 + Tsym*np.arange(0,20)],
    )), dtype='int')
plt.scatter(t[samples], np.ones(samples.size)*-.5, s=80000,marker=2, color='red')

bits = ['S',0,1,1,0,0,1,0,'E','E']
for i in range(len(bits)):
    plt.text(P1 + Tsym/2 + i * Tsym, 2.5, bits[i],
             horizontalalignment='center',
             verticalalignment='center', )
    
bits = ['S',0,1,0,0,0,0,0,0,'P',0,0,0,0,0,0,0,0,'P','E']
for i in range(len(bits)):
    plt.text(P2 + Tsym/2 + i * Tsym, 2.5, bits[i],
             horizontalalignment='center',
             verticalalignment='center', )
    
plt.show()

plt.figure(figsize=(20, 5))
plt.plot(t[r0], np.abs(y[r0]))
plt.plot(t[r0], np.abs(y2[r0]))
plt.show()

_ = plotfft(y0[r0], Fs, Ft)
_ = plotfft(y2[r0], Fs, Ft)
_ = plotfft(y[r0], Fs, Ft)

picc_y = y2

In [None]:
def decode_frame(bits):
    L = max(1, int(np.ceil(len(bits) / 9)))

    frame = np.zeros(L, dtype="H")
    Bi = 0
    for bi in range(len(bits)):
        if bi % 9 == 0:
            B = 0
        b = bits[bi]
        B += b * (2 ** (bi % 9))
        if bi % 9 == 8 or bi == len(bits)-1:
            frame[Bi] = B
            Bi += 1
    
    if len(frame) > 1 and frame[-1] == 0:
        frame = frame[:-1]
    return frame

In [None]:
picc_y3 = np.abs(picc_y)
PICC_ENVELOPE_THRESHOLD = .5

picc_envelope = maximum_filter1d(picc_y3, size=int(np.ceil(2*Tsym*Fs)))[0:len(y)]
picc_detection = picc_envelope > PICC_ENVELOPE_THRESHOLD

picc_messages = []
state = 0
for i in range(len(picc_envelope)):
    if state == 0 and picc_envelope[i] > PICC_ENVELOPE_THRESHOLD:
        picc_messages.append(i)
        state = 1
    elif state == 1 and picc_envelope[i] < PICC_ENVELOPE_THRESHOLD:
        picc_messages[-1] = (picc_messages[-1], i)
        state = 0

picc_messages = [(a, b) for (a, b) in picc_messages if (b-a)/Fs>Tsym*7]

print("Detected %d PICC messages" % len(picc_messages))

plt.figure(figsize=(20, 4))
plt.plot(t, np.abs(picc_y))
plt.plot(t, picc_envelope)
plt.show()

plt.figure(figsize=(20, 4))
plt.figure(figsize=(20, 4))
plt.plot(t[r0], picc_envelope[r0])
plt.plot(t[r0], picc_y3[r0])
plt.show()

In [None]:
picc_y4 = picc_y3 / picc_envelope

# Edge detection
symbol_ref = np.concatenate([[1], np.zeros(int(.5*Tsym*Fs)-2), [-1]])
picc_y6 = np.convolve(picc_y4 - .5, symbol_ref)[9:9+len(y)]

def resample_picc(msg_start_idx, msg_end_idx):
    first_edge = None
    for i in range(msg_start_idx, msg_end_idx):
        if picc_y4[i] > .5:
            first_edge = i
            break
    
    period_len = Tsym*Fs
    samples = [first_edge + period_len/4]
    
    period_len_init = period_len
    period_len_max = period_len * 1.2
    period_len_min = period_len * 0.8
    
    while samples[-1] + period_len < msg_end_idx:
        
        # PLL: detection of drift
        next_sample = samples[-1] + period_len
        search_radius = 3
        search_space = np.arange(next_sample-search_radius, next_sample+search_radius+1, dtype='uint')
        drift = search_space[np.argmax(np.abs(picc_y6[search_space]))] - next_sample
        
        # PLL: integrating drift into oscillation frequency
        period_len = period_len + .1 * drift
        period_len = max(period_len_min, min(period_len, period_len_max))
        
        # PLL: simulated VCO emits a new period
        samples.append(samples[-1] + period_len)
        
        #print("%8.5f us    %8.5f Hz    %3.2f%%" % (1e6*drift/Fs, Fs/period_len, 100*(1-period_len/period_len_init)))
    
    samples = np.array(samples, dtype='uint')

    if DECODER_PLOTS:
        r1 = np.arange(msg_start_idx - 100, msg_end_idx + 100)
        plt.figure(figsize=(20, 4))
        plt.plot(t[r1], picc_y3[r1])
        plt.plot(t[r1], picc_envelope[r1])
        plt.plot(t[r1], -1+np.abs(picc_y6[r1]))
        plt.scatter(t[samples], picc_y3[samples], color='red')
        plt.show()

    return picc_y4[samples]


for start, end in picc_messages:
    if DECODER_PLOTS:
        print('%.5f->%.5f' % (start/Fs, end/Fs))
    frame_samples = resample_picc(start, end)
    frame = decode_frame(frame_samples[1:] > .5)
    sys.stdout.write('. %.5f: > ' % (start/Fs))
    for Bi, B in enumerate(frame):
        P = sum([b == '1' for b in bin(B)])
        sys.stdout.write("%02x%s " % (B % 256, " " if P % 2 else "!"))
    sys.stdout.write("\n")
    
    if DECODER_PLOTS:
        print('')
        print('-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-')
        print('')

In [None]:
# This cell applies filters to extract the communication from PCD to PICC
# Shift baseband to the carrier
scbb_shifter = np.exp(-2j * np.pi * (Fbb / Fs) * np.arange(len(y)))
y0 = np.abs(y * scbb_shifter)

def pol(angle=0):
    return np.cos(angle) + 1j * np.sin(angle)

poles = [
    .8
]

zeroes = [
]

b, a = scipy.signal.zpk2tf(zeroes, poles, 1)
y2 = scipy.signal.lfilter( b, a, y0 )

plt.figure(figsize=(20, 4))
plt.plot(t, np.abs(y))
plt.plot(t, np.abs(y2))
plt.show()

plt.figure(figsize=(20, 4))
plt.plot(t[r0], np.abs(y[r0]))
plt.plot(t[r0], np.abs(y2[r0]))
plt.show()

_ = plotfft(y0[r0], Fs, Ft)
_ = plotfft(y2[r0], Fs, Ft)

pcd_y = y2

In [None]:
# envelope ABOVE the signal
i_2sym = int(np.ceil(2*Tsym*Fs))
pcd_envelope_h = maximum_filter1d(np.abs(pcd_y), size=i_2sym)[0:len(y)]
# envelope BELOW the signal
pcd_envelope_l = -maximum_filter1d(-np.abs(pcd_y), size=i_2sym)[0:len(y)]

#pcd_envelope = maximum_filter1d(pcd_envelope_h - np.abs(pcd_y), size=i_2sym)[0:len(y)]
pcd_envelope = pcd_envelope_h - pcd_envelope_l
pcd_detection = (pcd_envelope / pcd_envelope_h > .5) * (pcd_envelope_h > 1)

pcd_y3 = pcd_envelope_h - np.abs(pcd_y)

plt.figure(figsize=(20, 4))
plt.plot(t, np.abs(pcd_y))
plt.plot(t, pcd_envelope_h)
plt.show()

plt.figure(figsize=(20, 4))
plt.plot(t[r2], (pcd_envelope / pcd_envelope_h)[r2])
plt.plot(t[r2], .5 * picc_envelope[r2])
plt.show()

plt.figure(figsize=(20, 4))
plt.figure(figsize=(20, 4))
plt.plot(t[r0], pcd_envelope[r0])
plt.plot(t[r0], pcd_y3[r0])
plt.show()

pcd_messages = []
state = 0
for i in range(len(pcd_envelope)):
    if not state and pcd_detection[i]:
        pcd_messages.append(i)
        state = 1
    elif state and not pcd_detection[i]:
        pcd_messages[-1] = (pcd_messages[-1], i)
        state = 0

pcd_messages = [(a, b) for (a, b) in pcd_messages if (b-a)/Fs>Tsym*7]

print("Detected %d PCD messages" % len(pcd_messages))

In [None]:
pcd_y4 = pcd_y3 / pcd_envelope
symbol_ref = np.concatenate([[1], np.zeros(int(.5*Tsym*Fs)-2), [-1]])
b, a = scipy.signal.zpk2tf([1], [.9], 1)
pcd_y5 = scipy.signal.lfilter( b, a, pcd_y3 )
pcd_y6 = np.convolve(pcd_y5, symbol_ref)[9:9+len(y)]

def resample_pcd(msg_start_idx, msg_end_idx):
    first_edge = msg_start_idx
    for i in range(msg_start_idx, msg_end_idx):
        if pcd_y4[i] > .4:
            first_edge = i
            break
    
    period_len = Tsym*Fs
    samples = [first_edge + period_len/4]
    
    period_len_init = period_len
    period_len_max = period_len * 1.2
    period_len_min = period_len * 0.8
    
    while samples[-1] + period_len < msg_end_idx:
        
        # PLL: detection of drift
        next_sample = samples[-1] + period_len / 2
        if .5 < pcd_y4[int(np.round(next_sample))]:
            search_radius = 3
            search_space = np.arange(next_sample-search_radius, next_sample+search_radius+1, dtype='uint')
            drift = search_space[np.argmax(np.abs(pcd_y3[search_space]))] - next_sample

            # PLL: integrating drift into oscillation frequency
            period_len = period_len + .4 * drift
            period_len = max(period_len_min, min(period_len, period_len_max))

            #print("%8.5f us    %8.5f Hz    %3.2f%%" % (1e6*drift/Fs, Fs/period_len, 100*(1-period_len/period_len_init)))
    
        # PLL: simulated VCO emits a new period
        samples.append(samples[-1] + period_len / 2)

    samples = np.array(samples, dtype='uint')
    samples = samples[1::2]

    if DECODER_PLOTS:
        r1 = np.arange(msg_start_idx - 100, msg_end_idx + 100)
        plt.figure(figsize=(20, 4))
        plt.plot(t[r1], pcd_y3[r1])
        plt.plot(t[r1], pcd_envelope[r1])
        plt.scatter(t[samples], pcd_y3[samples], color='red')
        plt.show()

    return pcd_y4[samples]

for start, end in pcd_messages:
    if DECODER_PLOTS:
        print('%.5f->%.5f' % (start/Fs, end/Fs))
    frame_samples = resample_pcd(start, end)
    frame = decode_frame(frame_samples[1:] > .5)
    sys.stdout.write('. %.5f: < ' % (start/Fs))
    for Bi, B in enumerate(frame):
        P = sum([b == '1' for b in bin(B)])
        sys.stdout.write("%02x%s " % (B % 256, " " if P % 2 else "!"))
    sys.stdout.write("\n")
    
    if DECODER_PLOTS:
        print('')
        print('-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-')
        print('')

In [None]:
def plot_freqz(b, a):
    w, h = scipy.signal.freqz(b, a)

    fig, ax1 = plt.subplots(1, 1)
    plt.title('Digital filter frequency response')


    plt.plot(w, 20 * np.log10(abs(h)), 'b')
    plt.ylabel('Amplitude [dB]', color='b')
    plt.xlabel('Frequency [rad/sample]')
    plt.axis([None, None, -100, None])


    ax2 = ax1.twinx()
    angles = np.unwrap(np.angle(h))
    plt.plot(w, angles, 'g')
    plt.ylabel('Angle (radians)', color='g')
    plt.grid()
    plt.axis('tight')
    plt.show()
