In [None]:
import os
import librosa
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

%matplotlib inline

import seaborn as sns

import tensorflow as tf
from tensorflow.keras.layers import LSTM, GRU, Dense, MaxPooling1D, Dropout, Conv1D, BatchNormalization
from tensorflow.keras.models import Sequential
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from sklearn.model_selection import train_test_split

# Definicja modelu
model = Sequential()
model.add(Conv1D(2048, kernel_size=5, strides=1, padding='same', activation='relu', input_shape=(52, 1)))
model.add(MaxPooling1D(pool_size=2, strides=2, padding='same'))
model.add(BatchNormalization())

model.add(Conv1D(1024, kernel_size=5, strides=1, padding='same', activation='relu'))
model.add(MaxPooling1D(pool_size=2, strides=2, padding='same'))
model.add(BatchNormalization())

model.add(Conv1D(512, kernel_size=5, strides=1, padding='same', activation='relu'))
model.add(MaxPooling1D(pool_size=2, strides=2, padding='same'))
model.add(BatchNormalization())

model.add(LSTM(256, return_sequences=True))
model.add(LSTM(128))

model.add(Dense(64, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(32, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(1, activation='linear'))

# Funkcje pomocnicze
def add_noise(data, x):
    noise = np.random.randn(len(data))
    data_noise = data + x * noise
    return data_noise

def shift(data, x):
    return np.roll(data, x)

def stretch(data, rate):
    return librosa.effects.time_stretch(data, rate)

def pitch_shift(data, rate):
    return librosa.effects.pitch_shift(data, sr=22050, n_steps=rate)

def pad_or_truncate(data, target_length):
    """Funkcja do paddingu lub przycięcia sygnału audio do określonej długości."""
    current_length = len(data)
    if current_length < target_length:
        # Padding do określonej długości
        padded_data = np.pad(data, (0, target_length - current_length), 'constant')
    else:
        # Przycięcie do określonej długości
        padded_data = data[:target_length]
    return padded_data

def mfcc_feature_extraction_rr(dir_, target_length=30):
    """Funkcja do ekstrakcji cech z sygnałów audio z dodaniem paddingu/przycinania do target_length sekund."""
    X_ = []
    y_ = []
    data = df
    features = 52  # Liczba cech MFCC

    # Przetwarzanie wszystkich plików .wav
    for soundDir in os.listdir(dir_):
        if soundDir.endswith('.wav'):
            label = list(data[data['filename'] == (soundDir[:-4])]['cycles'])[0]
            data_x, sampling_rate = librosa.load(os.path.join(dir_, soundDir), sr=22050)

            # Przeliczamy target_length na próbki
            target_samples = int(target_length * sampling_rate)
            
            # Sprawdź długość dźwięku i pominij pliki dłuższe niż 30 sekund
            if len(data_x) > target_samples:
                print(f"Pomijanie pliku {soundDir} (dłuższy niż {target_length} sekund)")
                continue

            # Pad lub przycięcie sygnału do target_samples
            data_x = pad_or_truncate(data_x, target_samples)

            # Ekstrakcja MFCC
            mfccs = librosa.feature.mfcc(y=data_x, sr=sampling_rate, n_mfcc=features)
            mfccs_mean = np.mean(mfccs.T, axis=0)  # Średnia MFCC dla każdego wymiaru

            # Dodanie cech i etykiet do list
            X_.append(mfccs_mean)
            y_.append(label)
            
            # Generowanie danych augmentowanych
            data_shift = shift(data_x, 1600)
            mfccs_shift = np.mean(librosa.feature.mfcc(y=data_shift, sr=sampling_rate, n_mfcc=features).T, axis=0)
            X_.append(mfccs_shift)
            y_.append(label)

            data_noise = add_noise(data_x, 0.005)
            mfccs_noise = np.mean(librosa.feature.mfcc(y=data_noise, sr=sampling_rate, n_mfcc=features).T, axis=0)
            X_.append(mfccs_noise)
            y_.append(label)

    # Zduplikowanie danych
    X_data = np.tile(np.array(X_), (duplication_factor, 1))
    y_data = np.tile(np.array(y_), duplication_factor)
    
    return X_data, y_data

# Ładowanie danych
root = '../own_data_train'
file_list = os.listdir(root)
data = []

for filename in file_list:
    match = re.search(r'\d+', filename)
    if match:
        integer = int(match.group())
        data.append({'cycles': integer, 'filename': filename[:-4]})

df = pd.DataFrame(data, columns=['cycles', 'filename'])

# Wywołanie funkcji ekstrakcji cech
audio_data = root + "/"
res_data, res_y = mfcc_feature_extraction_rr(audio_data)

# Podział danych na zbiory treningowe i walidacyjne
x_train, x_val, y_train, y_val = train_test_split(res_data, res_y, test_size=0.2, random_state=10)

# Reshape danych wejściowych do kształtu oczekiwanego przez model (batch_size, timesteps, input_dim)
x_train_lstm = np.expand_dims(x_train, axis=2)
x_val_lstm = np.expand_dims(x_val, axis=2)

# Kompilacja modelu
optimiser = tf.keras.optimizers.Adam(learning_rate=0.00001)
model.compile(optimizer=optimiser, loss='mean_squared_error', metrics=['mae'])

# Trening modelu
history = model.fit(x_train_lstm, y_train, batch_size=8, epochs=2500, validation_data=(x_val_lstm, y_val))

# Zapisz model
model.save("model_different_length.h5")
model.save_weights('model_different_length.weights.h5')

# Zapisz historię treningu
import pickle
with open('model_different_length', 'wb') as file_pi:
    pickle.dump(history.history, file_pi)
