# CS 342 Homework 2 - Convolutional Neural Networks

Welcome to your second homework for CS 342! This problem set covers Convolutional Neural Networks. There are three main problems with several sub-questions each. For coding questions, fill in the missing parts (usually denoted `...`). For theoretical questions, write the most succinct possible answer in the provided markdown block. Please answer all questions in-line, being as brief and precise as possible. You will not need any libraries that aren't already imported here. This code can be run on your local machine.

Please follow <a href="https://pytorch.org">these instructions</a> to install pytorch.

Submission: Upload your jupyter notebook on canvas.

Good luck!

Enter your name and EID in the following block:

Hyunsung Oh (ho3626)

If you worked with anyone on this homework, please list them below:

In [2]:
# Run these two blocks to load important libraries and set things up
import torch
from torch import nn
import numpy as np

%matplotlib inline
import matplotlib.pyplot as plt

In [3]:
seed = 42

np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True

## Problem 1. Training CNNs on the Fashion MNIST dataset (22 pts)

In this problem, we will build CNNs on a freely available vision dataset, <a href="https://github.com/zalandoresearch/fashion-mnist">Fashion-MNIST</a>. We have explored this dataset with an MLP architecture and will now try CNNs.

You might recollect that Fashion MNIST consists of many 28x28 grayscale images belonging to 10 different classes of clothing. The task is to train a classifier that can predict the clothing class from the image.

Since several of the steps follow from your work on HW1, you can fill in the misisng pieces much like last time.

In [4]:
seed = 42

np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True

In [5]:
from torchvision import datasets, transforms

save_dir = 'fashionMNIST_data'

transform = transforms.ToTensor() # Convert the image into a torch tensor.

train_set = datasets.FashionMNIST(save_dir, download=True, train=True, transform=transform)
test_set = datasets.FashionMNIST(save_dir, download=True, train=False, transform=transform)

print(train_set)
print(test_set)

100%|██████████| 26.4M/26.4M [00:01<00:00, 19.1MB/s]
100%|██████████| 29.5k/29.5k [00:00<00:00, 304kB/s]
100%|██████████| 4.42M/4.42M [00:00<00:00, 5.57MB/s]
100%|██████████| 5.15k/5.15k [00:00<00:00, 10.1MB/s]

Dataset FashionMNIST
    Number of datapoints: 60000
    Root location: fashionMNIST_data
    Split: Train
    StandardTransform
Transform: ToTensor()
Dataset FashionMNIST
    Number of datapoints: 10000
    Root location: fashionMNIST_data
    Split: Test
    StandardTransform
Transform: ToTensor()





Each of these sets comprises of the image `data` and the clasification `targets`. The `targets` take a numerical value from 1-10 indicating which clothing class each image belongs to.

Since the original data does not have a validation set, let's (once again) create one by splitting the training set.

In [6]:
from torch.utils.data.sampler import SubsetRandomSampler
ntotal = 60000
ntrain = int(0.9*ntotal)
nval = ntotal - ntrain

val_ix = np.random.choice(range(ntotal), size=nval, replace=False)
train_ix = list(set(range(ntotal)) - set(val_ix))

train_sampler = SubsetRandomSampler(train_ix)
val_sampler = SubsetRandomSampler(val_ix)

### Q 1.1: Initialize the train, val and test dataloaders with the given `batch_size` (1 pt)

In [7]:
batch_size = 64
train_loader = torch.utils.data.DataLoader(train_set, batch_size=batch_size, sampler=train_sampler)
val_loader = torch.utils.data.DataLoader(train_set, batch_size=batch_size, sampler=val_sampler)
test_loader = torch.utils.data.DataLoader(test_set, batch_size=batch_size, shuffle=False)

Each loader iterates over the data, yielding `batch_size` images and output targets per iteration.

### Q 1.2: Initialize a CNN with 2-D convolution layer(s), max pooling and finally, fully-connected layer(s). You can add any non-linearity of your choice and use >=1 layer of each type overall. (4 pts)

We have provided the skeleton class definition below. Complete it by specifying the layer objects in your CNN. Keep in mind that the input `x` is of shape (`batch_size`, 1, 28, 28). Unlike the MLP, you do not have to flatten it to feed it into the model. Additionally, the output should be of shape (`batch_size`, 10) as we have 10 classes.

In [8]:
class fashionMLP(nn.Module):
    def __init__(self):
        super(fashionMLP, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=32, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1)

        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

        self.fc1 = nn.Linear(64 * 7 * 7, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = torch.relu(self.conv1(x))  # (batch_size, 32, 28, 28)
        x = self.pool(x)               # (batch_size, 32, 14, 14)
        x = torch.relu(self.conv2(x))  # (batch_size, 64, 14, 14)
        x = self.pool(x)               # (batch_size, 64, 7, 7)
        x = x.view(x.size(0), -1)      # flatten: (batch_size, 64*7*7)
        x = torch.relu(self.fc1(x))    # (batch_size, 128)
        x = self.fc2(x)                # (batch_size, 10)
        return x


Tip: An easy way to check if you got the dimensions right is to test your model on a single batch. Iteratively add each layer transformation to the input, and ensure the shape is right at each stage.

In [9]:
# for images, labels in train_loader:
#     print(images.shape, labels.shape)
#     break
# model = fashionMLP()
# outputs = model(...)
# print(outputs.shape)

### Q 1.3: Define `criterion` to be the cross entropy loss function and use an optimizer of your choice (2 pts)

In [10]:
model = fashionMLP()
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

### Q 1.4: Complete the training, validation and testing loops (5 pts)

In [11]:
from tqdm.notebook import tqdm
import torch.nn.functional as F

def train_network(model, train_loader, val_loader, criterion, optimizer, nepoch=100):
    try:
        for epoch in tqdm(range(nepoch)):
            print('EPOCH %d'%epoch)
            total_loss = 0
            count = 0
            for inputs, labels in train_loader:
                optimizer.zero_grad()
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()
                total_loss += loss.item()
                count += 1
            print('{:>12s} {:>7.5f}'.format('Train loss:', total_loss/count))
            with torch.no_grad():
                total_loss = 0
                count = 0
                for inputs, labels in val_loader:
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)
                    total_loss += loss.item()
                    count += 1
                print('{:>12s} {:>7.5f}'.format('Val loss:', total_loss/count))
            print()
    except KeyboardInterrupt:
        print('Exiting from training early')
    return

