In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
import torchaudio
import torchaudio.transforms as T
import textgrids
import pandas as pd
import numpy as np
from sklearn.metrics import accuracy_score, f1_score

SAMPLE_RATE = 16000

In [3]:
class AudioUtils():

    def resample_audio(signal, org_sr):
        resampler = T.Resample(org_sr, SAMPLE_RATE)
        resampled_sig = resampler(signal)
        return resampled_sig
    
    def open(aud_fn):
        sig, sr = torchaudio.load(aud_fn)
        duration = sig.shape[1]/sr
        #print(sig.shape)
        if sr != SAMPLE_RATE:
            sig = AudioUtils.resample_audio(sig, sr)

        # Converting stereo to mono
        if sig.shape[0] == 2:
            sig = sig.mean(dim=0).unsqueeze(0)
        
        if sig.shape[1]%SAMPLE_RATE == 0:
            duration = sig.shape[1]/SAMPLE_RATE
            sig = sig.squeeze().numpy()
        else:
            sig = sig.squeeze().numpy()
            #print(sig_np.shape)
            en = (int)((np.round(duration)+1)*SAMPLE_RATE - sig.shape[0])
            sig = np.pad(sig, (0,en), mode = 'constant')
            #sig = torch.from_numpy(sig_np).unsqueeze(0)
            duration = sig.shape[0] / SAMPLE_RATE
        
        return sig, SAMPLE_RATE, duration
    
    def get_second_wise_mfcc(signal, duration):
        mfcc_list = []
        MFCC = T.MFCC(sample_rate=SAMPLE_RATE, n_mfcc=13)
        for i in range(0,(int)(duration)):
            mfcc_list.append(MFCC(signal[0][i:(i+SAMPLE_RATE)].unsqueeze(0)).squeeze())

        return mfcc_list

In [4]:
class AnnotUtils():

    def get_speech_secs(fname):
        grid = textgrids.TextGrid(fname)
        speech_secs = []
        for i in grid['silences']:
            if i.text == '1':
                #speech_secs.append([(np.round(i.xmin, decimals=2)), (np.round(i.xmax, decimals=2))])
                speech_secs.append([(int)(np.round(i.xmin, decimals=2)), (int)(np.round(i.xmax, decimals=2))])
        return speech_secs
    

    def get_labs_for_secs(speech_secs, duration):
        
        labels = [i*0 for i in range(0, (int)(duration))]

        for i in range(0,len(speech_secs)):

            if speech_secs[i][0] == speech_secs[i][1]:
                labels[speech_secs[i][0]-1] = 1
            else:
                for j in range(speech_secs[i][0], speech_secs[i][1]):
                    
                    labels[j-1] = 1
                    labels[j] = 1

        return labels

In [6]:
df2 = pd.read_csv('secWise_labs.csv')

In [49]:
MFCC = T.MFCC(sample_rate=SAMPLE_RATE, n_mfcc=13)

class VAD_Dataset(Dataset):
    def __init__(self, df) -> None:
        #super().__init__()
        self.df = df
        self.sr = 16000

    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, index):

        aud_part = self.df.loc[index][0]
        labels = self.df.loc[index][1]
        
        if aud_part[-2] == '_':
            n_sec = (int)(aud_part[-1])
            aud_path = aud_part[:-2]+'.wav'
        elif aud_part[-3] == '_':
            n_sec = (int)(aud_part[-2:])
            aud_path = aud_part[:-3]+'.wav'
        #print(n_sec)
        signal00, sam00, dur00 = AudioUtils.open(aud_path)
        sig00 = torch.from_numpy(signal00[n_sec*SAMPLE_RATE:((n_sec+1)*SAMPLE_RATE)]).unsqueeze(0)
        #print(sig00.shape)
        #labels = torch.Tensor(labels).to(dtype=torch.long)
        #mfcc_list = AudioUtils.get_second_wise_mfcc(sig00, dur00)
        mfcc_tens = MFCC(sig00).to(dtype=torch.float)
        #labels = AnnotUtils.get_labs_for_secs(AnnotUtils.get_speech_secs(annot_path), dur)
        
        #mfcc_tens = torch.flatten(mfcc_tens)
        
        return mfcc_tens, labels



In [50]:
dataset = VAD_Dataset(df=df2)

In [51]:
num_items = len(dataset)
num_train = round(num_items * 0.8)
num_val = num_items - num_train
train_ds, val_ds = random_split(dataset, [num_train, num_val])

In [52]:
train_loader = DataLoader(train_ds, batch_size=32, shuffle=True)
test_loader = DataLoader(val_ds, batch_size=32, shuffle=True)

In [53]:
for inputs, labels in test_loader:
    print(inputs.shape, labels.shape)

