Import bibliotek

In [None]:
import sys
if 'google.colab' in sys.modules:
    print("Jestem w Google Colab! Wykonuję specyficzny kod...")
    !pip install pandas
    !pip install torch
    !pip install torchaudio
    !pip install lightning
    !pip install kagglehub
    !pip install scikit-learn
    !pip install ipython
    !pip install soundfile
    !pip install wandb
    !pip install onnx onnxscript onnxruntime

    !pip install "resampy>=0.4.0"
    !pip install numpy scipy tqdm requests julius
    !pip install torchopenl3 --no-deps
    !pip install torchcodec
else:
    print("Jestem na lokalnym komputerze. Pomijam ten kod.")

In [None]:
import os
import shutil
import pandas as pd
import torchaudio
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import pytorch_lightning as pl
from pytorch_lightning import LightningDataModule
from pytorch_lightning.loggers import WandbLogger
import kagglehub
from sklearn.model_selection import train_test_split
import random
from pathlib import Path
import torchmetrics
import torch.optim as optim
import torchopenl3
import numpy as np
import wandb
from IPython.display import Audio
from tqdm.auto import tqdm
import gdown
import glob

wandb.login()

Download dataset

In [None]:
target_dir = "dataset"

if os.path.exists(target_dir) and len(os.listdir(target_dir)) > 0:
    print(f"Dataset już istnieje w folderze '{target_dir}'. Pomijam pobieranie.")
else:
    print("Dataset nie znaleziony. Rozpoczynam pobieranie...")
    path = kagglehub.dataset_download("junewookim/mad-dataset-military-audio-dataset")
    print("Cache KaggleHub:", path)

    os.makedirs(target_dir, exist_ok=True)
    shutil.copytree(path, target_dir, dirs_exist_ok=True)
    print("Pobrano i zapisano do:", target_dir)

noise_folder = "dataset/noises"
os.makedirs(noise_folder, exist_ok=True)

### Pobieranie szumów z dysku google
Ten fragment ma za zadanie pobrać szumy na colaba z dysku google jeżeli jakiegoś dzwięku szumów nie ma w docelowym folderze. Gdy ktoś chce dodać inne szumy to należy dodać je do folderu pod tym adresem URL: https://drive.google.com/drive/folders/14Q_0KNDXACkFQ2oTF1T-gnjIaNbNuaKL?usp=sharing

In [None]:
url = "https://drive.google.com/drive/folders/14Q_0KNDXACkFQ2oTF1T-gnjIaNbNuaKL?usp=sharing"
output_folder = "dataset/noises"

os.makedirs(output_folder, exist_ok=True)

existing_wavs = list(Path(output_folder).glob("*.wav"))

if len(existing_wavs) > 0:
    print(f"Folder {output_folder} zawiera już {len(existing_wavs)} plików. Pomijam pobieranie.")
else:
    print("Folder pusty. Rozpoczynam pobieranie szumów z Google Drive...")
    try:
        gdown.download_folder(url, output=output_folder, quiet=False, use_cookies=False)
        print("Pobieranie zakończone sukcesem.")
    except Exception as e:
        print(f"Wystąpił błąd podczas pobierania: {e}")
        print("Upewnij się, że link na Google Drive jest ustawiony jako 'Każdy mający link' (Anyone with the link).")

noise_files_list = list(Path(output_folder).glob("*.wav"))
print(f"Gotowe. Dostępnych plików szumu do treningu: {len(noise_files_list)}")

### Funkcja dodająca szum
Funkcja pomocnicza do augmentacji danych poprzez dodawanie szumu do nagrań.

In [None]:
def add_noise_from_folder(waveform, noise_files, noise_std=0.01):
    """
    waveform: [1, N]
    noise_files: lista ścieżek do pliku z szumami
    """
    noise_path = random.choice(noise_files)
    noise_waveform, _ = torchaudio.load(noise_path)

    L = waveform.shape[1]

    # jeśli szum krótszy niż próbka -> powielamy
    if noise_waveform.shape[1] < L:
        repeats = int(L / noise_waveform.shape[1]) + 1
        noise_waveform = noise_waveform.repeat(1, repeats)
    # jeśli szum dłuższy -> losowy fragment
    if noise_waveform.shape[1] > L:
        start = random.randint(0, noise_waveform.shape[1] - L)
        noise_waveform = noise_waveform[:, start:start+L]

    noise_waveform = noise_waveform / (noise_waveform.std() + 1e-9) * noise_std
    noisy = waveform + noise_waveform
    return noisy

### Klasa ekstraktora cech OpenL3
Klasa wrapper dla modelu OpenL3 służąca do ekstrakcji embeddingów z plików audio.

