In [1]:
import os
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader


# Path to folder with test data
There must be to directories inside: "clean" and "noisy"

In [2]:
test_path = 'data/val/val/'

### Cuda device checking

In [3]:
device = ''
if torch.cuda.is_available():  
    device = "cuda:0" 
else:  
    device = "cpu"
print(f'Your device is "{device}"')

Your device is "cuda:0"


### Form a list of directories which contain train or val samples

In [4]:
def get_clean_data_paths(path: str):
    """
    input: path to train or val dir with 'clean' and 'noise' dirs
    output: list of paths to clean data
    """
    clean = 'clean/'
    list_clean = []
    with os.scandir(path+clean) as entries:
        for entry in entries:
            if entry.is_dir():
                list_clean.append(entry.path)
    return sorted(list_clean)

def check_data_paths(paths_list, show_cnt = 10):
    """
    outputs number of input list elements and their values
    """
    print(f'input list length = {len(paths_list)}')
    for i, elem in enumerate(paths_list):
        print(elem)
        if i==show_cnt:
            break
    pass

### Load, transpose and check data

In [5]:
def get_data_from_clean_data_paths(clean_data_paths: list):
    """
    Loading numpy arrays situated on input list paths.
    input: paths to clean data
    output: loaded clean and noisy data to lists of np.ndarrays    
    """
    clean_data_list = []
    noisy_data_list = []

    #scans every dir path
    for path in clean_data_paths:
        #scans every file in current dir
        with os.scandir(path) as entries:
            for entry in entries:
                if entry.is_file():
                    clean_path = entry.path
                    noisy_path = entry.path.replace('clean', 'noisy',1)

                    clean_data_list.append(np.load(clean_path).T)
                    noisy_data_list.append(np.load(noisy_path).T)
    return clean_data_list, noisy_data_list

def check_loaded_data(data1_list, data2_list, check_part=0.1, check_shapes=True, print_shapes=False, check_data=False):
    """
    Help checking loaded data: if data from 2 lists has same shape and doesn't contain nans, zeros only and so on
    """
    
    cnt2check = int(max(1, len(data1_list)*check_part))
    samples2check = np.random.choice(range(len(data1_list)), cnt2check, replace=False)
    
    different_shape = False
    for index in samples2check:
        if data1_list[index].shape != data2_list[index].shape:
            different_shape = True
            print(f'Difference in shape is detected!'
                 f'\t{data1_list[index].shape} vs {data2_list[index].shape}')
            break
        if print_shapes:
            print(data1_list[index].shape)
    
    if check_data:
        for index in samples2check:
            print(f'mean1 = {np.mean(data1_list[index]):5.3}, std1 = {np.std(data1_list[index]):5.3}\t'
                  f'mean2 = {np.mean(data2_list[index]):5.3}, std2 = {np.std(data2_list[index]):5.3}')
        
def get_data_statistics(data):
    """
    Prints some data statistics and plots histogram of data lengths
    """
    dataLens = []
    for d in data:
        dataLens.append(d.shape[1])
    dataLens = np.array(dataLens)
    maxLen = np.max(dataLens)
    minLen = np.min(dataLens)
    meanLen = np.mean(dataLens)
    stdLen = np.std(dataLens)
    print(f'Data length params: min={minLen}, max={maxLen}, mean={meanLen}, std={stdLen}')
    plt.figure(figsize=(10,10))
    sns.histplot(data=dataLens)

### We will use PyTorch functionality for batch construction:
  - custom Dataset class to store data with different sizes,
  - customized collate_fn to form batches of different shape elements in DataLoader.

In [6]:
class DetectionDS(Dataset):
    """
    Dataset class.
    Maintains
        data - clean and noisy data
        targets - target values for classification (0 - clean, 1 - noisy)
    """
    def __init__(self, clean, noisy):
        #initialize class object
        
        #concatenate clean and noisy data lists
        self.data = clean + noisy
        #generate corresponding targets
        self.targets = [0 for i in range(len(clean))] + [1 for i in range(len(noisy))]
        
    def __len__(self):
        #standard interface function for Dataset
        if self.data != None:
            return len(self.data)
        
    def __getitem__(self, idx):
        #standard interface function for Dataset
        if torch.is_tensor(idx):
            idx = idx.tolist()
        sample = [self.data[idx], self.targets[idx]] #{'mel_data': self.data[idx], 'target': self.targets[idx]}
        return sample

In [7]:
def collate_function(batch):
    """
    A customization of default PyTorch collate_fn function
    Forms batch from items of different size:
        looks for min item length and then randomly 
        cuts parts of min length from each item
    """
    #extract data from input batch
    data = [item[0] for item in batch]
    targets = [item[1] for item in batch]
    
    min_len = 100000
    for elem in data:
        min_len = min(min_len, elem.shape[1])
    
    #slicing data to size of minimum element
    data = [random_cut(elem, min_len) for elem in data]
    
    return [torch.tensor(data).float(), torch.tensor(targets).float()]
        
    
def random_cut(array, slice_len):
    """
    Help function to randomly cut array through slicing
    """
    start_cut_range = array.shape[1] - slice_len
    random_start = np.random.randint(0, start_cut_range+1)

    return array[:, random_start: random_start+slice_len]    

In [8]:
def get_DataLoader(clean_data_list, noisy_data_list, batch_size):
    """
    Returns dataloaders for further training and evaluating
    Input:
        clean_data_list - list of numpy arrays with clean data
        noisy_data_list - list of numpy arrays with noisy data
        batch_size - defines batch size
    """
    dataset = DetectionDS(clean_data_list, noisy_data_list)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, 
                            collate_fn=collate_function, pin_memory=True)
    return dataloader

