In [4]:
# LIBRISPEECH_ROOT = "C:\\Users\\razic\\OneDrive\\Desktop\\Speech processing project\\LIBRI_ROOT"  
# DEMAND_ROOT      = "C:\\Users\\razic\\OneDrive\\Desktop\\Speech processing project\\archive"

In [5]:
# Cell 1: Imports and basic config

import os
import random
from glob import glob
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torchaudio

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

Using device: cpu


In [6]:
LIBRISPEECH_ROOT = "C:\\Users\\razic\\OneDrive\\Desktop\\Speech processing project\\LIBRI_ROOT"   
DEMAND_ROOT      = "C:\\Users\\razic\\OneDrive\\Desktop\\Speech processing project\\archive"    

# Will use 16 kHz audio
TARGET_SAMPLE_RATE = 16000

# Training / validation / test subsets from LibriSpeech
TRAIN_SUBSET = "train-clean-100"
VAL_SUBSET   = "dev-clean"
TEST_SUBSET  = "test-clean"

# SNR levels in dB for mixing
TRAIN_SNR_LEVELS = [0, 5, 10, 15, 20]
TEST_SNR_LEVELS  = [0, 5, 10, 15, 20]

# Fixed length (seconds) of audio segments during training
SEGMENT_DURATION = 3.0  # seconds
SEGMENT_SAMPLES = int(TARGET_SAMPLE_RATE * SEGMENT_DURATION)


In [7]:
def load_audio(path, target_sr=TARGET_SAMPLE_RATE):
    """
    Load an audio file and resample to target_sr (mono).
    Returns: torch.Tensor [1, T] and sample_rate
    """
    waveform, sr = torchaudio.load(path)  # [channels, time]
    # Convert to mono if needed
    if waveform.shape[0] > 1:
        waveform = torch.mean(waveform, dim=0, keepdim=True)
    # Resample if needed
    if sr != target_sr:
        resampler = torchaudio.transforms.Resample(sr, target_sr)
        waveform = resampler(waveform)
        sr = target_sr
    return waveform, sr


def rms_energy(x):
    """
    Root-mean-square energy of a 1D torch tensor.
    """
    return torch.sqrt(torch.mean(x ** 2) + 1e-8)


def mix_clean_and_noise(clean, noise, snr_db):
    """
    Mix clean and noise at a desired SNR (in dB).
    clean, noise: torch.Tensor [1, T]
    Returns:
        noisy: clean + scaled_noise
        scaled_noise: noise after scaling (same length as clean)
    """
    # Make noise same length as clean
    if noise.shape[1] < clean.shape[1]:
        # Loop / tile noise
        repeat_factor = int(np.ceil(clean.shape[1] / noise.shape[1]))
        noise = noise.repeat(1, repeat_factor)
    noise = noise[:, :clean.shape[1]]

    # Compute scaling factor for desired SNR
    clean_rms = rms_energy(clean)
    noise_rms = rms_energy(noise)

    desired_noise_rms = clean_rms / (10 ** (snr_db / 20))
    scale = desired_noise_rms / (noise_rms + 1e-8)

    scaled_noise = noise * scale
    noisy = clean + scaled_noise

    return noisy, scaled_noise


def compute_snr_db(clean, noisy):
    """
    Compute SNR (dB) between clean and noisy.
    SNR = 10 * log10( P_signal / P_noise )
    where P_signal = mean(clean^2),
          P_noise  = mean((noisy - clean)^2)
    """
    signal_power = torch.mean(clean ** 2) + 1e-8
    noise_power = torch.mean((noisy - clean) ** 2) + 1e-8
    snr = 10 * torch.log10(signal_power / noise_power)
    return snr.item()


In [8]:
def collect_librispeech_files(root, subset):
    """
    Collect all .flac files from a given LibriSpeech subset.
    E.g., subset = 'train-clean-100'
    """
    subset_dir = os.path.join(root, subset)
    flac_files = glob(os.path.join(subset_dir, "**", "*.flac"), recursive=True)
    flac_files = sorted(flac_files)
    print(f"{subset}: found {len(flac_files)} files")
    return flac_files


