In [1]:
import pandas as pd
import os
import numpy as np
import soundfile as sf
from IPython.display import clear_output
import pickle
import librosa
from sklearn.preprocessing import StandardScaler
from keras.utils import to_categorical
import tensorflow as tf
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping, Callback
from tensorflow.keras.models import Model, load_model, Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, BatchNormalization, GRU, SimpleRNN
from sklearn.metrics.pairwise import cosine_similarity

In [None]:
# Wczytuje wszytkei modele
model_LSTM = load_model("/kaggle/input/modele-rnn-time/model_LSTM.h5")
model_GRU = load_model("/kaggle/input/modele-rnn-time/model_GRU.h5")
model_RNN = load_model("/kaggle/input/modele-rnn-time/model_RNN.h5")

# Zapisuję je w liście, tworzę również dodatkową listę aby pętli obłusgiwać modele po nazwie
list_of_models = [model_LSTM, model_GRU, model_RNN]
name_of_models = ['LSTM',       'GRU',      'RNN']

# Wczytuję dane do ewaluacji modelu oraz skaler który muszę użyć przez tworzeniem embeddingów
with open("/kaggle/input/scaler-rnn/scaler.pkl", "rb") as file:
    scaler = pickle.load(file)
with open("/kaggle/input/data-to-enrollment-and-test/data_to_cross_checking.pkl", "rb") as file:
    data_to_cross_checking = pickle.load(file)

In [None]:
def split_audio_to_slices(path_to_files, seconds):
    
    # Przechodzę do katalogów wewnątrz folderu osoby (ID osoby).
    # Każdy folder wewnętrzny zawiera więcej podfolderów, które mogą zawierać nagrania.
    paths_inside_ID = [f.name for f in os.scandir(path_to_files) if f.is_dir()]

    # Tworzę pełne ścieżki do podfolderów, aby przejść do wszystkich plików nagrań dla danej osoby.
    full_paths_to_files = [path_to_files + '/' + path_inside_ID for path_inside_ID in paths_inside_ID]

    # Zbieram wszystkie ścieżki do plików audio danej osoby.
    # Każdy plik powinien mieć rozszerzenie `.flac`, a wszystkie pliki są przechowywane w zmiennej `all_files_for_ID`.
    
    all_files_for_ID = []
    for full_path_to_files in full_paths_to_files:
        files = [f.name for f in os.scandir(full_path_to_files) if f.is_file() and f.name.endswith('.flac')]
        files = [full_path_to_files + '/' + file for file in files]
        all_files_for_ID = all_files_for_ID + files

    # Łączę wszystkie nagrania danej osoby w jedno bardzo długie nagranie.
    # Używam częstotliwości próbkowania 16kHz.
    sr = 16000
    combined_signals = np.array([])
    for file_for_ID in all_files_for_ID:
        signal, sr = librosa.load(file_for_ID, sr=sr)
        combined_signals = np.concatenate([combined_signals, signal])

    # Długie nagranie dzielę na  fragmenty o podanej długości.
    # Fragmenty, które mają mniej niż zadeklarowane długości nagrania (resztki na końcu nagrania), są pomijane.
    list_for_parts = []
    len_of_combined_signals = len(combined_signals)
    step = seconds * sr  # Ustawienie skoku na 5 sekund
    for i in np.arange(start=0, stop=len_of_combined_signals-step, step=step):
        list_for_parts.append(combined_signals[i:i+step].tolist())

    
    parts = np.array(list_for_parts)
    
    
    # Funkcja zwraca pociętę na kawałki, o długości 1 sekundy nagrania
    return parts
    