def test_network(model, test_loader, mode):
    true, pred = [], []
    with torch.no_grad():
        for inputs, labels  in test_loader:
            outputs = model(inputs)
            predicted = outputs.argmax(dim=1)
            labels = labels.cpu().numpy()
            predicted = predicted.cpu().numpy()
            true.append(labels)
            pred.append(predicted)
    acc = (np.concatenate(true) == np.concatenate(pred)).mean()
    print('%s accuracy: %0.3f' % (mode, acc))
    true = np.concatenate(true)
    pred = np.concatenate(pred)
    return acc, true, pred

### Q 1.5: Train the network and report the final test accuracy of your model (2 pts)
Use the functions `train_network` and `test_network` defined above.

Model test accuracy should be 86% or greater.

In [12]:
model = fashionMLP()
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

train_network(model, train_loader, val_loader, criterion, optimizer, nepoch=20)

test_acc, true, pred = test_network(model, test_loader, mode="Test")
print("Final Test Accuracy: {:.3f}".format(test_acc))

  0%|          | 0/20 [00:00<?, ?it/s]

EPOCH 0
Exiting from training early


KeyboardInterrupt: 

### Q 1.6: Which class did the model get wrong the most? Which class did it get right the most? (2 pts)

Shirt class got wrong the most and Trouser class got right the most.

### Q 1.7: Add any regularization method of your choice (or more than one!), and report the final test accuracy (4 pts)
 You should be able to get accuracy > 90%.

 The model accuracy for this should be greater than the test accuracy calculated in Q1.5.

In [13]:
class regularized_fashionMLP(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=32, kernel_size=3, stride=1, padding=1)
        self.bn1   = nn.BatchNorm2d(32)
        self.pool  = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1)
        self.bn2   = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=1)
        self.bn3   = nn.BatchNorm2d(128)
        self.dropout = nn.Dropout(p=0.25)
        self.global_avg_pool = nn.AdaptiveAvgPool2d((1,1))
        self.fc1 = nn.Linear(128, 128)
        self.dropout_fc = nn.Dropout(p=0.5)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.bn1(self.conv1(x))))
        x = self.pool(F.relu(self.bn2(self.conv2(x))))
        x = F.relu(self.bn3(self.conv3(x)))
        x = self.dropout(x)
        x = self.global_avg_pool(x)
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        x = self.dropout_fc(x)
        x = self.fc2(x)
        return x

In [14]:
# Instantiate the model, criterion, and optimizer
regmodel = regularized_fashionMLP()
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(regmodel.parameters(), lr=0.001, weight_decay=1e-4)

# Train the model
train_network(regmodel, train_loader, val_loader, criterion, optimizer, nepoch=20)

regmodel.eval() # disables some regularization methods for model testing

# Report test accuracy
test_acc, true, pred = test_network(regmodel, test_loader, mode="Test (Regularized)")
print("Final Test Accuracy (Regularized Model): {:.3f}".format(test_acc))

  0%|          | 0/20 [00:00<?, ?it/s]

EPOCH 0
 Train loss: 0.65876
   Val loss: 0.47469

EPOCH 1
 Train loss: 0.40772
   Val loss: 0.38102

EPOCH 2
 Train loss: 0.35259
   Val loss: 0.33734

EPOCH 3
 Train loss: 0.32480
   Val loss: 0.31287

EPOCH 4
 Train loss: 0.30556
   Val loss: 0.30383

EPOCH 5
 Train loss: 0.29166
   Val loss: 0.31064

EPOCH 6
 Train loss: 0.27820
   Val loss: 0.29095

EPOCH 7
 Train loss: 0.26742
   Val loss: 0.28167

EPOCH 8
 Train loss: 0.25804
   Val loss: 0.27607

EPOCH 9
 Train loss: 0.25050
   Val loss: 0.32486

EPOCH 10
 Train loss: 0.24361
   Val loss: 0.27043

EPOCH 11
 Train loss: 0.23682
   Val loss: 0.26521

EPOCH 12
 Train loss: 0.23277
   Val loss: 0.27370

EPOCH 13
 Train loss: 0.22492
   Val loss: 0.26752

EPOCH 14
 Train loss: 0.22013
   Val loss: 0.28049

EPOCH 15
 Train loss: 0.21612
   Val loss: 0.27534

EPOCH 16
 Train loss: 0.21037
   Val loss: 0.26748

EPOCH 17
 Train loss: 0.20608
   Val loss: 0.26559

EPOCH 18
 Train loss: 0.20438
   Val loss: 0.25294

EPOCH 19
 Train loss: 

### Q 1.8: Visualize learned kernels from first layer of your CNN (2 pts)

You can either plot them separately (i.e. each as one panel in a plot) or stack them together into a single array. In either case, use matplotlib `imshow` or `matshow` to make an image of each kernel.

In [None]:
# first, extract the weights from the learned model
conv1wt = model.conv1.weight.data.clone().cpu().numpy()

# then, visualize them!
import matplotlib.pyplot as plt
import math

n_kernels = conv1wt.shape[0]
n_cols = 8
n_rows = math.ceil(n_kernels / n_cols)

fig, axes = plt.subplots(n_rows, n_cols, figsize=(n_cols*1.5, n_rows*1.5))
axes = axes.flatten()

