# LISTA 9


In [1]:
import numpy as np
from scipy.signal import chirp, sawtooth, square, unit_impulse, wiener, savgol_filter
from skimage.metrics import peak_signal_noise_ratio, mean_squared_error
import torch
from torchmetrics.functional.audio import signal_noise_ratio
import ipywidgets as widgets
from ipywidgets import interact, interact_manual
import matplotlib.pyplot as plt
from matplotlib import gridspec
from PyEMD import EMD
import pandas as pd

### Zadanie 1
 Przygotuj kod w Pythonie, które wyznacza wartości następujących miar jakości sygnałów: SNR, PSNR, MSE

In [2]:
def generate_signals(f=5, fs=1000):
    # Parametry sygnału
    t = np.linspace(0, 1, fs, endpoint=False)  # Oś czasu 1 sekunda

    # a) Sygnał sinusoidalny
    sin_wave = np.sin(2 * np.pi * f * t)

    # b) Sygnał prostokątny
    square_wave = square(2 * np.pi * f * t)  

    # c) Sygnał piłokształtny
    sawtooth_wave = sawtooth(2 * np.pi * f * t)  

    # d) Sygnał świergotliwy (chirp)
    chirp_wave = chirp(t, f0=f, f1=5*f, t1=1, method='linear')

    # e) Superpozycja sinusa i cosinusa
    superposition_wave = np.sin(2 * np.pi * f * t) + 0.5 * np.cos(2 * np.pi * 2 * f * t)

    # f) Impuls jednostkowy
    impulse_wave = unit_impulse(fs, idx=fs//2)

    signals = [sin_wave, square_wave, sawtooth_wave, chirp_wave, superposition_wave, impulse_wave]
    titles = [
        'Sygnał sinusoidalny',
        'Sygnał prostokątny',
        'Sygnał piłokształtny',
        'Sygnał świergotliwy (chirp)',
        'Superpozycja sinusa i cosinusa',
        'Impuls jednostkowy'
    ]

    return t, signals, titles

In [3]:
def mse(signal, noisy_signal):
    return np.mean((signal - noisy_signal)**2)

def snr(signal, noisy_signal):
    # SNR (Signal to Noise Ratio)
    # Obliczamy moc sygnału i moc szumu, a następnie ich stosunek w dB.
    signal_power = np.mean(signal ** 2)
    noise_power = np.mean((signal - noisy_signal) ** 2)
    if noise_power == 0:
        return np.inf
    return 10 * np.log10(signal_power / noise_power)

def psnr(signal, noisy_signal, max_value=None):
    if max_value is None:
        max_value = np.max(signal)
    mse_value = mse(signal, noisy_signal)
    return 20 * np.log10(max_value / np.sqrt(mse_value))

# Przykład użycia:
t, signals, titles = generate_signals()
noisy_signals = [signal + np.random.rayleigh(len(signal)) for signal in signals]

for i, (signal, noisy_signal) in enumerate(zip(signals, noisy_signals)):
    print(f"{titles[i]}:")
    print(f"MSE: {mse(signal, noisy_signal)}")
    print(f"SNR: {snr(signal, noisy_signal)} dB")
    print(f"PSNR: {psnr(signal, noisy_signal)} dB\n")

Sygnał sinusoidalny:
MSE: 669502.3430439356
SNR: -61.26782096906365 dB
PSNR: -58.25752101242384 dB

Sygnał prostokątny:
MSE: 740042.7335971429
SNR: -58.69256798689867 dB
PSNR: -58.69256798689867 dB

Sygnał piłokształtny:
MSE: 2695522.43081724
SNR: -69.07742490704044 dB
PSNR: -64.39372560970527 dB

Sygnał świergotliwy (chirp):
MSE: 562883.3515027409
SNR: -60.51034176287173 dB
PSNR: -57.50418403647914 dB

Superpozycja sinusa i cosinusa:
MSE: 279740.2286269459
SNR: -56.50874908142236 dB
PSNR: -56.967270768156844 dB

Impuls jednostkowy:
MSE: 609437.0944254568
SNR: -87.84928884763579 dB
PSNR: -57.849288847635776 dB



### Zadanie 2
Przygotuj kod w Pythonie, który pozwoli na porównanie wartości miar SNR, PSNR, MSE przygotowanych w zadaniu 1 oraz z gotowych implementacji dostępnych w języku Python.

In [4]:
# Porównanie własnych i gotowych implementacji
for i, (signal, noisy_signal, title) in enumerate(zip(signals, noisy_signals, titles)):
    mse_own = mse(signal, noisy_signal)
    snr_own = snr(signal, noisy_signal)
    psnr_own = psnr(signal, noisy_signal)
    
    # Przykład: dwa tensory 1D (symulacja sygnałów audio)
    reference = torch.tensor(signal, dtype=torch.float32).unsqueeze(0)
    estimate = torch.tensor(noisy_signal, dtype=torch.float32).unsqueeze(0)
    snr_lib = signal_noise_ratio(estimate, reference)
    mse_lib = mean_squared_error(signal, noisy_signal)
    psnr_lib = peak_signal_noise_ratio(signal, noisy_signal, data_range=noisy_signal.max() - noisy_signal.min())
    
    print(f"{title}:")
    print(f"  MSE (własna): {mse_own:.4f}, MSE (skimage): {mse_lib:.4f}")
    print(f"  SNR (własna): {snr_own:.4f} dB, SNR (torchmetrics): {snr_lib.item():.4f} dB")
    print(f"  PSNR (własna): {psnr_own:.4f} dB, PSNR (skimage): {psnr_lib:.4f} dB\n")

Sygnał sinusoidalny:
  MSE (własna): 669502.3430, MSE (skimage): 669502.3430
  SNR (własna): -61.2678 dB, SNR (torchmetrics): -61.2678 dB
  PSNR (własna): -58.2575 dB, PSNR (skimage): -52.2369 dB

Sygnał prostokątny:
  MSE (własna): 740042.7336, MSE (skimage): 740042.7336
  SNR (własna): -58.6926 dB, SNR (torchmetrics): -58.6926 dB
  PSNR (własna): -58.6926 dB, PSNR (skimage): -52.6720 dB

Sygnał piłokształtny:
  MSE (własna): 2695522.4308, MSE (skimage): 2695522.4308
  SNR (własna): -69.0774 dB, SNR (torchmetrics): -69.0774 dB
  PSNR (własna): -64.3937 dB, PSNR (skimage): -58.3294 dB

Sygnał świergotliwy (chirp):
  MSE (własna): 562883.3515, MSE (skimage): 562883.3515
  SNR (własna): -60.5103 dB, SNR (torchmetrics): -60.5103 dB
  PSNR (własna): -57.5042 dB, PSNR (skimage): -51.4836 dB

Superpozycja sinusa i cosinusa:
  MSE (własna): 279740.2286, MSE (skimage): 279740.2286
  SNR (własna): -56.5087 dB, SNR (torchmetrics): -56.5088 dB
  PSNR (własna): -56.9673 dB, PSNR (skimage): -47.424

### Zadanie 3
Przygotuj kod w Pythonie, który wygeneruje sygnał świergotliwy z szumem biały oraz szumem browna z zadanym SNR.

In [5]:
def add_noise_with_snr(signal, noise_type='white', snr_db=10, random_state=None):
    """
    Dodaje szum (biały lub browna) do sygnału z zadanym SNR (w dB).
    """
    rng = np.random.default_rng(random_state)
    signal_power = np.mean(signal ** 2)
    snr_linear = 10 ** (snr_db / 10)
    noise_power = signal_power / snr_linear

    if noise_type == 'white':
        noise = rng.normal(0, np.sqrt(noise_power), size=signal.shape)
    elif noise_type == 'brown':
        # Szum browna: całka z białego szumu (Brownian noise)
        white = rng.normal(0, 1, size=signal.shape)
        brown = np.cumsum(white)
        brown = brown - np.mean(brown)
        brown = brown / np.std(brown)
        noise = brown * np.sqrt(noise_power)
    else:
        raise ValueError("noise_type must be 'white' or 'brown'")

    return signal + noise


signal_options = [
    ('Sinusoidalny', 'sin'),
    ('Prostokątny', 'square'),
    ('Piłokształtny', 'sawtooth'),
    ('Chirp', 'chirp'),
    ('Superpozycja', 'superposition'),
    ('Impuls jednostkowy', 'impulse')
]

signal_names = [name for name, _ in signal_options]

@interact(
    signal_types=widgets.SelectMultiple(
        options=[opt for _, opt in signal_options],
        value=('chirp',),
        description='Rodzaj sygnału',
        style={'description_width': 'initial'}
    ),
    f =widgets.IntSlider(value=5, min=1, max=20, step=1, description='Częstotliwość [Hz]'),
    snr_db=widgets.IntSlider(value=5, min=-10, max=30, step=1, description='SNR [dB]')
    
)


def update_noisy_plot(signal_types, snr_db, f):
    t, signals, titles = generate_signals(f=f)
    signal_map = {opt: idx for idx, (_, opt) in enumerate(signal_options)}

    num_signals = len(signal_types)
    fig = plt.figure(figsize=(15, num_signals * 4.5))
    gs = gridspec.GridSpec(num_signals * 2, 3, figure=fig, height_ratios=[1, 2] * num_signals)

    for i, sig_key in enumerate(signal_types):
        idx = signal_map[sig_key]
        base_signal = signals[idx]
        title = titles[idx]

        white_noise = add_noise_with_snr(base_signal, noise_type='white', snr_db=snr_db)
        brown_noise = add_noise_with_snr(base_signal, noise_type='brown', snr_db=snr_db)

        # Górny rząd (3 wykresy)
        ax1 = fig.add_subplot(gs[i * 2, 0])
        ax1.plot(t, base_signal)
        ax1.set_title(f'{title} - oryginał')

        ax2 = fig.add_subplot(gs[i * 2, 1])
        ax2.plot(t, white_noise)
        ax2.set_title(f'{title} + szum biały (SNR={snr_db} dB)')

        ax3 = fig.add_subplot(gs[i * 2, 2])
        ax3.plot(t, brown_noise)
        ax3.set_title(f'{title} + szum browna (SNR={snr_db} dB)')

        # Dolny rząd (pełna szerokość - 3 kolumny scalone)
        ax4 = fig.add_subplot(gs[i * 2 + 1, :])
        ax4.plot(t, base_signal, label='Oryginalny')
        ax4.plot(t, white_noise, label='+ Biały szum', alpha=0.7)
        ax4.plot(t, brown_noise, label='+ Szum browna', alpha=0.7)
        ax4.set_title(f'{title} - Wszystkie razem')
        ax4.legend()

    plt.tight_layout()
    plt.show()

interactive(children=(SelectMultiple(description='Rodzaj sygnału', index=(3,), options=('sin', 'square', 'sawt…

### Zadanie 4, 5 i 6 zarazem
- Przygotuj kod w Pythonie, który odszumi sygnał z zadania 3 z wykorzystaniem filtru Wienera.
- Przygotuj kod w Pythonie, który odszumi sygnał z zadania 3 z wykorzystaniem filtru Savitzkyego-Golaya.
- Przygotuj kod w Pythonie, który odszumi sygnał z zadania 3 z wykorzystaniem filtru bazującego na algorytmie EMD i częściowej rekonstrukcji.

In [9]:
def denoise_emd(noisy_signal, num_imfs_to_skip=None):
    """
    Odszumia sygnał za pomocą EMD i częściowej rekonstrukcji.
    
    Args:
        noisy_signal (np.array): zaszumiony sygnał 1D
        num_imfs_to_keep (int or None): ile pierwszych IMF zachować. 
                                        Jeśli None, wybieramy automatycznie.
                                        
    Returns:
        denoised_signal (np.array): odszumiony sygnał
    """
    emd = EMD()
    imfs = emd.emd(noisy_signal.to_numpy())
    
    # Jeśli nie podano liczby IMF do pominięcia, wybierz heurystycznie
    if num_imfs_to_skip is None:
        # Przykładowa heurystyka: pomiń pierwsze IMF
        num_imfs_to_skip = min(2, imfs.shape[0])
    
    # Rekonstrukcja sygnału z wybranych IMF
    denoised_signal = np.sum(imfs[num_imfs_to_skip:], axis=0)
    
    return denoised_signal


def denoise_signal(noisy_signal, method='wiener'):
    denoising_methods = {
        'wiener': lambda x: wiener(x, mysize=5, noise=None),
        'savgol': lambda x: savgol_filter(x, window_length=51, polyorder=3),
        'emd': lambda x: denoise_emd(x)
    }
    
    if method not in denoising_methods:
        raise ValueError(f"Nieznana metoda: {method}. Dostępne metody: {list(denoising_methods.keys())}")
    
    return denoising_methods[method](noisy_signal)

@interact(
    signal_type=widgets.Dropdown(
        options=[opt for _, opt in signal_options],
        value='chirp',
        description='Rodzaj sygnału',
        style={'description_width': 'initial'}
    ),
    f = widgets.IntSlider(value=5, min=1, max=20, step=1, description='f [Hz]'),
    noise_type=widgets.RadioButtons(
        options=['white', 'brown'],
        value='white',
        description='Typ szumu',
        style={'description_width': 'initial'}
    ),
    snr_db=widgets.IntSlider(value=5, min=-10, max=30, step=1, description='SNR [dB]'),
    method=widgets.RadioButtons(
        options=['wiener', 'savgol', 'emd'],
        value='wiener',
        description='Metoda denoisingu',
        style={'description_width': 'initial'}
    )

)
def update_denoised_plot(signal_type, f, noise_type, snr_db, method):
    t, signals, titles = generate_signals(f=f)
    signal_map = {opt: idx for idx, (_, opt) in enumerate(signal_options)}

    idx = signal_map[signal_type]
    base_signal = signals[idx]
    title = titles[idx]

    noisy_signal = add_noise_with_snr(base_signal, noise_type=noise_type, snr_db=snr_db)
    denoised_signal = denoise_signal(noisy_signal, method=method)

    plt.figure(figsize=(12, 10))
    plt.subplot(4, 1, 1)
    plt.plot(t, base_signal, alpha=0.7)
    plt.title(f'{title} - Oryginalny sygnał')
    plt.xlabel('Czas [s]')
    plt.ylabel('Amplituda')
    plt.grid()

    plt.subplot(4, 1, 2)
    plt.plot(t, noisy_signal, alpha=0.7, color='green')
    plt.title(f'{title} - Sygnał z szumem (SNR={snr_db} dB)')
    plt.xlabel('Czas [s]')
    plt.ylabel('Amplituda')
    plt.grid()

    plt.subplot(4, 1, 3)
    plt.plot(t, denoised_signal, alpha=0.7, color='darkorange')
    plt.title(f'{title} - Sygnał po denoisingu ({method})')
    plt.xlabel('Czas [s]')
    plt.ylabel('Amplituda')
    plt.grid()

    plt.subplot(4, 1, 4)
    plt.plot(t, base_signal, label='Oryginalny', alpha=0.7)
    plt.plot(t, denoised_signal, label='Po denoisingu', alpha=0.7)
    plt.title(f'{title} - Porównanie sygnałów')
    plt.xlabel('Czas [s]')
    plt.ylabel('Amplituda')
    plt.legend()
    plt.grid()

    plt.tight_layout()
    plt.show()


interactive(children=(Dropdown(description='Rodzaj sygnału', index=3, options=('sin', 'square', 'sawtooth', 'c…

### Zadanie 7
Przygotuj kod w Pythonie, który odszumi sygnał załadowany z pliku csv oraz odszumi ten sygnał wykorzystując filtr Wienera, Savitzkyego-Golaya oraz bazującego na algorytmie EMD i częściowej rekonstrukcji

Dostępne szumy w plikach CSV:
- zaszumiony.csv: sinus + biały szum Gaussowski
- zaszumiony2.csv: sinus + fala prostokątna + szum Gaussa
- zaszumiony3.csv: wykładniczy spadek + szum impulsowy

In [13]:
def denoise_csv(method, plik):
    signal = pd.read_csv(plik)

    t = signal['czas']
    noisy_signal = signal['amplituda']
    denoised_signal = denoise_signal(noisy_signal, method=method)

    plt.figure(figsize=(12, 8))
    plt.subplot(2, 1, 1)
    plt.plot(t, noisy_signal, alpha=0.7, color='green')
    plt.title(f'Sygnał z szumem z pliku ({plik})')
    plt.xlabel('Czas [s]')
    plt.ylabel('Amplituda')
    plt.grid()

    plt.subplot(2, 1, 2)
    plt.plot(t, denoised_signal, alpha=0.7, color='darkorange')
    plt.title(f'Sygnał po denoisingu ({method})')
    plt.xlabel('Czas [s]')
    plt.ylabel('Amplituda')
    plt.grid()

    plt.tight_layout()
    plt.show()

method=widgets.RadioButtons(options=['wiener', 'savgol', 'emd'], value='wiener', description='Metoda denoisingu',
        style={'description_width': 'initial'})
plik=widgets.RadioButtons(options=['zaszumiony.csv', 'zaszumiony2.csv', 'zaszumiony3.csv'], value='zaszumiony.csv', description='Plik z sygnałem',
        style={'description_width': 'initial'})

czy_wlasny = bool(input("Czy chcesz podać własny plik CSV z sygnałem? (y/n): ").strip().lower() == 'y')
if czy_wlasny:
    plik = str(input("Podaj ścieżkę do pliku CSV z sygnałem: "))

display(widgets.interactive(denoise_csv, method=method, plik=plik))


interactive(children=(RadioButtons(description='Metoda denoisingu', options=('wiener', 'savgol', 'emd'), style…