In [None]:
class OpenL3FeatureExtractor:
    """
    Ekstraktor cech audio za pomocą modelu torchopenl3 (PyTorch)
    """
    def __init__(self, input_repr="mel128", content_type="music", embedding_size=512):
        """
        input_repr: "mel128" lub "mel256"
        content_type: "music" lub "env" (w torchopenl3 'environmental' to 'env')
        embedding_size: 512 lub 6144
        """
        if content_type == "environmental":
            content_type = "env"

        print(f"Ładowanie modelu TorchOpenL3: {input_repr}, {content_type}, embedding_size={embedding_size}")

        self.model = torchopenl3.models.load_audio_embedding_model(
            input_repr=input_repr,
            content_type=content_type,
            embedding_size=embedding_size
        )

        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model.to(self.device)
        self.model.eval()

        self.input_repr = input_repr
        self.content_type = content_type
        self.embedding_size = embedding_size
        self.sample_rate = 48000

    def extract_features(self, waveform):
        """
        Ekstrakcja cech z audio.
        waveform: tensor PyTorch [batch, channels, samples] lub [channels, samples]
        """

        if waveform.dim() == 2:
            waveform = waveform.unsqueeze(0)

        waveform = waveform.to(self.device)

        with torch.no_grad():
            embeddings, _ = torchopenl3.get_audio_embedding(
                waveform,
                sr=48000,
                model=self.model,
                hop_size=0.1,
                verbose=False
            )

        aggregated = embeddings.mean(dim=1)

        return aggregated.cpu()

print("Inicjalizacja TorchOpenL3 Feature Extractor...")
feature_extractor = OpenL3FeatureExtractor(
    input_repr="mel128",
    content_type="music",
    embedding_size=512
)
print(f"OpenL3 ekstraktor załadowany na urządzeniu: {feature_extractor.device}")

In [None]:
features_cache_path = "dataset/features_cache"

if os.path.exists(features_cache_path):
    print(f"Znaleziono stary folder {features_cache_path} - USUWANIE...")
    shutil.rmtree(features_cache_path) # Usuwa folder i wszystko co w nim jest
    print("Folder usunięty.")
else:
    print("Folder cache nie istnieje (to dobrze, zostanie utworzony).")


def precompute_features_dataset(df, root_dir, output_dir, feature_extractor, noise_files):
    clean_dir = os.path.join(output_dir, "clean")
    noisy_dir = os.path.join(output_dir, "noisy")
    os.makedirs(clean_dir, exist_ok=True)
    os.makedirs(noisy_dir, exist_ok=True)

    print(f"Generowanie cech do folderu: {output_dir} ...")

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    for idx, row in tqdm(df.iterrows(), total=len(df)):
        filename = str(row['path'])
        src_path = os.path.join(root_dir, filename)

        save_name = filename.replace("/", "_").replace(".wav", ".pt")
        path_clean = os.path.join(clean_dir, save_name)
        path_noisy = os.path.join(noisy_dir, save_name)

        if os.path.exists(path_clean) and os.path.exists(path_noisy):
            continue

        try:
            # 1. Wczytaj
            wav, sr = torchaudio.load(src_path)
            # Resample do 48k (OpenL3 wymaga 48k)
            if sr != 48000:
                wav = torchaudio.transforms.Resample(sr, 48000)(wav)

            # 2. Pad/Trim
            max_len = 160000
            if wav.shape[1] > max_len:
                wav = wav[:, :max_len]
            else:
                wav = F.pad(wav, (0, max_len - wav.shape[1]))

            # 3. Zaszumianie
            wav_noisy = wav.clone()
            if noise_files:
                wav_noisy = add_noise_from_folder(wav_noisy, noise_files, noise_std=0.01)

            # 4. Ekstrakcja
            with torch.no_grad():
                # extract_features zwraca już [1, 512] (bo ma w sobie mean)
                f_clean = feature_extractor.extract_features(wav.unsqueeze(0))
                f_noisy = feature_extractor.extract_features(wav_noisy.unsqueeze(0))

                # POPRAWKA: Tylko squeeze, BEZ drugiego mean()
                f_clean = f_clean.squeeze(0).cpu() # Teraz [512]
                f_noisy = f_noisy.squeeze(0).cpu() # Teraz [512]

            # 5. Zapis
            torch.save(f_clean, path_clean)
            torch.save(f_noisy, path_noisy)

        except Exception as e:
            print(f"Error {filename}: {e}")

df_full = pd.read_csv("dataset/MAD_dataset/training.csv")
noise_folder_path = "dataset/MAD_dataset/noise"
noise_files_list = list(Path(noise_folder_path).glob("*.wav")) if os.path.exists(noise_folder_path) else []