for i in range(n_kernels):
    kernel = conv1wt[i, 0, :, :]
    axes[i].imshow(kernel, cmap='gray')
    axes[i].set_title(f"Kernel {i}")
    axes[i].axis('off')

for j in range(n_kernels, len(axes)):
    axes[j].axis('off')

plt.tight_layout()
plt.show()

## Problem 2. Training 1-D CNNs for phoneme classification (34 pts)

In this problem, we will build CNNs to classify phonemes from speech features. Instead of using the raw audio waveform as the input to a neural network, we can extract relevant features from the power spectrum of the waveform. Since this is not relevant for our exercise, we have precomputed speech features for you on the input waveform. The data comprises different episodes from a podcast _The Moth Radio Hour_ wherein a single speaker narrates a personal story. You can find it in `HW2_phone_class_data`.
<br><br>
Speech Features: These are 240-D features extracted for every 10ms snippet in a story.
<br>
Phoneme labels: We give you the raw labels and the associated start and end time of the phoneme.
<br><br>
Note that the speech features were extracted at a much higher sampling rate than the typical frequency of a phoneme. Consequeently, there are several speech features occuring between the start and end time of a single phoneme. To predict a single phoneme label, we will firstly consider a `window_size` of 10. Next, we will pick the 10 speech features that occurred right before the phoneme ended. For example, consider a hypothetical scenario where a phoneme started at 1:00 min and ended at 3:00 min, and the speech features were collected every 10s. Then, we would extract features at `[3:00 min, 2:50 min, 2:40 min, 2:30 min, 2:20 min, 2:10 min, 2:00 min, 1:50 min, 1:40 min and 1:30 min]`, ie., the last 10 features.
<br><br>
Next, we will do 1-D convolutions on `[batch_size, 240, 10]` dimensional inputs. Note that `240` here represents the input channels and you want to convolve over the context, ie., the `10` features.

Finally, we will use a hidden test set to evaluate your model. In most real-world applications, you don't have access to your test data and its important to not overfit on your validation set. So make sure you do hyperparameter tuning in moderation!

In [15]:
seed = 42

np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True

In [18]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [23]:
# Load the stories in the training and validation set.
import pickle
with open('/content/drive/My Drive/HW2_phone_class_data/HW2_phone_class_data/data_split.pickle', 'rb') as f:
    data_split_stories = pickle.load(f)
for key in data_split_stories:
    print(key)
    print(data_split_stories[key])
stories = np.concatenate([data_split_stories[key] for key in data_split_stories])
print(len(stories))

train
['souls' 'wheretheressmoke' 'thatthingonmyarm' 'hangtime' 'adollshouse'
 'odetostepfather' 'sloth' 'myfirstdaywiththeyankees' 'buck' 'avatar'
 'adventuresinsayingyes' 'exorcism' 'naked' 'haveyoumethimyet'
 'stagefright' 'undertheinfluence' 'swimmingwithastronauts' 'itsabox'
 'alternateithicatom' 'fromboyhoodtofatherhood']
val
['theclosetthatateeverything' 'tildeath' 'legacy']
23


In [25]:
# For each story, load the pre-computed speech features and their associated timestamps.
# Also load the phoneme transcriptions and the associated start/end times.
speech_features, speech_timestamps, phonemes, phoneme_start_times, phoneme_end_times = [{} for _ in range(5)]
for story in stories:
    LOAD = np.load('/content/drive/My Drive/HW2_phone_class_data/HW2_phone_class_data/%s.npz'%story)
    speech_features[story] = LOAD['fbank_features']
    speech_timestamps[story] = LOAD['fbank_timestamps']
    phonemes[story] = LOAD['phonemes']
    phoneme_start_times[story] = LOAD['phoneme_start_times']
    phoneme_end_times[story] = LOAD['phoneme_end_times']
    print(story)
    print(speech_features[story].shape, speech_timestamps[story].shape)
    print(phonemes[story].shape, phoneme_start_times[story].shape, phoneme_end_times[story].shape)

souls
(72997, 240) (72997,)
(6819,) (6819,) (6819,)
wheretheressmoke
(60191, 240) (60191,)
(6068,) (6068,) (6068,)
thatthingonmyarm
(88875, 240) (88875,)
(7080,) (7080,) (7080,)
hangtime
(66830, 240) (66830,)
(6423,) (6423,) (6423,)
adollshouse
(50353, 240) (50353,)
(5412,) (5412,) (5412,)
odetostepfather
(82808, 240) (82808,)
(8334,) (8334,) (8334,)
sloth
(89497, 240) (89497,)
(8595,) (8595,) (8595,)
myfirstdaywiththeyankees
(73686, 240) (73686,)
(8866,) (8866,) (8866,)
buck
(68498, 240) (68498,)
(5455,) (5455,) (5455,)
avatar
(75423, 240) (75423,)
(5171,) (5171,) (5171,)
adventuresinsayingyes
(80356, 240) (80356,)
(7850,) (7850,) (7850,)
exorcism
(95501, 240) (95501,)
(9952,) (9952,) (9952,)
naked
(86502, 240) (86502,)
(10281,) (10281,) (10281,)
haveyoumethimyet
(101320, 240) (101320,)
(10176,) (10176,) (10176,)
stagefright
(60698, 240) (60698,)
(6669,) (6669,) (6669,)
undertheinfluence
(62772, 240) (62772,)
(5932,) (5932,) (5932,)
swimmingwithastronauts
(79121, 240) (79121,)
(7318,)

### Q 2.1 Create classifier target dictionary (2 pts)

In [28]:
# Load the phoneme_classes which are the target of our CNN classifier.
phoneme_classes = np.load('/content/drive/My Drive/HW2_phone_class_data/HW2_phone_class_data/phoneme_classes.npz')['arr_0']
print(phoneme_classes)
nclass = len(phoneme_classes)
print('Number of classes:', nclass)
# Create a dictionary mapping from the phoneme classes to an ID.
# For example, if you were builiding a classifier of dog vs. cat,
# this could be {'dog': 0, 'cat': 1} and [0, 1] would be your classifier targets.
phone2int = {phone: i for i, phone in enumerate(phoneme_classes)}

