# Setup

In [None]:
! pip install torch torchvision torchinfo torchviz



In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)


Mounted at /content/drive


In [None]:
# from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import torch, torchinfo
# import torchaudio
import torch.nn as nn
import torch.nn.functional as F
# import torch.optim as optim
# import numpy as np
# import matplotlib.pyplot as plt
from torchvision import datasets, models, transforms
# import pandas as pd
# import os
from torchvision.datasets import ImageFolder
import os
import numpy as np

data_path = './drive/MyDrive/cs8803_data/spectrograms/' #looking in subfolder train
model_save_path = './drive/MyDrive/cs8803_data/models/'


if torch.cuda.is_available():
 dev = "cuda:0"
else:
 dev = "cpu"
device = torch.device(dev)


# Data Loading

In [None]:
class myImageFolder(ImageFolder):
  @staticmethod
  def find_classes(dir):
    classes = [d for d in os.listdir(dir) if os.path.isdir(os.path.join(dir, d)) and not d.startswith('.')]
    classes.sort()
    class_to_idx = {classes[i]: i for i in range(len(classes))}
    return classes, class_to_idx

In [None]:
multi_dataset = myImageFolder(
    root=data_path,
    transform=transforms.Compose([transforms.Resize((201,1103)),
                                  transforms.ToTensor()
                                  ])
)
print(multi_dataset)

Dataset myImageFolder
    Number of datapoints: 313
    Root location: ./drive/MyDrive/cs8803_data/spectrograms/
    StandardTransform
Transform: Compose(
               Resize(size=(201, 1103), interpolation=bilinear, max_size=None, antialias=warn)
               ToTensor()
           )


In [None]:
class_map=multi_dataset.class_to_idx

print("\nClass category and index of the images: {}\n".format(class_map))


Class category and index of the images: {'blender': 0, 'garbage': 1, 'noise': 2}



In [None]:
#split data to test and train
#use 80% to train
train_size = int(0.8 * len(multi_dataset))
test_size = int(0.1 * len(multi_dataset))
val_size = len(multi_dataset) - train_size - test_size
multi_train_dataset, multi_test_dataset, multi_val_dataset = torch.utils.data.random_split(multi_dataset, [train_size, test_size, val_size])

print("Training size:", len(multi_train_dataset))
print("Testing size:",len(multi_test_dataset))
print("Validation size:",len(multi_val_dataset))

Training size: 250
Testing size: 31
Validation size: 32


In [None]:
from collections import Counter

# labels in training set
train_classes = [label for _, label in multi_train_dataset]
Counter(train_classes)

KeyboardInterrupt: ignored

In [None]:
test_classes = [label for _, label in multi_test_dataset]
Counter(test_classes)

In [None]:
train_dataloader = torch.utils.data.DataLoader(
    multi_train_dataset,
    batch_size=1,
    shuffle=True
)

test_dataloader = torch.utils.data.DataLoader(
    multi_test_dataset,
    batch_size=1,
    shuffle=True
)

val_dataloader = torch.utils.data.DataLoader(
    multi_val_dataset,
    batch_size=1,
    shuffle=True
)
td = train_dataloader.dataset[0][0][0][0]
print(td)

tensor([0.9529, 0.9922, 0.9922,  ..., 0.6980, 0.7490, 0.3412])


# Model

In [None]:
import torch_directml
tensor_dev = torch_directml.device(0)
print('Using {} device'.format(torch_directml.device_name(tensor_dev.index)))

Using AMD Radeon RX 6800M  device


In [None]:
tensor_dev = device

In [None]:
class CNNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3)
        self.conv2_drop = nn.Dropout2d()
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(841728, 50)
        self.fc2 = nn.Linear(50, 3)


    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        #x = x.view(x.size(0), -1)
        x = self.flatten(x)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = F.relu(self.fc2(x))
        return F.sigmoid(x)

model = CNNet().to(tensor_dev)

In [None]:
from torchinfo import summary
summary(model)

Layer (type:depth-idx)                   Param #
CNNet                                    --
├─Conv2d: 1-1                            896
├─Conv2d: 1-2                            18,496
├─Dropout2d: 1-3                         --
├─Flatten: 1-4                           --
├─Linear: 1-5                            42,086,450
├─Linear: 1-6                            153
Total params: 42,105,995
Trainable params: 42,105,995
Non-trainable params: 0

In [None]:
# cost function used to determine best parameters
cost = torch.nn.CrossEntropyLoss()

# used to create optimal parameters
learning_rate = 1e-6
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [None]:
model.to(tensor_dev)

accs = []
stop_model_train_threshold = 5

# Create the training function

def train(dataloader, model, loss, optimizer):
    model.train()
    size = len(dataloader.dataset)
    for batch, (X, Y) in enumerate(dataloader):

        X, Y = X.to(tensor_dev), Y.to(tensor_dev)
        optimizer.zero_grad()
        pred = model(X)
        loss = cost(pred, Y)
        loss.backward()
        optimizer.step()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f'loss: {loss:>7f}  [{current:>5d}/{size:>5d}]')


