In [3]:
import numpy as np
import pandas as pd
import os
import random
import librosa as libr
import librosa.display as disp
from IPython.display import Audio

In [2]:
import torch
from torch import nn
from torch.utils.data import Dataset
from torchvision.transforms import ToTensor
import torchaudio
import torch.nn.functional as F
from torch.utils.data import SubsetRandomSampler,Subset,DataLoader

In [4]:
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')

In [5]:
data_path = "./../data/28/"
#train_noisy_data_path = "./../data/28/noisy_trainset_28spk_wav/"
train_noisy_data_path = "./../data/28/sample/11/"
train_clean_data_path = "./../data/28/clean_trainset_28spk_wav/"
#test_noisy_data_path = "./../data/28/noisy_testset_wav/"
test_noisy_data_path = "./../data/28/sample/21/"
test_clean_data_path = "./../data/28/clean_testset_wav/"

In [None]:
def load_data(directory_path):
    all_items = os.listdir(directory_path)
    files = [item for item in all_items if os.path.isfile(os.path.join(directory_path, item))]
    data = {'Filepath': [os.path.join(directory_path, file_name) for file_name in files]}
    df = pd.DataFrame(data)
    return df

In [None]:
load_data(train_noisy_data_path).shape

In [None]:
load_data(train_clean_data_path).shape

In [None]:
load_data(test_noisy_data_path).shape

In [None]:
load_data(test_clean_data_path).shape

In [None]:
def build_mfcc_spectograms(audio_file_path,n_coeffs = 15,n_fft = 2048,hop_len = 512):
    signal, sample_rate = libr.load(audio_file_path) #sr=22050
    mfcc = libr.feature.mfcc(y=signal,n_fft=n_fft, hop_length=hop_len, n_mfcc=n_coeffs)
    return libr.get_duration(y=signal, sr=sample_rate)

In [None]:
a = load_data(test_clean_data_path)['Filepath'].apply(build_mfcc_spectograms)
a

In [None]:
a.max() #Max-duration

In [None]:
train_noisy_data_path = "./../data/28/noisy_trainset_28spk_wav/"
train_clean_data_path = "./../data/28/clean_trainset_28spk_wav/"
test_noisy_data_path = "./../data/28/noisy_testset_wav/"
test_clean_data_path = "./../data/28/clean_testset_wav/"

In [None]:
config = {
    'sample_rate':48000,
    'max_duration':10,
    'n_fft':1024,
    'hop_length':512,
    'n_mels':64,
    'batch_size':128,
    'learning_rate':1e-6,
    'epochs':10
}

In [None]:
class AudioDataset(Dataset):
    def __init__(self,noisy_path,clean_path, transform=None, sample_rate=None,max_duration=None):
        
        #get file paths
        noisy_all_items = os.listdir(noisy_path)
        noisy_files = [item for item in noisy_all_items if os.path.isfile(os.path.join(noisy_path, item))]
        noisy_file_paths = [os.path.join(noisy_path, file_name) for file_name in noisy_files]
        clean_file_paths = [os.path.join(clean_path, file_name) for file_name in noisy_files]

        #initialize variables
        self.noisy_data = noisy_file_paths
        self.clean_data = clean_file_paths
        self.transform = transform
        self.sample_rate = sample_rate
        self.max_duration = max_duration
        self.num_samples = sample_rate*max_duration

    def __len__(self):
        return len(self.noisy_data)

    def __getitem__(self, idx):
        noisy_waveform, sr = torchaudio.load(self.noisy_data[idx])  
        noisy_waveform = noisy_waveform.numpy().reshape(-1)
        if noisy_waveform.shape[0] < self.num_samples:
            num_missing_samples = self.num_samples - noisy_waveform.shape[0]
            noisy_waveform = F.pad(torch.tensor(noisy_waveform), (0, num_missing_samples))
        noisy_waveform = noisy_waveform[:self.num_samples]
        if self.transform:
            noisy_waveform = self.transform(noisy_waveform)
            
        clean_waveform, sr = torchaudio.load(self.clean_data[idx])  
        clean_waveform = clean_waveform.numpy().reshape(-1)
        if clean_waveform.shape[0] < self.num_samples:
            num_missing_samples = self.num_samples - clean_waveform.shape[0]
            clean_waveform = F.pad(torch.tensor(clean_waveform), (0, num_missing_samples))
        clean_waveform = clean_waveform[:self.num_samples]
        if self.transform:
            clean_waveform = self.transform(clean_waveform)
        print(type(noisy_waveform))
        return noisy_waveform, clean_waveform


    def plot_waveform(self,noisy_waveform,clean_waveform):
        noisy_waveform = noisy_waveform.numpy().reshape(-1)
        clean_waveform = clean_waveform.numpy().reshape(-1)
        plt.figure(figsize=(8,5))
        plt.xlabel("Time")
        plt.ylabel("Amplitude")
        plt.title("Noisy vs Filtered Signal - Time Domain")
        disp.waveshow(noisy_waveform,sr=self.sample_rate,label="Noisy")
        disp.waveshow(clean_waveform,sr=self.sample_rate,label="Clean")
        plt.legend()
        plt.show()
        

In [None]:
dataset = AudioDataset(noisy_path=train_noisy_data_path,
                       clean_path=train_clean_data_path,
                       sample_rate=config['sample_rate'],
                       max_duration=config['max_duration'])
