In [None]:
#%reset -f
#importing libraries
import torchvision
import torch 
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset

from PIL import Image
import cv2 as cv
import numpy as np

from pathlib import Path, PurePath
import glob
import itertools

import time
import os
import matplotlib.pyplot as plt
import numpy as np

In [None]:
#reading in the data
#specifying data directory, current folder of this file joined with the folder containing the data
DATA_FOLDER = os.path.join(Path.cwd(), "cifar-10-batches-py/")

#gets filenames
batch_names = sorted(glob.glob(f"{DATA_FOLDER}/data*"))
test_names = os.path.join(DATA_FOLDER, "test_batch")

def unpickle(file):
    import pickle
    with open(file, "rb") as f:
        dict = pickle.load(f, encoding="bytes")
    return dict

#unpickles and divides the cifar batches into data and labels as a list of dictionaries,
#also reshapes the 1000*3072 (n_pictures*color_channels) arrays into 3*32*32(n_channels, width, height)
def read_data_label_set(filepaths):
    
    if isinstance(filepaths, list):
    
        li=[]
        for file in filepaths:
            unpacked = unpickle(file)
            data = torch.FloatTensor(list(map(lambda x : np.reshape(x, (3, 32, 32)), unpacked[b'data'])))
            labels = torch.tensor(unpacked[b'labels'])
            li.append({"data":data, "labels":labels})
        return li
    else:
        unpacked = unpickle(filepaths)
        data = torch.FloatTensor(list(map(lambda x : np.reshape(x, (3, 32, 32)), unpacked[b'data'])))
        labels = torch.tensor(unpacked[b'labels'])
        dic = {"data":data, "labels":labels}
        return dic


In [None]:
#computes mean and std for the normalization
unnorm_batches = read_data_label_set(batch_names)
#concatenates 3 channels of all the batches
concatChan = torch.cat([unnorm_batches[i]["data"] for i in range(len(unnorm_batches))]).permute(1,2,3,0).reshape(3,-1)
#computes mean and st_dev
mean = concatChan.mean(dim=1)/255
st_dev = concatChan.std(dim=1)/255
#Transformations for the dataset, that turn it into a tensor and normalize
transf = torchvision.transforms.Compose([
    #torchvision.transforms.ToTensor(),
    torchvision.transforms.Normalize(mean = mean, std = st_dev)]
)

In [None]:
#Dataset class for cifar10 to put into a dataloader later
class Cif10Set(Dataset):
    def __init__(self, data_path, transform=None):
        dataSetDict_List = read_data_label_set(data_path)
        if isinstance(dataSetDict_List, list):
            self.data = np.concatenate([dataSetDict_List[i]["data"] for i in range(len(dataSetDict_List))])/255
            self.label = np.concatenate([dataSetDict_List[i]["labels"] for i in range(len(dataSetDict_List))])
            self.length = self.data.shape[0]
            self.transform = transform
        else: 
            self.data =  dataSetDict_List["data"]/255
            self.label = dataSetDict_List["labels"]
            self.length = self.data.shape[0]
            self.transform = transform
    
    def __getitem__(self, index):
        if self.transform:
            if not torch.is_tensor(self.data):
                return self.transform(torch.from_numpy(self.data[index])), self.label[index]
            else:
                return self.transform(self.data[index]), self.label[index]
        else:
            return self.data[index], self.label[index]
    
    def __len__(self):
        return self.length

In [None]:
#Sets datasets for training and testing
#Training dataset
train_set = Cif10Set(batch_names, transform = transf)
#Testing dataset
test_set = Cif10Set(test_names, transform = transf)

In [None]:
#Sets dataloaders for training and testing
#Training dataloader
train_load = DataLoader(train_set, shuffle=True, batch_size=100)
#Test dataloader
test_load = DataLoader(test_set, shuffle=True, batch_size=100)

