In [None]:
import numpy as np
import librosa
import matplotlib.pyplot as plt
import os
import random
from sklearn.metrics import classification_report, confusion_matrix
from keras.models import Sequential
from keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout
import pywt  # Для работы с дискретным вейвлет-преобразованием
import seaborn as sns

# Определение всех необходимых функций

def convert_to_spectrogram(signal, sr, n_mels=128):
    """
    Преобразует аудиосигнал в мел-спектрограмму и затем в децибелы.
    """
    S = librosa.feature.melspectrogram(y=signal, sr=sr, n_mels=n_mels)
    log_S = librosa.power_to_db(S, ref=np.max)
    return log_S

def pad_spectrogram(spectrogram, target_shape):
    """
    Дополняет или усекает спектрограмму до заданной формы.
    """
    if spectrogram.shape[1] < target_shape[1]:
        pad_width = target_shape[1] - spectrogram.shape[1]
        padded_spectrogram = np.pad(spectrogram, ((0, 0), (0, pad_width)), mode='constant')
    else:
        padded_spectrogram = spectrogram[:, :target_shape[1]]
    return padded_spectrogram

def apply_wavelet_transform(signal, wavelet='db4', level=4):
    """
    Применяет дискретное вейвлет-преобразование к аудиосигналу.
    """
    coeffs = pywt.wavedec(signal, wavelet, level=level)
    return coeffs

def frame_signal(signal, frame_size, hop_length):
    """
    Разделяет сигнал на перекрывающиеся кадры.
    """
    frames = librosa.util.frame(signal, frame_length=frame_size, hop_length=hop_length).T
    return frames

def load_audio_files(directory):
    """
    Загружает все WAV-файлы из указанной директории.
    """
    return [os.path.join(directory, f) for f in os.listdir(directory) if f.lower().endswith(('.wav', '.mp3', '.flac', '.ogg', '.m4a'))]

def load_and_prepare_data(clean_files, noise_files, frame_size, hop_length, target_shape):
    """
    Загружает аудиофайлы, разделяет их на кадры, преобразует в спектрограммы,
    и формирует массивы признаков и меток.
    """
    X = []
    y = []
    
    # Обработка чистых сигналов
    for file in clean_files:
        signal, sr = librosa.load(file, sr=None)
        frames = frame_signal(signal, frame_size, hop_length)
        
        for frame in frames:
            spectrogram = convert_to_spectrogram(frame, sr)
            padded_spectrogram = pad_spectrogram(spectrogram, target_shape)
            X.append(padded_spectrogram)
            y.append(1)  # Метка 1 для чистого кадра
    
    # Обработка шумных сигналов
    for file in noise_files:
        signal, sr = librosa.load(file, sr=None)
        frames = frame_signal(signal, frame_size, hop_length)
        
        for frame in frames:
            spectrogram = convert_to_spectrogram(frame, sr)
            padded_spectrogram = pad_spectrogram(spectrogram, target_shape)
            X.append(padded_spectrogram)
            y.append(0)  # Метка 0 для шумного кадра
    
    return np.array(X), np.array(y)

def balance_classes(X, y):
    """
    Выполняет undersampling класса 'шум' для балансировки набора данных.
    """
    clean_indices = np.where(y == 1)[0]
    noise_indices = np.where(y == 0)[0]
    
    # Выбирает случайным образом равное количество шумных образцов
    n_clean_samples = len(clean_indices)
    undersampled_noise_indices = np.random.choice(noise_indices, n_clean_samples, replace=False)
    
    # Объединяет индексы классов
    balanced_indices = np.concatenate((clean_indices, undersampled_noise_indices))
    
    return X[balanced_indices], y[balanced_indices]