### Model definition
For noise classification problem we will use 10-convlayer VGG-like CNN with BatchNorm layers.

In [9]:
class BinaryNoiseClassification(nn.Module):
    """
    Model definition.
    Layers summary:
        There are 5 stages:
            1-3 stages consist of 2 Conv1d layers with ReLU -> MaxPool -> BatchNorm.
            4th stage consists of 2 Conv1d layers with ReLU -> BatchNorm -> GlobalMaxPool
            5th stage consists of 2 Fully-connected layers
    """
    def __init__(self):
        super(BinaryNoiseClassification, self).__init__()
        #VGG like architecture
        self.Conv1_1 = nn.Conv1d(in_channels=80, out_channels=64, kernel_size=3, padding=1)
        self.Conv1_2 = nn.Conv1d(in_channels=64, out_channels=64, kernel_size=3, padding=1)
        self.MaxPool1 = nn.MaxPool1d(kernel_size=2, stride=2)
        self.BatchNorm1 = nn.BatchNorm1d(64)
        
        self.Conv2_1 = nn.Conv1d(in_channels=64, out_channels=128, kernel_size=3, padding=1)
        self.Conv2_2 = nn.Conv1d(in_channels=128, out_channels=128, kernel_size=3, padding=1)
        self.MaxPool2 = nn.MaxPool1d(kernel_size=2, stride=2)
        self.BatchNorm2 = nn.BatchNorm1d(128)
        
        self.Conv3_1 = nn.Conv1d(in_channels=128, out_channels=256, kernel_size=3, padding=1)
        self.Conv3_2 = nn.Conv1d(in_channels=256, out_channels=256, kernel_size=3, padding=1)
        self.MaxPool3 = nn.MaxPool1d(kernel_size=2, stride=2)
        self.BatchNorm3 = nn.BatchNorm1d(256)
        
        self.Conv4_1 = nn.Conv1d(in_channels=256, out_channels=512, kernel_size=3, padding=1)
        self.Conv4_2 = nn.Conv1d(in_channels=512, out_channels=512, kernel_size=3, padding=1)
        self.BatchNorm4 = nn.BatchNorm1d(512)
        self.GloMaxPool1 = nn.AdaptiveMaxPool1d(output_size=1)
        
        self.Linear1 = nn.Linear(in_features=512, out_features=10)
        self.Linear2 = nn.Linear(in_features=10, out_features=1)
        
    def forward(self, inputs):
        #1st stage
        x = F.relu(self.Conv1_1(inputs))
        x = self.MaxPool1(F.relu(self.Conv1_2(x)))
        x = self.BatchNorm1(x)
        #2nd stage
        x = F.relu(self.Conv2_1(x))
        x = self.MaxPool2(F.relu(self.Conv2_2(x)))
        x = self.BatchNorm2(x)
        #3rd stage
        x = F.relu(self.Conv3_1(x))
        x = self.MaxPool3(F.relu(self.Conv3_2(x)))
        x = self.BatchNorm3(x)
        #4th stage
        x = F.relu(self.Conv4_1(x))
        x = self.GloMaxPool1(F.relu(self.Conv4_2(x)))
        x = self.BatchNorm4(x)
        #full conv stage
        x = torch.flatten(x, start_dim=1)
        x = F.relu(self.Linear1(x))
        x = self.Linear2(x)
        
        return x

### Auxiliary functions for evaluating:

In [10]:
def Evaluate(model, dataloader, device):
    """
    Computes output for data in dataloader and returns loss and accuracy.
    Input: 
        model - trained model, 
        dataloader - DataLoader with evaluating data,
        device - chosen device
    """
    model.eval()
    predictions = None
    targets = None
    for batch_number, data in enumerate(dataloader):
        samples, labels = data[0], data[1].unsqueeze_(1)
        samples, labels = samples.to(device) , labels.to(device)

        if predictions is None:
            predictions = model(samples).cpu().detach().numpy()
            targets = labels.cpu().detach().numpy()
        else:        
            predictions = np.concatenate((predictions, model(samples).cpu().detach().numpy()), axis=0)
            targets = np.concatenate((targets, labels.cpu().detach().numpy()), axis=0)
    
    accuracy = binary_acc(predictions, targets)
    
    return accuracy

def binary_acc(y_pred, y_test):
    """
    Calculates objective metric: Accuracy
    Input: 
        y_pred - model predictions, 
        y_test - ground truth
    """
    y_test, y_pred = torch.from_numpy(y_test).float(), torch.from_numpy(y_pred).float()
    y_pred_tag = torch.round(torch.sigmoid(y_pred))

    correct_results_sum = (y_pred_tag == y_test).sum().float().item()
    acc = correct_results_sum/y_test.shape[0]
    acc = np.round(acc*100, 2)
    
    return acc

# Classifier evaluation
Loading best trained model and compute its loss and accuracy

In [11]:
loaded_classification_model = BinaryNoiseClassification()
model_state_dict = torch.load(f'classifier_epoch15.pth')
loaded_classification_model.load_state_dict(model_state_dict)
loaded_classification_model.to(device);

In [12]:
test_clean_paths_list = get_clean_data_paths(test_path)
test_clean_data, test_noisy_data = get_data_from_clean_data_paths(test_clean_paths_list[:])
test_dataloader = get_DataLoader(test_clean_data, test_noisy_data, batch_size=1)

In [13]:
test_accuracy = Evaluate(loaded_classification_model, test_dataloader, device)
print(f'Accuracy = {test_accuracy}%')

Accuracy = 98.32%
