In [2]:
import pandas as pd       
import os 
import math 
import numpy as np
import matplotlib.pyplot as plt
import librosa
from pydub import AudioSegment
from pydub.silence import split_on_silence
import torch
import torch.nn as nn
import pickle
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from pathlib import Path

In [62]:
class MFCCDataset(Dataset):
    def __init__(self, root_dir):
        self.root_dir = root_dir
              
        self.mfccs = []
        self.labels = []
        
        for i in range(len(os.listdir(self.root_dir))):

            f = os.listdir(self.root_dir)[i]
            with open(self.root_dir / f, 'rb') as handle:
                entry = pickle.load(handle)
            self.mfccs.append(entry['data'])
            self.labels.extend(entry['labels'])
        
        self.mfccs = torch.from_numpy(np.vstack(self.mfccs).reshape(-1,3,20,22)).float()
        self.labels = torch.tensor(self.labels, dtype=torch.float)
    
    def __len__(self):
        
        return len(self.mfccs)

    def __getitem__(self, idx):
        
        return self.mfccs[idx], self.labels[idx]

In [49]:
original_dir = Path('C:/Users/omar_/Documents/cockatoos/data/accent_samples/recordings/recordings')
train_dir =  Path('C:/Users/omar_/Documents/cockatoos/data/train')
val_dir =  Path('C:/Users/omar_/Documents/cockatoos/data/val')
test_dir =  Path('C:/Users/omar_/Documents/cockatoos/data/test')

files = os.listdir(original_dir)


other_accent_types = ["mandarin", "japanese", "korean", "taiwanese", "cantonese", "thai", "indonesian"]

english_accent_files = []
other_accent_files = []

num_train_files = 150
num_val_files = 23
num_test_files = 25

end_idx_train = num_train_files
end_idx_val = end_idx_train + num_val_files
end_idx_test = end_idx_val + num_test_files

for f in files:
    if "english" in f:
        english_accent_files.append(f)
    
    if any(t in f for t in other_accent_types):
        other_accent_files.append(f)
        
np.random.shuffle(english_accent_files)
np.random.shuffle(other_accent_files)
        
print(f"Number of english accent files: {len(english_accent_files)}")
print(f"Number of other accent files: {len(other_accent_files)}")

train_files = english_accent_files[0:end_idx_train] + other_accent_files[0:end_idx_train]
val_files   = english_accent_files[end_idx_train:end_idx_val] + other_accent_files[end_idx_train:end_idx_val]
test_files  = english_accent_files[end_idx_val:end_idx_test] + other_accent_files[end_idx_val:end_idx_test]

print(f"Number of training files: {len(train_files)}")
print(f"Number of validaiton files: {len(val_files)}")
print(f"Number of test files: {len(test_files)}")

Number of english accent files: 579
Number of other accent files: 198
Number of training files: 300
Number of validaiton files: 46
Number of test files: 50


In [4]:
  def generate_mfcc_data(mfcc):
        mfcc_standardized = np.zeros(mfcc.shape)
        for b in range(mfcc.shape[0]):
            mfcc_slice = mfcc[b,:]
            centered = mfcc_slice - np.mean(mfcc_slice)
            if np.std(centered) != 0:
                centered_scaled = centered / np.std(centered)

            mfcc_standardized[b,:] = centered_scaled

        delta1 = librosa.feature.delta(mfcc_standardized, order=1)
        delta2 = librosa.feature.delta(mfcc_standardized, order=2)
        mfcc_data = np.stack((mfcc_standardized,delta1,delta2))
        
        return mfcc_data

