In [None]:
!pip install librosa

You should consider upgrading via the '/opt/conda/bin/python3.7 -m pip install --upgrade pip' command.[0m


# Load the wav files and convert to stft

In [None]:
import librosa

s, sr  = librosa.load("../input/denoise-data/train_clean_male.wav" , sr=None)
S = librosa.stft( s , n_fft=1024 , hop_length=512)
sn , sr = librosa.load("../input/denoise-data/train_dirty_male.wav" , sr=None)
X = librosa.stft(sn , n_fft=1024 , hop_length=512)

In [None]:
print("Input Clear voice data shape : ", S.shape)
print("Input Noise voice data shape : ", X.shape)

Input Clear voice data shape :  (513, 2459)
Input Noise voice data shape :  (513, 2459)


In [None]:
import numpy as np
S_abs = np.abs(S)
X_abs = np.abs(X)

S_in = np.swapaxes(S_abs , 0 , 1)
X_in = np.swapaxes(X_abs , 0 , 1)

In [None]:
#Import Libraries
from __future__ import print_function
import argparse
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.autograd import Variable

# Model with 2D CNNs

### Model data preparation

In [None]:
from collections import deque

data_in_clean =[]
data_in_dirty =[]

i_dirty = deque(maxlen=20)

i_dirty.extend( np.zeros(513 , dtype=np.float32) for i in range(20))  

for x_in , s_in in zip(X_in , S_in):

    i_dirty.append(x_in)
    
    data_in_clean.append(s_in)
    data_in_dirty.append(np.array(i_dirty))

In [None]:
from torch.utils.data import Dataset , DataLoader

class Wav_DataGenerator(Dataset):
    def __init__(self , noise_wav , clean_wav , seed):
        super(Wav_DataGenerator , self).__init__()
        self.noise_wav = noise_wav
        self.clean_wav = clean_wav
        self.seed = torch.manual_seed(seed)
        
    def __getitem__(self , index):
        
        data_x = self.noise_wav[index]
        data_y = self.clean_wav[index]
        
        data_x = data_x[np.newaxis , : , : ]
        
        return data_x , data_y
        
    def __len__(self ):
        return len(self.noise_wav)
    


## Define the data loaders

In [None]:
#define the data generator
train_data = Wav_DataGenerator(data_in_dirty , data_in_clean , 1264)
train_dataloader = DataLoader(train_data , batch_size=32 , shuffle=True)

## Define the 2D CNN model

In [None]:
class Net(nn.Module):
    #This defines the structure of the NN.
    def __init__(self , activation='relu'):
        super(Net, self).__init__()
        self.wav_size = 513
        self.conv2d_1 = nn.Conv2d(in_channels=1 , out_channels=16 , kernel_size=(3,3) , padding=(1,1) )
        self.conv2d_2 = nn.Conv2d(in_channels=16 , out_channels=32 , kernel_size=(3,3), padding=(1,1) , stride=(2,2) )
        self.conv2d_3 = nn.Conv2d(in_channels=32 , out_channels=64 , kernel_size=(3,3), stride=(2,2))
        #self.conv2d_4 = nn.Conv2d(in_channels=64 , out_channels=128 , kernel_size=(3,3), stride=(2,2))
        self.flatten_size = 64*2*128*2
        self.out_layer = nn.Linear(self.flatten_size , 513)
        #select the activation function
        if(activation=='relu'):
            self.activation_fn = nn.ReLU()
        if(activation=='logistic_sigmoid'):
            self.activation_fn = nn.LogSigmoid()

    def forward(self, x):
        
        x = F.relu(self.conv2d_1(x))
        x = F.relu(self.conv2d_2(x))
        x = F.relu(self.conv2d_3(x))
        #x = F.relu(self.conv2d_4(x))
        x = x.view(-1,self.flatten_size)
        out = self.activation_fn(self.out_layer(x))
        #Softmax gets probabilities. 
        return out


#model weight initialization function 
def init_weights_normal(m):
    if type(m) == nn.Linear:
        torch.nn.init.normal_(m.weight , mean=0 , std=0.01)
        m.bias.data.fill_(0)

def init_weights_xavier(m):
    if type(m) == nn.Linear:      
        torch.nn.init.xavier_normal_(m.weight , gain=0.8)
        m.bias.data.fill_(0)
def init_weights_kaiman(m):
    if type(m) == nn.Linear:
        torch.nn.init.kaiming_normal_(m.weight)
        m.bias.data.fill_(0)

## initialize the model

In [None]:
#define the model
device="cuda:0" if torch.cuda.is_available() else "cpu"

Denoise_Model = Net()
Denoise_Model.apply(init_weights_xavier)
#load the model to gpu if available
Denoise_Model.to(device)

Net(
  (conv2d_1): Conv2d(1, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv2d_2): Conv2d(16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
  (conv2d_3): Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2))
  (out_layer): Linear(in_features=32768, out_features=513, bias=True)
  (activation_fn): ReLU()
)