def collect_demand_noise_files(root):
    """
    Collect all 16k DEMAND noise wav files.
    We'll search in *_16k/* folders.
    """
    pattern = os.path.join(root, "*_16k", "*", "*.wav")
    noise_files = glob(pattern)
    noise_files = sorted(noise_files)
    print(f"DEMAND: found {len(noise_files)} noise files")
    return noise_files


train_clean_files = collect_librispeech_files(LIBRISPEECH_ROOT, TRAIN_SUBSET)
val_clean_files   = collect_librispeech_files(LIBRISPEECH_ROOT, VAL_SUBSET)
test_clean_files  = collect_librispeech_files(LIBRISPEECH_ROOT, TEST_SUBSET)

noise_files = collect_demand_noise_files(DEMAND_ROOT)

train-clean-100: found 28539 files
dev-clean: found 2703 files
test-clean: found 2620 files
DEMAND: found 272 noise files


In [9]:
class SpeechDenoiseDataset(Dataset):
    def __init__(self, clean_files, noise_files, snr_levels, segment_samples=SEGMENT_SAMPLES, mode="train"):
        """
        clean_files: list of paths to LibriSpeech flac files
        noise_files: list of paths to DEMAND wav files
        snr_levels: list of SNR dB values to sample from
        segment_samples: number of samples per segment
        mode: 'train', 'val', or 'test'
        """
        self.clean_files = clean_files
        self.noise_files = noise_files
        self.snr_levels = snr_levels
        self.segment_samples = segment_samples
        self.mode = mode

    def __len__(self):
        return len(self.clean_files)

    def random_segment(self, audio):
        """
        Given audio [1, T], crop a random segment of length segment_samples.
        If shorter, pad with zeros.
        """
        num_samples = audio.shape[1]
        if num_samples <= self.segment_samples:
            pad_amount = self.segment_samples - num_samples
            audio = torch.nn.functional.pad(audio, (0, pad_amount))
            return audio
        else:
            start = random.randint(0, num_samples - self.segment_samples)
            return audio[:, start:start + self.segment_samples]

    def __getitem__(self, idx):
        clean_path = self.clean_files[idx]

        # Random noise file
        noise_path = random.choice(self.noise_files)

        # Load audio
        clean, _ = load_audio(clean_path)
        noise, _ = load_audio(noise_path)

        # Random segment (for both clean and noise)
        clean_seg = self.random_segment(clean)
        noise_seg = self.random_segment(noise)

        # Choose SNR
        snr_db = random.choice(self.snr_levels)

        # Mix
        noisy_seg, _ = mix_clean_and_noise(clean_seg, noise_seg, snr_db)

        return {
            "noisy": noisy_seg.squeeze(0),  # [T]
            "clean": clean_seg.squeeze(0),  # [T]
            "snr_db": snr_db
        }

In [10]:
# Cell 5: DataLoaders

BATCH_SIZE = 16
NUM_WORKERS = 2  # adjust based on your CPU

train_dataset = SpeechDenoiseDataset(
    clean_files=train_clean_files,
    noise_files=noise_files,
    snr_levels=TRAIN_SNR_LEVELS,
    segment_samples=SEGMENT_SAMPLES,
    mode="train"
)

val_dataset = SpeechDenoiseDataset(
    clean_files=val_clean_files,
    noise_files=noise_files,
    snr_levels=TRAIN_SNR_LEVELS,
    segment_samples=SEGMENT_SAMPLES,
    mode="val"
)

test_dataset = SpeechDenoiseDataset(
    clean_files=test_clean_files,
    noise_files=noise_files,
    snr_levels=TEST_SNR_LEVELS,
    segment_samples=SEGMENT_SAMPLES,
    mode="test"
)

def collate_fn(batch):
    """
    Simple collate: stack tensors along batch dimension.
    Each element in batch is a dict.
    """
    noisy = torch.stack([item["noisy"] for item in batch], dim=0)
    clean = torch.stack([item["clean"] for item in batch], dim=0)
    snr_db = torch.tensor([item["snr_db"] for item in batch], dtype=torch.float32)
    return noisy, clean, snr_db