['SIL' 'S' 'AY' 'EY' 'T' 'Y' 'ER' 'HH' 'SP' 'BR ' 'Z' 'AA' 'IY' 'OW' 'R'
 'IH' 'BR' 'AO' 'DH' 'M' 'AH' 'B' 'K' 'L' 'W' 'EH' 'N' 'UW' 'OY' 'D' 'CH'
 'F' 'NG' 'AE' 'AW' 'SH' 'P' 'TH' 'LG' 'LS' 'G' 'JH' 'V' 'UH' 'NS' 'CG' ''
 'SPM' 'AHN' 'SP\x7f' 'SPAH' 'ZH' '{NS}' '{IG}' '{CG}' 'ST' 'OA' 'OH' 'U'
 ' IY' ' AY' 'N ' 'A']
Number of classes: 63


### Q 2.2 Create features and labels by aligning timestamps (4 points)

In [29]:
from collections import defaultdict
window_size = 10
features, labels = defaultdict(list), defaultdict(list)

for story in stories:
    print(story)
    # For each phoneme in the story, store a list of speech timestamps `t`
    # such that phoneme_start_time <= t <= phoneme_end_time.
    # This is an example of what `phoneme_time_windows` could look like:
    # [[0,1,2,5..10], [11,12,..27]...., [10000, 10001, ...10012]]
    # In the above example, the first 10 speech features map to the first phoneme,
    # features 11-27 match to second phoneme etc..
    phoneme_time_windows = [[] for _ in range(len(phonemes[story]))]
    for i in range(len(phonemes[story])):
        indices = np.where((speech_timestamps[story] >= phoneme_start_times[story][i]) &
                           (speech_timestamps[story] <= phoneme_end_times[story][i]))[0]
        phoneme_time_windows[i] = indices.tolist()

    for ix in range(len(phoneme_time_windows)):
        phoneme_time_windows[ix] = np.array(phoneme_time_windows[ix])
    # Compute and print the average number of speech features that match to 1 phoneme.
    # For example, for a story there could on average be 13 features per phoneme.
    # Store this value in `ptw_mean_length`. We will compute the difference between your
    # variable and our pre-computed values to grade.
    ptw_mean_length = np.mean([len(win) for win in phoneme_time_windows])
    # Finally, we are going to select the 10 most recent features per phoneme
    # and create our dataset.
    for ix in range(len(phoneme_time_windows)):
        # Discard any data points for which fewer than `window_size` speech features
        # map onto the phoneme.
        if phoneme_time_windows[ix].shape[0] < window_size:
            continue
        ph = phonemes[story][ix].upper().strip("0123456789")
        # Append the phoneme class ID.
        labels[story].append(phone2int[ph])
        # Append the `window_size` most recent features.
        # You will have to make use of `phoneme_time_windows` of course to find the labels.
        selected_indices = phoneme_time_windows[ix][-window_size:]
        features[story].append(speech_features[story][selected_indices, :])
    labels[story]= np.array(labels[story])
    features[story] = np.array(features[story])
    print(labels[story].shape, features[story].shape)

    # Just making sure everything iss the right shape!
    assert not np.any(np.isnan(features[story])), story
    assert labels[story].shape[0] == features[story].shape[0]
    assert np.all(features[story].shape[1:] == (window_size, 240))

souls
(2331,) (2331, 10, 240)
wheretheressmoke
(1781,) (1781, 10, 240)
thatthingonmyarm
(3260,) (3260, 10, 240)
hangtime
(2327,) (2327, 10, 240)
adollshouse
(1136,) (1136, 10, 240)
odetostepfather
(2710,) (2710, 10, 240)
sloth
(2920,) (2920, 10, 240)
myfirstdaywiththeyankees
(2195,) (2195, 10, 240)
buck
(1993,) (1993, 10, 240)
avatar
(2372,) (2372, 10, 240)
adventuresinsayingyes
(2730,) (2730, 10, 240)
exorcism
(3210,) (3210, 10, 240)
naked
(2472,) (2472, 10, 240)
haveyoumethimyet
(2568,) (2568, 10, 240)
stagefright
(1899,) (1899, 10, 240)
undertheinfluence
(1643,) (1643, 10, 240)
swimmingwithastronauts
(2420,) (2420, 10, 240)
itsabox
(2106,) (2106, 10, 240)
alternateithicatom
(2360,) (2360, 10, 240)
fromboyhoodtofatherhood
(2330,) (2330, 10, 240)
theclosetthatateeverything
(2025,) (2025, 10, 240)
tildeath
(1985,) (1985, 10, 240)
legacy
(2248,) (2248, 10, 240)


### Q 2.3 Set up the dataloaders (2 points)

In [30]:
batch_size = 1024

# Stack all the speech features of the training set stories.
# It is recommended to swap the 240-D and 10-D axes to make the convolution easier.
train_feats = np.concatenate([
    np.transpose(features[story], (0, 2, 1))  # (num_samples, 240, window_size)
    for story in data_split_stories['train']
], axis=0)
# Stack all the phoneme labels of the training set stories.
train_labels = np.concatenate([
    labels[story]
    for story in data_split_stories['train']
], axis=0)
print(train_feats.shape, train_labels.shape)
train_dataset = torch.utils.data.TensorDataset(torch.tensor(train_feats), torch.tensor(train_labels))
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# Repeat the process for the validation set.
val_feats = np.concatenate([
    np.transpose(features[story], (0, 2, 1))  # swap axes for convolution ease
    for story in data_split_stories['val']
], axis=0)
val_labels = np.concatenate([
    labels[story]
    for story in data_split_stories['val']
], axis=0)
print(val_feats.shape, val_labels.shape)
val_dataset = torch.utils.data.TensorDataset(torch.tensor(val_feats), torch.tensor(val_labels))
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