def extract_features_from_model(signal, sr, frame_size, hop_length, target_shape):
    """
    Преобразует сигнал в спектрограмму и подготавливает его для модели.
    """
    # Разделение сигнала на кадры
    frames = frame_signal(signal, frame_size, hop_length)
    spectrograms = []
    
    for frame in frames:
        spectrogram = convert_to_spectrogram(frame, sr)
        padded_spectrogram = pad_spectrogram(spectrogram, target_shape)
        spectrograms.append(padded_spectrogram)
    
    X = np.array(spectrograms).astype('float32') / 80.0
    X = X[..., np.newaxis]
    return X

def verify_clean_signal_in_noisy(noisy_signal, clean_signal, sr, model, frame_size, hop_length, target_shape, threshold=0.8):
    """
    Проверяет, присутствует ли чистый сигнал внутри шумного сигнала.
    """
    # Извлечение характеристик чистого сигнала
    X_clean = extract_features_from_model(clean_signal, sr, frame_size, hop_length, target_shape)
    predictions_clean = model.predict(X_clean)
    avg_clean_prob = np.mean(predictions_clean)
    
    # Разделение шумного сигнала на окна
    window_size = len(clean_signal)
    hop_size = int(hop_length)  # Сдвиг окна
    num_windows = (len(noisy_signal) - window_size) // hop_size + 1
    
    for i in range(num_windows):
        start = i * hop_size
        end = start + window_size
        window = noisy_signal[start:end]
        
        if len(window) < window_size:
            continue
        
        # Извлечение характеристик окна
        X_window = extract_features_from_model(window, sr, frame_size, hop_length, target_shape)
        predictions_window = model.predict(X_window)
        avg_window_prob = np.mean(predictions_window)
        
        # Сравнение с порогом
        if avg_window_prob >= threshold:
            print(f"Чистый сигнал найден в позиции {start} до {end} с вероятностью {avg_window_prob:.2f}")
            # Визуализация найденного сегмента
            spectrogram = convert_to_spectrogram(window, sr)
            padded_spectrogram = pad_spectrogram(spectrogram, target_shape)
            window_spectrogram = padded_spectrogram.reshape(target_shape)
            plt.figure(figsize=(6, 4))
            plt.imshow(window_spectrogram, cmap='viridis', aspect='auto')
            plt.title(f'Найденный чистый сигнал: позиция {start}-{end}')
            plt.xlabel('Временные окна')
            plt.ylabel('Мел-частоты')
            plt.colorbar()
            plt.show()
            return True
    return False

# Функция для выбора случайного файла из директории
def get_random_audio_file(directory):
    """
    Возвращает путь к случайному аудиофайлу из указанной директории.
    """
    audio_files = [f for f in os.listdir(directory) if f.lower().endswith(('.wav', '.mp3', '.flac', '.ogg', '.m4a'))]
    if not audio_files:
        raise FileNotFoundError(f"No audio files found in directory: {directory}")
    return os.path.join(directory, random.choice(audio_files))

# Пути к директориям с аудиофайлами
clean_test_dir = 'clean_test'  # Замените на путь к вашей директории с чистыми сигналами
noise_test_dir = 'noise_test'  # Замените на путь к вашей директории с шумными сигналами

# Выбор и загрузка случайных файлов для проверки
path_to_clean_signal = get_random_audio_file(clean_test_dir)
path_to_noisy_signal = get_random_audio_file(noise_test_dir)

print(f"Выбранный чистый сигнал: {path_to_clean_signal}")
print(f"Выбранный шумной сигнал: {path_to_noisy_signal}")

# Загрузка сигналов
clean_signal, sr_clean = librosa.load(path_to_clean_signal, sr=22050)
noisy_signal, sr_noisy = librosa.load(path_to_noisy_signal, sr=22050)

# Проверка частот дискретизации
if sr_clean != sr_noisy:
    raise ValueError("Частоты дискретизации чистого и шумного сигналов не совпадают.")

# Проверка длины сигналов
if len(noisy_signal) < len(clean_signal):
    raise ValueError("Шумной сигнал короче чистого сигнала. Убедитесь, что шумной сигнал длиннее или равен чистому.")