dataset

In [None]:
len(dataset)

In [None]:
idx = 500
x,y = dataset[idx]
dataset.plot_waveform(x,y)

In [None]:
x.shape,y.shape

In [None]:
x[0]

In [None]:
c,x = torchaudio.load('./../data/28/noisy_trainset_28spk_wav/p250_342.wav')   
c = c.numpy().reshape(-1)
c.shape,x

In [None]:
if c.shape[0] > 1:
    c = c[:480000]
c.shape

In [None]:
if noisy_waveform.shape[1] > self.num_samples:
        noisy_waveform = noisy_waveform[:, :self.num_samples]
    if noisy_waveform.shape[1] < self.num_samples:
        num_missing_samples = self.num_samples - noisy_waveform.shape[1]
        noisy_waveform = F.pad(noisy_waveform, (0, num_missing_samples))
    if self.transform:
        noisy_waveform = self.transform(noisy_waveform)

In [None]:
mel_spectrogram = torchaudio.transforms.MelSpectrogram(sample_rate=config['sample_rate'],
                                                      n_fft=config['n_fft'], 
                                                      hop_length=config['hop_length'], 
                                                      n_mels=config['n_mels'])

In [None]:
train_dataset = AudioDataset(noisy_path=train_noisy_data_path,
                       clean_path=train_clean_data_path,
                       sample_rate=config['sample_rate'],
                       max_duration=config['max_duration'],
                       transform=mel_spectrogram)
test_dataset = AudioDataset(noisy_path=test_noisy_data_path,
                       clean_path=test_clean_data_path,
                       sample_rate=config['sample_rate'],
                       max_duration=config['max_duration'],
                       transform=mel_spectrogram)

In [None]:
len(train_dataset),len(test_dataset)

In [None]:
idx = 1000
train_dataset[idx][0].shape,train_dataset[idx][1].shape

In [None]:
idx = 100
test_dataset[idx][0].shape,test_dataset[idx][1].shape

In [None]:
def split_dataset(dataset, perc=0.9):
    dataset_indices = list(range(len(dataset)))
    random.shuffle(dataset_indices)
    train_indices = dataset_indices[:int(len(dataset_indices) * perc)]
    val_indices = dataset_indices[int(len(dataset_indices) * perc):]
    train_dataset = Subset(dataset, train_indices)
    val_dataset = Subset(dataset, val_indices)
    return train_dataset,val_dataset

In [None]:
def print_all_grad_fns(grad_fn):
    while grad_fn is not None:
        print(grad_fn)
        if hasattr(grad_fn, 'next_functions'):
            for next_function in grad_fn.next_functions:
                print_all_grad_fns(next_function[0])
        grad_fn = grad_fn.next_functions[0][0]

In [None]:
train_dataset, val_dataset = split_dataset(train_dataset,0.9)

In [None]:
len(train_dataset),len(val_dataset),len(test_dataset)

In [None]:
idx = 100
val_dataset[idx][0].shape,val_dataset[idx][1].shape

In [None]:
batch_size = 64

In [None]:
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
for X, y in test_dataloader:
    print(f"Shape of X [N, C, H, W]: {X.shape}")
    print(f"Shape of y: {y.shape} {y.dtype}")
    break

In [None]:
for X, y in val_dataloader:
    print(f"Shape of X [N, C, H, W]: {X.shape}")
    print(f"Shape of y: {y.shape} {y.dtype}")
    break

In [None]:
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")

In [None]:
class NeuralNetwork(nn.Module):
    def __init__(self, input_shape, output_shape):
        super().__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(input_shape,512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512,output_shape)
        )

    def forward(self, x):
        x = self.flatten(x)
        print(x.shape)
        logits = self.linear_relu_stack(x)
        return logits

In [None]:
model = NeuralNetwork(input_shape=60032,output_shape=60032).to(device)
print(model)

In [None]:
learning_rate = 1e-3
epochs = 5

In [None]:
loss_fn = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [None]:
def train_loop(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    # Set the model to training mode - important for batch normalization and dropout layers
    # Unnecessary in this situation but added for best practices
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        print(X.shape) 
        # Compute prediction and loss
        pred = model(X)
        loss = loss_fn(pred, y)
        print_all_grad_fns(loss.grad_fn)
        # Backpropagation
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        if batch % 100 == 0:
            loss, current = loss.item(), (batch + 1) * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")
    


In [None]:
def test_loop(dataloader, model, loss_fn):
    # Set the model to evaluation mode - important for batch normalization and dropout layers
    # Unnecessary in this situation but added for best practices
    model.eval()
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    test_loss, correct = 0, 0

    # Evaluating the model with torch.no_grad() ensures that no gradients are computed during test mode
    # also serves to reduce unnecessary gradient computations and memory usage for tensors with requires_grad=True
    with torch.no_grad():
        for X, y in dataloader:
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()

    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")


In [None]:
epochs = 10
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    model.to(device)
    train_dataloader = train_dataloader.to(device)
    test_dataloader = test_dataloader.to(device)
    train_loop(train_dataloader, model, loss_fn, optimizer)
    test_loop(test_dataloader, model, loss_fn)
print("Done!")

In [None]:
model.to(device)
data = data.to(device)