train_loader = DataLoader(
    train_dataset, batch_size=BATCH_SIZE, shuffle=True,
    num_workers=NUM_WORKERS, collate_fn=collate_fn
)

val_loader = DataLoader(
    val_dataset, batch_size=BATCH_SIZE, shuffle=False,
    num_workers=NUM_WORKERS, collate_fn=collate_fn
)

test_loader = DataLoader(
    test_dataset, batch_size=1, shuffle=False,  # batch_size=1 for easy per-utterance eval
    num_workers=NUM_WORKERS, collate_fn=collate_fn
)

len(train_loader), len(val_loader), len(test_loader)

(1784, 169, 2620)

In [11]:
class ConvDenoiser(nn.Module):
    def __init__(self):
        super().__init__()
        # Encoder
        self.encoder = nn.Sequential(
            nn.Conv1d(1, 16, kernel_size=15, stride=2, padding=7),
            nn.ReLU(),
            nn.Conv1d(16, 32, kernel_size=15, stride=2, padding=7),
            nn.ReLU(),
            nn.Conv1d(32, 64, kernel_size=15, stride=2, padding=7),
            nn.ReLU(),
        )
        # Decoder (transpose convolutions)
        self.decoder = nn.Sequential(
            nn.ConvTranspose1d(64, 32, kernel_size=15, stride=2, padding=7, output_padding=1),
            nn.ReLU(),
            nn.ConvTranspose1d(32, 16, kernel_size=15, stride=2, padding=7, output_padding=1),
            nn.ReLU(),
            nn.ConvTranspose1d(16, 1, kernel_size=15, stride=2, padding=7, output_padding=1),
        )

    def forward(self, x):
        """
        x: [batch, time] -> reshape to [batch, 1, time]
        """
        x = x.unsqueeze(1)
        z = self.encoder(x)
        y = self.decoder(z)
        # y: [batch, 1, time]
        y = y.squeeze(1)
        return y


baseline_model = ConvDenoiser().to(device)
print(baseline_model)


ConvDenoiser(
  (encoder): Sequential(
    (0): Conv1d(1, 16, kernel_size=(15,), stride=(2,), padding=(7,))
    (1): ReLU()
    (2): Conv1d(16, 32, kernel_size=(15,), stride=(2,), padding=(7,))
    (3): ReLU()
    (4): Conv1d(32, 64, kernel_size=(15,), stride=(2,), padding=(7,))
    (5): ReLU()
  )
  (decoder): Sequential(
    (0): ConvTranspose1d(64, 32, kernel_size=(15,), stride=(2,), padding=(7,), output_padding=(1,))
    (1): ReLU()
    (2): ConvTranspose1d(32, 16, kernel_size=(15,), stride=(2,), padding=(7,), output_padding=(1,))
    (3): ReLU()
    (4): ConvTranspose1d(16, 1, kernel_size=(15,), stride=(2,), padding=(7,), output_padding=(1,))
  )
)


In [12]:
def train_one_epoch(model, dataloader, optimizer, criterion):
    model.train()
    running_loss = 0.0

    for noisy, clean, _snr_db in dataloader:
        noisy = noisy.to(device)
        clean = clean.to(device)

        optimizer.zero_grad()
        denoised = model(noisy)

        loss = criterion(denoised, clean)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * noisy.size(0)

    return running_loss / len(dataloader.dataset)


def evaluate_loss(model, dataloader, criterion):
    model.eval()
    running_loss = 0.0

    with torch.no_grad():
        for noisy, clean, _snr_db in dataloader:
            noisy = noisy.to(device)
            clean = clean.to(device)

            denoised = model(noisy)
            loss = criterion(denoised, clean)
            running_loss += loss.item() * noisy.size(0)

    return running_loss / len(dataloader.dataset)


