In [None]:
import torch.nn as nn
import librosa
import numpy as np
import torch
import torch.utils.data
import matplotlib.pyplot as plt
import IPython.display as ipd
import os
from torch.autograd import Variable

In [None]:
# code used to run on COLAB

# from google.colab import drive
# drive.mount('/content/drive/')

# os.chdir('/content/drive/My Drive/Colab Notebooks')

In [None]:
training_folder = 'tr'
validation_folder = 'v'
test_folder = 'te'
file_extension = '.wav'
# Paths for varios files
train_dirty_file_path = 'numpy_files/file_names/train/train_dirty_files.npy'
train_speech_file_path = 'numpy_files/file_names/train/train_speech_files.npy'
train_noise_file_path = 'numpy_files/file_names/train/train_noise_files.npy'

train_librosa_n_list_path = 'numpy_files/librosa_data/train/train_n_list.npy'
train_librosa_s_list_path = 'numpy_files/librosa_data/train/train_s_list.npy'
train_librosa_x_list_path = 'numpy_files/librosa_data/train/train_x_list.npy'

train_librosa_complex_list_path = 'numpy_files/librosa_data/train/train_complex_list.npy'

test_dirty_file_path = 'numpy_files/file_names/test/test_dirty_files.npy'
test_librosa_x_list_path = 'numpy_files/librosa_data/test/test_x_list.npy'
test_librosa_complex_list_path = 'numpy_files/librosa_data/test/test_complex_list.npy'

ibm_path = 'numpy_files/ibm.npy'
dimension = 513
# number of files you want to denoise
file_count_to_denoise = 1200
# if this value is try it will load files from 'npy' file, the path are given above
load_existing_files = True
# number of files you want to test
test_files_count = 400 
epochs = 200
sample_rate = 16000
# if you want to save the files and overwrite existing files on the path given
save_files = False

padding_length = 200

In [None]:
# function to generate the file numbers
def get_file_number(number):
    return "0" * (4 - len(number)) + number + file_extension

In [None]:
# function to generate the file names
def get_files(folder, test=False):
    dirty_files = []
    speech_files = []
    noise_files = []
    for i in range(file_count_to_denoise):
        dirty_files.append(folder+'/' + folder + 'x'  + get_file_number(str(i)))
        speech_files.append(folder+'/' + folder +'s' + get_file_number(str(i)))
        noise_files.append(folder+ '/' + folder + 'n' + get_file_number(str(i)))

    return np.array(dirty_files), np.array(speech_files), np.array(noise_files)
 

In [None]:
def get_files_test(folder):
    dirty_files = []
    for i in range(test_files_count):
        dirty_files.append(folder+'/' + folder + 'x'  + get_file_number(str(i)))
    return np.array(dirty_files)

In [None]:
if load_existing_files:
    print('loading existing files')
    train_dirty_files = torch.load(train_dirty_file_path)
    train_speech_files = torch.load(train_speech_file_path)
    train_noise_files = torch.load(train_noise_file_path)
else:
    train_dirty_files, train_speech_files, train_noise_files = get_files(training_folder)

train_dirty_files = train_dirty_files[:file_count_to_denoise].reshape(file_count_to_denoise, 1)
train_speech_files = train_speech_files[:file_count_to_denoise].reshape(file_count_to_denoise,1)
train_noise_files = train_noise_files[:file_count_to_denoise].reshape(file_count_to_denoise,1)
  
if save_files:
    torch.save(train_dirty_files, train_dirty_file_path)
    torch.save(train_speech_files, train_speech_file_path)
    torch.save(train_noise_files, train_noise_file_path)

In [None]:
print(train_dirty_files.shape)
print(train_speech_files.shape)

In [None]:
def get_processed_lists(dirty_files, speech_files, noise_files, test=False):
    x_list = np.zeros(shape=(1,513, 200))
    x_complex = []
    s_list = np.zeros(shape=(1,513, 200))
    n_list = np.zeros(shape=(1,513, 200))
    for i in range(file_count_to_denoise):
        print("Running for tile", str(i))
        #     for dirty files
        x, srx=librosa.load(dirty_files[i][0], sr=None)
        X=librosa.stft(x, n_fft=1024, hop_length=512)
        #     for speech files

        s, srs=librosa.load(speech_files[i][0], sr=None)
        S=librosa.stft(s, n_fft=1024, hop_length=512)
        #     for noise files
        n, srn=librosa.load(noise_files[i][0], sr=None)
        N=librosa.stft(n, n_fft=1024, hop_length=512)

        print(x.shape)
        x_abs = np.abs(X)
        s_abs = np.abs(S)
        n_abs = np.abs(N)

        x_zeros_to_pad = padding_length - X.shape[1]
        s_zeros_to_pad = padding_length - S.shape[1]
        n_zeros_to_pad = padding_length - N.shape[1]

        x_abs = np.pad(x_abs,((0,0), (0, x_zeros_to_pad)), 'constant') 
        s_abs = np.pad(s_abs, ((0,0), (0, s_zeros_to_pad)), 'constant') 
        n_abs = np.pad(n_abs, ((0,0), (0, n_zeros_to_pad)), 'constant') 

        reshaped_x_abs = x_abs.reshape(1 , dimension, padding_length)
        reshaped_s_abs = s_abs.reshape(1 , dimension, padding_length)
        reshaped_n_abs = n_abs.reshape(1 , dimension, padding_length)

        x_complex.append(X)
        if i == 0:
            x_list[0] = reshaped_x_abs
            s_list[0] = reshaped_s_abs
            n_list[0] = reshaped_n_abs
        else:
            x_list = np.append(x_list, reshaped_x_abs, axis = 0)
            s_list = np.append(s_list, reshaped_s_abs, axis = 0)
            n_list = np.append(n_list,reshaped_n_abs, axis = 0)
    return s_list,n_list,x_list, x_complex

