In [None]:
import ctypes
import numpy as np
from plotly import graph_objs as go
from scipy.io import wavfile
from scipy.signal import freqz
import os

from util.plotting import compute_fft_plot_from_sample_rate
from util.demodulation import demodulate_signal
from util.filtering import low_pass_filter_real_signal


# This path must be relative to where the python code is executed from.
c_data_io = ctypes.CDLL('build/data_io.so')


In [None]:
def cast_list_to_cfloat(input: np.ndarray):
    contiguous_array = np.ascontiguousarray(input, dtype=np.float32)
    return (ctypes.c_float * len(contiguous_array)).from_buffer(contiguous_array)


In [None]:
"""
    int ReadRawFile(const char* const file_path, const int size, const float offset_frequency,
        float sampling_rate, const float low_pass_cutoff_frequency, const int downsample_factor,
        float* const real_output, float* const imaginary_output);

"""


file_path = "data/gqrx_20220109_025620_136741000_2080000_fc.raw"
file_path_c = ctypes.c_char_p(file_path.encode('utf-8'))

offset_frequency = 360788
low_pass_cutoff = 34E3
fs = 2.08E6
length_to_read = int(os.path.getsize(file_path) / 8)  # int(fs*10)
downsample_factor = 20

real_output = np.zeros((int(length_to_read/downsample_factor),), dtype=np.float32)
imaginary_output = np.zeros_like(real_output)

real_output = cast_list_to_cfloat(real_output)
imaginary_output = cast_list_to_cfloat(imaginary_output)



c_data_io.ReadRawFile(
    file_path_c, ctypes.c_int(length_to_read), ctypes.c_float(offset_frequency), ctypes.c_float(fs),
    ctypes.c_float(low_pass_cutoff), ctypes.c_int(downsample_factor), real_output, imaginary_output
)

signal = np.array(real_output) + 1j * np.array(imaginary_output)
filtered_fs = fs / downsample_factor

In [9]:
len(signal) / filtered_fs

468.2073557692308

In [21]:
fig = go.Figure()

f,m = compute_fft_plot_from_sample_rate(signal[int(filtered_fs*0):int(filtered_fs*1)], sampling_rate=filtered_fs)
fig.add_scatter(x=f[::10],y=m[::10])

fig.show()

In [None]:
angle_diff = np.angle(
    np.conjugate(signal[:-1]) * signal[1:]
)

audio = low_pass_filter_real_signal(angle_diff, cutoff_frequency=15E3, sample_rate=filtered_fs, order=20)
audio = audio[::5]
audio_fs = filtered_fs / 5

In [None]:
wavfile.write("data/test_newest.wav", int(audio_fs), audio)

In [None]:
audio, audio_fs = demodulate_signal(output_shifted_signal, fs, base_band_filter_cutoff=34E3,
                              base_band_downsample_rate=10, audio_filter_cutoff=15000,
                              audio_downsample_rate=10,
                              apply_output_filter=True)

wavfile.write("data/newest.wav", int(audio_fs), audio)

In [None]:
"""
local function fir_lowpass(num_taps, cutoff)
    local h = {}

    for n = 0, num_taps-1 do
        if n == (num_taps-1)/2 then
            h[n+1] = cutoff
        else
            h[n+1] = math.sin(math.pi*cutoff*(n - (num_taps-1)/2))/(math.pi*(n - (num_taps-1)/2))
        end
    end

    return h
end

hamming window
0.54 - 0.46*math.cos((2*math.pi*n)/(M-1)).


    local scale = 0
    for n=0, #h-1 do
        scale = scale + h[n+1]*math.cos(math.pi*(n - (#h-1)/2)*scale_freq)
    end
    for n=1, #h do
        h[n] = h[n] / scale
    end


"""

def compute_hamming_coefficient(n, M):
    return 0.54 - 0.46*np.cos((2*np.pi*n)/(M-1))


def compute_fir_coefficients(num_taps, cutoff_frequency, sampling_frequency):
    if (num_taps % 2) != 0:
        raise RuntimeError("Number of coefficients in this FIR must be even")

    nyquist_frequency = sampling_frequency / 2
    normalized_cutoff = cutoff_frequency / nyquist_frequency

    coefficients = []
    for i in range(num_taps):
        coefficient = np.sin(np.pi*normalized_cutoff*(i - (num_taps-1)/2))/(np.pi*(i - (num_taps-1)/2))
        hamming_coefficient = compute_hamming_coefficient(i, num_taps)

        coefficients.append(coefficient * hamming_coefficient)
    
    total_sum = np.sum(coefficients)
    coefficients = [c / total_sum for c in coefficients]
    
    return coefficients

    

In [None]:
num_taps = 10
cutoff_frequency = 500
fs = 5000

coefficients = compute_fir_coefficients(num_taps=num_taps, cutoff_frequency=cutoff_frequency, sampling_frequency=fs)

In [None]:
from scipy.signal import firwin

In [None]:
fig = go.Figure()

num_taps = 20
cutoff_frequency = 500
fs = 5000

coefficients = compute_fir_coefficients(num_taps=num_taps, cutoff_frequency=cutoff_frequency, sampling_frequency=fs)
w,h = freqz(coefficients, fs=fs)
fig.add_scatter(x=w,y=20*np.log10(np.abs(h)), name="mine")


coeff_scipy = firwin(numtaps=num_taps, cutoff=cutoff_frequency, fs=fs)
w,h = freqz(coeff_scipy, fs=fs)
fig.add_scatter(x=w,y=20*np.log10(np.abs(h)), name="scipy")


fig.show()

In [None]:
time = np.linspace(0, 5, 5*fs)
test = np.cos(2*np.pi*100*time) + np.cos(2*np.pi*1000*time)

In [None]:
fig = go.Figure()

f,m = compute_fft_plot_from_sample_rate(test, sampling_rate=fs)
fig.add_scatter(x=f,y=m)

fig.show()

In [None]:
class LPF(object):
    def __init__(self, num_taps, cutoff_frequency, fs):
        self.num_taps = num_taps
        self.coefficients = compute_fir_coefficients(
            num_taps=num_taps, cutoff_frequency=cutoff_frequency, sampling_frequency=fs
        )
        self.input_buffer = [0.0] * num_taps
    
    def update(self, input_value):
        self.input_buffer[0] = input_value

        # print(f"\nUpdated input buffer:{self.input_buffer}\n")

        output = 0.0
        for i, coeff in enumerate(self.coefficients):
            output += coeff * self.input_buffer[i]
        
        # for i in range(self.num_taps-1):
        #     self.input_buffer[i+1] = self.input_buffer[i]

        for i in range(self.num_taps-1, 0, -1):
            # Iterating backwards.
            self.input_buffer[i] = self.input_buffer[i-1]

        # print(f"\nAfter input buffer:{self.input_buffer}\n")

        return output


In [None]:
lpf = LPF(num_taps, cutoff_frequency, fs)

lpf_test = []
for input_value in test:
    lpf_test.append(lpf.update(input_value))

In [None]:
fig = go.Figure()

f,m = compute_fft_plot_from_sample_rate(test, sampling_rate=fs)
fig.add_scatter(x=f,y=m, name="original")

f,m = compute_fft_plot_from_sample_rate(lpf_test, sampling_rate=fs)
fig.add_scatter(x=f,y=m, name="lpf")

fig.show()