# Wywołanie funkcji (ona sama stworzy folder dzięki os.makedirs wewnątrz)
precompute_features_dataset(df_full, "dataset/MAD_dataset", features_cache_path, feature_extractor, noise_files_list)

### Klasa Dataset na surowym audio
Podstawowa klasa Dataset zwracająca pary nagrań (surowe waveformy) oraz informację czy należą do tej samej klasy.

In [None]:
class SiameseAudioDataset(Dataset):
    def __init__(self, df, features_dir):
        self.df = df.reset_index(drop=True)
        self.features_dir = features_dir
        self.clean_dir = os.path.join(features_dir, "clean")
        self.noisy_dir = os.path.join(features_dir, "noisy")

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row_a = self.df.iloc[idx]
        label_a = int(row_a['label'])
        fname_a = str(row_a['path']).replace("/", "_").replace(".wav", ".pt")

        # A: Zawsze czysta próbka (Anchor)
        path_a = os.path.join(self.clean_dir, fname_a)
        feat_a = torch.load(path_a) if os.path.exists(path_a) else torch.zeros(512)

        # B: Losowanie pary
        if random.random() > 0.5:
            # Positive (ta sama klasa)
            # Bierzemy ZASZUMIONĄ wersję (żeby model uczył się odporności)
            target_df = self.df[self.df['label'] == label_a]
            same_label = 1
        else:
            # Negative (inna klasa)
            target_df = self.df[self.df['label'] != label_a]
            same_label = 0

        row_b = target_df.sample(1).iloc[0]
        label_b = int(row_b['label'])
        fname_b = str(row_b['path']).replace("/", "_").replace(".wav", ".pt")

        # B: Zaszumiona próbka
        path_b = os.path.join(self.noisy_dir, fname_b)
        feat_b = torch.load(path_b) if os.path.exists(path_b) else torch.zeros(512)

        # Dummy waveform (żeby collate function nie wybuchła, bo spodziewa się czegoś na poz 0 i 2)
        dummy = torch.empty(1)

        # Zwracamy features na pozycjach 6 i 7 (tak jak było w Twoim kodzie po modyfikacji w collate)
        return dummy, label_a, dummy, label_b, same_label, 48000, feat_a, feat_b

In [None]:
def siamese_collate(batch):
    a = torch.stack([item[0] for item in batch])
    label_a = torch.tensor([item[1] for item in batch], dtype=torch.long)
    b = torch.stack([item[2] for item in batch])
    label_b = torch.tensor([item[3] for item in batch], dtype=torch.long)
    same_label = torch.tensor([item[4] for item in batch], dtype=torch.long)
    sample_rate = batch[0][5]

    features_a = torch.stack([item[6] for item in batch]) if batch[0][6] is not None else None
    features_b = torch.stack([item[7] for item in batch]) if batch[0][7] is not None else None

    return a, label_a, b, label_b, same_label, sample_rate, features_a, features_b

### DataModule dla surowego audio
Klasa LightningDataModule zarządzająca datasetami treningowymi i walidacyjnymi dla surowego audio.

In [None]:
class SiameseAudioDataModule(LightningDataModule):
    def __init__(self, df, features_dir, batch_size=32, num_workers=4):
        super().__init__()
        self.df = df
        self.features_dir = features_dir
        self.batch_size = batch_size
        self.num_workers = num_workers

    def setup(self, stage=None):
        train_df, val_df = train_test_split(self.df, test_size=0.2, random_state=42, stratify=self.df['label'])
        self.train_ds = SiameseAudioDataset(train_df, self.features_dir)
        self.val_ds = SiameseAudioDataset(val_df, self.features_dir)

    def train_dataloader(self):
        return DataLoader(self.train_ds, batch_size=self.batch_size, shuffle=True,
                          num_workers=self.num_workers, collate_fn=siamese_collate)

    def val_dataloader(self):
        return DataLoader(self.val_ds, batch_size=self.batch_size, shuffle=False,
                          num_workers=self.num_workers, collate_fn=siamese_collate)

In [None]:
dm_test = SiameseAudioDataModule(df_full, features_cache_path, batch_size=8)
dm_test.setup()
batch = next(iter(dm_test.train_dataloader()))
print("Features shape:", batch[6].shape) # Powinno być [8, 512]
print("Działa!")

### Model Syjamski (LightningModule)
Definicja modelu sieci neuronowej (klasyfikatora), który przyjmuje różnicę cech dwóch nagrań i decyduje czy są to te same klasy.

