# <center> Speech denoising </center>

In [1]:
%pip install torchmetrics
%pip install pesq
%pip install torchmetrics[audio]

Defaulting to user installation because normal site-packages is not writeable
Collecting torchmetrics
  Downloading torchmetrics-1.6.1-py3-none-any.whl.metadata (21 kB)
Collecting lightning-utilities>=0.8.0 (from torchmetrics)
  Downloading lightning_utilities-0.11.9-py3-none-any.whl.metadata (5.2 kB)
Downloading torchmetrics-1.6.1-py3-none-any.whl (927 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m927.3/927.3 kB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hDownloading lightning_utilities-0.11.9-py3-none-any.whl (28 kB)
Installing collected packages: lightning-utilities, torchmetrics
Successfully installed lightning-utilities-0.11.9 torchmetrics-1.6.1

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.2[0m[39;49m -> [0m[32;49m24.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49m/Library/Developer/CommandLineTools/usr/bin/python3 -m pip install --upgrade 

# Imports

In [None]:
import gc
import os
import re
import traceback
from functools import wraps
from random import randint
from time import perf_counter

import torchaudio
from IPython.core.display import display
from ipywidgets import interact
from matplotlib.lines import Line2D

import IPython
import matplotlib.pyplot as plt
import numpy as np
import psutil
import torch
import torch.nn as nn

  from IPython.core.display import display


In [3]:
def load_folder(folder_path):

  if not folder_path.endswith("/"):
    folder_path += "/" 

  tensor_array, sampling_rate = [], None

  for file_name in os.listdir(folder_path):
    if file_name.endswith(".wav"):
      tensor_data, sr = torchaudio.load(folder_path + file_name)
      tensor_array.append(tensor_data)

      sampling_rate = sampling_rate or sr
      assert sampling_rate == sr, "Sampling rate not uniform on all the data"
  
  print(f"Sampling rate: {sampling_rate}")
  
  return torch.stack(tensor_array)

In [14]:
voice_train = load_folder("train")
noise_train = load_folder("train-bruit")

voice_test = load_folder("test")
noise_test = load_folder("test-bruit")


Sampling rate: 8000
Sampling rate: 8000
Sampling rate: 8000
Sampling rate: 8000


In [15]:
normalize = True

if normalize:
    noise_train = nn.functional.normalize(noise_train, dim=2)
    voice_train = nn.functional.normalize(voice_train, dim=2)
    noise_test = nn.functional.normalize(noise_test, dim=2)
    voice_test = nn.functional.normalize(voice_test, dim=2)

In [16]:
@interact
def plot_voice(i = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) -> None:
  display(IPython.display.Audio((voice_train[i]), rate=8000))

interactive(children=(Dropdown(description='i', options=(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), value=1), Output()), …

In [17]:
@interact
def plot_noise(i = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) -> None:
  display(IPython.display.Audio((noise_train[i]), rate=8000))

interactive(children=(Dropdown(description='i', options=(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), value=1), Output()), …

# Utilities

In [18]:
class ConvBlock(torch.nn.Module):
    def __init__(
        self,
        io_channels: int,
        hidden_channels: int,
        kernel_size: int,
        padding: int,
        dilation: int = 1,
    ):
        super().__init__()

        self.conv_layers = torch.nn.Sequential(
            torch.nn.Conv1d(in_channels=io_channels, out_channels=hidden_channels, kernel_size=1),
            torch.nn.PReLU(),
            torch.nn.GroupNorm(num_groups=1, num_channels=hidden_channels, eps=1e-08),
            torch.nn.Conv1d(
                in_channels=hidden_channels,
                out_channels=hidden_channels,
                kernel_size=kernel_size,
                padding=padding,
                dilation=dilation,
                groups=hidden_channels,
            ),
            torch.nn.PReLU(),
            torch.nn.GroupNorm(num_groups=1, num_channels=hidden_channels, eps=1e-08),
        )

        self.out = torch.nn.Conv1d(in_channels=hidden_channels, out_channels=io_channels, kernel_size=1)
        self.skip_co = torch.nn.Conv1d(in_channels=hidden_channels, out_channels=io_channels, kernel_size=1)

    def forward(self, input):
        feature = self.conv_layers(input)
        output = self.out(feature)
        skip_co = self.skip_co(feature)
        return output, skip_co

In [19]:
class Mask(torch.nn.Module):
    def __init__(
        self,
        input_dim: int,
        kernel_size: int,
        num_feats: int,
        num_hidden: int,
        num_layers: int,
        num_stacks: int,
    ):
        super().__init__()

        self.input_dim = input_dim

        self.input_norm = torch.nn.GroupNorm(num_groups=1, num_channels=input_dim, eps=1e-8)
        self.input_conv = torch.nn.Conv1d(in_channels=input_dim, out_channels=num_feats, kernel_size=1)

        self.conv_layers = torch.nn.ModuleList([])
        for stack in range(num_stacks):
            for layer in range(num_layers):
                self.conv_layers.append(
                    ConvBlock(
                        io_channels=num_feats,
                        hidden_channels=num_hidden,
                        kernel_size=kernel_size,
                        dilation=2**layer,
                        padding=2**layer
                    )
                )
        self.output_prelu = torch.nn.PReLU()
        self.output_conv = torch.nn.Conv1d(
            in_channels=num_feats,
            out_channels=input_dim * 2,
            kernel_size=1,
        )
        self.sigmoid = torch.nn.Sigmoid()
        

    def forward(self, input):
        batch_size = input.shape[0]
        feats = self.input_norm(input)
        feats = self.input_conv(feats)
        output = 0.0
        for layer in self.conv_layers:
            residual, skip = layer(feats)
            feats = feats + residual
            output = output + skip
        output = self.output_prelu(output)
        output = self.output_conv(output)
        output = self.sigmoid(output)
        return output.view(batch_size, 2, self.input_dim, -1)

In [20]:
class TasNet(torch.nn.Module):
    def __init__(
        self,
        enc_kernel_size: int = 16,
        enc_num_feats: int = 512,
        
        msk_kernel_size: int = 3,
        msk_num_feats: int = 128,
        msk_num_hidden_feats: int = 512,
        msk_num_layers: int = 2,
        msk_num_stacks: int = 3,
    ):
        super().__init__()

        self.enc_num_feats = enc_num_feats
        self.enc_kernel_size = enc_kernel_size
        self.enc_stride = enc_kernel_size // 2

        self.encoder = torch.nn.Conv1d(
            in_channels=1,
            out_channels=enc_num_feats,
            kernel_size=enc_kernel_size,
            stride=self.enc_stride,
            padding=self.enc_stride,
            bias=False,
        )
        self.mask = Mask(
            input_dim=enc_num_feats,
            kernel_size=msk_kernel_size,
            num_feats=msk_num_feats,
            num_hidden=msk_num_hidden_feats,
            num_layers=msk_num_layers,
            num_stacks=msk_num_stacks
        )

        self.decoder = torch.nn.ConvTranspose1d(
            in_channels=enc_num_feats,
            out_channels=1,
            kernel_size=enc_kernel_size,
            stride=self.enc_stride,
            padding=self.enc_stride,
            bias=False,
        )
  
    def forward(self, input):
        batch_size, num_padded_frames = input.shape[0], input.shape[2]
        feats = self.encoder(input)
        masked = self.mask(feats) * feats.unsqueeze(1)
        masked = masked.view(batch_size * 2, self.enc_num_feats, -1)
        decoded = self.decoder(masked)
        output = decoded.view(batch_size, 2, num_padded_frames)
        return output[:, 0, :]

In [21]:
def l1_freq(pred: torch.Tensor, target: torch.Tensor) -> float:
    transform = torchaudio.transforms.Spectrogram()
    pred_spectrogram = transform(pred)
    target_spectrogram = transform(target)

    return (torch.sum(torch.abs(target_spectrogram - pred_spectrogram))).item()

In [50]:
def l1_freq(pred: torch.Tensor, target: torch.Tensor) -> float:
    # Configure the Spectrogram transform
    transform = torchaudio.transforms.Spectrogram(
        n_fft=pred.shape[-1],  # Match n_fft to the size of the last dimension
        win_length=pred.shape[-1],  # Use the full window size
        hop_length=pred.shape[-1] // 2,  # Overlap by 50%
        pad=0,  # No extra padding
    )
    
    # Apply the Spectrogram transform to both inputs
    pred_spectrogram = transform(pred)
    target_spectrogram = transform(target)

    # Compute and return the L1 frequency-domain loss
    return torch.sum(torch.abs(target_spectrogram - pred_spectrogram)).item()


memory usage track

In [22]:
def print_mem_info() -> None:
  print(f"Memory allocated: {torch.cuda.memory_allocated(0) / 1024 ** 2 :.4f} MB")
  print(f"Memory reserved:  {torch.cuda.memory_reserved(0) / 1024 ** 2 :.4f} MB")

# Model

In [23]:
# Combine voice and noise signals into training and testing datasets
big_train_signals = torch.cat([voice_train, noise_train], dim=0)
big_train_labels = torch.cat([torch.ones(len(voice_train)), torch.zeros(len(noise_train))])

big_test_signals = torch.cat([voice_test, noise_test], dim=0)
big_test_labels = torch.cat([torch.ones(len(voice_test)), torch.zeros(len(noise_test))])

# Create TensorDataset and DataLoader
train_dataset = torch.utils.data.TensorDataset(
    big_train_signals,
    big_train_labels
)
train_dataloader = torch.utils.data.DataLoader(
    train_dataset,
    batch_size=10,
    shuffle=True,
)

test_dataset = torch.utils.data.TensorDataset(
    big_test_signals,
    big_test_labels
)
test_dataloader = torch.utils.data.DataLoader(
    test_dataset,
    batch_size=10,
)

# Example of iterating through the DataLoader
for signals, labels in train_dataloader:
    print("Batch signals shape:", signals.shape)
    print("Batch labels shape:", labels.shape)

Batch signals shape: torch.Size([10, 1, 80000])
Batch labels shape: torch.Size([10])
Batch signals shape: torch.Size([10, 1, 80000])
Batch labels shape: torch.Size([10])
Batch signals shape: torch.Size([10, 1, 80000])
Batch labels shape: torch.Size([10])
Batch signals shape: torch.Size([10, 1, 80000])
Batch labels shape: torch.Size([10])
Batch signals shape: torch.Size([10, 1, 80000])
Batch labels shape: torch.Size([10])
Batch signals shape: torch.Size([10, 1, 80000])
Batch labels shape: torch.Size([10])
Batch signals shape: torch.Size([10, 1, 80000])
Batch labels shape: torch.Size([10])
Batch signals shape: torch.Size([10, 1, 80000])
Batch labels shape: torch.Size([10])
Batch signals shape: torch.Size([10, 1, 80000])
Batch labels shape: torch.Size([10])
Batch signals shape: torch.Size([10, 1, 80000])
Batch labels shape: torch.Size([10])
Batch signals shape: torch.Size([10, 1, 80000])
Batch labels shape: torch.Size([10])
Batch signals shape: torch.Size([10, 1, 80000])
Batch labels shap

In [24]:
# Get cpu or gpu device for training.
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"Using {device} device")

Using cpu device


In [51]:
model = TasNet().to(device)
print(model)

loss_fn = l1_freq
optimizer = torch.optim.Adam(model.parameters(), lr=1e-2)

TasNet(
  (encoder): Conv1d(1, 512, kernel_size=(16,), stride=(8,), padding=(8,), bias=False)
  (mask): Mask(
    (input_norm): GroupNorm(1, 512, eps=1e-08, affine=True)
    (input_conv): Conv1d(512, 128, kernel_size=(1,), stride=(1,))
    (conv_layers): ModuleList(
      (0): ConvBlock(
        (conv_layers): Sequential(
          (0): Conv1d(128, 512, kernel_size=(1,), stride=(1,))
          (1): PReLU(num_parameters=1)
          (2): GroupNorm(1, 512, eps=1e-08, affine=True)
          (3): Conv1d(512, 512, kernel_size=(3,), stride=(1,), padding=(1,), groups=512)
          (4): PReLU(num_parameters=1)
          (5): GroupNorm(1, 512, eps=1e-08, affine=True)
        )
        (out): Conv1d(512, 128, kernel_size=(1,), stride=(1,))
        (skip_co): Conv1d(512, 128, kernel_size=(1,), stride=(1,))
      )
      (1): ConvBlock(
        (conv_layers): Sequential(
          (0): Conv1d(128, 512, kernel_size=(1,), stride=(1,))
          (1): PReLU(num_parameters=1)
          (2): GroupNorm(

In [52]:
def count_parameters(model: nn.Module) -> int:
  return sum(p.numel() for p in model.parameters() if p.requires_grad)

In [53]:
print(f"Our ConvTasNet has {count_parameters(model)} parameters.")

Our ConvTasNet has 1424013 parameters.


In [54]:
print_mem_info()

Memory allocated: 0.0000 MB
Memory reserved:  0.0000 MB


In [55]:
def train(dataloader, model, loss_fn, optimizer, print_loss=True):
    """
    Train the model using a DataLoader, loss function, and optimizer.

    Args:
        dataloader (DataLoader): DataLoader for training data.
        model (nn.Module): PyTorch model to be trained.
        loss_fn (callable): Loss function to compute error.
        optimizer (Optimizer): Optimizer for updating model weights.
        device (torch.device): Device to run computations on (e.g., 'cpu' or 'cuda').
        print_loss (bool): Whether to print loss during training.

    Returns:
        float: Final loss value after training.
    """
    model.train()  # Set model to training mode
    size = len(dataloader.dataset)
    final_loss = 0.0

    for batch, (X, y) in enumerate(dataloader):
        # Move inputs and labels to the device
        X, y = X.to(device), y.to(device)

        optimizer.zero_grad()  # Clear gradients from previous iteration

        # Forward pass: Compute predictions
        pred = model(X)

        # Compute loss
        loss = loss_fn(pred.squeeze(), y.float())  # Ensure y is float for regression loss if needed
        final_loss = loss.item()  # Save loss for the final return

        # Backward pass: Compute gradients
        loss.backward()
        optimizer.step()  # Update model parameters

        # Print progress every 10 batches
        if batch % 10 == 0:
            current = batch * len(X)
            if print_loss:
                print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

    return final_loss



In [56]:
def test(dataloader, model, loss_fn, pesq_fn):
    num_batches = len(dataloader)
    model.eval()
    test_loss, pesq_err = 0, 0

    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            
            test_loss += loss_fn(pred, y.squeeze()).item()
            pesq_err += pesq_fn(pred, y.squeeze()).item()

    test_loss /= num_batches
    pesq_err /= num_batches
    
    print(f"Test Error: \n Avg loss: {test_loss:>8f} \n Avg pesq: {pesq_err:>8f}")
    
    return test_loss, pesq_err

In [57]:
from torchmetrics.audio import PerceptualEvaluationSpeechQuality

epochs = 20

loss_list_val, loss_list_test, pesq_list_test = [], [], []
test_loss, val_no_impv, halving = 10, 0, False
nb_pesq = PerceptualEvaluationSpeechQuality(8000, 'nb')

for t in range(epochs):
    try:
      print(f"Epoch {t + 1}\n-------------------------------")
      
      loss_test = train(train_dataloader, model, loss_fn, optimizer)
      prev_test_loss = test_loss
      test_loss, pesq_err = test(test_dataloader, model, loss_fn, nb_pesq)

      loss_list_val.append(test_loss)
      loss_list_test.append(loss_test)
      pesq_list_test.append(pesq_err)
      
      if test_loss >= prev_test_loss:
          val_no_impv += 1
          if val_no_impv >= 3:
              halving = True
          if val_no_impv >= 10:
              print("No improvement for 10 epochs, early stopping.")
              break
      else:
          val_no_impv = 0
      if halving:
          optim_state = optimizer.state_dict()
          optim_state['param_groups'][0]['lr'] = optim_state['param_groups'][0]['lr'] / 2.0
          optimizer.load_state_dict(optim_state)
          print(f"Learning rate adjusted to: {optim_state['param_groups'][0]['lr']:.6f}")
          halving = False

    except KeyboardInterrupt:
      print("\nExecution stopped.")
      break
    
print("Done!")

Epoch 1
-------------------------------


RuntimeError: Argument #4: Padding size should be less than the corresponding input dimension, but got: padding (40000, 40000) at dimension 2 of input [1, 1, 10]

In [None]:
plot_performance(loss_list_val, loss_list_test, pesq_list_test)

In [None]:
with torch.no_grad():
  for X, y in test_dataloader:
      n = noise_test.shape[0]
      X, y = (
          get_data(X, n, "test").to(device),
          voice_test[y].to(device)
      )
      pred = model(X)
      break

compare_batch(X, y, pred)