In [None]:
!pip install soundfile

Collecting soundfile
  Downloading https://files.pythonhosted.org/packages/eb/f2/3cbbbf3b96fb9fa91582c438b574cff3f45b29c772f94c400e2c99ef5db9/SoundFile-0.10.3.post1-py2.py3-none-any.whl
Installing collected packages: soundfile
Successfully installed soundfile-0.10.3.post1


In [None]:
import os
import tqdm
import torch
import h5py
import librosa
import random
from adamp import AdamP
from utils import *
import numpy as np
import pandas as pd
import torch.nn as nn
from scipy import stats
import soundfile as sf
from scipy.signal import spectrogram
from IPython.display import Audio
import librosa.display as display
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader


In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [None]:
sample_file = '/data/Audio/LA/ASVspoof2019_LA_train/flac/LA_T_1000137.flac'

In [None]:
# read the labels from the file: bonafide vs spoof
# labels: 1 = true sample, 0 = spoofed sample
filename = 'ASVspoof2019.LA.cm.train.trn.txt'
dir_path = '/data/Audio/LA/ASVspoof2019_LA_cm_protocols'
train_label_dict = {}
with open(os.path.join(dir_path, filename)) as f:
    for line in f:
        toks = line.split()
        sample = toks[1]
        label = 1 if toks[-1] == 'bonafide' else 0
        train_label_dict[sample] = label
print(f'There are {len(train_label_dict)} entries')

In [None]:
# labels: 1 = true sample, 0 = spoofed sample
filename = 'ASVspoof2019.LA.cm.eval.trl.txt'
dir_path = '/data/Audio/LA/ASVspoof2019_LA_cm_protocols'
eval_label_dict = {}
with open(os.path.join(dir_path, filename)) as f:
    for line in f:
        toks = line.split()
        sample = toks[1]
        label = 1 if toks[-1] == 'bonafide' else 0
        eval_label_dict[sample] = label
print(f'There are {len(eval_label_dict)} entries')

In [None]:
def pre_emp(x):
    '''
    Apply pre-emphasis to given utterance.
    x: list or 1 dimensional numpy.ndarray
    '''
    return np.asarray(x[1:] - 0.97 * x[:-1], dtype=np.float32)

In [None]:
def preprocess_and_save_audio(dirpath, label_dict, recompute=False, max_files=None):
    '''
    Load the audio flac files, trim the audio file (decibel level) and
    compute the Mel spectrogram for the audio.
    '''
    filenames = os.listdir(os.path.join(dirpath, 'flac'))
    if not os.path.isfile(os.path.join(dirpath, 'preproc', 'preproc.npy')) or recompute:
        precproc_list = []
        num_removed = 0
        num_processed = 0
        for filename in tqdm.notebook.tqdm(filenames):
            audio_array, sample_rate = librosa.load(os.path.join(dirpath, 'flac', filename), sr=16000)
            trim_audio_array, index = librosa.effects.trim(audio_array)
            mel_spec_array = librosa.feature.melspectrogram(y=trim_audio_array, sr=sample_rate,
                                                            n_mels=num_mels).T
            label_name = filename.split('.')[0]
            try:
                label = label_dict[label_name]
                precproc_list.append((mel_spec_array, label))
                num_processed += 1
            except KeyError:
                num_removed += 1
                print(f'Removed: {num_removed} removed', end='\r')
                os.remove(os.path.join(dirpath, 'flac', filename))
            # limit 
            if max_files != None and num_processed > max_files:
                break
        if not os.path.isdir(os.path.join(dirpath, 'preproc')):
            os.mkdir(os.path.join(dirpath, 'preproc'))
        np.save(os.path.join(dirpath, 'preproc', 'preproc.npy'), precproc_list)
    else:
        print("Preprocessing already done!")

In [None]:
class AudioDS(Dataset):
    def __init__(self, x_arr, y_arr, stime, train=True):
        # record indices of the spoofs & bonafide samples
        # Note: in training, spoofs vastly outnumber bonafides
        y_spoof_indices = [i for i, v in enumerate(y_arr) if v == 0]
        y_bonafide_indices = [i for i, v in enumerate(y_arr) if v == 1]
        assert(len(y_bonafide_indices) < len(y_spoof_indices))
        
        # create the indices that will comprise this DS
        random.shuffle(y_spoof_indices)
        random.shuffle(y_bonafide_indices)
        
        # ds indices
        if train:
            num_each = min(len(y_bonafide_indices),len(y_spoof_indices))
            ds_indices = y_spoof_indices[:num_each] + y_bonafide_indices[:num_each]
        else:
            ds_indices = y_spoof_indices + y_bonafide_indices
            
        random.shuffle(ds_indices)
        
        # collect the samples
        x = [x_arr[i] for i in ds_indices]
        y = [y_arr[i] for i in ds_indices]
        
        self.x = x.copy()
        self.y = y.copy()
        self.stime = stime
        
    def __len__(self):
        return len(self.x)
    
    def __getitem__(self, idx):
        X = self.x[idx]
        y = self.y[idx]
        stime = X.shape[0]
        # sample is of shorter duration than minimum
        if stime == self.stime:
            X = X.reshape(1, stime, num_mels)
        elif stime < self.stime:
            x_dup = int(self.stime / stime) + 1
            X = np.tile(X, (1, x_dup, 1))[:, :self.stime, :]
        else:
            start_idx = np.random.randint(low = 0, high = stime - self.stime)
            X = X.reshape(1, stime, num_mels)
            X = X[:, start_idx:start_idx+self.stime, :]
        # return the sample
        x = torch.from_numpy(X)
        y = torch.tensor(y)
        return x, y