In [50]:
def generate_model_data(path, files, train):
    
    counter = 0
    seg_thresh = 500
    batch_num = 1
    items = []
    labels = []
    
    for f in files:
            
        if "english" in f:
            label = 1
        else:
            label = 0
            
        sound_file = AudioSegment.from_mp3(original_dir / f)
        audio_chunks = split_on_silence(sound_file, 
            # must be silent for at least half a second
            min_silence_len = 80,

            # consider it silent if quieter than -16 dBFS
            silence_thresh=-30
        )

        
        for seg in audio_chunks:
            
            seg_len = len(seg)
    
            if seg_len >= seg_thresh:
                seg_standardized = seg[0:seg_thresh]
            else:
                seg_standardized = seg + AudioSegment.silent(duration=(seg_thresh - seg_len))
                
                
            samples = seg_standardized.get_array_of_samples()
            arr = np.array(samples).astype(np.float32)/32768 # 16 bit 
            arr = librosa.core.resample(arr, seg_standardized.frame_rate, 22050, res_type='kaiser_best') 
                
            mfcc = librosa.feature.mfcc(y=arr, sr=22050)
            data = generate_mfcc_data(mfcc)
            items.append(data)
            labels.append(label)
            
            if train:
                noise = np.random.normal(0,1, mfcc.shape)
                mfcc_noisy = mfcc + noise
                noisy_data = generate_mfcc_data(mfcc_noisy)
                items.append(noisy_data)
                labels.append(label)
        
              
    max_batch_size = len(labels) // 5
    for j in range(0,len(items),max_batch_size):
        curr_data = items[j:j + max_batch_size]
        curr_labels = labels[j:j + max_batch_size]
        batch_mfcc = np.vstack(curr_data).reshape(-1,3,20,22)
        entry = dict()
        entry['data'] = batch_mfcc
        entry['labels'] = curr_labels
        with open(path / f'data_batch_{batch_num}.pickle', 'wb') as handle:
            pickle.dump(entry, handle, protocol=pickle.HIGHEST_PROTOCOL)
        batch_num += 1
            

In [51]:
#Uncomment to create the files for the dataset folders

generate_model_data(train_dir, train_files, True)
print("Training words created")
generate_model_data(val_dir, val_files, False)
print("Validaiton words created")
generate_model_data(test_dir, test_files, False)
print("Testing words created")

Training words created
Validaiton words created
Testing words created


In [52]:
train_data_dir = Path('C:/Users/omar_/Documents/cockatoos/data/train')
val_data_dir = Path('C:/Users/omar_/Documents/cockatoos/data/val')
test_data_dir = Path('C:/Users/omar_/Documents/cockatoos/data/test')

train_data = MFCCDataset(train_data_dir)
val_data = MFCCDataset(val_data_dir)
# test_data = MFCCDataset(test_data_dir)

In [57]:
print(train_data[0][0].shape)

torch.Size([3, 20, 22])


In [54]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [55]:
#Model definition
model = nn.Sequential(
            nn.Conv2d(3,32,3),
            nn.ReLU(),
            nn.BatchNorm2d(32),
            nn.MaxPool2d(2),
            nn.Conv2d(32,64,3),
            nn.ReLU(),
            nn.BatchNorm2d(64),
            nn.MaxPool2d(2),
            nn.Dropout(0.5),
            nn.Flatten(1,3),
            nn.Linear(768,128),
            nn.Dropout(0.5),
            nn.Linear(128,1),
            nn.Softmax(dim=0)
        ).to(device)

In [26]:
print(model)

Sequential(
  (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1))
  (1): ReLU()
  (2): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (4): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1))
  (5): ReLU()
  (6): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (7): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (8): Dropout(p=0.5, inplace=False)
  (9): Flatten(start_dim=1, end_dim=3)
  (10): Linear(in_features=768, out_features=128, bias=True)
  (11): Dropout(p=0.5, inplace=False)
  (12): Linear(in_features=128, out_features=2, bias=True)
  (13): Softmax(dim=0)
)


In [56]:
with torch.no_grad():
    torch.cuda.empty_cache()

In [None]:
train_loader = DataLoader(train_data,batch_size=32,shuffle=True)
val_loader = DataLoader(train_data,batch_size=32)
optimizer = optim.Adam(model.parameters(), lr=0.001)
epochs = 100

for epoch in range(epochs):
    
    running_loss = 0
    
    for i, (inputs, labels) in enumerate(train_loader):
        optimizer.zero_grad()
        outputs = model(inputs.to(device))
        loss = nn.BCELoss()(outputs,labels.to(device).reshape(-1,1))
        running_loss += loss.item()
        loss.backward()
        optimizer.step()
        
    if epoch % 10 == 0:
        with torch.no_grad():
            val_loss = 0
            for j, (d,l) in enumerate(val_loader):
                o = model(d.to(device))
                loss = nn.BCELoss()(o,l.to(device).reshape(-1,1))
                val_loss += loss.item()
            print(f"Validation loss for epoch {epoch}: {val_loss / len(val_loader)}")
        
        
    print(f"Epoch:{epoch}, avg loss for epoch:{running_loss / len(train_loader)} ")