In [None]:
class SNR_loss(nn.Module):
    def __init__(self):
        super(SNR_loss , self).__init__()
        
    def forward(self , x , target):
        sum_signal = torch.sum(torch.square(x), 1)
        dif_noise = torch.sum(torch.square(x-target) , 1)
        log_base = -10*torch.log10(sum_signal / (dif_noise ))
        
        log_out = torch.sum(log_base , 0)
        
        return log_out
    
#define the model optimizer and loss
optimizer = optim.Adam(Denoise_Model.parameters() , lr=0.001)
#SNR loss function
criterion = SNR_loss()

## Model Training

In [None]:
#training the model
epoch = 350
model_train_loss = []


for i_epoch in range(epoch):
    epoch_loss = 0
    for batch_idx, (data, target) in enumerate(train_dataloader):

        data, target = data.to(device) , target.to(device)
        #Variables in Pytorch are differenciable. 
        data, target = Variable(data), Variable(target)
        #This will zero out the gradients for this batch. 
        optimizer.zero_grad()
        output = Denoise_Model(data)
        # Calculate the loss The negative log likelihood loss. It is useful to train a classification problem with C classes.
        loss =criterion(output, target)
        #dloss/dx for every Variable 
        loss.backward()
        #to do a one-step update on our parameter.
        optimizer.step()
        epoch_loss += loss.detach().to('cpu').item()
        #Print out the loss periodically. 
        if batch_idx % 100 == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                i_epoch, batch_idx * len(data), len(train_dataloader.dataset),
                100. * batch_idx / len(train_dataloader), loss.detach().item()))







In [None]:
tn , sr = librosa.load("../input/denoise-data/test_x_01.wav" , sr=None)
X = librosa.stft(tn , n_fft=1024 , hop_length=512)

T_abs = np.abs(X)
T_in = np.swapaxes(T_abs , 0 , 1)

data_in_test =[]
i_test = deque(maxlen=20)

i_test.extend( np.zeros(513 , dtype=np.float32) for i in range(20))  

for t_in in T_in:

    i_test.append(t_in)

    data_in_test.append(np.array(i_test))

In [None]:
data_in_test = np.array(data_in_test)[:,np.newaxis,:,:]

In [None]:
T_in_tensor = torch.tensor(data_in_test , dtype=torch.float32)

#inference the model
T_out_tensor = Denoise_Model(T_in_tensor.to(device))

T_out = T_out_tensor.detach().to("cpu").numpy()
T_out = np.swapaxes(T_out , 0 , 1)

In [None]:
#obtain the pahse information from the signal
T_phase = X / T_abs

#do Hadamard product
S_hat = np.multiply(T_phase,T_out)

#create the output sound file from the test signal stft
import soundfile as sf

iStftMat = librosa.istft(S_hat, hop_length=512)

sf.write("testOut_2d.wav", iStftMat , sr)

## Play audio

In [None]:
import IPython
IPython.display.Audio("testOut_2d.wav")