In [None]:
import time
import pickle
import warnings
import gc
import copy
import numpy as np
import torch
import torch.nn as nn
import torchaudio
from tqdm import tqdm, tqdm_notebook
from torch.utils.data import Dataset, DataLoader
from matplotlib import colors, pyplot as plt
from IPython.display import clear_output
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import numpy as np
import librosa
import os

In [None]:
# Define your custom dataset class
def signal2pytorch(x):
    X = np.expand_dims(x, axis=0)  #add channels dimension (here only 1 channel)
    if len(x.shape)==1: #mono:
        X = np.expand_dims(X, axis=0)  #add batch dimension (here only 1 batch)
    X=torch.from_numpy(X)
    X=X.type(torch.Tensor)
    X=X.permute(1,0,2)  #make batch dimension first
    return X

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!cp -r /content/drive/MyDrive/DL_Project/Clean /content/clean ./

!cp -r /content/drive/MyDrive/DL_Project/Noisy  /content/noisy ./

cp: cannot stat '/content/clean': No such file or directory
cp: cannot stat '/content/noisy': No such file or directory


In [None]:
!unzip /content/Clean/clean_trainset_wav.zip
!unzip /content/Noisy/noisy_trainset_wav.zip
!unzip /content/Clean/clean_testset_wav.zip
!unzip /content/Noisy/noisy_testset_wav.zip

Archive:  /content/Clean/clean_testset_wav.zip
   creating: clean_testset_wav/
  inflating: clean_testset_wav/p232_001.wav  
  inflating: clean_testset_wav/p232_002.wav  
  inflating: clean_testset_wav/p232_003.wav  
  inflating: clean_testset_wav/p232_005.wav  
  inflating: clean_testset_wav/p232_006.wav  
  inflating: clean_testset_wav/p232_007.wav  
  inflating: clean_testset_wav/p232_009.wav  
  inflating: clean_testset_wav/p232_010.wav  
  inflating: clean_testset_wav/p232_011.wav  
  inflating: clean_testset_wav/p232_012.wav  
  inflating: clean_testset_wav/p232_013.wav  
  inflating: clean_testset_wav/p232_014.wav  
  inflating: clean_testset_wav/p232_015.wav  
  inflating: clean_testset_wav/p232_016.wav  
  inflating: clean_testset_wav/p232_017.wav  
  inflating: clean_testset_wav/p232_019.wav  
  inflating: clean_testset_wav/p232_020.wav  
  inflating: clean_testset_wav/p232_021.wav  
  inflating: clean_testset_wav/p232_022.wav  
  inflating: clean_testset_wav/p232_023.wav  
 

In [None]:
SAMPLE_RATE = 48000
N_FFT = (SAMPLE_RATE * 64) // 1000
HOP_LENGTH = (SAMPLE_RATE * 16) // 1000
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


In [None]:
class SpeechDataset(Dataset):
    def __init__(self, noisy_files, clean_files):
        super().__init__()
        # list of files
        self.noisy_files = sorted(noisy_files)
        self.clean_files = sorted(clean_files)

        self.len_ = len(self.noisy_files)

        # fixed len
        self.max_len = 165000


    def __len__(self):
        return self.len_

    def load_sample(self, file):
        waveform, _ = torchaudio.load(file)
        return waveform

    def __getitem__(self, index):
        # load to tensors and normalization
        clean_audio = self.load_sample(self.clean_files[index])
        noisy_audio = self.load_sample(self.noisy_files[index])
        clean_audio = self._prepare_sample(clean_audio)
        noisy_audio = self._prepare_sample(noisy_audio)
        clean_audio = signal2pytorch(clean_audio).to(device)
        noisy_audio = signal2pytorch(noisy_audio).to(device)
        return noisy_audio, clean_audio

    def _prepare_sample(self, waveform):
        waveform = waveform.numpy()
        current_len = waveform.shape[1]

        output = np.zeros((1, self.max_len), dtype='float32')
        output[0, -current_len:] = waveform[0, :self.max_len]
        output = torch.from_numpy(output)

        return output




In [None]:
TRAIN_INPUT_DIR = Path('/content/noisy_trainset_wav')
TRAIN_TARGET_DIR = Path('/content/clean_trainset_wav')


TEST_INPUT_DIR = Path('/content/noisy_testset_wav')
TEST_TARGET_DIR = Path('/content/clean_testset_wav')

