In [105]:
import numpy as np
import matplotlib.pyplot as plt 
from scipy.signal import square
import math
from numpy.fft import fft, ifft, fftfreq, fftshift, ifftshift
import time

import plotly
import plotly.graph_objs as go
import plotly.express as px
from plotly.subplots import make_subplots
import pandas as pd

import pywt 
import pydub

In [106]:
def cosinus(t, frequency, amplitude):
    w = frequency * 2 * np.pi
    return np.cos(w * t) * amplitude

def gauss_noise(signal, noise_level):
    return signal + np.random.standard_normal(signal.shape) * noise_level

In [107]:
def median(input_signal, k):
    padded_input_signal = np.pad(input_signal, (k, k), constant_values=(0, 0))
    result = np.array([])
    for t, _ in enumerate(input_signal):
        result = np.append(result, np.sum(padded_input_signal[t: t + 2 * k + 1]))
    return result / (2 * k + 1)


# Параметры временной области:
sample_rate = 1000.0  
duration = 1.0
num_samples = int(sample_rate * duration)

time_domain = np.linspace(0, duration, num_samples, endpoint=False)
freq_domain = np.arange(0, 100)

signal = (cosinus(time_domain, 20, 2) + 
          cosinus(time_domain, 40, 3) + 
          cosinus(time_domain, 60, 1))
signal = gauss_noise(signal, noise_level=3)
signal_spectrum = np.abs(fft(signal))

K = 10
median_signal = median(signal, k=K)
median_signal_spectrum = np.abs(fft(signal))

In [108]:
fig = make_subplots(rows=2, cols=2,
                    subplot_titles=("Исходный зашумленный сигнал", 
                                    f"Усредненный сигнал при k={42}", 
                                    "Спектр исходного зашумленного сигнала",
                                    f"Спектр усредненного сигнала при k={42}"))

fig.add_trace(
    go.Scatter(x=time_domain, y=signal),
    row=1, col=1
)

fig.add_trace(
    go.Scatter(x=time_domain, y=median_signal),
    row=1, col=2
)

fig.add_trace(
    go.Scatter(x=freq_domain, y=signal_spectrum[0:100].real),
    row=2, col=1
)

fig.add_trace(
    go.Scatter(x=freq_domain, y=median_signal_spectrum[0:100].real),
    row=2, col=2
)
fig.update_layout(
    height=1000
)
fig.show()

In [109]:
def g(w, n):
    t = np.linspace((-n + 1) / 2, (n - 1) / 2, n)
    return np.exp(-4 * np.log(2) * t**2 / w**2)

def gaussian_smoothing_filter(input_signal, k, w):
    padded_input_signal = np.pad(input_signal, (k, k), constant_values=(0, 0))
    result = np.array([])
    for t, _ in enumerate(input_signal):
        result = np.append(result, np.sum(padded_input_signal[t: t + 2 * k + 1] * g(w, 2 * k + 1)))
    return result

gauss_smoothed_signal = gaussian_smoothing_filter(signal, k=42, w=2.355)
gauss_smoothed_signal_spectrum = np.abs(fft(gauss_smoothed_signal))

In [110]:
fig = make_subplots(rows=2, cols=2,
                    subplot_titles=("Исходный зашумленный сигнал", 
                                    f"Сглаженный сигнал", 
                                    "Спектр исходного зашумленного сигнала",
                                    f"Спектр сглаженного сигнала"))

fig.add_trace(
    go.Scatter(x=time_domain, y=signal),
    row=1, col=1
)

fig.add_trace(
    go.Scatter(x=time_domain, y=gauss_smoothed_signal),
    row=1, col=2
)

fig.add_trace(
    go.Scatter(x=freq_domain, y=signal_spectrum[0:100].real),
    row=2, col=1
)

fig.add_trace(
    go.Scatter(x=freq_domain, y=gauss_smoothed_signal_spectrum[0:100].real),
    row=2, col=2
)
fig.update_layout(
    height=1000
)
fig.show()

In [111]:
fig = make_subplots(rows=1, cols=1)

