In [3]:
# импорт нужных библиотек и модулей

import numpy as np
import h5py

from sklearn.metrics import mean_squared_error, mean_absolute_error


In [4]:
# список всех модуляций из датасета в правильном порядке

modulation_labels = ['OOK', '4ASK', '8ASK', 'BPSK', 'QPSK', '8PSK', '16PSK', '32PSK', 
                    '16APSK', '32APSK', '64APSK', '128APSK', '16QAM', '32QAM', '64QAM',
                    '128QAM', '256QAM', 'AM-SSB-WC', 'AM-SSB-SC', 'AM-DSB-WC', 'AM-DSB-SC',
                    'FM', 'GMSK', 'OQPSK']

In [5]:
# так как стоит конкретная задача - восстановление сигнала с БПЛА, 
# то выберем подходящие модуляции

selected_modulations = ["BPSK", "QPSK", "8PSK", "16QAM", "64QAM"]

selected_indices = [modulation_labels.index(mod) for mod in selected_modulations]

selected_indices

[3, 4, 5, 12, 14]

In [6]:
# Открываем файл

file_path = '/kaggle/input/radioml2018/GOLD_XYZ_OSC.0001_1024.hdf5'

filtered_signals = []

with h5py.File(file_path, 'r') as f:
    labels = f['Y']  # Метки в формате one-hot
    for i in range(labels.shape[0]):  
        label = np.argmax(labels[i])  # Преобразуем one-hot в индекс модуляции
        if label in selected_indices:
            filtered_signals.append(f['X'][i])  

filtered_signals = np.array(filtered_signals)
filtered_signals = filtered_signals[:10000]

In [7]:
# Функция для вычисления PSNR
def psnr_metric(original, reconstructed):
    mse = np.mean((original - reconstructed) ** 2)
    if mse == 0:  # Если ошибки нет, PSNR бесконечен
        return float('inf')
    max_pixel = np.max(original)
    return 20 * np.log10(max_pixel / np.sqrt(mse))

# Реализация фильтра Калмана для 1D сигнала
def kalman_filter(signal, process_variance=1e-5, measurement_variance=0.1):
    """Применение фильтра Калмана к сигналу."""
    estimated_signal = np.zeros_like(signal)
    estimated_signal[0] = signal[0]  # Начальная оценка
    error_estimate = 1.0  # Начальная ошибка

    for t in range(1, len(signal)):
        # Прогнозируемое состояние
        prediction = estimated_signal[t-1]
        error_estimate += process_variance  # Обновляем ошибку предсказания

        # Калмановский коэффициент (соотношение прогнозной ошибки и измеренной ошибки)
        kalman_gain = error_estimate / (error_estimate + measurement_variance)

        # Обновление оценки
        estimated_signal[t] = prediction + kalman_gain * (signal[t] - prediction)

        # Обновление ошибки
        error_estimate *= (1 - kalman_gain)

    return estimated_signal

# Применение фильтра Калмана к каждому сигналу
reconstructed_signals = []
for signal in filtered_signals:
    reconstructed_signal = kalman_filter(signal)
    reconstructed_signals.append(reconstructed_signal)

reconstructed_signals = np.array(reconstructed_signals)

# Вычисление метрик
mse_values = []
mae_values = []
psnr_values = []

for original, reconstructed in zip(filtered_signals, reconstructed_signals):
    mse_values.append(mean_squared_error(original, reconstructed))
    mae_values.append(mean_absolute_error(original, reconstructed))
    psnr_values.append(psnr_metric(original, reconstructed))

# Средние значения метрик
average_mse = np.mean(mse_values)
average_mae = np.mean(mae_values)
average_psnr = np.mean(psnr_values)

print(f"MSE: {average_mse:.4f}")
print(f"MAE: {average_mae:.4f}")
print(f"PSNR: {average_psnr:.4f}")

MSE: 0.4904
MAE: 0.5582
PSNR: 10.7813
