In [1]:
import torch
import torch.nn as nn
import os
import librosa
import numpy as np
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm

In [2]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [3]:
import librosa
import os
import soundfile as sf
import numpy

target =  ['ht1', 'muff']
data_dir = '/content/drive/My Drive/NMAL_project/RNN_BLACK_BOX_TEST/Data'
target_dir = '/content/drive/My Drive/NMAL_project/RNN_BLACK_BOX_TEST/Data_proc'
sub_dir = ['test', 'train', 'val']
ext = ['input', 'target']

for s in sub_dir:
    for t in target:
        for e in ext:
            s_dir = os.path.join(data_dir, s, t)
            t_dir = os.path.join(target_dir, s, t)
            if(not os.path.exists(t_dir)):
                os.makedirs(t_dir)
                audio, sr = librosa.load(s_dir+'-'+e+'.wav')
                seg_len = (int)(sr * 0.5)
                for i in range(int(len(audio)/seg_len)):
                    sf.write(os.path.join(t_dir, e)+str(i)+'.wav', numpy.array(audio[i*seg_len:(i+1)*seg_len]),sr)



In [18]:
class Audio(Dataset):
    def __init__(self, data_dir, target):
        if torch.cuda.is_available():
            self.device = 'cuda'
        else:
            self.device = 'cpu'
        self.data_dir = data_dir
        self.ext = ['input', 'target']
        self.target = target
        audio, self.sr = librosa.load(os.path.join(data_dir, 'ht1', 'input1.wav'))
        self.len = 680

    def __len__(self) -> int:
        return self.len

    def __getitem__(self, index) -> list:

        x_path = os.path.join(self.data_dir, self.target, self.ext[0]+str(index)+'.wav')
        y_path = os.path.join(self.data_dir, self.target, self.ext[1]+str(index)+'.wav')
        x, _ = librosa.load(x_path)
        y, _ = librosa.load(y_path)

        return torch.tensor(x), torch.tensor(y)

    def show(self):
        print(f'The audio length is {self.len*0.5} sec')
        print(f'Total {self.len} datapoints')

class RNN(nn.Module):
    def __init__(self, input_size, num_layer, hidden_size, batch_size):
        if torch.cuda.is_available():
            self.device = 'cuda'
        else:
            self.device = 'cpu'

        super(RNN, self).__init__()
        self.hidden_size = hidden_size
        self.num_layer = num_layer
        self.rnn = nn.LSTM(input_size, hidden_size, num_layer, batch_first=True)
        self.fc = nn.Linear(hidden_size, input_size)

    def forward(self, x):
        batch_size = x.size(0)

        h = torch.zeros(self.num_layer, batch_size, self.hidden_size, device=self.device)
        c = torch.zeros(self.num_layer, batch_size, self.hidden_size, device=self.device)
        logic, (self.h, self.c)= self.rnn(x.unsqueeze(-1), (h, c))
        out = self.fc(logic)
        return out

def train_one_epoch(model, epoch, train_dataloader, loss_fn, optimiser, device):
    model.train(True)
    total_loss = 0.0
    for x, y in tqdm(train_dataloader, desc=f'Epoch {epoch+1}', unit='batch'):
        x = x.to(device)
        y = y.to(device)
        optimiser.zero_grad()
        out = model(x)
        # print(out.size())
        loss = loss_fn(out[:, :, 0], y)
        loss.backward()
        optimiser.step()
        total_loss = total_loss+loss

    print(f'Total loss: {total_loss}')


In [19]:
class ESR(nn.Module):
    def __init__(self, weight=None, pre_filt=[1.0, 0.85]):
        super(ESR, self).__init__()
        self.weight = weight
        self.pre_filt = pre_filt
    def forward(self, input, output):
        if self.pre_filt is not None:
            print(self.pre_filt)
            output_p = torch.zeros(output.shape).to(device)
            input_p = torch.zeros(input.shape).to(device)
            for i in range(output.shape[1]):
                for j, b in enumerate(self.pre_filt):
                    output_p[:, i] = output_p[:, i]+output[:, i-j]*b
                    input_p[:, i] = input_p[:, i]+input[:, i-j]*b

            diff_p = output_p - input_p
            esr_loss = torch.sum(diff_p**2, dim=1)/torch.sum(output_p**2, dim=1)
        else:
            diff = output-input
            esr_loss = torch.sum(diff**2, dim=1)/torch.sum(output**2, dim=1)

        diff = output-input
        dc_loss = torch.sum(diff**2, dim=1)/torch.sum(output**2, dim=1)
        loss = torch.sum(esr_loss + dc_loss)
        return loss

In [20]:
data_dir = '/content/drive/My Drive/NMAL_project/RNN_BLACK_BOX_TEST/Data_proc/train'
torch.cuda.empty_cache()
train_input = Audio(data_dir, 'ht1')
train_input.show()
train_dataloader = DataLoader(train_input, batch_size=40, shuffle=True, num_workers=2)


if torch.cuda.is_available():
    device = 'cuda'
else:
    device = 'cpu'

print(device)

seq_len = int(train_input.sr * 0.5)
model = RNN(input_size=1, hidden_size=12, num_layer=1, batch_size=40).to(device)

num_epoch = 20
learning_rate = 1e-4
loss_fn = nn.CrossEntropyLoss()
loss_fn = ESR()
optimiser = torch.optim.Adam(model.parameters(), lr=learning_rate)


The audio length is 340.0 sec
Total 680 datapoints
cuda


In [None]:
for epoch in range(num_epoch):
    train_one_epoch(model, epoch, train_dataloader, loss_fn, optimiser, device)

Epoch 1:   0%|          | 0/17 [00:00<?, ?batch/s]

[1.0, 0.85]


Epoch 1:   6%|▌         | 1/17 [00:29<07:57, 29.82s/batch]

[1.0, 0.85]


In [None]:
torch.save(model.state_dict(), '/content/drive/My Drive/NMAL_project/RNN_BLACK_BOX_TEST/RNN_model_highpass_prefilt_CrossEntrophy_24.pth')