train_input_files = sorted(list(TRAIN_INPUT_DIR.rglob('*.wav')))
train_target_files = sorted(list(TRAIN_TARGET_DIR.rglob('*.wav')))
print(train_input_files)
print(train_target_files)


test_input_files = sorted(list(TEST_INPUT_DIR.rglob('*.wav')))
test_target_files = sorted(list(TEST_TARGET_DIR.rglob('*.wav')))

print("No. of Training files:",len(train_input_files))
# print("No. of Testing files:",len(test_noisy_files))
train_dataset = SpeechDataset(train_input_files, train_target_files)
train_loader = DataLoader(train_dataset, batch_size=30, shuffle=True)

test_dataset = SpeechDataset(test_input_files, test_target_files)
test_loader = DataLoader(test_dataset, batch_size=30, shuffle=True)

[PosixPath('/content/noisy_testset_wav/p232_001.wav'), PosixPath('/content/noisy_testset_wav/p232_002.wav'), PosixPath('/content/noisy_testset_wav/p232_003.wav'), PosixPath('/content/noisy_testset_wav/p232_005.wav'), PosixPath('/content/noisy_testset_wav/p232_006.wav'), PosixPath('/content/noisy_testset_wav/p232_007.wav'), PosixPath('/content/noisy_testset_wav/p232_009.wav'), PosixPath('/content/noisy_testset_wav/p232_010.wav'), PosixPath('/content/noisy_testset_wav/p232_011.wav'), PosixPath('/content/noisy_testset_wav/p232_012.wav'), PosixPath('/content/noisy_testset_wav/p232_013.wav'), PosixPath('/content/noisy_testset_wav/p232_014.wav'), PosixPath('/content/noisy_testset_wav/p232_015.wav'), PosixPath('/content/noisy_testset_wav/p232_016.wav'), PosixPath('/content/noisy_testset_wav/p232_017.wav'), PosixPath('/content/noisy_testset_wav/p232_019.wav'), PosixPath('/content/noisy_testset_wav/p232_020.wav'), PosixPath('/content/noisy_testset_wav/p232_021.wav'), PosixPath('/content/noisy_t

In [None]:
for batch_idx, (noisy_audio, clean_audio) in enumerate(train_loader):
  print("NS",noisy_audio.shape)
  print("CS",clean_audio.shape)
  temp_audio = noisy_audio[0]
  temp_audio=np.array(temp_audio.cpu())
  xrek_noisy=temp_audio[:,0,:]
  print("xrek_noisy ",xrek_noisy)
  display(ipd.Audio(xrek_noisy, rate=48000));
  xrek_clean=clean_audio[0][:,0,:]
  print("xrek_clean ",xrek_clean)
  display(ipd.Audio(xrek_clean.cpu(), rate=48000));
  break

NS torch.Size([10, 1, 1, 165000])
CS torch.Size([10, 1, 1, 165000])
xrek_noisy  [[ 0.          0.          0.         ... -0.09966832 -0.09972361
  -0.1010503 ]]


xrek_clean  tensor([[0.0000, 0.0000, 0.0000,  ..., 0.0018, 0.0022, 0.0020]],
       device='cuda:0')


In [None]:
# Define the basic building blocks: Encoder Block,Bottleneck Block, Decoder Block
class conv_block(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.conv1 = nn.Conv1d(in_channels, out_channels, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm1d(out_channels)
        self.conv2 = nn.Conv1d(out_channels, out_channels, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm1d(out_channels)
        self.relu = nn.ReLU()

    def forward(self, inputs):
        x = self.conv1(inputs)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)
        return x

class EncoderBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
      super().__init__()
      self.conv = conv_block(in_channels, out_channels)
      self.pool = nn.MaxPool1d(2)

    def forward(self, inputs):
        x = self.conv(inputs)
        p = self.pool(x)
        return x, p

class BottleneckBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
      super().__init__()
      self.conv = conv_block(in_channels, out_channels)

    def forward(self, inputs):
        x = self.conv(inputs)
        return x

class DecoderBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
      super().__init__()
      self.up = nn.ConvTranspose1d(in_channels, out_channels, kernel_size=2, stride=2, padding=0)
      self.conv = conv_block(out_channels+out_channels, out_channels)
    def forward(self, inputs, skip):
        x = self.up(inputs)
        # print("X_shape ",x.shape)
        # print("Skip_shape ",skip.shape)
        min_size = min(x[0].size(1), skip[0].size(1))
        # print("min_size",min_size)
        x_truncated = x[:,:, :min_size]
        s_truncated = skip[:,:, :min_size]
        # print("x_truncated_shape ",x_truncated.shape)
        # print("s_truncated_shape ",s_truncated.shape)
        x = torch.cat([x_truncated, s_truncated], axis=1)
        x = self.conv(x)
        return x

class UNet(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(UNet, self).__init__()
        self.in_channels = 1
        self.out_channels = 1

        # Encoder
        self.e1 = EncoderBlock(in_channels, 16)
        self.e2 = EncoderBlock(16, 32)
        self.e3 = EncoderBlock(32, 64)
        self.e4 = EncoderBlock(64, 128)
        self.e5 = EncoderBlock(128, 256)
        self.e6 = EncoderBlock(256, 512)

        # Bottleneck
        self.b = BottleneckBlock(512, 1024)
=
        # Decoder
        self.d1 = DecoderBlock(1024, 512)
        self.d2 = DecoderBlock(512, 256)
        self.d3 = DecoderBlock(256, 128)
        self.d4 = DecoderBlock(128, 64)
        self.d5 = DecoderBlock(64, 32)
        self.d6 = DecoderBlock(32, 16)

        # Output layer
        self.out_conv = nn.Conv1d(16, out_channels, kernel_size=1, padding=0)
        # self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        # self.fc = nn.Linear(512, out_channels)
        # self.sigmoid = nn.Sigmoid()

    def forward(self, inputs):
        # Encoder pass
        s1, p1 = self.e1(inputs)
        s2, p2 = self.e2(p1)
        s3, p3 = self.e3(p2)
        s4, p4 = self.e4(p3)
        s5, p5 = self.e5(p4)
        s6, p6 = self.e6(p5)

        #Bottelneck pass
        b = self.b(p6)

        # Output pass
        d1 = self.d1(b, s6)
        d2 = self.d2(d1, s5)
        d3 = self.d3(d2, s4)
        d4 = self.d4(b, s3)
        d5 = self.d5(d4, s2)
        d6 = self.d6(d5, s1)

        #Output pass

        outputs = self.out_conv(d6)

        return outputs

In [None]:
# Now you can iterate through the dataloader in your training loop
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print("Generate Model:")
loss_tracker=[]
unet = UNet(in_channels=1, out_channels=1).to("cuda")  # Adjust the input and output channels
loss_fn = nn.L1Loss()
learning_rate = 1e-4
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)#, betas=(0.9, 0.999))

Generate Model:


In [None]:
unet.load_state_dict(torch.load("/content/drive/MyDrive/DL_Project/your_unet_checkpoint.pth"))
unet.train()

UNet(
  (e1): EncoderBlock(
    (conv): conv_block(
      (conv1): Conv1d(1, 16, kernel_size=(3,), stride=(1,), padding=(1,))
      (bn1): BatchNorm1d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv1d(16, 16, kernel_size=(3,), stride=(1,), padding=(1,))
      (bn2): BatchNorm1d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU()
    )
    (pool): MaxPool1d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (e2): EncoderBlock(
    (conv): conv_block(
      (conv1): Conv1d(16, 32, kernel_size=(3,), stride=(1,), padding=(1,))
      (bn1): BatchNorm1d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv1d(32, 32, kernel_size=(3,), stride=(1,), padding=(1,))
      (bn2): BatchNorm1d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU()
    )
    (pool): MaxPool1d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)

In [1]:
# Now you can iterate through the dataloader in your training loop
# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# print("Generate Model:")
# loss_tracker=[]
# unet = UNet(in_channels=1, out_channels=1).to("cuda")  # Adjust the input and output channels
# print('Total number of parameters: %i' % (sum(p.numel() for p in unet.parameters() if p.requires_grad)))
# print("Def. loss function:")
# loss_fn = nn.MSELoss()  #MSE

# learning_rate = 1e-5
# optimizer = torch.optim.Adam(unet.parameters(), lr=learning_rate)#, betas=(0.9, 0.999))

epochs=10000
for epoch in range(epochs):
    curr_loss=0
    for batch_idx, (noisy_audio, clean_audio) in enumerate(train_loader):
        # Convert data to PyTorch tensors and move them to the device
        noisy_audio = noisy_audio.to(device)
        clean_audio = clean_audio.to(device)

        # print("Noisy audio shape",noisy_audio.shape)
        # print("Clean audio shape",clean_audio.shape)
        # print("noisy_audio  ",noisy_audio[0])
        temp_audio = noisy_audio[0]
        temp_audio=np.array(temp_audio.cpu())
        xrek=temp_audio[:,0,:]
        # print(xrek)
        # display(ipd.Audio(xrek, rate=SAMPLE_RATE))
        # Your training code here
        # Example:
        optimizer.zero_grad()
        Ypred = unet(noisy_audio[0])
        # print("Ypred audio shape",Ypred.shape)
        clean_audio_idx=clean_audio[0]
        # print("Ypred", Ypred)
        outputlen=len(Ypred[0,0,:])
        # print("outputlen",outputlen)
        clean_audio_trunc=clean_audio_idx[:,:,:outputlen]
        # print("Input Ypred.shape=", Ypred.shape )
        # print("Target clean_audio_trunc.shape=", clean_audio_trunc.shape)
        loss = loss_fn(Ypred, clean_audio_trunc)
        loss.backward()
        optimizer.step()
        print(f"Epoch [{epoch + 1}/{epochs}], Batch [{batch_idx + 1}/{len(train_loader)}], Loss: {loss.item():.10f}")
        curr_loss+=loss.item()
    if epoch % 500 == 0:
      torch.save(model.state_dict(), f"/content/drive/MyDrive/DL_Project/model_new_small_checkpoint_{epoch}.pth")
    loss_tracker.append(curr_loss)

# Don't forget to save your trained model after training0
torch.save(model.state_dict(), "your_unet_small_checkpoint.pth")


In [None]:
# torch.save(unet.state_dict(), "/content/drive/MyDrive/DL_Project/your_unet_checkpoint.pth")
unet.load_state_dict(torch.load("/content/drive/MyDrive/DL_Project/unet_checkpoint_800.pth"))
unet.eval()

UNet(
  (e1): EncoderBlock(
    (conv): conv_block(
      (conv1): Conv1d(1, 16, kernel_size=(3,), stride=(1,), padding=(1,))
      (bn1): BatchNorm1d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv1d(16, 16, kernel_size=(3,), stride=(1,), padding=(1,))
      (bn2): BatchNorm1d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU()
    )
    (pool): MaxPool1d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (e2): EncoderBlock(
    (conv): conv_block(
      (conv1): Conv1d(16, 32, kernel_size=(3,), stride=(1,), padding=(1,))
      (bn1): BatchNorm1d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv1d(32, 32, kernel_size=(3,), stride=(1,), padding=(1,))
      (bn2): BatchNorm1d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU()
    )
    (pool): MaxPool1d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)

In [None]:
ww = unet.state_dict()   #read obtained weights
noisy_audio_test, ntsamplerate = librosa.load("/content/noisy_testset_wav/p232_138.wav", mono=False, sr=None)
noisy_audio_norm_test = noisy_audio_test/np.abs(noisy_audio_test.max())
noisy_audio_norm_test_q=signal2pytorch(noisy_audio_norm_test).to(device)
predictions=unet(noisy_audio_norm_test_q).cpu() # Make Predictions based on the obtained weights, on training set
predictions=predictions.detach()
predictions=np.array(predictions)
print(predictions)
xrek=predictions[:,0,:]  #remove unnecessary dimension for playback

[[[-0.0223401  -0.03859597 -0.01959269 ... -0.00377782 -0.00471451
   -0.00588413]]]


In [None]:

ww = unet.state_dict()   #read obtained weights
noisy_audio_test, ntsamplerate = librosa.load("/content/Noisy/noisy_p232_005_audio_chunk_0.wav", mono=False, sr=None)
noisy_audio_norm_test = noisy_audio_test/np.abs(noisy_audio_test.max())
noisy_audio_norm_test_q=signal2pytorch(noisy_audio_norm_test).to(device)
predictions=unet(noisy_audio_norm_test_q).cpu() # Make Predictions based on the obtained weights, on training set
predictions=predictions.detach()
predictions=np.array(predictions)
print(predictions)
xrek=predictions[:,0,:]  #remove unnecessary dimension for playback

[[[-0.00255254 -0.0045112  -0.00596929 ...  0.00280965  0.00172099
    0.00089499]]]


In [None]:
import IPython.display as ipd
display(ipd.Audio(xrek, rate=ntsamplerate));