
# Audio degradation

This notebook downloads the `VoiceBank-DEMAND` data, used to train our Siamese CNN for Discrimination.

# IMPORTS

In [1]:
import os
import zipfile
import requests
import pandas as pd
import torchaudio
import torch
from torch.utils.data import Dataset, DataLoader
from torchaudio.transforms import MelSpectrogram, AmplitudeToDB

# Download dataset

In [None]:
import os
import zipfile
import requests
import pandas as pd
import torchaudio
import torch
from torch.utils.data import Dataset, DataLoader
from torchaudio.transforms import MelSpectrogram, AmplitudeToDB
from tqdm import tqdm

# Step 1: Scarica e decomprimi il dataset con barra di avanzamento
def download_and_unzip(url, extract_to='.'):
    local_zip = 'ODAQ_dataset.zip'
    response = requests.get(url, stream=True)
    total_size = int(response.headers.get('content-length', 0))
    block_size = 1024  # 1 Kibibyte

    with open(local_zip, 'wb') as f, tqdm(
        desc="Scaricamento ODAQ",
        total=total_size,
        unit='iB',
        unit_scale=True,
        unit_divisor=1024,
    ) as bar:
        for data in response.iter_content(block_size):
            f.write(data)
            bar.update(len(data))

    with zipfile.ZipFile(local_zip, 'r') as zip_ref:
        zip_ref.extractall(extract_to)
    os.remove(local_zip)

# Step 2: Dataset PyTorch
class ODAQDataset(Dataset):
    def __init__(self, annotations_file, audio_dir, target_sample_rate=44100, n_mels=64):
        self.annotations = pd.read_csv(annotations_file)
        self.audio_dir = audio_dir
        self.target_sample_rate = target_sample_rate
        self.mel_spectrogram = MelSpectrogram(
            sample_rate=self.target_sample_rate,
            n_mels=n_mels,
            n_fft=1024,
            hop_length=512
        )
        self.amplitude_to_db = AmplitudeToDB()

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, idx):
        audio_path = os.path.join(self.audio_dir, self.annotations.iloc[idx, 0])
        waveform, sample_rate = torchaudio.load(audio_path)
        if sample_rate != self.target_sample_rate:
            waveform = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=self.target_sample_rate)(waveform)
        mel_spec = self.mel_spectrogram(waveform)
        mel_spec_db = self.amplitude_to_db(mel_spec)
        score = torch.tensor(self.annotations.iloc[idx, 1], dtype=torch.float32)
        return mel_spec_db, score
# Step 3: Esegui tutto
def prepare_dataset():
    # dataset_url = 'https://zenodo.org/records/10405774/files/ODAQ.zip'
    # download_and_unzip(dataset_url, 'ODAQ_dataset')
    annotations_file = 'ODAQ_dataset/ODAQ/ODAQ_listening_test/ODAQ_results.csv'
    audio_dir = 'ODAQ_dataset/ODAQ/ODAQ_listening_test'
    dataset = ODAQDataset(annotations_file, audio_dir)
    return dataset

# Esempio di utilizzo
if __name__ == "__main__":
    dataset = prepare_dataset()
    dataloader = DataLoader(dataset, batch_size=16, shuffle=True)
    for mel_specs, scores in dataloader:
        print(mel_specs.shape, scores.shape)
        break


Scaricamento ODAQ: 100%|██████████| 0.98G/0.98G [02:30<00:00, 6.98MiB/s]    


FileNotFoundError: [Errno 2] No such file or directory: 'ODAQ_dataset/ODAQ_results.csv'

# Dataset build

Create the couple (original, noisy)

In [12]:
class AudioPairDataset(Dataset):
    def __init__(self, clean_files, noisy_files, sample_rate=16000, duration=None, use_logmel=True, n_mels=64, transform_type="logmel"):
        """
        clean_files: lista di path ai file clean
        noisy_files: lista di path ai file noisy
        """
        assert len(clean_files) == len(noisy_files), "Liste clean e noisy devono avere la stessa lunghezza"
        self.clean_files = clean_files
        self.noisy_files = noisy_files
        self.sample_rate = sample_rate
        self.duration = duration
        self.use_logmel = use_logmel
        self.n_mels = n_mels

        self.mel_transform = T.MelSpectrogram(sample_rate=sample_rate, n_mels=n_mels, n_fft=1024)
        self.to_db = T.AmplitudeToDB()

        # Costruiamo le coppie una sola volta
        self.pairs = []
        num_files = len(clean_files)

        for i in range(num_files):
            # Coppia simile: (clean, clean)
            self.pairs.append((clean_files[i], clean_files[i], 1))

            # Coppia simile: (clean, noisy)
            self.pairs.append((clean_files[i], noisy_files[i], 1))

            # Coppia dissimile: (clean_i, noisy_j) con j ≠ i
            j = random.choice([x for x in range(num_files) if x != i])
            self.pairs.append((clean_files[i], noisy_files[j], 0))

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx):
        path1, path2, label = self.pairs[idx]

        waveform1, _ = torchaudio.load(path1)
        waveform2, _ = torchaudio.load(path2)

        # Lunghezza fissa (es. 2 secondi)
        if self.duration:
            max_len = int(self.sample_rate * self.duration)
            waveform1 = self.pad_or_truncate(waveform1, max_len)
            waveform2 = self.pad_or_truncate(waveform2, max_len)

        if self.use_logmel:
            spec1 = self.to_db(self.mel_transform(waveform1))
            spec2 = self.to_db(self.mel_transform(waveform2))
        else:
            spec1 = waveform1
            spec2 = waveform2

        return spec1.squeeze(0), spec2.squeeze(0), torch.tensor(label, dtype=torch.float)

    def pad_or_truncate(self, waveform, max_len):
        length = waveform.size(1)
        if length > max_len:
            return waveform[:, :max_len]
        elif length < max_len:
            pad_size = max_len - length
            return F.pad(waveform, (0, pad_size))
        return waveform
    



