In [87]:
# standard libraries
from collections import deque
import math
from math import pi
# external libraries
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots


In [None]:
np.random.seed(1234)


# Generate periodic signals
def get_periodic(A, f1, p, fs, t1, t2):
    Ts = 1 / fs
    time = np.arange(t1, t2, Ts)  # seconds
    periodic = np.array([A * np.cos(2 * pi * f1 * t - p) for t in time])
    return periodic, time


def generate_harmonics(A, F, P, fs, t1, t2):
    Ts = 1 / fs
    harmonics = []
    for i in range(len(A)):
        harmonic, time = get_periodic(A[i], F[i], P[i], fs, t1, t2)
        harmonics.append(harmonic)
    harmonics = np.array(harmonics)
    return harmonics, time


def generate_target(A, F, P, T, fs, t1, t2):
    Ts = 1 / fs
    time = np.arange(t1, t2, Ts)  # seconds
    target = []
    i = 0
    for t in time:
        if i != len(T) - 1 and t >= T[i + 1]:
            i += 1
        target.append(A[i] * np.cos(2 * pi * F[i] * t - P[i]))
    target = np.array(target)
    return target, time


f1 = 50
fs = 4000
t1 = 0
t2 = 80e-3

A = [1, 2]
F = [f1, f1]
P = [0, pi]
T = [0, t2 / 2]

target, time = generate_target(A, F, P, T, fs, t1, t2)

F = np.arange(0, 20) * f1
P = np.random.uniform(-pi, pi, len(F))
A = np.random.uniform(0, 0.1, len(F))
A[1] = 0
periodics, time = generate_harmonics(A, F, P, fs, t1, t2)

# Generate white noise
noise_max = 0.1
noise = np.random.uniform(-noise_max, noise_max, len(time))
print(f"Maximum noise amplitude = {noise_max}")