### NOTE: It is super important that you double check your array shapes.

(46763, 240, 10) (46763,)
(6258, 240, 10) (6258,)


### Q 2.4 Set up the training function. Note that this is identical to the FashionMNIST question (3 points)

In [31]:
from tqdm.notebook import tqdm

def train_network(model, train_loader, val_loader, criterion, optimizer, nepoch=100):
    try:
        for epoch in tqdm(range(nepoch)):
            print('EPOCH %d'%epoch)
            total_loss = 0
            count = 0
            for inputs, labels in train_loader:
                optimizer.zero_grad()
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()
                total_loss += loss.item()
                count += 1
            print('{:>12s} {:>7.5f}'.format('Train loss:', total_loss/count))
            with torch.no_grad():
                total_loss = 0
                count = 0
                for inputs, labels in val_loader:
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)
                    total_loss += loss.item()
                    count += 1
                print('{:>12s} {:>7.5f}'.format('Val loss:', total_loss/count))
            print()
    except KeyboardInterrupt:
        print('Exiting from training early')
    return

### Q 2.5 Define your CNN architecture. (3 points)

- Make sure to use at least two 1-D convolution layers followed by at least one fully connected layers.
- You will again find functions like `reshape`, `view`, `flatten` useful.
- Your convolutions should treat the 240-D as _channels_ and combine information over the context, ie., the 10-D.
- Build your model incrementally by checking the resultant output shape for each new layer you add! Test with an example batch.

Documentation for Conv1D: https://pytorch.org/docs/stable/generated/torch.nn.Conv1d.html

In [32]:
class CNNModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.nonlin = nn.ReLU()
        self.conv1 = nn.Sequential(
            nn.Conv1d(in_channels=240, out_channels=128, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.Conv1d(in_channels=128, out_channels=64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )
        self.fc1 = nn.Linear(64 * 5, nclass)

    def forward(self, x):
        x = self.conv1(x)
        x = x.view(x.size(0), -1)
        x = self.fc1(x)
        return x

### Q 2.6 Define your optimizeer and criterion for the phoneme classifier. (2 points)

In [33]:
model = CNNModel()
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4)

seed = 42
np.random.seed(seed)
torch.manual_seed(seed)

<torch._C.Generator at 0x7b99bc046770>

### Q 2.7 How are weights and biases being initialized for the convolution layers? (2 pts)

PyTorch's convolutional layers use Kaiming uniform initialization for the weights, and the biases are initialized to zeros.

### Q 2.8 Train your network (5 pts)
- Make sure you tune the hyperparameters!

Train and validation losses should be both less than 1.5, and should be generally decreasing over time.

In [34]:
model = CNNModel()
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4)

train_network(model, train_loader, val_loader, criterion, optimizer, nepoch=30)

  0%|          | 0/30 [00:00<?, ?it/s]

EPOCH 0
 Train loss: 2.36982
   Val loss: 1.78263

EPOCH 1
 Train loss: 1.47403
   Val loss: 1.55743

EPOCH 2
 Train loss: 1.28925
   Val loss: 1.47795

EPOCH 3
 Train loss: 1.19769
   Val loss: 1.43538

EPOCH 4
 Train loss: 1.12737
   Val loss: 1.39290

EPOCH 5
 Train loss: 1.07679
   Val loss: 1.39670

EPOCH 6
 Train loss: 1.03310
   Val loss: 1.34932

EPOCH 7
 Train loss: 1.00187
   Val loss: 1.33673

EPOCH 8
 Train loss: 0.96782
   Val loss: 1.37011

EPOCH 9
 Train loss: 0.93932
   Val loss: 1.35597

EPOCH 10
 Train loss: 0.91052
   Val loss: 1.33204

EPOCH 11
 Train loss: 0.87831
   Val loss: 1.33093

EPOCH 12
 Train loss: 0.85343
   Val loss: 1.36710

EPOCH 13
 Train loss: 0.83593
   Val loss: 1.33905

EPOCH 14
 Train loss: 0.81199
   Val loss: 1.33575

EPOCH 15
 Train loss: 0.78648
   Val loss: 1.37302

EPOCH 16
 Train loss: 0.76028
   Val loss: 1.36490

EPOCH 17
 Train loss: 0.74286
   Val loss: 1.36056

EPOCH 18
 Train loss: 0.72322
   Val loss: 1.33951

EPOCH 19
 Train loss: 

### Q 2.9 Set up the test function. Note that this is identical to the FashionMNIST question (3 pts)

In [35]:
def test_network(model, test_loader, mode):
    true, pred = [], []
    with torch.no_grad():
        for inputs, labels  in test_loader:
            outputs = model(inputs)
            predicted = outputs.argmax(dim=1)
            true.append(labels)
            pred.append(predicted)
    acc = (np.concatenate(true) == np.concatenate(pred)).mean()
    print('%s accuracy: %0.3f' % (mode, acc))
    true = np.concatenate(true)
    pred = np.concatenate(pred)
    return acc, true, pred

### Q 2.10 Write a function that takes in the model predictions and true labels, and returns the accuracy per phoneme class as a dictionary (3 pts)

For example, if the true labels contain 10 instances of the phoneme class "AY" and the model predicts this correctly 5 times, the entry `class_accuracy['AY'] = 0.5`.

In [36]:
def report_test_results(precomputed_acc, model_true, model_pred):
    # Accuracy per phoneme class.
    class_accuracy = np.zeros(nclass)
    for i in range(nclass):
        total_i = np.sum(model_true == i)
        class_accuracy[i] = np.sum((model_true == i) & (model_pred == i)) / total_i if total_i > 0 else 0
    # Write an assertion to check that the sum of correctly predicted
    # labels across all classes is equal to the total accuracy of the model.
    # It is good (and important) to always run such sanity checks!
    # PS. You will probably have to define more variables.
    assert np.allclose(np.sum(model_true == model_pred) / len(model_true), precomputed_acc)
    # Sort the phoneme classes in increasing order of accuracy.
    sorted_phones = np.array(phoneme_classes)[np.argsort(class_accuracy)]
    print('Top-3 predicted phones:', sorted_phones[-3:])
    return class_accuracy