In [None]:
class Dessa(torch.nn.module):

  def __init__(self, batch_size, input_dim, output_dim):
    super().__init__()
    self.linear1 = torch.nn.Linear(input_size, out_features=80)
    self.conv1d_19 = torch.nn.Conv1D(80, 16)
    self.conv1d_20 = torch.nn.Conv1D(80, 16)
    self.conv1d_21 = torch.nn.Conv1D(80, 16)
    self.leaky_re_lu_20 = torch.nn.LeakyReLU()
    self.leaky_re_lu_21 = torch.nn.LeakyReLU()
    self.leaky_re_lu_22 = torch.nn.LeakyReLU()
    self.spatial_dropout1d_6 = torch.nn.Dropout2d() #Slight issue with dropout mask used as compared to spatial droput - worth looking into later with Vijay.
    self.spatial_dropout1d_7 = torch.nn.Dropout2d()
    self.spatial_dropout1d_8 = torch.nn.Dropout2d()
    self.conv1d_22 = torch.nnConv1D(16, 32)
    self.conv1d_23 = torch.nnConv1D(16, 32)
    self.conv1d_24 = torch.nnConv1D(16, 32)
    self.leaky_re_lu_23 = torch.nn.LeakyReLU()
    self.leaky_re_lu_24 = torch.nn.LeakyReLU()
    self.leaky_re_lu_25 = torch.nn.LeakyReLU()
    self.lambda_4 = torch.nn.Identity()
    self.lambda_5 = torch.nn.Identity()
    self.lambda_6 = torch.nn.Identity()

    #Write own concatenate layer
    self.concatenate_1 = self.concatenate_layer()

    self.dense_3 = torch.nn.Linear(96, 10)
    self.batch_norm_2 = torch.nn.BatchNorm1d()
    self.leaky_re_lu_26 = torch.nn.LeakyReLU()
    self.dropout_1 = torch.nn.Dropout()
    self.dense_4 = torch.nn.Linear(10, 10)
    self.batch_norm_3 = torch.nn.BatchNorm1d()
    self.leaky_re_lu_27 = torch.nn.LeakyReLU()
    self.dropout_2 = torch.nn.Dropout()
    self.dense_5 = torch.nn.Linear(10, 10)
    self.activation = torch.nn.Sigmoid()

  def concatenate_layer(self):
    #HOF
    def concatenate(tensor_array):
      return torch.cat([tensor_array], dim=0)
    return concatenate


In [None]:
for epoch in range(num_epochs):
    
    ds = AudioDS(xtrain, ytrain, stime=sample_ts)
    dl = DataLoader(ds, batch_size=8, shuffle=True)
    
    epoch_loss = []
    
    net.train()
    
    with torch.set_grad_enabled(True):
        for m_batch, m_label in tqdm.notebook.tqdm(dl):
            m_batch, m_label = m_batch.to(device), m_label.to(device)

            optimizer.zero_grad()
            
            code, output = net(m_batch)
            
            loss = criterion(output, m_label)

            epoch_loss.append(loss.item())
            
            loss.backward()
            optimizer.step()
            
        eloss = np.array(epoch_loss)
        print(f'Epoch[{epoch}]: epoch train loss: {np.mean(eloss):.3f}')
        del dl
        del ds
    
    scheduler.step(np.mean(eloss))
    
    net.eval()
    val_loss = []
    
    with torch.set_grad_enabled(False):
        # create dataset, dataloader - randomized, but balanced
        ds = AudioDS(xeval, yeval, stime=sample_ts, train=False)
        dl = DataLoader(ds, batch_size=8, shuffle=True)
    
        total_samples = 0
        total_corrects = 0
        epoch_loss = []

        for m_batch, m_label in tqdm.notebook.tqdm(dl):
            m_batch, m_label = m_batch.to(device), m_label.to(device)

            _, output = net(m_batch)
            
            loss = criterion(output, m_label)
            
            val_loss.append(loss.item())
            
            _, preds = torch.max(output, 1)
            output = output.detach().cpu()
            
            total_corrects += torch.sum(preds.cpu() == m_label.cpu()).item()
            total_samples += len(m_label)
        
        acc = float(total_corrects*100.0/total_samples)
        print(f'Epoch[{epoch}]: epoch accuracy: {acc:.2f}')
        
        eloss = np.array(val_loss)
        print(f'Epoch[{epoch}]: epoch validation loss: {np.mean(eloss):.3f}')
        