# Generate aperiodic component
fade_coef = 100
amp_coef = 1
t_start = time[len(time) // 2]  # middle time
aperiodic = amp_coef * np.exp(-fade_coef * (time - t_start))
aperiodic[time < t_start] = 0  # aperiodic starts form t_start

# Sum up all components into one signal
signal = 0
signal += target
# signal += np.sum(periodics, axis=0)
# signal += aperiodic
# signal += noise

fig = go.Figure()
fig.add_trace(
    go.Scatter(
        x=time, y=signal, name="noisy", mode="lines+markers", line=dict(color="black")
    )
)
fig.add_trace(
    go.Scatter(x=time, y=target, name="pure", mode="lines", line=dict(color="red"))
)
fig.add_trace(go.Scatter(x=time, y=periodics[0], name="DC"))
fig.add_trace(go.Scatter(x=time, y=np.sum(periodics[2:], axis=0), name="harmonics"))
fig.add_trace(go.Scatter(x=time, y=aperiodic, name="aperiodic"))
fig.add_trace(go.Scatter(x=time, y=noise, name="noise"))
fig.update_layout(
    title="Digital Signal",
    xaxis_title="Time, s",
    yaxis_title="Magnitude, linear",
    template="plotly_white"
)
fig.show()

Maximum noise amplitude = 0.1


In [89]:
class FourierFilter:
    def __init__(self, f1, fs, mode="full-period"):
        self.f1 = f1
        self.fs = fs
        self.mode = mode
        if mode == "full-period":
            self._setup_full_period()
        elif mode == "half-period":
            self._setup_half_period()

    def _setup_full_period(self):
        fs = self.fs
        f1 = self.f1
        N = fs//f1
        self.buflen = N
        self.buf = deque([], maxlen=self.buflen)
        f_step = fs//N
        k = f1 // f_step # chooses right freq bin
        pi = math.pi
        self.cos_coefs = [2/N*math.cos(2*pi/N*(-n*k-1)) for n in range(N)]
        self.sin_coefs = [2/N*math.sin(2*pi/N*(-n*k-1)) for n in range(N)]

    def _setup_half_period(self):
        fs = self.fs
        f1 = self.f1
        N = fs//f1
        self.buflen = N//2
        self.buf = deque([], maxlen=self.buflen)
        f_step = fs//N
        k = f1 // f_step # chooses right freq bin
        pi = math.pi
        self.cos_coefs = [-4/N*math.cos(2*pi/N*(-n*k-1)) for n in range(N//2)]
        self.sin_coefs = [-4/N*math.sin(2*pi/N*(-n*k-1)) for n in range(N//2)]


    def fit_sample(self,sample):
        self.buf.append(sample)
        if len(self.buf) >= self.buflen:
            buf = self.buf
            cos_coefs = self.cos_coefs
            sin_coefs = self.sin_coefs
            real = sum(buf[i] * cos_coefs[i] for i in range(self.buflen))
            imag = sum(buf[i] * sin_coefs[i] for i in range(self.buflen))
            # magnitude = math.sqrt(real**2 + imag**2)
            # phase = math.atan2(imag, real)
        else:
            real = 0
            imag = 0
            # magnitude = 0
            # phase = 0
        return real + 1j*imag


In [90]:
test_description = "synthetic test"
test_name = "Test S1"
f1 = 50
fs = 4000
ff_full = FourierFilter(f1=f1, fs=fs, mode="full-period")
ff_half = FourierFilter(f1=f1, fs=fs, mode="half-period")
s_filtered_full = []
s_filtered_half = []
for i in range(len(signal)):
    s_filtered_full.append(ff_full.fit_sample(signal[i]))
    s_filtered_half.append(ff_half.fit_sample(signal[i]))
s_filtered_full = np.array(s_filtered_full)
s_filtered_half = np.array(s_filtered_half)

fig = go.Figure()
fig.update_layout(
    title="Filtration of signal with noise, harmonic noise, and sudden stepped phase shift + amplitude change of target frequency",
    xaxis_title="Time, s",
    template="plotly_white",
)
fig.add_trace(go.Scatter(x=time, y=signal, name="signal", mode="lines+markers", line=dict(color="black")))  # fmt:skip
fig.add_trace(go.Scatter(x=time, y=target, name="ideal filter", line=dict(color="red", width=4)))  # fmt:skip
fig.add_trace(go.Scatter(x=time, y=np.real(s_filtered_full), name="full-period filter", line=dict(color="gold")))  # fmt:skip
fig.add_trace(go.Scatter(x=time, y=np.real(s_filtered_half), name="half-period filter", line=dict(color="silver")))  # fmt:skip
fig.show()

**2x faster reaction to transitions**

In [91]:
fig = go.Figure()
fig.update_layout(
    title="Deviation between filtered and ideally filtered values",
    xaxis_title="Time, s",
    template="plotly_white",
)
fig.add_trace(go.Scatter(x=time, y=np.real(s_filtered_full)-target, name="full-period", line=dict(color="gold")))  # fmt:skip
fig.add_trace(go.Scatter(x=time, y=np.real(s_filtered_half)-target, name="half-period", line=dict(color="silver")))  # fmt:skip
fig.show()

In [96]:

s1,time = get_periodic(1, f1, 0, fs, t1, t2)
s2,time = get_periodic(1, 2*f1, 0, fs, t1, t2)
s3,time = get_periodic(1, 3*f1, 0, fs, t1, t2)
s4,time = get_periodic(1, 4*f1, 0, fs, t1, t2)

signal = s1 + s2 + s3 + s4

test_description = "big odd and even harmonics"
test_name = "Test S2"

f1 = 50
fs = 4000
ff_full = FourierFilter(f1=f1, fs=fs, mode="full-period")
ff_half = FourierFilter(f1=f1, fs=fs, mode="half-period")
s_filtered_full = []
s_filtered_half = []
for i in range(len(signal)):
    s_filtered_full.append(ff_full.fit_sample(signal[i]))
    s_filtered_half.append(ff_half.fit_sample(signal[i]))
s_filtered_full = np.array(s_filtered_full)
s_filtered_half = np.array(s_filtered_half)

fig = go.Figure()
fig.update_layout(
    title=f"{test_name}: {test_description}",
    xaxis_title="Time, s",
    template="plotly_white",
)
fig.add_trace(go.Scatter(x=time, y=signal, name="signal", mode="lines+markers", line=dict(color="black")))  # fmt:skip
fig.add_trace(go.Scatter(x=time, y=s1, name="ideal filter", line=dict(color="red", width=4)))  # fmt:skip
fig.add_trace(go.Scatter(x=time, y=np.real(s_filtered_full), name="full-period filter", line=dict(color="gold")))  # fmt:skip
fig.add_trace(go.Scatter(x=time, y=np.real(s_filtered_half), name="half-period filter", line=dict(color="silver")))  # fmt:skip
fig.show()

fig = go.Figure()
fig.update_layout(
    title="Deviation between filtered and ideally filtered values",
    xaxis_title="Time, s",
    template="plotly_white",
)
fig.add_trace(go.Scatter(x=time, y=np.real(s_filtered_full)-s1, name="full-period", line=dict(color="gold")))  # fmt:skip
fig.add_trace(go.Scatter(x=time, y=np.real(s_filtered_half)-s1, name="half-period", line=dict(color="silver")))  # fmt:skip
fig.show()