In [345]:
# !pip install torchaudio librosa
import os
import numpy as np
from matplotlib import pyplot as plt
import IPython.display as ipd
import librosa
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import datasets, transforms
from pathlib import Path
import torchaudio
import torchvision
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import Variable
from torch import nn
from torch.nn import init
import warnings
warnings.filterwarnings('ignore')
plt.style.use(['nature', 'science', 'no-latex'])
plt.rcParams['font.family'] = 'Times New Roman'

In [265]:
def load_audio_files(path: str, label:str, resample_rate=16000, offset=300, duration=60, slice_length=600):
    dataset = []
    walker = sorted(str(p) for p in Path(path).glob(f'*.mp3'))

    for i, file_path in enumerate(walker):
        path, filename = os.path.split(file_path)
        speaker = path.split('/')[-1]
        # Load audio as slices
        for i, t in enumerate(range(0, slice_length, duration)):
            waveform, _ = librosa.load(file_path, sr=resample_rate, offset=t, duration=duration)
            dataset.append([waveform, resample_rate, label, filename.replace('.mp3', ''), i])
    return dataset

In [268]:
LyndonBJohnson = load_audio_files(f'../audio/Lyndon B. Johnson', 'Lyndon B. Johnson')
RichardMNixon = load_audio_files(f'../audio/Richard M. Nixon', 'Richard M. Nixon')
BillClinton = load_audio_files(f'../audio/Bill Clinton', 'Bill Clinton')

In [269]:
trainloader_LyndonBJohnson = torch.utils.data.DataLoader(LyndonBJohnson, batch_size=1, shuffle=True, num_workers=2)
trainloader_RichardMNixon = torch.utils.data.DataLoader(RichardMNixon, batch_size=1, shuffle=True, num_workers=2)
trainloader_BillClinton = torch.utils.data.DataLoader(BillClinton, batch_size=1, shuffle=True, num_workers=2)

In [270]:
def create_spectrogram_images(trainloader, label_dir):
    # make directory
    directory = f'../dataset/audio_images/spectrogram/{label_dir}/'
    if(os.path.isdir(directory)):
        print("Data exists for", label_dir)
    else:
        os.makedirs(directory, mode=0o777, exist_ok=True)
        
        for i, data in enumerate(trainloader):
            waveform = data[0].numpy().flatten()
            sample_rate = data[1][0]
            label = data[2]

            # create transformed waveforms
            spectrogram = librosa.feature.mfcc(y=waveform, sr=sample_rate.numpy())
            plt.imsave(f'../dataset/audio_images/spectrogram/{label_dir}/spec_img{i}.png', spectrogram)

In [271]:
create_spectrogram_images(trainloader_LyndonBJohnson, 'Lyndon B. Johnson')
create_spectrogram_images(trainloader_RichardMNixon, 'Richard M. Nixon')
create_spectrogram_images(trainloader_BillClinton, 'Bill Clinton')

In [339]:
# load preprocessed data
presidents_dataset = datasets.ImageFolder(
    root='dataset/audio_images/spectrogram/',
    transform=transforms.Compose([transforms.ToTensor()
                                  ])
)
print(presidents_dataset)

Dataset ImageFolder
    Number of datapoints: 141
    Root location: dataset/audio_images/spectrogram/
    StandardTransform
Transform: Compose(
               ToTensor()
           )


In [340]:
train_size = int(0.8 * len(presidents_dataset))
test_size = len(presidents_dataset) - train_size
presidents_train_dataset, presidents_test_dataset = torch.utils.data.random_split(presidents_dataset, [train_size, test_size])
print("Training size:", len(presidents_train_dataset))
print("Testing size:",len(presidents_test_dataset))

Training size: 112
Testing size: 29


In [341]:
from collections import Counter
# labels in training set
train_classes = [label for _, label in presidents_train_dataset]
Counter(train_classes)

Counter({2: 33, 1: 42, 0: 37})

In [342]:
train_dataloader = torch.utils.data.DataLoader(
    presidents_train_dataset,
    batch_size=10,
    num_workers=2,
    shuffle=True
)

test_dataloader = torch.utils.data.DataLoader(
    presidents_test_dataset,
    batch_size=10,
    num_workers=2,
    shuffle=True
)