### Q 2.11 Run tests on the validation data (2 pts)

In [37]:
val_precomputed_acc, val_model_true, val_model_pred = test_network(model, val_loader, 'Validation')
val_class_accuracy = report_test_results(
    val_precomputed_acc, val_model_true, val_model_pred)

Validation accuracy: 0.583
Top-3 predicted phones: ['AY' 'K' 'SP']


### Q 2.12 Print the validation class accuracy for the given phoneme classes. What do you observe? What could be the reason for this? (2 pts)

HINT: The answer is UNRELATED to the model.

In [38]:
for phoneme in ['OH', 'U']:
    idx = np.where(phoneme_classes == phoneme)[0][0]
    print(phoneme, val_class_accuracy[idx])

OH 0.0
U 0.0


### Q 2.13 We will now test your model on a hidden test set. So make sure your model definition and functions to compute test accuracy are clearly defined! (2 pts)

_(there is nothing for you to do here, this block exists for grading purposes only)_

In [40]:
data_split_stories['test'] = ['inamoment', 'eyespy', 'life', 'howtodraw']

for story in data_split_stories['test']:
    LOAD = np.load('/content/drive/My Drive/HW2_phone_class_data/HW2_phone_class_data/%s.npz'%story)
    speech_features[story] = LOAD['fbank_features']
    speech_timestamps[story] = LOAD['fbank_timestamps']
    phonemes[story] = LOAD['phonemes']
    phoneme_start_times[story] = LOAD['phoneme_start_times']
    phoneme_end_times[story] = LOAD['phoneme_end_times']
    print(story)
    print(speech_features[story].shape, speech_timestamps[story].shape)
    print(phonemes[story].shape, phoneme_start_times[story].shape, phoneme_end_times[story].shape)

features, labels = defaultdict(list), defaultdict(list)

for story in data_split_stories['test']:
    print(story)
    phoneme_time_windows = [[] for _ in range(len(phonemes[story]))]
    for time_index, time in enumerate(speech_timestamps[story]):
        # Find position which starts at or right before this timestamp.
        ix = np.where(phoneme_start_times[story] <= time)[0][-1]
        if phoneme_end_times[story][ix] > time:
            phoneme_time_windows[ix].append(time_index)
    for ix in range(len(phoneme_time_windows)):
        phoneme_time_windows[ix] = np.array(phoneme_time_windows[ix])
    ptw_lengths = np.array([len(ptw) for ptw in phoneme_time_windows])
    print(ptw_lengths.mean(), ptw_lengths.std())
    for ix in range(len(phoneme_time_windows)):
        if len(phoneme_time_windows[ix]) < window_size:
            continue
        ph = phonemes[story][ix].upper().strip("0123456789")
        labels[story].append(phone2int[ph])
        features[story].append(speech_features[story][phoneme_time_windows[ix]][-window_size:])
    labels[story]= np.array(labels[story])
    features[story] = np.array(features[story])
    print(labels[story].shape, features[story].shape)

    assert not np.any(np.isnan(features[story])), story
    assert labels[story].shape[0] == features[story].shape[0]
    assert np.all(features[story].shape[1:] == (window_size, 240))

# Set up the dataloaders
test_feats = np.vstack([features[story] for story in data_split_stories['test']])
test_feats = np.swapaxes(test_feats, 1, 2)
test_labels = np.hstack([labels[story] for story in data_split_stories['test']])
print(test_feats.shape, test_labels.shape)
test_dataset = torch.utils.data.TensorDataset(torch.tensor(test_feats), torch.tensor(test_labels))
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

precomputed_acc, model_true, model_pred = test_network(model, test_loader, 'test')
report_test_results(precomputed_acc, model_true, model_pred)

inamoment
(43031, 240) (43031,)
(3494,) (3494,) (3494,)
eyespy
(77941, 240) (77941,)
(7920,) (7920,) (7920,)


KeyboardInterrupt: 

### Q 2.14 Can you reuse your model as is with a different `window_size`? Why or why not? (2 pts)

No, the model is built for a fixed window_size, so its convolutional and fully-connected layers expect specific dimensions. Changing the window_size alters these dimensions, requiring architectural adjustments.

### Q 2.15 Can you reuse your model as is with a different dimensionality of speech features? Why or why not? (2 pts)

No, the model’s first convolution layer is fixed to the original feature dimensionality, so changing it will cause a mismatch.

## Problem 3. 1-D CNNs and model interpretation (27 pts)

In this problem you will again train a 1-D CNN on audio data, here to classify spoken numerals (e.g. "one", "two"). Then you will use additional techniques to interpret (since it's audio, one can't really say "visualize", but same idea) what the model is doing. Run these cells first to set things up.

In [41]:
import IPython.display as ipd

import torchaudio
from torchaudio import datasets as audiodatasets

audio_save_dir = './'#SPEECHCOMMANDS_data' # set to wherever you want to keep these files
sc_training = audiodatasets.SPEECHCOMMANDS(audio_save_dir, download=True, subset="training")
sc_validation = audiodatasets.SPEECHCOMMANDS(audio_save_dir, download=True, subset="validation")
sc_testing = audiodatasets.SPEECHCOMMANDS(audio_save_dir, download=True, subset="testing")

100%|██████████| 2.26G/2.26G [00:39<00:00, 61.2MB/s]


KeyboardInterrupt: 

In [42]:
# the audio will be downsampled to 8 kHz to make it easier to work with. just run this
new_sample_rate = 8000
transform = torchaudio.transforms.Resample(orig_freq=16000, new_freq=new_sample_rate)