def train_one_experiment(
    model,
    train_loader,
    val_loader,
    num_epochs=5,
    lr=1e-3,
    experiment_name="baseline_conv1d"
):
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = nn.MSELoss()

    best_val_loss = float("inf")
    history = {"train_loss": [], "val_loss": []}

    for epoch in range(1, num_epochs + 1):
        train_loss = train_one_epoch(model, train_loader, optimizer, criterion)
        val_loss = evaluate_loss(model, val_loader, criterion)

        history["train_loss"].append(train_loss)
        history["val_loss"].append(val_loss)

        print(f"[{experiment_name}] Epoch {epoch}/{num_epochs} "
              f"- Train Loss: {train_loss:.6f} - Val Loss: {val_loss:.6f}")

        # Save best model
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), f"{experiment_name}_best.pt")
            print(f"  -> New best model saved (val_loss={best_val_loss:.6f})")

    return history


In [13]:
# Smoke Test Cell 1: tiny dataset & dataloader

SMOKE_NUM_FILES = 10      # use only 10 clean files
SMOKE_BATCH_SIZE = 2      # very small batch
SMOKE_SEGMENT_DURATION = 1.0  # 1 second segments
SMOKE_SEGMENT_SAMPLES = int(TARGET_SAMPLE_RATE * SMOKE_SEGMENT_DURATION)

# Take just a subset of train_clean_files
smoke_clean_files = train_clean_files[:SMOKE_NUM_FILES]

smoke_dataset = SpeechDenoiseDataset(
    clean_files=smoke_clean_files,
    noise_files=noise_files,
    snr_levels=TRAIN_SNR_LEVELS,
    segment_samples=SMOKE_SEGMENT_SAMPLES,
    mode="train"
)

def smoke_collate_fn(batch):
    noisy = torch.stack([item["noisy"] for item in batch], dim=0)
    clean = torch.stack([item["clean"] for item in batch], dim=0)
    snr_db = torch.tensor([item["snr_db"] for item in batch], dtype=torch.float32)
    return noisy, clean, snr_db

smoke_loader = DataLoader(
    smoke_dataset,
    batch_size=SMOKE_BATCH_SIZE,
    shuffle=True,
    num_workers=0,           # 0 for easier debugging
    collate_fn=smoke_collate_fn
)

print("Smoke dataset size:", len(smoke_dataset))
print("Smoke loader batches:", len(smoke_loader))


Smoke dataset size: 10
Smoke loader batches: 5


In [14]:
# Smoke Test : single forward pass
smoke_model = ConvDenoiser().to(device)
smoke_model.eval()

noisy_batch, clean_batch, snr_batch = next(iter(smoke_loader))
print("Noisy batch shape:", noisy_batch.shape)
print("Clean batch shape:", clean_batch.shape)
print("SNR batch:", snr_batch)

noisy_batch = noisy_batch.to(device)
clean_batch = clean_batch.to(device)

with torch.no_grad():
    denoised_batch = smoke_model(noisy_batch)

print("Denoised batch shape:", denoised_batch.shape)


Noisy batch shape: torch.Size([2, 16000])
Clean batch shape: torch.Size([2, 16000])
SNR batch: tensor([ 0., 20.])




Denoised batch shape: torch.Size([2, 16000])


In [15]:
# Smoke Test Cell: one small training step
smoke_model = ConvDenoiser().to(device)
optimizer = torch.optim.Adam(smoke_model.parameters(), lr=1e-3)
criterion = nn.MSELoss()

smoke_model.train()

max_batches = 2  # 2 batches
batch_count = 0

for noisy, clean, snr_db in smoke_loader:
    noisy = noisy.to(device)
    clean = clean.to(device)

    optimizer.zero_grad()
    denoised = smoke_model(noisy)
    loss = criterion(denoised, clean)
    loss.backward()
    optimizer.step()

    print(f"Batch {batch_count+1} - Loss: {loss.item():.6f}")
    batch_count += 1

    if batch_count >= max_batches:
        break

print("Smoke training step completed.")




Batch 1 - Loss: 0.076515
Batch 2 - Loss: 0.059124
Smoke training step completed.


In [None]:
# Run one baseline experiment
NUM_EPOCHS = 1
LEARNING_RATE = 1e-3

baseline_model = ConvDenoiser().to(device)

history_baseline = train_one_experiment(
    baseline_model,
    train_loader,
    val_loader,
    num_epochs=NUM_EPOCHS,
    lr=LEARNING_RATE,
    experiment_name="baseline_conv1d"
)