In [331]:
class Network(nn.Module):
    def __init__(self):
        super().__init__()
        conv_layers = []
        # First Convolution Block with Relu and Batch Norm. Use Kaiming Initialization
        self.conv1 = nn.Conv2d(2, 8, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2))
        self.relu1 = nn.ReLU()
        self.bn1 = nn.BatchNorm2d(8)
        init.kaiming_normal_(self.conv1.weight, a=0.1)
        self.conv1.bias.data.zero_()
        conv_layers += [self.conv1, self.relu1, self.bn1]
        # Second Convolution Block
        self.conv2 = nn.Conv2d(8, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu2 = nn.ReLU()
        self.bn2 = nn.BatchNorm2d(16)
        init.kaiming_normal_(self.conv2.weight, a=0.1)
        self.conv2.bias.data.zero_()
        conv_layers += [self.conv2, self.relu2, self.bn2]
        # Second Convolution Block
        self.conv3 = nn.Conv2d(16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu3 = nn.ReLU()
        self.bn3 = nn.BatchNorm2d(32)
        init.kaiming_normal_(self.conv3.weight, a=0.1)
        self.conv3.bias.data.zero_()
        conv_layers += [self.conv3, self.relu3, self.bn3]
        # Second Convolution Block
        self.conv4 = nn.Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu4 = nn.ReLU()
        self.bn4 = nn.BatchNorm2d(64)
        init.kaiming_normal_(self.conv4.weight, a=0.1)
        self.conv4.bias.data.zero_()
        conv_layers += [self.conv4, self.relu4, self.bn4]
        # Linear Classifier
        self.ap = nn.AdaptiveAvgPool2d(output_size=1)
        self.lin = nn.Linear(in_features=64, out_features=10)

        # Wrap the Convolutional Blocks
        self.conv = nn.Sequential(*conv_layers)
 

    def forward(self, x):
        # Run the convolutional blocks
        x = self.conv(x)
        # Adaptive pool and flatten for input to linear layer
        x = self.ap(x)
        x = x.view(x.shape[0], -1)
        # Linear layer
        x = self.lin(x)
        # Final output
        return x


model = Network()

In [335]:
optimizer = optim.Adam(model.parameters(), lr = 0.01, weight_decay = 0.0001)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size = 20, gamma = 0.1)

In [346]:
def training(model, train_dl, num_epochs):
    # Loss Function, Optimizer and Scheduler
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(),lr=0.001)
    scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=0.001,
                                                steps_per_epoch=int(len(train_dl)),
                                                epochs=num_epochs,
                                                anneal_strategy='linear')

    # Repeat for each epoch
    for epoch in range(num_epochs):
        running_loss = 0.0
        correct_prediction = 0
        total_prediction = 0

        # Repeat for each batch in the training set
        for i, data in enumerate(train_dl):
            # Get the input features and target labels, and put them on the GPU
            inputs, labels = data[0].to(device), data[1].to(device)

            # Normalize the inputs
            inputs_m, inputs_s = inputs.mean(), inputs.std()
            inputs = (inputs - inputs_m) / inputs_s

            # Zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            scheduler.step()

            # Keep stats for Loss and Accuracy
            running_loss += loss.item()

            # Get the predicted class with the highest score
            _, prediction = torch.max(outputs,1)
            # Count of predictions that matched the target label
            correct_prediction += (prediction == labels).sum().item()
            total_prediction += prediction.shape[0]

            #if i % 10 == 0:    # print every 10 mini-batches
            #    print('[%d, %5d] loss: %.3f' % (epoch + 1, i + 1, running_loss / 10))

        # Print stats at the end of the epoch
        num_batches = len(train_dl)
        avg_loss = running_loss / num_batches
        acc = correct_prediction/total_prediction
        print(f'Epoch: {epoch}, Loss: {avg_loss:.2f}, Accuracy: {acc:.2f}')

    print('Finished Training')
  
num_epochs=2   # Just for demo, adjust this higher.
training(myModel, train_dl, num_epochs)

IndentationError: expected an indented block (<ipython-input-346-510e90566ebd>, line 12)

In [337]:
def test(model, epoch):
    model.eval()
    correct = 0
    for data, target in test_loader:
        data = data.to(device)
        target = target.to(device)
        output = model(data)
        output = output.permute(1, 0, 2)
        pred = output.max(2)[1] # get the index of the max log-probability
        correct += pred.eq(target).cpu().sum().item()
    print('\nTest set: Accuracy: {}/{} ({:.0f}%)\n'.format(
        correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))

In [338]:
log_interval = 20
for epoch in range(1, 41):
    if epoch == 31:
        print("First round of training complete. Setting learn rate to 0.001.")
    scheduler.step()
    train(model, epoch)
    test(model, epoch)

RuntimeError: Expected 3-dimensional input for 3-dimensional weight [128, 1, 80], but got 4-dimensional input of size [10, 3, 512, 30] instead

In [344]:
for batch_idx, (data, target) in enumerate(train_dataloader):
    print(data.shape, target)
    break

torch.Size([10, 3, 20, 1876]) tensor([1, 2, 2, 0, 0, 1, 2, 0, 1, 1])