# Create the validation/test function

def test(dataloader, model):
    size = len(dataloader.dataset)
    model.eval()
    test_loss, correct = 0, 0

    with torch.no_grad():
        for batch, (X, Y) in enumerate(dataloader):
            X, Y = X.to(tensor_dev), Y.to(tensor_dev)
            pred = model(X)

            test_loss += cost(pred, Y).item()
            correct += (pred.argmax(1)==Y).type(torch.float).sum().item()

    test_loss /= size
    correct /= size

    accs.append(100*correct)

    print(f'\nTest Error:\nacc: {(100*correct):>0.1f}%, avg loss: {test_loss:>8f}\n')

In [None]:
epochs = 25

for t in range(epochs):
    print(f'Epoch {t+1}\n-------------------------------')
    train(train_dataloader, model, cost, optimizer)
    test(test_dataloader, model)
    if len(accs)>1 and accs[-1]-accs[-2] <= stop_model_train_threshold and t > 10: break

print('Done!')

Epoch 1
-------------------------------
loss: 1.086392  [    0/  250]
loss: 1.098612  [  100/  250]
loss: 1.098612  [  200/  250]

Test Error:
acc: 32.3%, avg loss: 1.095087

Epoch 2
-------------------------------
loss: 1.098612  [    0/  250]
loss: 1.098612  [  100/  250]
loss: 1.119276  [  200/  250]

Test Error:
acc: 32.3%, avg loss: 1.087462

Epoch 3
-------------------------------
loss: 1.100138  [    0/  250]
loss: 0.914695  [  100/  250]
loss: 1.167600  [  200/  250]

Test Error:
acc: 77.4%, avg loss: 1.061428

Epoch 4
-------------------------------
loss: 1.070401  [    0/  250]
loss: 0.998737  [  100/  250]
loss: 0.948608  [  200/  250]

Test Error:
acc: 71.0%, avg loss: 1.044311

Epoch 5
-------------------------------
loss: 1.273614  [    0/  250]
loss: 1.111812  [  100/  250]
loss: 1.042237  [  200/  250]

Test Error:
acc: 77.4%, avg loss: 1.026241

Epoch 6
-------------------------------
loss: 1.023414  [    0/  250]
loss: 1.164861  [  100/  250]
loss: 1.049072  [  200/  

In [None]:
model.to(tensor_dev).eval()
#model.eval()
#class_map = ['blender', 'noise']
class_map = ['blender', 'garbage', 'noise']

cf_matrix = np.array([[0,0,0],[0,0,0],[0,0,0]])

correct = 0
with torch.no_grad():
    for batch, (X, Y) in enumerate(val_dataloader):
        X, Y = X.to(tensor_dev), Y.to(tensor_dev)
        pred = model(X)
        print("Predicted:\nvalue={}, class_name= {}\n".format(pred[0].argmax(0),class_map[pred[0].argmax(0)]))
        print("Actual:\nvalue={}, class_name= {}\n".format(Y[0],class_map[Y[0]]))
        if pred[0].argmax(0)==Y[0]:
          correct += 1
        else:
          print("incorrect!")
        cf_matrix[Y[0]][pred[0].argmax(0)] += 1
    print('Correct count = {}'.format(correct))
    print('Accuracy = {}'.format(correct/len(val_dataloader)))


Predicted:
value=2, class_name= noise

Actual:
value=2, class_name= noise

Predicted:
value=2, class_name= noise

Actual:
value=2, class_name= noise

Predicted:
value=2, class_name= noise

Actual:
value=2, class_name= noise

Predicted:
value=0, class_name= blender

Actual:
value=0, class_name= blender

Predicted:
value=2, class_name= noise

Actual:
value=2, class_name= noise

Predicted:
value=2, class_name= noise

Actual:
value=2, class_name= noise

Predicted:
value=2, class_name= noise

Actual:
value=2, class_name= noise

Predicted:
value=1, class_name= garbage

Actual:
value=1, class_name= garbage

Predicted:
value=0, class_name= blender

Actual:
value=0, class_name= blender

Predicted:
value=2, class_name= noise

Actual:
value=2, class_name= noise

Predicted:
value=1, class_name= garbage

Actual:
value=1, class_name= garbage

Predicted:
value=2, class_name= noise

Actual:
value=2, class_name= noise

Predicted:
value=1, class_name= garbage

Actual:
value=1, class_name= garbage

Predi

In [None]:
cf_matrix

array([[ 9,  0,  0],
       [ 0, 10,  0],
       [ 0,  0, 13]])

In [None]:
torch.save(model.state_dict(), f"{model_save_path}/model.pth")

In [None]:
from torchviz import make_dot

y = model(X).to(tensor_dev)

In [None]:
dot = make_dot(y, params=dict(model.named_parameters()))
dot.graph_attr['rankdir'] = 'TB'
dot.render("vertical_layout_graph", format="png", cleanup=True)

'vertical_layout_graph.png'