In [None]:
class SiameseComparator(pl.LightningModule):
    def __init__(self, input_dim=512, hidden_dim=256, learning_rate=1e-3):
        super().__init__()
        self.save_hyperparameters()

        self.classifier = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.BatchNorm1d(hidden_dim),
            nn.ReLU(),
            nn.Dropout(0.3),

            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.BatchNorm1d(hidden_dim // 2),
            nn.ReLU(),
            nn.Dropout(0.3),

            nn.Linear(hidden_dim // 2, 1),
            nn.Sigmoid()
        )

        self.loss_fn = nn.BCELoss()

        self.accuracy = torchmetrics.Accuracy(task="binary")
        self.f1_score = torchmetrics.F1Score(task="binary")

    def forward(self, feat_a, feat_b):
        diff = torch.abs(feat_a - feat_b)

        return self.classifier(diff)

    def training_step(self, batch, batch_idx):
        _, _, _, _, same_label, _, features_a, features_b = batch

        probs = self(features_a, features_b)
        probs = probs.squeeze()

        loss = self.loss_fn(probs, same_label.float())

        preds = (probs > 0.5).long()
        acc = self.accuracy(preds, same_label)
        self.log("train_loss", loss, prog_bar=True)
        self.log("train_acc", acc, prog_bar=True)

        return loss

    def validation_step(self, batch, batch_idx):
        _, _, _, _, same_label, _, features_a, features_b = batch

        probs = self(features_a, features_b)
        probs = probs.squeeze()

        loss = self.loss_fn(probs, same_label.float())

        preds = (probs > 0.5).long()
        acc = self.accuracy(preds, same_label)
        f1 = self.f1_score(preds, same_label)

        self.log("val_loss", loss, prog_bar=True)
        self.log("val_acc", acc, prog_bar=True)
        self.log("val_f1", f1, prog_bar=True)

        return loss

    def configure_optimizers(self):
        optimizer = optim.Adam(self.parameters(), lr=self.hparams.learning_rate)
        scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5)
        return {
            "optimizer": optimizer,
            "lr_scheduler": {
                "scheduler": scheduler,
                "monitor": "val_loss"
            }
        }

### Trening modelu

In [None]:
dm = SiameseAudioDataModule(
    df=df_full,  # Używamy df_full zdefiniowanego wcześniej
    features_dir="dataset/features_cache", # Folder z wygenerowanymi cechami
    batch_size=32,
    num_workers=4
)

# 2. Inicjalizacja modelu
model = SiameseComparator(input_dim=512, hidden_dim=256, learning_rate=0.001)

# 3. Logger
wandb_logger = WandbLogger(
    project="siamese-audio-classifier",
    name="Kamil_Maj_1",
    log_model="all"
)

# 4. Trainer
trainer = pl.Trainer(
    max_epochs=20,
    accelerator="auto",
    devices=1,
    logger=wandb_logger,
    log_every_n_steps=5
)

print("Rozpoczynam trening z logowaniem do W&B...")
# Teraz zmienna 'dm' już istnieje, więc to zadziała:
trainer.fit(model, datamodule=dm)

print("Trening zakończony!")
wandb.finish()

**Zapisywanie do ONNX**

In [None]:
search_pattern = "siamese-audio-classifier/**/*.ckpt"
list_of_files = glob.glob(search_pattern, recursive=True)

if not list_of_files:
    print("Nie znaleziono checkpointu .ckpt do eksportu!")
else:
    latest_checkpoint = max(list_of_files, key=os.path.getctime)
    print(f"Eksportuję model z pliku: {latest_checkpoint}")

    device = torch.device("cpu") # Do eksportu ONNX bezpieczniej użyć CPU
    model_export = SiameseComparator.load_from_checkpoint(latest_checkpoint)
    model_export.to(device)
    model_export.eval()

    dummy_input_a = torch.randn(1, 512, device=device)
    dummy_input_b = torch.randn(1, 512, device=device)

    onnx_path = "siamese_audio_comparator.onnx"

    try:
        torch.onnx.export(
            model_export,
            (dummy_input_a, dummy_input_b),
            onnx_path,
            export_params=True,
            opset_version=12,
            do_constant_folding=True,
            input_names=['feature_vector_a', 'feature_vector_b'],
            output_names=['similarity_score'],
            dynamic_axes={
                'feature_vector_a': {0: 'batch_size'},
                'feature_vector_b': {0: 'batch_size'},
                'similarity_score': {0: 'batch_size'}
            }
        )
        print(f"Sukces! Model zapisany jako: {onnx_path}")

    except Exception as e:
        print(f"Błąd podczas eksportu do ONNX: {e}")