fig.add_trace(
    go.Scatter(x=time_domain[:num_samples//2], y=signal[:num_samples//2], name="original"),
    row=1, col=1
)
fig.add_trace(
    go.Scatter(x=time_domain[:num_samples//2], y=median_signal[:num_samples//2], name="median"),
    row=1, col=1
)
fig.add_trace(
    go.Scatter(x=time_domain[:num_samples//2], y=gauss_smoothed_signal[:num_samples//2], name="gaussian"),
    row=1, col=1
)
fig.update_layout(
    height=500,
    xaxis_title="Временные отсчеты",
    yaxis_title="Амплитуда",
)
fig.show()

In [112]:
spikes = np.random.randint(2, size=num_samples)
signal_from_spikes = gaussian_smoothing_filter(spikes, k=2, w=2.355)


In [113]:
fig = make_subplots(rows=1, cols=1)
fig.update_xaxes(range=[0, 0.2], col=1)
fig.add_trace(
    go.Bar(x=time_domain, y=spikes, name="spikes"),
    row=1, col=1
)
fig.add_trace(
    go.Scatter(x=time_domain, y=signal_from_spikes / 3, name="signal_from_spikes"),
    row=1, col=1
)
fig.update_layout(
    height=1000,
    xaxis_title="Временные отсчеты",
    yaxis_title="Амплитуда",
)
fig.show()

In [114]:
def median_filter_to_remove_spike_noises(spikes_signal, threshold, k):
    mask = spikes_signal >= threshold 
    spikes_signal[mask] = 0
    # median_spikes = median(spikes_signal, k)
    # spikes_signal[mask] = median_spikes[mask]
    return median(spikes_signal, k)


spikes = np.random.randint(200, size=num_samples)
median_signal_from_spikes = median_filter_to_remove_spike_noises(spikes.copy(), 160, 42)

spikes_for_graph = np.maximum(spikes - median_signal_from_spikes, 0)

In [115]:
fig = make_subplots(rows=1, cols=1)
# fig.update_xaxes(range=[0, 0.2], col=1)
fig.add_trace(
    go.Bar(x=time_domain, y=spikes_for_graph, name="spikes", base=median_signal_from_spikes),
    row=1, col=1
)
fig.add_trace(
    go.Scatter(x=time_domain, y=median_signal_from_spikes, name="signal_from_spikes"),
    row=1, col=1
)
fig.update_layout(
    height=1000,
    xaxis_title="Временные отсчеты",
    yaxis_title="Амплитуда",
)
fig.show()

In [116]:
def pad_zeros_to_spectrum(spectrum, new_shape):
    shifted_spectrum = fftshift(spectrum)
    current_shape = len(spectrum)
    padded_spectrum = None
    if len(spectrum) % 2 == 0: 
        padded_spectrum = np.pad(shifted_spectrum, ((new_shape - current_shape) // 2, (new_shape - current_shape) - (new_shape - current_shape) // 2), constant_values=(0, 0))
    else:
        padded_spectrum = np.pad(shifted_spectrum, ((new_shape - current_shape) - (new_shape - current_shape) // 2, (new_shape - current_shape) // 2), constant_values=(0, 0))
    return ifftshift(padded_spectrum)    

def spectral_interpolation_method(input_signal, recovery_interval):
    input_signal = input_signal.copy()
    recovery_interval_length = recovery_interval[1] - recovery_interval[0] + 1

    left_part = input_signal[:recovery_interval[0]]
    right_part = input_signal[recovery_interval[1] + 1:]

    size_of_window = min(len(right_part), len(left_part))
    average_spectrum = (fft(right_part[:size_of_window]) + fft(left_part[:size_of_window])) / 2
    recovered_signal = np.real(ifft(average_spectrum))

    input_signal[recovery_interval[0]: recovery_interval[1] + 1] = recovered_signal[:recovery_interval_length]
    input_signal[recovery_interval[0] - 1] = (input_signal[recovery_interval[0] - 1] + recovered_signal[0]) / 2
    input_signal[recovery_interval[1] + 1] = (input_signal[recovery_interval[1] + 1] + recovered_signal[recovery_interval_length - 1]) / 2
    
    return input_signal, average_spectrum

# Параметры временной области:
sample_rate = 1000.0  
duration = 10.0
num_samples = int(sample_rate * duration)

time_domain = np.linspace(0, duration, num_samples, endpoint=False)
freq_domain = fftfreq(num_samples, 1 / sample_rate)

signal = (cosinus(time_domain, 20, 2) + 
          cosinus(time_domain, 40, 3) + 
          cosinus(time_domain, 60, 1))
signal = gauss_noise(signal, noise_level=0.4)
signal_spectrum = fft(signal)

interval = np.array([5 * sample_rate, 7 * sample_rate], dtype='int')
signal_to_recover = signal.copy()
signal_to_recover[interval[0]: interval[1] + 1] = 0

signal_recovered, spectrumm = spectral_interpolation_method(signal_to_recover.copy(), interval)

In [117]:
fig = make_subplots(rows=2, cols=1)
fig.update_xaxes(range=[4.8, 5.2], col=1, row=1)
fig.update_xaxes(range=[0, 100], col=1, row=2)
fig.add_trace(
    go.Scatter(x=time_domain, y=signal_to_recover, name="original"),
    row=1, col=1
)
fig.add_trace(
    go.Scatter(x=time_domain, y=signal_recovered, name="interpolated"),
    row=1, col=1
)
fig.update_layout(
    height=1000,
    xaxis_title="Временные отсчеты",
    yaxis_title="Амплитуда",
)
fig.show()

In [118]:
a = fftfreq(10, 1/1000)
b = fftfreq(11, 1/1000)
print("a", a)
print("b", b)
# print(a[10//2])
# print(b[11//2 + 1])

ia = fftshift(a)
print("ia", ia)
ia_shape = 10
ib = fftshift(b)
print("ib", ib)
ib_shape = 11

new_shape = 16

ia_padded = np.pad(ia, ((new_shape - ia_shape) // 2, (new_shape - ia_shape) - (new_shape - ia_shape) // 2), constant_values=(0, 0))
ib_padded = np.pad(ib, ((new_shape - ib_shape) - (new_shape - ib_shape) // 2, (new_shape - ib_shape) // 2), constant_values=(0, 0))
print()
print(ifftshift(ia_padded))
print(ifftshift(ib_padded))


print("a to 16", pad_zeros_to_spectrum(a, 16))
print("a to 17", pad_zeros_to_spectrum(a, 17))
print("b to 16", pad_zeros_to_spectrum(b, 16))
print("b to 17", pad_zeros_to_spectrum(b, 17))

a [   0.  100.  200.  300.  400. -500. -400. -300. -200. -100.]
b [   0.           90.90909091  181.81818182  272.72727273  363.63636364
  454.54545455 -454.54545455 -363.63636364 -272.72727273 -181.81818182
  -90.90909091]
ia [-500. -400. -300. -200. -100.    0.  100.  200.  300.  400.]
ib [-454.54545455 -363.63636364 -272.72727273 -181.81818182  -90.90909091
    0.           90.90909091  181.81818182  272.72727273  363.63636364
  454.54545455]

[   0.  100.  200.  300.  400.    0.    0.    0.    0.    0.    0. -500.
 -400. -300. -200. -100.]
[   0.           90.90909091  181.81818182  272.72727273  363.63636364
  454.54545455    0.            0.            0.            0.
    0.         -454.54545455 -363.63636364 -272.72727273 -181.81818182
  -90.90909091]
a to 16 [   0.  100.  200.  300.  400.    0.    0.    0.    0.    0.    0. -500.
 -400. -300. -200. -100.]
a to 17 [   0.  100.  200.  300.  400.    0.    0.    0.    0.    0.    0.    0.
 -500. -400. -300. -200. -100.]
b to 16 [

In [131]:
def pad_zeros_to_spectrum(spectrum, new_shape):
    shifted_spectrum = fftshift(spectrum)
    current_shape = len(spectrum)
    padded_spectrum = None
    if len(spectrum) % 2 == 0: 
        padded_spectrum = np.pad(shifted_spectrum, ((new_shape - current_shape) // 2, (new_shape - current_shape) - (new_shape - current_shape) // 2), constant_values=(0, 0))
    else:
        padded_spectrum = np.pad(shifted_spectrum, ((new_shape - current_shape) - (new_shape - current_shape) // 2, (new_shape - current_shape) // 2), constant_values=(0, 0))
    return ifftshift(padded_spectrum) 


def downsample(input_signal, old_sample_rate, new_sample_rate):
    step = int(old_sample_rate / new_sample_rate)
    return input_signal[::step]


def upsample(input_signal, new_sample_rate):
    input_signal_spectrum = fft(input_signal)
    padded_spectrum = pad_zeros_to_spectrum(input_signal_spectrum, new_sample_rate)
    return np.real(ifft(padded_spectrum))


# Параметры временной области:
sample_rate = 1000.0  
duration = 1.0
num_samples = int(sample_rate * duration)

time_domain = np.linspace(0, duration, num_samples, endpoint=False)
freq_domain = fftfreq(num_samples, 1 / sample_rate)

signal = (cosinus(time_domain, 50, 2) + 
          cosinus(time_domain, 100, 3))
signal = gauss_noise(signal, noise_level=0.4)
signal_spectrum = fft(signal)


signal_downsampled = downsample(signal, sample_rate, 500)
resampled_signal = upsample(signal_downsampled, 1000)
time_domain_500 = np.linspace(0, duration, int(500 * duration), endpoint=False)


In [135]:
fig = make_subplots(rows=3, cols=1)
fig.update_xaxes(range=[0, 0.5], col=1)
#fig.update_xaxes(range=[0, 100], col=1, row=2)
fig.add_trace(
    go.Scatter(x=time_domain, y=signal, name="original"),
    row=1, col=1
)
fig.add_trace(
    go.Scatter(x=time_domain_500, y=signal_downsampled.real, mode="markers", name="downsampled"),
    row=1, col=1
)
fig.add_trace(
    go.Scatter(x=time_domain, y=resampled_signal, name="resampled to original"),
    row=2, col=1
)
fig.update_layout(
    height=1000,
    xaxis_title="Временные отсчеты"
)
fig.show()