In [None]:
# Evaluate SNR improvement (ΔSNR) on test set
def evaluate_snr_improvement(model, dataloader):
    model.eval()

    results_per_level = {snr: [] for snr in TEST_SNR_LEVELS}

    with torch.no_grad():
        for noisy, clean, snr_db_in in dataloader:
            noisy = noisy.to(device)
            clean = clean.to(device)

            # Denoise
            denoised = model(noisy)

            # Compute SNRs
            # batch_size = 1 (by construction), so just use [0]
            clean_ = clean[0:1, :]
            noisy_ = noisy[0:1, :]
            denoised_ = denoised[0:1, :]

            input_snr  = compute_snr_db(clean_, noisy_)
            output_snr = compute_snr_db(clean_, denoised_)
            delta_snr  = output_snr - input_snr

            snr_level = float(snr_db_in.item())
            if snr_level not in results_per_level:
                results_per_level[snr_level] = []
            results_per_level[snr_level].append(delta_snr)

    # Aggregate
    avg_results = {}
    for snr_level, values in results_per_level.items():
        if len(values) > 0:
            avg_results[snr_level] = {
                "mean_delta_snr": float(np.mean(values)),
                "std_delta_snr": float(np.std(values)),
                "n": len(values),
            }
        else:
            avg_results[snr_level] = {
                "mean_delta_snr": None,
                "std_delta_snr": None,
                "n": 0,
            }

    return avg_results


# Load best model weights before evaluation
baseline_eval_model = ConvDenoiser().to(device)
baseline_eval_model.load_state_dict(torch.load("baseline_conv1d_best.pt", map_location=device))

snr_results = evaluate_snr_improvement(baseline_eval_model, test_loader)

print("SNR improvement (ΔSNR) per input SNR level (dB):")
for snr_level in sorted(snr_results.keys()):
    stats = snr_results[snr_level]
    print(f"SNR_in={snr_level:>2} dB -> "
          f"ΔSNR mean={stats['mean_delta_snr']:.3f} dB, "
          f"std={stats['std_delta_snr']:.3f} dB, n={stats['n']}")


In [None]:
# Run model on one random test example and save audio
from IPython.display import Audio

def denoise_one_example(model, dataset, index=None, save_prefix="example"):
    model.eval()
    if index is None:
        index = random.randint(0, len(dataset) - 1)
    sample = dataset[index]

    noisy = sample["noisy"].unsqueeze(0).to(device)  # [1, T]
    clean = sample["clean"].unsqueeze(0).to(device)

    with torch.no_grad():
        denoised = model(noisy)

    # Move to CPU for playback
    noisy_np = noisy.squeeze(0).cpu().numpy()
    clean_np = clean.squeeze(0).cpu().numpy()
    denoised_np = denoised.squeeze(0).cpu().numpy()

    print(f"Sample index: {index}, SNR_in={sample['snr_db']} dB")

    # Save wavs if you want
    torchaudio.save(f"{save_prefix}_noisy.wav", torch.tensor(noisy_np).unsqueeze(0), TARGET_SAMPLE_RATE)
    torchaudio.save(f"{save_prefix}_clean.wav", torch.tensor(clean_np).unsqueeze(0), TARGET_SAMPLE_RATE)
    torchaudio.save(f"{save_prefix}_denoised.wav", torch.tensor(denoised_np).unsqueeze(0), TARGET_SAMPLE_RATE)

    print("Saved:", f"{save_prefix}_noisy.wav", f"{save_prefix}_clean.wav", f"{save_prefix}_denoised.wav")

    # Play in notebook (if desired)
    print("Noisy:")
    display(Audio(noisy_np, rate=TARGET_SAMPLE_RATE))
    print("Clean:")
    display(Audio(clean_np, rate=TARGET_SAMPLE_RATE))
    print("Denoised:")
    display(Audio(denoised_np, rate=TARGET_SAMPLE_RATE))


denoise_one_example(baseline_eval_model, test_dataset, index=None, save_prefix="baseline_example")