In [None]:
# Defines a convolutional network class
class ConvNet(nn.Module):
    # conv reduction: (W-F+2P)/S +1
    # W:input, F:filter, P:padding, S:stride
    def __init__(self):
        super(ConvNet, self).__init__()
        self.network = nn.Sequential(

            # 3*32*32
            nn.Conv2d(in_channels=3, out_channels=128, kernel_size=3,
                      padding=1, stride=1),  # ->128*32*32
            nn.MaxPool2d(kernel_size=2, stride=2),  # ->128*16*16
            nn.ReLU(),

            # 128*16*16
            nn.Conv2d(in_channels=128, out_channels=512,
                      kernel_size=3, padding=1, stride=1),  # ->512*16*16
            nn.MaxPool2d(kernel_size=2, stride=2),  # ->512*8*8
            nn.ReLU(),

            # ->512*8*8
            nn.Conv2d(in_channels=512, out_channels=512,
                      kernel_size=3, padding=1, stride=1),  # ->512*8*8
            nn.MaxPool2d(kernel_size=2, stride=2),  # ->512*4*4
            nn.ReLU(),

            # ->512*4*4
            nn.Conv2d(in_channels=512, out_channels=512,
                      kernel_size=3, padding=1, stride=1),  # ->512*4*4
            nn.MaxPool2d(kernel_size=2, stride=2),  # ->512*2*2
            nn.ReLU(),

            # ->512*2*2
            nn.Conv2d(in_channels=512, out_channels=512,
                      kernel_size=3, padding=1, stride=1),  # ->512*2*2
            nn.MaxPool2d(kernel_size=2, stride=2),  # ->512*1*1
            nn.ReLU(),

            # Flatten
            nn.Flatten(),  # ->512
            nn.Linear(512, 10)  # ->10
        )

    def forward(self, data):
        x = self.network(data)
        return x

In [None]:
# initializes model
model = ConvNet()
print(model)
numel_list = [p.numel() for p in model.parameters()]
print(sum(numel_list), numel_list)

# defines optimizer, loss function and the hyperparameters
# hyperparameters
learning_rate = 0.009
epochs = 3
loss_meas = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9)

In [None]:
#load the trained model
check_poi = torch.load("./cifar_m_FromCell.pt")
model = ConvNet()
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9)

model.load_state_dict(check_poi["model_state_dict"])
optimizer.load_state_dict(check_poi["optimizer_state_dict"])

In [None]:
#TODO improve outputs, shuffle, add dropout, add batch norm, add augment, add cross validation
# training loop
for epoch in range(epochs):
    running_loss = 0.0
    
    # iterates over batches
    for batch_num, (imageData, labels) in enumerate(train_load):
        # forward pass        

        #model_out = model(batch["data"])
        model_out = model(imageData)

        #loss = loss_meas(model_out, batch["labels"])
        loss = loss_meas(model_out, labels)

        #optimization and backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss+=loss.item()

        #print statistics
        if batch_num*100 % 2000 == 0:    # print every 20 mini-batches
            print('[%d, %5d] loss: %.4f' %(epoch + 1, batch_num*100, running_loss / 2000))
            running_loss = 0.0
"""
    #saves a model every epoch, uncommented because space UgU            
    savename = "cifar_m"+str(epoch)+".pt"
    torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': running_loss,
            }, savename)         
"""

In [None]:
#Save current model manually
savename = "cifar_m_FromCell.pt"
torch.save({
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'loss': running_loss,
        }, savename)    

In [None]:
#validation of the model
classes = ('plane', 'car', 'bird', 'cat',
           'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

total = 0
correct = 0
class_correct = list(0. for i in range(10))
class_total = list(0. for i in range(10))

with torch.no_grad():
    for batch_num, batch in enumerate(test_load):
        images, labels = batch
        outputs = model(images)
        _, predicted = torch.max(outputs, 1)
        c = (predicted == labels).squeeze()
        for i in range(4):
            label = labels[i]
            class_correct[label] += c[i].item()
            class_total[label] += 1
        total += labels.size()[0]
        correct += (predicted == labels).sum().item()


for i in range(10):
    print('Accuracy of %5s : %2d %%' % (classes[i], 100 * class_correct[i] / class_total[i]))
print("Accuracy: ", (correct/total)*100,"%")

In [None]:
#A cell to test taken pictures in the same folder as this file
imgNames = glob.glob(os.path.join(Path.cwd(), "*.png"))
img = [cv.imread(file) for file in imgNames]
print(glob.glob(os.path.join(Path.cwd(), "*.png")))
resized = torch.tensor([cv.resize(img[i], dsize=(32, 32), interpolation=cv.INTER_CUBIC) for i in range(len(img))])  
transposed = torch.stack([torch.transpose(resized[i], 2, 0) for i in range(len(resized))])
inp = transposed.float()/255

outputs = model(inp)
print(outputs)

_, predicted = torch.max(outputs, 1)


print([img.split("/")[-1] for img in imgNames])
print(predicted)

In [None]:
#shows image at index
img = unnorm_batches[0]["data"][0]/255
plt.imshow(img.permute((1, 2, 0)))

isinstance(train_set.data, list)
torch.is_tensor(test_set.data)