In [43]:
# the dataset contains a lot of words, but let's focus on the numbers
sel_labels = ["zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine"]

training_inds = np.array([ii for ii,datum in enumerate(sc_training) if datum[2] in sel_labels])
validation_inds = np.array([ii for ii,datum in enumerate(sc_validation) if datum[2] in sel_labels])
testing_inds = np.array([ii for ii,datum in enumerate(sc_testing) if datum[2] in sel_labels])

NameError: name 'sc_training' is not defined

In [None]:
# define function that will pad each sample in a batch to the same length
def pad_sequence(batch):
    # Make all tensor in a batch the same length by padding with zeros
    batch = [item.t() for item in batch]
    batch = torch.nn.utils.rnn.pad_sequence(batch, batch_first=True, padding_value=0.)
    return batch.permute(0, 2, 1)

In [None]:
# make data loaders

def collate_fn(batch):
    tensors, targets = [], []
    for waveform, _, label, *_ in batch:
        tensors += [waveform]
        targets += [torch.Tensor([sel_labels.index(label)]).squeeze().long()]

    tensors = pad_sequence(tensors)
    targets = torch.stack(targets)

    return tensors, targets


batch_size = 64

train_loader = torch.utils.data.DataLoader(
    sc_training,
    batch_size=batch_size,
    collate_fn=collate_fn,
    sampler=SubsetRandomSampler(training_inds)
)
val_loader = torch.utils.data.DataLoader(
    sc_validation,
    batch_size=batch_size,
    drop_last=False,
    collate_fn=collate_fn,
    sampler=SubsetRandomSampler(validation_inds)
)
test_loader = torch.utils.data.DataLoader(
    sc_testing,
    batch_size=batch_size,
    drop_last=False,
    collate_fn=collate_fn,
    sampler=SubsetRandomSampler(testing_inds)
)

### Q 3.1: Plot the waveform and play the audio for one training sample (2 pts)
Set the x-axis values so that they show seconds.

In [None]:
# plot the waveform for one example (e.g., sc_training[150])

plt.figure(figsize=(5,2))
plt.plot(time, waveform[0].numpy())
plt.xlabel("Time (seconds)")
plt.axis("off")

# play the audio
ipd.Audio(waveform.numpy(),, rate=16000)

### Q 3.2: Define model (3 pts)
Assume you are given the basic number of channels, `n_channels`. This model should have the following layers:
* 1-D convolutional layer with `n_channels` kernels of length 80, with a stride of 16
* 1-D batch norm layer
* ReLU
* 1-D max pooling layer with kernel size 4, stride 4
* 1-D convolutional layer with `n_channels` kernels of size 3, with a stride of 1
* 1-D batch norm layer
* ReLU
* 1-D max pooling layer with same parameters as above
* 1-D convolutional layer with `2*n_channels` kernels of size 3, with a stride of 1
* 1-D batch norm layer
* ReLU
* 1-D max pooling with same parameters as above
* 1-D convolutional layer with `2*n_channels` kernels of size 3, with a stride of 1
* 1-D batch norm layer
* ReLU
* 1-D max pooling with same parameters as above
* average pooling across all remaining timepoints (see `AdaptiveAvgPool1d`)
* flattening
* Linear layer with `2*n_channels` inputs and 10 outputs

In [None]:
class WordRecognizer(nn.Module):
    def __init__(self, n_input=1, n_output=10, stride=16, n_channel=32):
        super().__init__()
        self.conv_layers = nn.Sequential(
            nn.Conv1d(in_channels=n_input, out_channels=n_channel, kernel_size=80, stride=stride),
            nn.BatchNorm1d(n_channel),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=4, stride=4),
            nn.Conv1d(in_channels=n_channel, out_channels=n_channel, kernel_size=3, stride=1),
            nn.BatchNorm1d(n_channel),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=4, stride=4),
            nn.Conv1d(in_channels=n_channel, out_channels=2*n_channel, kernel_size=3, stride=1),
            nn.BatchNorm1d(2*n_channel),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=4, stride=4),
            nn.Conv1d(in_channels=2*n_channel, out_channels=2*n_channel, kernel_size=3, stride=1),
            nn.BatchNorm1d(2*n_channel),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=4, stride=4)
        )
        self.avg_pool = nn.AdaptiveAvgPool1d(1)
        self.fc = nn.Linear(2*n_channel, n_output)

    def forward(self, x):
        x = self.conv_layers(x)
        x = self.avg_pool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

model = WordRecognizer(n_input=transformed.shape[0], n_output=len(sel_labels), n_channel=32)
print(model)

def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

n = count_parameters(model)
print("Number of parameters: %s" % n)

### Q 3.3: Write the training and test functions (5 pts)
Remember you need to transform the data (using the `transform` function) before you put it into the model!

In [None]:
# Run this cell first

def number_of_correct(pred, target):
    # count number of correct predictions
    return pred.squeeze().eq(target).sum().item()

def get_likely_index(tensor):
    # find most likely label index for each element in the batch
    return tensor.argmax(dim=-1)

# let's use StepLR to reduce the learning after 20 epochs by a factor of 10
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.1)
lossfunction = nn.CrossEntropyLoss()

In [None]:
optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4)


# write a function that does one training epoch
def train_one_epoch(model):
    model.train()
    total_loss = 0.0
    total_correct = 0
    total_samples = 0
    for inputs, targets in train_loader:
        inputs = transform(inputs)
        outputs = model(inputs)
        loss = lossfunction(outputs, targets)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        pred = get_likely_index(outputs)
        total_correct += number_of_correct(pred, targets)
        total_samples += targets.size(0)
    scheduler.step()
    print(f"Train Loss: {total_loss/len(train_loader):.3f}, Train Accuracy: {total_correct/total_samples:.3f}")

