Pobranie datasetu do folderu dataset

In [17]:
import os
import shutil
import pandas as pd
import torchaudio
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import pytorch_lightning as pl
from lightning.pytorch import LightningDataModule
import kagglehub
from sklearn.model_selection import train_test_split
import random
from pathlib import Path

target_dir = "dataset"

path = kagglehub.dataset_download("junewookim/mad-dataset-military-audio-dataset")
print("Cache KaggleHub:", path)

os.makedirs(target_dir, exist_ok=True)
shutil.copytree(path, target_dir, dirs_exist_ok=True)

print("Zapisano do:", target_dir)

noise_folder = "dataset/noises"
os.makedirs(noise_folder, exist_ok=True)


Cache KaggleHub: C:\Users\kwasn\.cache\kagglehub\datasets\junewookim\mad-dataset-military-audio-dataset\versions\1
Zapisano do: dataset


In [None]:
def add_noise_from_folder(waveform, noise_files, noise_std=0.01):
    """
    waveform: [1, N]
    noise_files: lista ścieżek do pliku z szumami
    """
    noise_path = random.choice(noise_files)
    noise_waveform, _ = torchaudio.load(noise_path)

    L = waveform.shape[1]

    # jeśli szum krótszy niż próbka -> powielamy
    if noise_waveform.shape[1] < L:
        repeats = int(L / noise_waveform.shape[1]) + 1
        noise_waveform = noise_waveform.repeat(1, repeats)
    # jeśli szum dłuższy -> losowy fragment
    if noise_waveform.shape[1] > L:
        start = random.randint(0, noise_waveform.shape[1] - L)
        noise_waveform = noise_waveform[:, start:start+L]

    noise_waveform = noise_waveform / (noise_waveform.std() + 1e-9) * noise_std
    noisy = waveform + noise_waveform
    return noisy

In [19]:
class SiameseAudioDataset(Dataset):
    def __init__(self, df, root_dir, noise_files=None, max_len=160000):
        self.df = df.reset_index(drop=True)
        self.root_dir = root_dir
        self.noise_files = noise_files
        self.max_len = max_len

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row_a = self.df.iloc[idx]
        path_a = os.path.join(self.root_dir, row_a['path'])
        label_a = int(row_a['label'])

        waveform_a, sample_rate = torchaudio.load(path_a)
        waveform_a = self._pad_or_trim(waveform_a)

        # losowy wybór drugiego nagrania
        idx_b = idx
        while idx_b == idx:
            idx_b = random.randint(0, len(self.df) - 1)
        row_b = self.df.iloc[idx_b]
        path_b = os.path.join(self.root_dir, row_b['path'])
        label_b = int(row_b['label'])

        waveform_b, _ = torchaudio.load(path_b)
        waveform_b = self._pad_or_trim(waveform_b)

        # dodajemy szum, jeśli mamy pliki szumów
        if self.noise_files:
            waveform_b = add_noise_from_folder(waveform_b, self.noise_files, noise_std=0.01)

        same_label = 1 if label_a == label_b else 0

        return waveform_a, label_a, waveform_b, label_b, same_label, sample_rate

    def _pad_or_trim(self, waveform):
        L = waveform.shape[1]
        if L > self.max_len:
            waveform = waveform[:, :self.max_len]
        elif L < self.max_len:
            waveform = torch.nn.functional.pad(waveform, (0, self.max_len - L))
        return waveform


In [20]:
def siamese_collate(batch):
    a = torch.stack([item[0] for item in batch])
    label_a = torch.tensor([item[1] for item in batch], dtype=torch.long)
    b = torch.stack([item[2] for item in batch])
    label_b = torch.tensor([item[3] for item in batch], dtype=torch.long)
    same_label = torch.tensor([item[4] for item in batch], dtype=torch.long)
    sample_rate = batch[0][5]
    return a, label_a, b, label_b, same_label, sample_rate


In [21]:
from lightning.pytorch import LightningDataModule
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split

class SiameseAudioDataModule(LightningDataModule):
    def __init__(self, df, root_dir, noise_folder=None, batch_size=4, num_workers=0, max_len=160000):
        super().__init__()
        self.df = df
        self.root_dir = root_dir
        self.batch_size = batch_size
        self.num_workers = num_workers
        self.max_len = max_len

        self.noise_files = list(Path(noise_folder).glob("*.wav")) if noise_folder else None

    def setup(self, stage=None):
        train_df, val_df = train_test_split(self.df, test_size=0.2, random_state=42, stratify=self.df['label'])
        self.train_dataset = SiameseAudioDataset(train_df, self.root_dir, self.noise_files, max_len=self.max_len)
        self.val_dataset = SiameseAudioDataset(val_df, self.root_dir, self.noise_files, max_len=self.max_len)

    def train_dataloader(self):
        return DataLoader(self.train_dataset, batch_size=self.batch_size, shuffle=True,
                          num_workers=self.num_workers, collate_fn=siamese_collate)

    def val_dataloader(self):
        return DataLoader(self.val_dataset, batch_size=self.batch_size, shuffle=False,
                          num_workers=self.num_workers, collate_fn=siamese_collate)


In [22]:
import pandas as pd
from IPython.display import Audio

df = pd.read_csv("dataset/MAD_dataset/training.csv")

dm = SiameseAudioDataModule(df, root_dir="dataset/MAD_dataset", noise_folder=noise_folder,
                            batch_size=8, max_len=160000)
dm.setup()

batch = next(iter(dm.train_dataloader()))
a, label_a, b, label_b, same_label, sample_rate = batch

print("Batch shapes:", a.shape, b.shape)
print("Labely czy takie same:", same_label)

Batch shapes: torch.Size([8, 1, 160000]) torch.Size([8, 2, 160000])
Labely czy takie same: tensor([0, 1, 0, 0, 1, 0, 0, 0])


In [24]:
#Audio(a[0].squeeze().numpy(), rate=sample_rate)  # czyste
Audio(b[0].squeeze().numpy(), rate=sample_rate)  # zaszumione