In [None]:
class EbeddingExtractor:
    
    def __init__(self, model, bottleneck, scaler_before_embedding, scaler_after_embedding=None, lda=None):
        self.model = model  # Przypisanie modelu do obiektu
        self.bottleneck = bottleneck # Nazwa warstwy bottleneck
        self.scaler_before_embedding = scaler_before_embedding  # Skaler do standaryzacji MFCC przed generowaniem embeddingów
        self.scaler_after_embedding = scaler_after_embedding  # Skaler do standaryzacji embeddingów przed zastosowaniem LDA
        self.lda = lda  # Przypisanie modelu LDA do obiektu

    # Funkcja obliczająca współczynniki MFCC dla danego nagrania audio
    def calucate_MFCC(self, audio):
        quantity_of_mel_coef = 40  # Liczba współczynników MFCC
        quantity_of_mel_filters = 60  # Liczba filtrów Mel

        # Obliczanie współczynników MFCC za pomocą librosa
        mfcc = librosa.feature.mfcc(y=audio, 
                                    sr=16000, 
                                    n_mfcc=quantity_of_mel_coef, 
                                    n_mels=quantity_of_mel_filters).T
        
        #Oczliczam pochodne aby dane były zgodne z tymi na których model był trenowany
        delta = librosa.feature.delta(mfcc)
        delta2 = librosa.feature.delta(mfcc, order=2)

        mfcc = np.hstack([mfcc, delta, delta2])

        # Standaryzacja MFCC przed generowaniem embeddingów
        mfcc = self.scaler_before_embedding.transform(mfcc)
        return mfcc

    # Funkcja obliczająca embedding na podstawie wcześniej przetworzonych MFCC
    def calcuate_embedding(self, audio_MFCC):
        
        intermediate_layer_model = Model(inputs=self.model.inputs,
                                         outputs=self.model.get_layer(self.bottleneck).output)
        intermediate_output = intermediate_layer_model.predict(audio_MFCC[np.newaxis, ...])
        
        return intermediate_output

    # Funkcja postprocessingu embeddingu – standaryzacja i LDA, jeżeli nie przekażemy skalera i LDA to nie zostanie przeprowadzony postprocessig
    def transform_audio_postprocessing(self, embedding):
        if self.scaler_after_embedding is not None:
            embedding = self.scaler_after_embedding.transform(embedding)  # Standaryzacja embeddingu
        if self.lda is not None:
            embedding = self.lda.transform(embedding)  # Użycie LDA
        
        return embedding

    # Funkcja łączy wszystkie poprzednie 
    def process_audio_to_embedding(self, audio):
        mfcc = self.calucate_MFCC(audio)
        embedding = self.calcuate_embedding(mfcc)
        embedding = self.transform_audio_postprocessing(embedding)

        return embedding

In [None]:
# Inicjalizacja zmiennej iterator_for_model, która śledzi, który model jest obecnie przetwarzany.
iterator_for_model = 0

# Pętla iterująca przez listę modeli do przetwarzania.
for model in list_of_models:
    # Tworzenie obiektu ekstraktora embeddingów dla bieżącego modelu.
    EmbExtr = EbeddingExtractor(model=model, 
                                bottleneck='bottleneck',  # Ustawienie nazwy warstwy bottleneck do ekstrakcji cech.
                                scaler_before_embedding=scaler)  # Dodanie skalera przed warstwą embeddingów.
    
    # Lista do przechowywania embeddingów dla wszystkich osób dla bieżącego modelu.
    all_person_embedding = []
    
    # Pętla iterująca przez dane do porównania (po 10 osób, jest jeszcze 40 kolejnych ale zostało to rozbite na 5 kont kaggle).
    for i in range(0, 10):
        # Pobranie danych audio dla jednej osoby.
        one_person = data_to_cross_checking[i]
        
        # Podział audio jednej osoby na małe fragmenty (1-sekundowe).
        slices = split_audio_to_slices(one_person, 1)
        
        # Lista do przechowywania embeddingów dla jednej osoby.
        one_person_embedding = []
    
        # Inicjalizacja licznika fragmentów dla bieżącej osoby.
        iterator = 0
        
        # Pętla iterująca przez wszystkie fragmenty audio dla danej osoby.
        for slice in slices:
            # Przetworzenie fragmentu audio na embedding za pomocą ekstraktora.
            embedding = EmbExtr.process_audio_to_embedding(slice)
            
            # Dodanie embeddingu do listy dla jednej osoby.
            one_person_embedding.append(embedding)
            
            # Zwiększenie licznika fragmentów.
            iterator += 1
            
            # Wyświetlenie aktualnego postępu przetwarzania w konsoli.
            print(f"Model {iterator_for_model + 1}/{len(list_of_models)}, "
                  f"Osoba {i + 1}/{len(data_to_cross_checking)}, "
                  f"Część {iterator}/{len(slices)} ({iterator / len(slices):.2%})")
            
            # Wyczyść poprzedni output w konsoli (dla czytelności).
            clear_output(wait=True)
        
        # Dodanie embeddingów jednej osoby do listy wszystkich osób.
        all_person_embedding.append(one_person_embedding)
    
    # Zapisanie embeddingów wszystkich osób do pliku .pkl dla bieżącego modelu.
    with open('all_person_embedding_0_' + name_of_models[iterator_for_model] + '.pkl', 'wb') as file:
        pickle.dump(all_person_embedding, file)

    # Zwiększenie licznika modeli (przechodzimy do następnego modelu w liście).
    iterator_for_model += 1