In [13]:
# Funzione che costruisce solo le coppie allineate
def get_file_path_list(dir_path):
    files = sorted(os.listdir(dir_path))

    paths = []
    for fname in files:
        file_path = os.path.join(dir_path, fname)
        paths.append(file_path)
    return paths

# === Percorsi ===
base_path = os.path.join("data", "VoiceBank_DEMAND")

train_clean_dir = os.path.join(base_path, "clean_trainset", "clean_trainset_28spk_wav")
train_noisy_dir = os.path.join(base_path, "noisy_trainset", "noisy_trainset_28spk_wav")
test_clean_dir = os.path.join(base_path, "clean_testset", "clean_testset_wav")
test_noisy_dir = os.path.join(base_path, "noisy_testset", "noisy_testset_wav")


train_clean_paths = get_file_path_list(train_clean_dir)
train_noisy_paths = get_file_path_list(train_noisy_dir)

test_clean_paths = get_file_path_list(test_clean_dir)
test_noisy_paths = get_file_path_list(test_noisy_dir)

# === Dataset personalizzato ===
train_full_dataset = AudioPairDataset(train_clean_paths, train_noisy_paths, duration=2.0)

# === Train/Validation split (es. 90% train, 10% val) ===
val_ratio = 0.1
val_size = int(len(train_full_dataset) * val_ratio)
train_size = len(train_full_dataset) - val_size

train_dataset, val_dataset = random_split(train_full_dataset, [train_size, val_size])

# === Test dataset ===
test_dataset = AudioPairDataset(test_clean_paths, test_noisy_paths, duration=2.0)

# === Dataloader ===
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)

# Stampa
print(f"Train set: {len(train_dataset)}")
print(f"Validation set: {len(val_dataset)}")
print(f"Test set: {len(test_dataset)}")


Train set: 31245
Validation set: 3471
Test set: 2472


# SIAMESE CNN

This section will cover the creation and training of our Siamese CNN for discrimination between original audio and edited one.

In [None]:
class DiscriminatorNet(nn.Module):
    def __init__(self, embedding_dim=128, n_mels=64):
        super().__init__()
        self.cnn = nn.Sequential(
            nn.Conv2d(1, 16, 3, stride=1, padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(2),

            nn.Conv2d(16, 32, 3, stride=1, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2),
        )

        self.embedding = nn.Sequential(
            nn.Flatten(),
            nn.Linear(32 * (n_mels//4) * (T//4), embedding_dim),  # T = frame length dopo padding
            nn.ReLU()
        )

        self.classifier = nn.Sequential(
            nn.Linear(embedding_dim * 2, 64),
            nn.ReLU(),
            nn.Linear(64, 1),
            nn.Sigmoid()
        )

    def forward_once(self, x):
        x = x.unsqueeze(1)  # per Conv2D
        x = self.cnn(x)
        x = self.embedding(x)
        return x

    def forward(self, x1, x2):
        emb1 = self.forward_once(x1)
        emb2 = self.forward_once(x2)
        merged = torch.cat((emb1, emb2), dim=1)
        return self.classifier(merged).squeeze(1)


# Training

In [16]:
def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs=10):
    model.train()
    for epoch in range(num_epochs):
        running_loss = 0.0
        loop = tqdm(train_loader, total=len(train_loader))
        for clean, noisy, label in loop:

            output1, output2 = model(clean, noisy)
            loss = criterion(output1, output2, label)

            # Backpropagation
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

            # Update progress bar
            loop.set_description(f"\033[34mEpoch [{epoch + 1}/{num_epochs}]\033[0m")
            loop.set_postfix(loss=loss.item())
        
        avg_loss = running_loss / len(train_loader)

        scheduler.step(avg_loss)

        # Print loss for this epoch
        tqdm.write(f"Epoch [{epoch + 1}/{num_epochs}], Average Loss: {avg_loss:.4f}")

        #### VALIDATION ####
        model.eval()
        val_loss = 0.0

        with torch.no_grad():
            for clean, noisy, label in val_loader:

                output1, output2 = model(clean, noisy)
                loss = criterion(output1, output2, label)
                val_loss += loss.item()
            
            avg_val_loss = val_loss / len(val_loader)
            print(f"\033[34mStudent learning Validation Loss: {avg_val_loss:.4f}\033[0m")

            if epoch == 0:
                # create a directory to save the model
                os.makedirs("checkpoint", exist_ok=True)
                best_loss = avg_val_loss
                torch.save(model.state_dict(), os.path.join("checkpoint", "siamese_model.pth"))
            elif avg_val_loss < best_loss:
                best_loss = avg_val_loss
                torch.save(model.state_dict(), os.path.join("checkpoint", "siamese_model.pth"))
        


In [None]:
# Hyperparameters
epochs = 10
batch_size = 16
learning_rate = 1e-3
margin = 1.0
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Modello, loss, optimizer
model = DiscriminatorNet().to(device)
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=2)

# Load the saved model if exists
checkpoint_path = os.path.join("checkpoint", "siamese_model.pth")
if os.path.exists(checkpoint_path):
    print("Loading saved model...")
    model.load_state_dict(torch.load(checkpoint_path))

# Train the model
train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs=epochs)


[34mEpoch [1/10][0m:   2%|▏         | 35/1953 [00:10<09:54,  3.23it/s, loss=0.0748]


KeyboardInterrupt: 