In [8]:
"""
This is a starter file to get you going. You may also include other files if you feel it's necessary.

Make sure to follow the code convention described here:
https://github.com/UWARG/computer-vision-python/blob/main/README.md#naming-and-typing-conventions

Hints:
* The internet is your friend! Don't be afraid to search for tutorials/intros/etc.
* We suggest using a convolutional neural network.
* TensorFlow Keras has the CIFAR-10 dataset as a module, so you don't need to manually download and unpack it.
"""

# Import whatever libraries/modules you need

from ast import arg
from matplotlib import style
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from tqdm import tqdm
from re import L
from zmq import device

# Your working code here

class Net(nn.Module):
    def __init__(self):
        #create convolutional layers
        super().__init__()
        #kernel size of 5
        self.conv1 = nn.Conv2d(1, 16, 5)
        self.conv2 = nn.Conv2d(16, 32, 5)
        #flatten to go through linear layers
        nn.Flatten()
        self.fc1 = nn.Linear(800, 512)
        #10 classes
        self.fc2 = nn.Linear(512, 10)
    def convs(self, x):
        #maxpooling simplifies the picture with window size of 2
        x = F.max_pool2d(F.relu(self.conv1(x)), (2, 2))
        x = F.max_pool2d(F.relu(self.conv2(x)), (2, 2))
        return x
    def forward(self, x):
        x = self.convs(x)
        x = x.view(-1, 800)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        #run through the neural network
        return F.softmax(x, dim = 1)

def organize_data():
    #manage data from Tf dataset
    RGB_TO_GREY = [0.299, 0.587, 0.114]
    PIXEL_VALUE = 255.0
    #load CIFAR10 data set into np arrays
    (xTrain, yTrain), (xTest, yTest) = tf.keras.datasets.cifar10.load_data()
    #grey scale
    xTrain = np.dot(xTrain, RGB_TO_GREY)
    #conver to tensor
    xTrain = torch.Tensor(xTrain) 
    #convert to decimal values from 0-1
    xTrain = xTrain/PIXEL_VALUE
    #repeat for test set
    xTest = np.dot(xTest, RGB_TO_GREY)
    xTest = torch.Tensor(xTest) 
    xTest = xTest/PIXEL_VALUE
    #convert to tensor
    yTrain = torch.Tensor(yTrain)
    yTest = torch.Tensor(yTest)
    #make a new tensor of correct dimensions to input both images and labels into loss function
    ytrain = torch.zeros(len(xTrain), 10)
    ytest = torch.zeros(len(xTest), 10)
    #input correct value into correct index value for entire tensor
    for i in range(len(xTrain)):
        ytrain[i][(yTrain[i] - 1).long()] = 1.0
    for i in range(len(xTest)):
        ytest[i][(yTest[i] - 1).long()] = 1.0
    yTrain = ytrain
    yTest = ytest
    return xTrain, yTrain, xTest, yTest

def select_device():
    #choose device if you can use nvidea gpu
    if torch.cuda.is_available():
        device = torch.device('cuda:0')
    else:
        device = torch.device('cpu')
    return device

def train(net, xTrain, yTrain, xTest, yTest, device, loss_function, optimizer, EPOCHS, BATCH_SIZE, MODEL_NAME): 
    #use for graph later
    with open("model.log", "a") as f: 
        for epoch in range(EPOCHS):
            #use for loading bar
            for i in tqdm(range(0, len(xTrain), BATCH_SIZE)): 
                #array splice for each batch size
                #32, 32 because cnn can take 2d array
                x_batch = xTrain[i:i+BATCH_SIZE].view(-1, 1, 32, 32)
                y_batch = yTrain[i:i+BATCH_SIZE]
                #send to gpu
                x_batch, y_batch = x_batch.to(device), y_batch.to(device)
                #get accuracy and loss through helper function
                acc, loss = fwd_pass(x_batch, y_batch, net, loss_function, optimizer, train = True)
                if i % 10 == 0:
                    #for the graph
                    val_acc, val_loss = test(xTest, yTest, net, loss_function, optimizer, device, size = 100)
                    f.write(f"{MODEL_NAME}, {epoch}, {round(float(acc), 2)}, {round(float(loss), 4)}, {round(float(val_acc), 2)}, {round(float(val_loss), 4)}\n")
            print(f"Epoch: {epoch}. Loss: {loss}")