torch.Size([32, 1, 13, 81]) torch.Size([32])
torch.Size([32, 1, 13, 81]) torch.Size([32])
torch.Size([32, 1, 13, 81]) torch.Size([32])
torch.Size([32, 1, 13, 81]) torch.Size([32])
torch.Size([32, 1, 13, 81]) torch.Size([32])
torch.Size([32, 1, 13, 81]) torch.Size([32])
torch.Size([32, 1, 13, 81]) torch.Size([32])
torch.Size([32, 1, 13, 81]) torch.Size([32])
torch.Size([32, 1, 13, 81]) torch.Size([32])
torch.Size([32, 1, 13, 81]) torch.Size([32])
torch.Size([32, 1, 13, 81]) torch.Size([32])
torch.Size([32, 1, 13, 81]) torch.Size([32])
torch.Size([32, 1, 13, 81]) torch.Size([32])
torch.Size([32, 1, 13, 81]) torch.Size([32])
torch.Size([32, 1, 13, 81]) torch.Size([32])
torch.Size([32, 1, 13, 81]) torch.Size([32])
torch.Size([32, 1, 13, 81]) torch.Size([32])
torch.Size([32, 1, 13, 81]) torch.Size([32])
torch.Size([32, 1, 13, 81]) torch.Size([32])
torch.Size([32, 1, 13, 81]) torch.Size([32])
torch.Size([32, 1, 13, 81]) torch.Size([32])
torch.Size([32, 1, 13, 81]) torch.Size([32])
torch.Size

In [44]:
def acc_fn(lab, pred):
    pred = torch.round(pred)
    correct = (lab == pred).float()
    return correct.mean().item()

In [60]:
sampl = torch.randn(32,1,13,81)

l1 = nn.Conv2d(1,13,kernel_size=(3,3), padding=(1,1))
relu = nn.ReLU()
maxp1 = nn.MaxPool2d(kernel_size=(2,2))

# --------------------------------- #

l2 = nn.Conv2d(13, 26, kernel_size=(3,3), padding=(1,1))
maxp2 = nn.MaxPool2d(kernel_size=(3,3))

linear = nn.Linear(676, 1)
sigmoid = nn.Sigmoid()

x = maxp1(relu(l1(sampl)))
x = maxp2(relu(l2(x)))
print(x.shape)
x = x.view(x.size(0), -1)
#x = sigmoid(linear(x)).squeeze()

# (torch.Size([32, 13, 4, 27]), torch.Size([32, 1404]))

x.shape

torch.Size([32, 26, 2, 13])


torch.Size([32, 676])

In [61]:
class ConvVAD(nn.Module):
    def __init__(self) -> None:
        super(ConvVAD, self).__init__()

        self.l1 = nn.Conv2d(1,13,kernel_size=(3,3), padding=(1,1))
        self.relu = nn.ReLU()
        self.maxp1 = nn.MaxPool2d(kernel_size=(2,2))

        # --------------------------------- #

        self.l2 = nn.Conv2d(13, 26, kernel_size=(3,3), padding=(1,1))
        self.maxp2 = nn.MaxPool2d(kernel_size=(3,3))

        self.linear = nn.Linear(676, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        
        x = self.maxp1(self.relu(self.l1(x)))
        x = self.maxp2(self.relu(self.l2(x)))
        x = x.view(x.size(0), -1)
        x = self.sigmoid(self.linear(x)).squeeze()

        return x

In [62]:
torch.cuda.empty_cache()
device = 'cuda:1' if torch.cuda.is_available() else 'cpu'

model00 = ConvVAD().to(device)

criterion = nn.BCELoss()
optimizer = optim.SGD(model00.parameters(), lr=0.001)

In [63]:
import statistics

torch.manual_seed(13)
torch.cuda.manual_seed(13)

epochs = 20

for epoch in range(epochs):

    acc = []

    for batch in train_loader:

        inputs, labels = batch

        inputs = inputs.to(device)
        labels = labels.to(device, dtype=torch.float)

        model00.train()
        outputs = torch.round(model00(inputs))
        loss = criterion(outputs.float(), labels.float())

        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        acc.append(acc_fn(labels.float(), outputs.float()))

    accuracie = statistics.mean(acc)

    print(f'Epoch [{epoch+1}/{epochs}], Accuracy: {accuracie}, Loss: {loss}')


Epoch [1/20], Accuracy: 0.6345207727685267, Loss: 28.571430206298828
Epoch [2/20], Accuracy: 0.6337008020099328, Loss: 42.85714340209961
Epoch [3/20], Accuracy: 0.6332908163265306, Loss: 50.000003814697266
Epoch [4/20], Accuracy: 0.6341107876933351, Loss: 35.71428680419922
Epoch [5/20], Accuracy: 0.6332908163265306, Loss: 50.000003814697266
Epoch [6/20], Accuracy: 0.6349307584519289, Loss: 21.428571701049805
Epoch [7/20], Accuracy: 0.6345207727685267, Loss: 28.571430206298828
Epoch [8/20], Accuracy: 0.6337008020099328, Loss: 42.85714340209961
Epoch [9/20], Accuracy: 0.6332908163265306, Loss: 50.000003814697266
Epoch [10/20], Accuracy: 0.632880831251339, Loss: 57.142860412597656
Epoch [11/20], Accuracy: 0.6349307584519289, Loss: 21.428571701049805
Epoch [12/20], Accuracy: 0.6349307584519289, Loss: 21.428571701049805
Epoch [13/20], Accuracy: 0.6332908163265306, Loss: 50.000003814697266
Epoch [14/20], Accuracy: 0.6337008020099328, Loss: 42.85714340209961
Epoch [15/20], Accuracy: 0.6341107

In [65]:
test_acc = []
for batch in test_loader:

    
    inputs, labels = batch

    inputs = inputs.to(device)
    labels = labels.to(device, dtype=torch.float)

    model00.eval()
    with torch.inference_mode():
        y_preds = torch.round(model00(inputs))

    test_acc.append(acc_fn(labels.float(), y_preds.float()))

print(statistics.mean(test_acc))

0.63375