# Загрузка и подготовка обучающих данных
# Пути к директориям с данными
clean_train_dir = 'clean_train'
clean_test_dir = 'clean_test'
noise_train_dir = 'noise_train'
noise_test_dir = 'noise_test'

# Загрузка файлов из соответствующих директорий
clean_files_train = load_audio_files(clean_train_dir)
clean_files_test = load_audio_files(clean_test_dir)
noise_files_train = load_audio_files(noise_train_dir)
noise_files_test = load_audio_files(noise_test_dir)

# Параметры разделения на кадры
frame_duration = 1.0  # Длительность кадра в секундах
hop_duration = 0.5    # Сдвиг между кадрами в секундах

# Целевая форма спектрограммы (количество мел-частот, временные окна)
n_mels = 128
sample_rate = 22050
frame_size = int(frame_duration * sample_rate)
hop_length = int(hop_duration * sample_rate)

# Вычислим количество временных окон на кадр
spectrogram = convert_to_spectrogram(np.zeros(frame_size), sample_rate)
time_steps = spectrogram.shape[1]
target_shape = (n_mels, time_steps)

# Загрузка и подготовка обучающих данных
X_train, y_train = load_and_prepare_data(
    clean_files_train,
    noise_files_train,
    frame_size,
    hop_length,
    target_shape
)

# Загрузка и подготовка тестовых данных
X_test, y_test = load_and_prepare_data(
    clean_files_test,
    noise_files_test,
    frame_size,
    hop_length,
    target_shape
)

# Балансировка данных de treinamento
X_train_balanced, y_train_balanced = balance_classes(X_train, y_train)

# Нормализация спектрограмм
X_train_balanced = X_train_balanced.astype('float32') / 80.0
X_test = X_test.astype('float32') / 80.0

# Добавление новой размерности для канала
X_train_balanced = X_train_balanced[..., np.newaxis]
X_test = X_test[..., np.newaxis]

# Проверка баланса классов
print(f'Количество чистых кадров в сбалансированном тренировочном наборе: {(y_train_balanced == 1).sum()}')
print(f'Количество шумных кадров в сбалансированном тренировочном наборе: {(y_train_balanced == 0).sum()}')

# Визуализация примерного чистого кадра
clean_indices = np.where(y_train_balanced == 1)[0]
random_clean_index = random.choice(clean_indices)
clean_spectrogram = X_train_balanced[random_clean_index].reshape(target_shape)

plt.figure(figsize=(6, 4))
plt.imshow(clean_spectrogram, cmap='viridis', aspect='auto')
plt.title('Пример чистого кадра - Обучающая выборка')
plt.xlabel('Временные окна')
plt.ylabel('Мел-частоты')
plt.colorbar()
plt.show()

# Определение архитектуры модели CNN
model = Sequential()

# Первый сверточный слой
model.add(Conv2D(32, (3, 3), activation='relu', input_shape=(target_shape[0], target_shape[1], 1)))
model.add(MaxPooling2D(pool_size=(2, 2)))

# Второй сверточный слой
model.add(Conv2D(64, (3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))

# Третий сверточный слой
model.add(Conv2D(128, (3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))

# Выравнивание
model.add(Flatten())

# Полносвязный слой
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.5))

# Выходной слой для бинарной классификации
model.add(Dense(1, activation='sigmoid'))

# Компиляция модели
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# Вывод структуры модели
model.summary()

# Обучение модели с использованием сбалансированных данных
history = model.fit(
    X_train_balanced,
    y_train_balanced,
    epochs=20,
    batch_size=32,
    validation_split=0.2
)

# Оценка модели на тестовых данных
loss, accuracy = model.evaluate(X_test, y_test)
print(f'Точность модели на тестовых данных: {accuracy:.2f}')

# Получение прогнозов на тестовой выборке
predictions = model.predict(X_test)
predicted_classes = (predictions > 0.5).astype(int).flatten()