def batch_test(net, xTest, yTest, device, BATCH_SIZE): 
    with torch.no_grad():
        #test a batch of the test tensor
        x_batch = xTest[:BATCH_SIZE].view(-1, 1, 32, 32)
        y_batch = yTest[:BATCH_SIZE]
        x_batch, y_batch = x_batch.to(device), y_batch.to(device)
        #zero gradient
        net.zero_grad()
        #output of the cnn
        outputs = net(x_batch)
        #find matches of outputs and labels
        matches = [torch.argmax(i) == torch.argmax(j) for i, j in zip(outputs, y_batch)]
        #find accuracy of matches
        acc = matches.count(True)/len(matches)
        print("Test Accuracy: ", round(acc, 3))

def fwd_pass(X, y, net, loss_function, optimizer, train = False):
    if train:
        #if we are training, zero the gradient
        net.zero_grad()
    #outputs of the cnn
    outputs = net(X)
    #find matches of outputs and labels
    matches = [torch.argmax(i) == torch.argmax(j) for i, j in zip(outputs, y)]
    #find accuracy of matches
    acc = matches.count(True)/len(matches)
    #run through loss function 
    loss = loss_function(outputs, y)
    if train: 
        #if we are training, optimize the network
        loss.backward()
        optimizer.step()
    return acc, loss

def test(xTest, yTest, net, loss_function, optimizer, device, size = 32):
    #test for points on the graph
    X, y = xTest[:size], yTest[:size]
    val_acc, val_loss = fwd_pass(X.view(-1, 1, 32, 32).to(device), y.to(device), net, loss_function, optimizer)
    return val_acc, val_loss


def create_acc_loss_graph(model_name, loss_or_acc):
    contents = open("model.log", "r").read().split("\n")
    epochs = []
    accuracies = []
    losses = []
    test_accs = []
    test_losses = []
    for c in contents: 
        if model_name in c:
            #read the name of the model
            name, epoch, acc, loss, test_acc, test_loss = c.split(",")
            epochs.append(float(epoch))
            accuracies.append(float(acc))
            losses.append(float(loss))
            test_accs.append(float(test_acc))
            test_losses.append(float(test_loss))
    fig = plt.figure()
    ax1 = plt.subplot2grid((2,1), (0,0))
    #make the correct graph 
    if(loss_or_acc == "acc"):
        ax1.plot(epochs, accuracies, label = "acc")
        ax1.plot(epochs, test_accs, label = "test_acc")
    elif(loss_or_acc == "loss"):
        ax1.plot(epochs, losses, label = "loss")
        ax1.plot(epochs, test_losses, label = "test_loss")
    ax1.legend(loc = 2)
    plt.show() 

def main():
    #make all objects 
    device = select_device()
    net = Net().to(device)
    optimizer = optim.Adam(net.parameters(), lr = 0.001)
    loss_function = nn.MSELoss()
    xTrain, yTrain, xTest, yTest = organize_data()
    #make constants
    MODEL_NAME = f"model-{int(time.time())}"
    BATCH_SIZE = 100
    EPOCHS = 10  
    #train the model
    train(net, xTrain, yTrain, xTest, yTest, device, loss_function, optimizer, EPOCHS, BATCH_SIZE, MODEL_NAME)
    #test the model
    batch_test(net, xTest, yTest, device, BATCH_SIZE)
    style.use("ggplot")
    model_name = MODEL_NAME
    #show the accuracy/loss
    create_acc_loss_graph(model_name, "acc")

#run the program
main()

        



50000
