##Connecting Google drive

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
import zipfile
audio = zipfile.ZipFile('/content/gdrive/MyDrive/data.zip', 'r')
audio.extractall('/content')

##Meta creation

In [None]:
import pandas as pd
from pathlib import Path
import os
from tqdm.notebook import tqdm

data_folder = Path('/content/val') # specify train or validation folder
df = []

for path, _, files in tqdm(os.walk(data_folder)):
    for file in files:
      
      folder = path.split('/')[-3]
      type_ = path.split('/')[-2]
      id = path.split('/')[-1]
      df.append([folder, type_, id, file])

df = pd.DataFrame(df, columns = ['folder', 'type', 'id', 'path'])
df.to_csv('meta_val.csv')

0it [00:00, ?it/s]

##Meta Demonstration

In [None]:
import pandas as pd

demonstration = pd.read_csv('/content/meta_train.csv')
demonstration.head()

Unnamed: 0.1,Unnamed: 0,folder,type,id,path
0,0,train,noisy,6625,6625_70995_6625-70995-0008.npy
1,1,train,noisy,6625,6625_70995_6625-70995-0004.npy
2,2,train,noisy,6625,6625_39674_6625-39674-0024.npy
3,3,train,noisy,6625,6625_39680_6625-39680-0034.npy
4,4,train,noisy,6625,6625_70995_6625-70995-0006.npy


##Sound Dataset

In [None]:
from torch.utils.data import Dataset
import numpy as np
import os
from tqdm.notebook import tqdm

class SoundDataset(Dataset):

    def __init__(self, meta, source_folder, transforms):

        self.source_folder = source_folder
        self.transforms = transforms
        self.meta = meta
        self.files = self.meta.path.unique()

    def __len__(self):
        return len(self.files)

    def __getitem__(self, i):

        sound = self.files[i]
        sound_df = self.meta.loc[self.meta.path == sound]
        
        noisy_df = sound_df.loc[sound_df.type == 'noisy'].iloc[0]
        clean_df = sound_df.loc[sound_df.type == 'clean'].iloc[0]

        noisy_path = os.path.join(self.source_folder, noisy_df.get('folder'), noisy_df.get('type'), str(noisy_df.get('id')), sound)
        clean_path = os.path.join(self.source_folder, clean_df.get('folder'), clean_df.get('type'), str(clean_df.get('id')), sound)
        
        noisy = np.expand_dims(np.load(noisy_path).astype(float), 2)
        clean = np.expand_dims(np.load(clean_path).astype(float), 2)

        augmented = self.transforms(image=noisy, mask=clean)

        return {'noisy': augmented['image'][0].unsqueeze(0).float(), 'clean': augmented['mask'][:, :, 0].unsqueeze(0).float()}
        

##Separation of meta: Training (80%), validation (20%)

In [None]:
df = pd.read_csv('/content/meta_train.csv', index_col=0) # Specify mata train

df = df.sort_values(by=['id'], ignore_index=True)

train_size = int(0.8 * df.shape[0])
val_size = df.shape[0] - train_size

border_id = df['id'][train_size]

train_df = df[df['id'] < border_id]
val_df = df[df['id'] >= border_id]

In [None]:
!pip install pytorch_lightning
!pip install segmentation-models-pytorch

##Train

In [None]:
import pytorch_lightning as pl
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import segmentation_models_pytorch as smp
import albumentations as albu
from albumentations.pytorch import ToTensorV2

class LightningLearner(pl.LightningModule):

    def __init__(self, train_df, val_df, source_folder):
        super().__init__()
        
        self.unet = smp.Unet(encoder_name='resnet18', in_channels=1)
        self.loss = {'MSE': nn.MSELoss()}
        self.transforms = albu.Compose([albu.PadIfNeeded(480, 80),
                          albu.RandomCrop(480, 80),
                          albu.Resize(576, 96),
                          ToTensorV2()])
        self.trainset = SoundDataset(train_df, source_folder, self.transforms)
        self.validset = SoundDataset(val_df, source_folder, self.transforms)

    def forward(self, x):
        return self.unet(x)

    def _calc_loss(self, batch):
        noisy, gt_clean = batch['noisy'], batch['clean']
        pr_clean = self(noisy)
        loss = self.loss['MSE'](pr_clean, gt_clean)
        return loss

    def training_step(self, batch, batch_nb):
        loss = self._calc_loss(batch)
        self.log('train_loss', loss, prog_bar=False)
        return loss

    def validation_step(self, batch, batch_idx):
        loss = self._calc_loss(batch)
        self.log('valid_loss', loss, prog_bar=False)
        return loss

    def validation_epoch_end(self, outputs):
        loss_value = torch.stack([x for x in outputs]).mean()
        self.log('valid_loss', loss_value, prog_bar=True)

    def configure_optimizers(self):
        opt = optim.Adam(self.unet.parameters(), lr=0.0001)
        sch = optim.lr_scheduler.CosineAnnealingLR(opt, T_max=10)
        return [opt], [sch]

    def train_dataloader(self):
        return DataLoader(self.trainset, batch_size=32,
                          num_workers=1, shuffle=True)

    def val_dataloader(self):
        return DataLoader(self.validset, batch_size=32,
                          num_workers=1, shuffle=False)