# Вывод отчёта по классификации
print(classification_report(y_test, predicted_classes, target_names=['Шум', 'Чистый']))

# Визуализация случайного предсказания
sample_index = random.randint(0, len(X_test) - 1)
sample_prediction = predicted_classes[sample_index]
sample_label = y_test[sample_index]
selected_class = "Чистый" if sample_prediction == 1 else "Шум"

sample_spectrogram = X_test[sample_index].reshape(target_shape)

plt.figure(figsize=(6, 4))
plt.imshow(sample_spectrogram, cmap='viridis', aspect='auto')
plt.title(f'Предсказание: {selected_class} - Истинная метка: {"Чистый" if sample_label == 1 else "Шум"}')
plt.xlabel('Временные окна')
plt.ylabel('Мел-частоты')
plt.colorbar()
plt.show()

# Визуализация меток и прогнозов для первых 100 кадров тестовой выборки
num_samples = 100
plt.figure(figsize=(15, 5))
plt.plot(y_test[:num_samples], label='Истинные метки', marker='o', color='blue')
plt.plot(predicted_classes[:num_samples], label='Предсказанные метки', marker='x', color='red')

# Добавление аннотаций для ошибок
errors = np.where(y_test[:num_samples] != predicted_classes[:num_samples])[0]
for error in errors:
    plt.annotate('Ошибка', (error, predicted_classes[error]), textcoords="offset points", xytext=(0,10), ha='center', color='red')

plt.title('Истинные и Предсказанные Метки для Первых 100 Кадров Тестовой Выборки')
plt.xlabel('Кадр')
plt.ylabel('Класс')
plt.legend()
plt.axhline(y=0.5, color='grey', linestyle='--')  # Линия порога
plt.show()

# Функция для построения кривых обучения
def plot_learning_curve(history):
    """
    Plota a curva de aprendizado para perda e precisão.
    """
    # Резюме потерь
    plt.figure(figsize=(12, 6))
    
    # Потери на обучении и валидации
    plt.subplot(1, 2, 1)
    plt.plot(history.history['loss'], label='Потеря на обучении')
    plt.plot(history.history['val_loss'], label='Потеря на валидации')
    plt.title('Потери во время обучения')
    plt.xlabel('Эпохи')
    plt.ylabel('Потеря')
    plt.legend()
    
    # Точность на обучении и валидации
    plt.subplot(1, 2, 2)
    plt.plot(history.history['accuracy'], label='Точность на обучении')
    plt.plot(history.history['val_accuracy'], label='Точность на валидации')
    plt.title('Точность во время обучения')
    plt.xlabel('Эпохи')
    plt.ylabel('Точность')
    plt.legend()
    
    plt.tight_layout()
    plt.show()

# Вызов функции для построения кривых обучения
plot_learning_curve(history)

# Построение матрицы путаницы
cm = confusion_matrix(y_test, predicted_classes)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['Шум', 'Чистый'], yticklabels=['Шум', 'Чистый'])
plt.xlabel('Предсказания')
plt.ylabel('Истинные метки')
plt.title('Матрица путаницы')
plt.show()

# Проверка наличия чистого сигнала в шумном
found = verify_clean_signal_in_noisy(noisy_signal, clean_signal, sr_clean, model, frame_size, hop_length, target_shape, threshold=0.8)
if found:
    print("Чистый сигнал был найден в шумном сигнале.")
else:
    print("Чистый сигнал НЕ был найден в шумном сигнале.")

# Визуализация шумного сигнала
plt.figure(figsize=(10, 4))  # Настройка размера фигуры при необходимости
plt.plot(noisy_signal, color='blue')  # Использовать flatten, если это многомерный массив
plt.title("Шумной сигнал")
plt.xlabel("Время (сэмплы)")
plt.ylabel("Амплитуда")
plt.grid(True)  # Добавляет сетку для лучшей визуализации
plt.show()