In [None]:
def get_processed_list_test(dirty_files):
    x_list = []
    x_complex = []

    for i in range(test_files_count):
        print("Running for tile", str(i))
        #     for dirty files
        x, srx=librosa.load(dirty_files[i][0], sr=None)
        X=librosa.stft(x, n_fft=1024, hop_length=512)


        x_abs = np.abs(X)

        x_list.append(x_abs)
        x_complex.append(X)

    return x_list, x_complex

In [None]:
if load_existing_files:
    train_s_list = torch.load(train_librosa_s_list_path)
    train_n_list = torch.load(train_librosa_n_list_path)
    train_x_list = torch.load(train_librosa_x_list_path)
    train_x_complex = torch.load(train_librosa_complex_list_path)
else:
    train_s_list, train_n_list, train_x_list, train_x_complex = get_processed_lists(train_dirty_files, train_speech_files, train_noise_files)

In [None]:
if save_files:
    torch.save(train_s_list, train_librosa_s_list_path)
    torch.save(train_n_list , train_librosa_n_list_path)
    torch.save(train_x_list , train_librosa_x_list_path)
    torch.save(train_x_complex, train_librosa_complex_list_path)

In [None]:
ibm = 1 * ( train_s_list > train_n_list)

In [None]:
train_s_list[0]

In [None]:
ibm[0].shape

In [None]:
print(np.max(ibm))
print(np.min(ibm))

In [None]:
BATCH = 1
train_loader = torch.utils.data.DataLoader(train_x_list, batch_size=BATCH)
test_loader = torch.utils.data.DataLoader(ibm, batch_size=BATCH)
# test_loader = torch.utils.data.DataLoader(train_s_list, batch_size=BATCH)

In [None]:
#Ref : https://nipunbatra.github.io/blog/2018/denoising.html
#Ref: https://pytorch.org/tutorials/beginner/nlp/sequence_models_tutorial.html
class LSTM(nn.Module):
    
    def __init__(self, input_size, hidden_size):
        super(LSTM, self).__init__()
        self.rnn = nn.LSTM(input_size=input_size, hidden_size=hidden_size)
        self.linear = nn.Linear(hidden_size, 513)
        self.act = torch.sigmoid
        
    def forward(self, x):
        pred, hidden = self.rnn(x, None)
        pred = self.act(self.linear(pred)).view(pred.data.shape[0], BATCH, 513)
        
        return pred
model = LSTM(513, 256).cuda()
print(model)

In [None]:
loss_function= nn.MSELoss()
para = model.parameters()
optimizer = torch.optim.Adam(params=para, lr=0.001)
loss_list = []
for i in range(epochs):
    total_loss = 0
    print("Epoc", i)
    input_iter = iter(train_loader)
    target_iter = iter(test_loader)
    file_count = 0
    while True:
        try:
            file_count += 1
        
            input_set = input_iter.next()
            target_set = target_iter.next()
            input_set  = Variable(input_set.reshape(padding_length, BATCH, dimension))
            target_set = Variable(target_set.reshape(padding_length, BATCH, dimension))
    
            input_set = input_set.to(dtype=torch.float)
            target_set = target_set.to(dtype=torch.float)
            input_set = input_set.cuda()
            target_set = target_set.cuda()

    
            network_output  = model(input_set)
            loss = loss_function(network_output , target_set)
            total_loss += loss.data.cpu().numpy()
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()   
        except StopIteration:
            print("break")
            break
     

In [None]:
# validation_dirty_files, validation_speech_files, validation_noise_files = get_files(validation_folder)
if load_existing_files:
    test_dirty_files = torch.load(test_dirty_file_path)
else:
    test_dirty_files = get_files_test(test_folder)


In [None]:
test_dirty_files = test_dirty_files.reshape(test_dirty_files.shape[0], 1)

In [None]:
len(test_dirty_files)

In [None]:
print(test_dirty_files.shape)
if save_files:
    torch.save(test_dirty_files, test_dirty_file_path)

In [None]:
test_dirty_files.shape

**--------------------------------------------------------------------------------------------------Testing Begins-----------------------------------------------------**

In [None]:
if load_existing_files:
    test_x_list = torch.load(test_librosa_x_list_path)
    test_x_complex = torch.load(test_librosa_complex_list_path)
else:
    test_x_list, test_x_complex = get_processed_list_test(test_dirty_files)
x_list = test_x_list

In [None]:
len(test_x_list)

In [None]:
if save_files:
    torch.save(test_x_list,test_librosa_x_list_path)
    torch.save(test_x_complex, test_librosa_complex_list_path)

In [None]:
x_list[0].shape

In [None]:
x_list[0].reshape(x_list[0].shape[0],x_list[0].shape[1], 1).shape

In [None]:
output_istft = []
with torch.no_grad():
    for i in range(len(x_list)):
        model_input = Variable(torch.Tensor(x_list[i].reshape(-1, BATCH, dimension))).cuda()
        prediction = model(model_input)
        m = prediction.cpu().numpy()
        m = 1 * (m >= 0.5)
        complex_x = test_x_complex[i] 
        m = m.reshape(m.shape[2], m.shape[0])
        result = np.multiply(complex_x , m)
        result_istft = librosa.istft(result, hop_length=512)
        output_istft.append(result_istft)


In [None]:

librosa.output.write_wav('test_result.wav', output_istft[3], sample_rate)

In [None]:
ipd.Audio('test_result.wav')

In [None]:
for i in range(len(output_istft)):
    librosa.output.write_wav('test'+ str(i) + '.wav', output_istft[i], sample_rate)