# write a function that computes & prints test accuracy
# (hint: use the `number_of_correct` and `get_likely_index` functions)
def test(model):
    model.eval()
    total_loss = 0.0
    total_correct = 0
    total_samples = 0
    with torch.no_grad():
        for inputs, targets in test_loader:
            # Transform the data before passing it to the model
            inputs = transform(inputs)
            outputs = model(inputs)
            loss = lossfunction(outputs, targets)
            total_loss += loss.item()
            pred = get_likely_index(outputs)
            total_correct += number_of_correct(pred, targets)
            total_samples += targets.size(0)
    test_acc = total_correct/total_samples
    print("Test Loss: {:.3f}, Test Accuracy: {:.3f}".format(total_loss/len(test_loader), test_acc))
    return test_acc

### Q 3.4: Fit the model! Tweak until you get test accuracy of at least 85% (3 pts)

In [None]:
n_epoch = 40

for epoch in range(n_epoch):
    train(model)
    test(model)
    scheduler.step()

### Q 3.5: Create contingency matrix to show performance (2 pts)
`contingency` should be a 10x10 matrix where each row is a predicted label and each column a true label. The value of each element in this matrix should be the number of times a test example with the true label given by the column was assigned the predicted label given by the row. For example, if the predicted label for an example was "seven" but the true label was "six", then you should add one to `contingency[7,6]`. Do this for all examples in the test set.

This is a nice way to visualize how well the model is working.

In [None]:
all_preds = []
all_targets = []

# get predictions for each test example. you can use same methods as `test` function, above
model.eval()
with torch.no_grad():
    for inputs, targets in test_loader:
        inputs = transform(inputs)
        outputs = model(inputs)
        pred = get_likely_index(outputs)
        all_preds.extend(pred.tolist())
        all_targets.extend(targets.tolist())

# create contingency table
#
contingency = np.zeros((10, 10))
for p, t in zip(all_preds, all_targets):
    contingency[p, t] += 1

plt.matshow(contingency)
plt.colorbar()

### Q 3.6: Create a new model and use it to generate optimized inputs (10 pts)
Finally, let's try to generate new sounds that would maximally activate each of the output units in our `WordRecognizer` model. Input optimization is one type of feature visualization (in this case, "audiolization") that can help understand how a model works.

We will do this by using gradient backpropagation to generate sounds that maximally activate each of the `WordRecognizer` outputs. This requires us to define a new model, `InputOptim`, which has one parameter tensor: `optimized_input`, the input that it is trying to optimize. In its `forward` method, this model should apply the pretrained `WordRecognizer` to its `optimized_input` and return the result. We will then use some optimizer (e.g. Adam) to make the `WordRecognizer` output the desired value. We will repeat this for each of the output classes ("zero", "one", "two", etc.). Then we will check to make sure that this works correctly (after optimization, the `WordRecognizer` should be 100% certain that the optimized input belongs to the desired class) and listen to the resulting sounds.

There are two things that you should consider here:
* How should you initialize `optimized_input`? There are many possibilities, and this choice will greatly affect your result.
* How should you do the optimization? Number of epochs, weight decay, and other optimization choices will also have a big effect on your result.

At the end of the problem you will be asked to write down some observations.

In [None]:
# first, define the new model
# this model should have one parameter tensor: the input vector you are optimization
# its `forward` function should apply the pretrained `WordRecognizer` model to this model's `optimized_input`

class InputOptim(nn.Module):
    def __init__(self, recognizer_model, input_shape=(1,1,8000)):
        super().__init__()
        self.recognizer_model = recognizer_model
        self.optimized_input = nn.Parameter(torch.randn(input_shape)) # initialize parameter tensor, should have requires_grad=True

    def forward(self, x):
        return self.recognizer_model(self.optimized_input)

    def parameters(self):
        return [self.optimized_input]

In [None]:
# next, train 10 `InputOptim` models, one for each output class ("zero", "one", etc.)
targets = torch.arange(10).long()
opt_stims = []

n_epochs = 300

for t in targets:
    input_model = InputOptim(recognizer_model=model, input_shape=(1,1,8000))
    optimizer = torch.optim.Adam(input_model.parameters(), lr=0.1, weight_decay=1e-4) # make sure this only optimizes the `input_model` parameters!
    lossfxn = nn.CrossEntropyLoss()

    print("target: ", t)
    for epoch in range(n_epochs):
        optimizer.zero_grad()
        output = input_model(None)
        target_tensor = torch.tensor([t])
        loss = lossfxn(output, target_tensor)
        loss.backward()
        optimizer.step()
        if epoch % 50 == 0:
            print(f"Epoch {epoch}, Loss: {loss.item():.3f}")

    opt_stims.append(input_model.optimized_input.detach().numpy())

In [None]:
# finally, use this cell to see model predictions for the optimized inputs & listen to the sounds

def predict(tensor):
    # Use the model to predict the label of the waveform
    logits = model(tensor.unsqueeze(0))
    tensor = get_likely_index(logits)
    tensor = sel_labels[tensor.squeeze().item()]
    return logits.squeeze().detach().numpy(), tensor

for ind in range(10):
    utterance = sel_labels[ind]

    probs, pred = predict(torch.Tensor(opt_stims[ind].reshape(1,8000)))
    print(f"Expected: {utterance}. Predicted: {pred}.")

    plt.figure()
    plt.bar(range(10), np.exp(probs) / np.exp(probs).sum())
    plt.xticks(range(10), sel_labels)

    ipd.Audio(opt_stims[ind].squeeze(), rate=8000)

### Q 3.7: What happens if you initialize `InputOptim` using a real sound, such as one of the training examples? What does this tell us about how this model works? (2 pts)

Initializing InputOptim with a real sound leads the optimization to refine the input while retaining recognizable acoustic features. This shows that the model's learned representation effectively captures real audio characteristics and is sensitive to meaningful input patterns.