In [1]:
import numpy as np
from matplotlib import pyplot as plt
from matplotlib.lines import Line2D
import os
import torch
import torchvision
import json

from utils import visualizeWeights

from sklearn.model_selection import train_test_split

In [2]:
class mlp_1(torch.nn.Module):
    def __init__(self, input_size, num_classes):
        super(mlp_1, self).__init__()
        self.input_size = input_size
        self.fc1 = torch.nn.Linear(input_size, 32)
        self.relu = torch.nn.ReLU()
        self.fc2 = torch.nn.Linear(32, num_classes)

    def forward(self, x):
        x = x.view(-1, self.input_size)
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return x
    
class mlp_2(torch.nn.Module):
    def __init__(self, input_size, num_classes):
        super(mlp_2, self).__init__()
        self.input_size = input_size
        self.fc1 = torch.nn.Linear(input_size, 32)
        self.relu = torch.nn.ReLU()
        self.fc2 = torch.nn.Linear(32, 64, bias=False)
        self.fc3 = torch.nn.Linear(64, num_classes)

    def forward(self, x):
        x = x.view(-1, self.input_size)
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        x = self.fc3(x)
        return x

class cnn_3(torch.nn.Module):
    def __init__(self, num_classes):
        super(cnn_3, self).__init__()
        self.conv1 = torch.nn.Conv2d(in_channels = 1, out_channels = 16, kernel_size = 3, padding = 'valid')
        self.relu1 = torch.nn.ReLU()
        self.conv2 = torch.nn.Conv2d(in_channels = 16, out_channels = 8, kernel_size = 5, padding = 'valid')
        self.relu2 = torch.nn.ReLU()
        self.maxpool1 = torch.nn.MaxPool2d(kernel_size = 2, stride = 2)
        self.conv3 = torch.nn.Conv2d(in_channels = 8, out_channels = 16, kernel_size = 7, padding = 'valid')
        self.maxpool2 = torch.nn.MaxPool2d(kernel_size=2, stride=2)
        self.fc = torch.nn.Linear(16 * 3 * 3, num_classes)

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.conv2(x)
        x = self.relu2(x)
        x = self.maxpool1(x)
        x = self.conv3(x)
        x = self.maxpool2(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

class cnn_4(torch.nn.Module):
    def __init__(self, num_classes):
        super(cnn_4, self).__init__()
        self.conv1 = torch.nn.Conv2d(in_channels = 1, out_channels = 16, kernel_size = 3, padding = 'valid')
        self.relu1 = torch.nn.ReLU()
        self.conv2 = torch.nn.Conv2d(in_channels = 16, out_channels = 8, kernel_size = 3, padding = 'valid')
        self.relu2 = torch.nn.ReLU()
        self.conv3 = torch.nn.Conv2d(in_channels = 8, out_channels = 16, kernel_size = 5, padding = 'valid')
        self.relu3 = torch.nn.ReLU()
        self.maxpool1 = torch.nn.MaxPool2d(kernel_size = 2, stride = 2)
        self.conv4 = torch.nn.Conv2d(in_channels = 16, out_channels = 16, kernel_size = 5, padding = 'valid')
        self.relu4 = torch.nn.ReLU()
        self.maxpool2 = torch.nn.MaxPool2d(kernel_size = 2, stride = 2)
        self.fc = torch.nn.Linear(16 * 4 * 4, num_classes)

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.conv2(x)
        x = self.relu2(x)
        x = self.conv3(x)
        x = self.relu3(x)
        x = self.maxpool1(x)
        x = self.conv4(x)
        x = self.relu4(x)
        x = self.maxpool2(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

class cnn_5(torch.nn.Module):
    def __init__(self, num_classes):
        super(cnn_5, self).__init__()
        self.conv1 = torch.nn.Conv2d(in_channels = 1, out_channels = 8, kernel_size = 3, padding = 'valid')
        self.relu1 = torch.nn.ReLU()
        self.conv2 = torch.nn.Conv2d(in_channels = 8, out_channels = 16, kernel_size = 3, padding = 'valid')
        self.relu2 = torch.nn.ReLU()
        self.conv3 = torch.nn.Conv2d(in_channels = 16, out_channels = 8, kernel_size = 3, padding = 'valid')
        self.relu3 = torch.nn.ReLU()
        self.conv4 = torch.nn.Conv2d(in_channels = 8, out_channels = 16, kernel_size = 3, padding = 'valid')
        self.relu4 = torch.nn.ReLU()
        self.maxpool1 = torch.nn.MaxPool2d(kernel_size = 2, stride = 2)
        self.conv5 = torch.nn.Conv2d(in_channels = 16, out_channels = 16, kernel_size = 3, padding = 'valid')
        self.relu5 = torch.nn.ReLU()
        self.conv6 = torch.nn.Conv2d(in_channels = 16, out_channels = 8, kernel_size = 3, padding = 'valid')
        self.relu6 = torch.nn.ReLU()
        self.maxpool2 = torch.nn.MaxPool2d(kernel_size = 2, stride = 2)
        self.fc = torch.nn.Linear(8 * 4 * 4, num_classes)

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.conv2(x)
        x = self.relu2(x)
        x = self.conv3(x)
        x = self.relu3(x)
        x = self.conv4(x)
        x = self.relu4(x)
        x = self.maxpool1(x)
        x = self.conv5(x)
        x = self.relu5(x)
        x = self.conv6(x)
        x = self.relu6(x)
        x = self.maxpool2(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x


In [3]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"{device} is available")
print(torch.cuda.is_available())
print(torch.cuda.current_device())
print(torch.cuda.device(0))
print(torch.cuda.device_count())
print(torch.cuda.get_device_name(0))

cuda is available
True
0
<torch.cuda.device object at 0x000001DD7EBB1F90>
1
NVIDIA GeForce RTX 3050 Ti Laptop GPU


In [4]:
transform = torchvision.transforms.Compose([
    torchvision.transforms.ToTensor(),
    torchvision.transforms.Normalize((0.4914, 0.4822, 0.4465), (0.247, 0.243, 0.261)),
    torchvision.transforms.Grayscale()
])
BATCH_SIZE = 50
NUM_EPOCHS = 3
NUM_STEPS = 1
LEARNING_RATE = 0.01
# training set
trainset = torchvision.datasets.CIFAR10('./data', train=True, download=True, transform=transform)

#indices = np.arange(len(trainset))
#train_indices, test_indices = train_test_split(indices, test_size=0.1, stratify=trainset.targets, random_state=42)
#trainset = torch.utils.data.Subset(trainset, train_indices)
#valset = torch.utils.data.Subset(trainset, test_indices)
trainset, valset = train_test_split(trainset, test_size=0.1, random_state=42)
testset = torchvision.datasets.CIFAR10('./data', train=False, transform=transform)

trainloader = torch.utils.data.DataLoader(trainset, batch_size=BATCH_SIZE, shuffle=True)
valloader = torch.utils.data.DataLoader(valset, batch_size=BATCH_SIZE*100, shuffle=False)
testloader = torch.utils.data.DataLoader(testset, batch_size=BATCH_SIZE, shuffle=False)

classes = ('airplane', 'automobile', 'bird', 'cat','deer', 'dog', 'frog', 'horse', 'ship', 'truck')

print('Training set has {} instances'.format(len(trainset)))
print('Test set has {} instances'.format(len(testset)))
print('Validation set has {} instances'.format(len(valset)))

Files already downloaded and verified
Training set has 45000 instances
Test set has 10000 instances
Validation set has 5000 instances


In [5]:
def val(model_, outputs_, labels_, loss_,step_train_loss_, step_train_acc_, step_val_acc_):
    model_.eval()

    _, predicted = torch.max(outputs_.data, 1)
    n_samples = labels_.size(0)
    n_correct = (predicted == labels_).sum().item()
    
    training_acc = 100.0 * n_correct / n_samples
    val_loss = loss_.item()
    val_total = 0
    val_correct = 0

    for i, val_data in enumerate(valloader, 0):
        val_inputs, val_labels = val_data[0].to(device), val_data[1].to(device)
        
        val_outputs = model_(val_inputs)
        
        _, val_predicted = torch.max(val_outputs.data, 1)
        val_total += val_labels.size(0)
        val_correct += (val_predicted == val_labels).sum().item()

    val_acc = 100.0 * val_correct / val_total

    step_train_loss_.append(val_loss)
    step_train_acc_.append(training_acc)
    step_val_acc_.append(val_acc)
    return None

In [None]:
def test(model, step_test_acc_, best_weight_, best_acc_):
    with torch.no_grad():
        model.eval()
        testloader = torch.utils.data.DataLoader(testset, batch_size=BATCH_SIZE, shuffle=False)
        n_correct = 0
        n_samples = 0
        
#        n_class_correct = [0 for i in range(10)]
#        n_class_samples = [0 for i in range(10)]

        for images, labels in testloader:
            images, labels = images.to(device), labels.to(device)

            outputs = model(images)

            _, predicted = torch.max(outputs, 1)
            n_samples += labels.size(0)
            n_correct += (predicted == labels).sum().item()

#            for i in range(BATCH_SIZE):
#                label = labels[i]
#                pred = predicted[i]
#                if (label == pred):
#                    n_class_correct[label] += 1
#                n_class_samples[label] += 1

        test_acc = 100.0 * n_correct / n_samples
        step_test_acc_.append(test_acc)
        
        if(test_acc > best_acc_):
            best_acc_ = test_acc
            model.to('cpu')
            if (model.__class__.__name__ == 'mlp_1' or model.__class__.__name__ == 'mlp_2'):
                best_weight_ = model.fc1.weight.data.numpy()
            else:
                best_weight_ = model.conv1.weight.data.numpy()
            model.to(device)

        #print(f'Accuracy of the network on the test images: {test_acc} %')
        #for i in range(10):
            #class_acc = 100.0 * n_class_correct[i] / n_class_samples[i]
            #print(f'Accuracy of {classes[i]}: {class_acc} %')

    return best_weight_, best_acc_


In [None]:
def train(model_, optimizer_fn, loss_fn, step_train_loss_, step_train_acc_, step_val_acc_,epoch_,step_):
    trainloader = torch.utils.data.DataLoader(trainset, batch_size=BATCH_SIZE, shuffle=True)

    for i, data in enumerate(trainloader, 0):
        model_.train()
        inputs, labels = data[0].to(device), data[1].to(device)
        
        # forward + backward + optimize
        outputs = model_(inputs)
        loss = loss_fn(outputs, labels)
        optimizer_fn.zero_grad()
        loss.backward()
        optimizer_fn.step()

        if i % 100 == 99:
            val(model_, outputs, labels, loss, step_train_loss_, step_train_acc_, step_val_acc_)
        
    return None


In [None]:
def hw3_1():
    model_mlp1 = mlp_1(1024, 10).to(device)
    model_mlp2 = mlp_2(1024, 10).to(device)
    model_cnn3 = cnn_3(10).to(device)
    model_cnn4 = cnn_4(10).to(device)
    model_cnn5 = cnn_5(10).to(device)
    
    models = [model_mlp1, model_mlp2, model_cnn3, model_cnn4, model_cnn5]
    models = [model_cnn5]
    
    for model in models:
        model_name = model.__class__.__name__

        model_train_loss = []
        model_train_acc = []
        model_val_acc = []
        best_weight = 0
        best_acc = 0

        for step in range(NUM_STEPS):
            optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
            criterion = torch.nn.CrossEntropyLoss().to(device)

            step_train_loss = []
            step_train_acc = []
            step_val_acc = []
            step_test_acc = []

            for epoch in range(NUM_EPOCHS):
                print(f'Epoch {epoch+1}/{NUM_EPOCHS}  {step+1}/{NUM_STEPS} for {model_name} model')
                train(model, optimizer, criterion, step_train_loss, step_train_acc, step_val_acc, epoch, step)

            model_train_loss.append(step_train_loss)
            model_train_acc.append(step_train_acc)
            model_val_acc.append(step_val_acc)
            print(f"step_train_loss: {step_train_loss}, step_train_acc: {step_train_acc}, step_val_acc: {step_val_acc}")
            best_weight, best_acc = test(model, step_test_acc, best_weight, best_acc)

        avg_train_loss = [sum(x)/len(x) for x in zip(*model_train_loss)]
        avg_train_acc = [sum(x)/len(x) for x in zip(*model_train_acc)]
        avg_valid_acc = [sum(x)/len(x) for x in zip(*model_val_acc)]
        model_result = {
            'name': model_name,
            'loss_curve': avg_train_loss,
            'train_acc_curve': avg_train_acc,
            'val_acc_curve': avg_valid_acc,
            'test_acc': best_acc,
            'weights': best_weight.tolist(),
        }

        with open("q2_"+model_name+".json", "w") as outfile:
            json.dump(model_result, outfile)

        visualizeWeights(best_weight, save_dir='Q3', filename='input_weights_'+model_name)

    print("Training Done")

    return None

In [None]:
hw3_1()