In [None]:
model = LightningLearner(train_df, val_df, '/content/data') # Specify data folder

trainer = pl.Trainer(
    gpus=1,
    logger=False,
    max_epochs=50,
    precision=32,
)

trainer.fit(model)

  f"Setting `Trainer(gpus={gpus!r})` is deprecated in v1.7 and will be removed"
INFO:pytorch_lightning.utilities.rank_zero:GPU available: True (cuda), used: True
INFO:pytorch_lightning.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO:pytorch_lightning.utilities.rank_zero:IPU available: False, using: 0 IPUs
INFO:pytorch_lightning.utilities.rank_zero:HPU available: False, using: 0 HPUs
  rank_zero_warn(f"Checkpoint directory {dirpath} exists and is not empty.")
INFO:pytorch_lightning.accelerators.cuda:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
INFO:pytorch_lightning.callbacks.model_summary:
  | Name | Type | Params
------------------------------
0 | unet | Unet | 14.3 M
------------------------------
14.3 M    Trainable params
0         Non-trainable params
14.3 M    Total params
57.288    Total estimated model params size (MB)


Sanity Checking: 0it [00:00, ?it/s]

Training: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

INFO:pytorch_lightning.utilities.rank_zero:`Trainer.fit` stopped: `max_epochs=50` reached.


##Save weights

In [None]:
import torch

torch.save(model.state_dict(), '/content/gdrive/MyDrive/unet_11.pth')

##Test

In [None]:
!pip install pytorch_lightning
!pip install segmentation-models-pytorch
!pip install torchmetrics

In [5]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [6]:
import zipfile
audio = zipfile.ZipFile('/content/gdrive/MyDrive/data.zip', 'r')
audio.extractall('/content')

In [10]:
import pytorch_lightning as pl
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import segmentation_models_pytorch as smp
import albumentations as albu
from albumentations.pytorch import ToTensorV2
import pandas as pd
from pathlib import Path
import os
from tqdm.notebook import tqdm
import numpy as np
from torchmetrics import MeanSquaredError

In [8]:
class SoundDataset(Dataset):

    def __init__(self, meta, source_folder, transforms):

        self.source_folder = source_folder
        self.transforms = transforms
        self.meta = meta
        self.files = self.meta.path.unique()

    def __len__(self):
        return len(self.files)

    def __getitem__(self, i):

        sound = self.files[i]
        sound_df = self.meta.loc[self.meta.path == sound]
        
        noisy_df = sound_df.loc[sound_df.type == 'noisy'].iloc[0]
        clean_df = sound_df.loc[sound_df.type == 'clean'].iloc[0]

        noisy_path = os.path.join(self.source_folder, noisy_df.get('folder'), noisy_df.get('type'), str(noisy_df.get('id')), sound)
        clean_path = os.path.join(self.source_folder, clean_df.get('folder'), clean_df.get('type'), str(clean_df.get('id')), sound)
        
        noisy = np.expand_dims(np.load(noisy_path).astype(float), 2)
        clean = np.expand_dims(np.load(clean_path).astype(float), 2)

        augmented = self.transforms(image=noisy, mask=clean)

        return {'noisy': augmented['image'][0].unsqueeze(0).float(), 'clean': augmented['mask'][:, :, 0].unsqueeze(0).float()}
        

In [12]:
class LightningPredictor(nn.Module):

    def __init__(self, test_df, source_folder):
        super().__init__()
  
        self.unet = smp.Unet(encoder_name='resnet18', in_channels=1)
        self.loss = {'MSE': MeanSquaredError()}
        self.transforms = albu.Compose([albu.PadIfNeeded(480, 80),
                                        albu.RandomCrop(480, 80),
                                        albu.Resize(576, 96),
                                        ToTensorV2()])
        self.testset = SoundDataset(test_df, source_folder, self.transforms)
        self.testloader = DataLoader(self.testset, batch_size=1, shuffle=False)

    def calculate_metrics(self):

        MSE = list()

        with torch.no_grad():
            for batch in tqdm(self.testloader):
                noisy, gt_clean = batch['noisy'], batch['clean']
                pr_clean = self.unet(noisy)
                MSE.append(self.loss['MSE'](pr_clean, gt_clean).detach().numpy())

        return np.mean(MSE)

In [13]:
test_df = pd.read_csv('/content/meta_val.csv') # Specify test meta
model = LightningPredictor(test_df, '/content/data')
model.load_state_dict(torch.load('/content/gdrive/MyDrive/unet_11.pth')) # Specify weights
model.eval()
metrics = model.calculate_metrics()

  0%|          | 0/2000 [00:00<?, ?it/s]

In [14]:
print('MSE =', metrics)

MSE = 0.027861672
