<a href="https://colab.research.google.com/github/MertYuksek/CIFAR-With-CNN/blob/main/cifarWithCnn.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [12]:
%reset
# import libraries
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader,TensorDataset
import copy
from sklearn.model_selection import train_test_split


import torchvision.transforms as transforms
import torchvision.datasets as datasets
# model summary info
from torchsummary import summary

import matplotlib.pyplot as plt
from IPython import display
display.set_matplotlib_formats('svg')

import torchvision.transforms as T
import torchvision
from torch.utils.data import DataLoader,Subset

Once deleted, variables cannot be recovered. Proceed (y/[n])? y


In [13]:
# use GPU if available
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

In [14]:
transform  = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,0.5,0.5), (0.5,0.5,0.5))])
trainset = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
devset  = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)
print(trainset.data.shape) # in loader shape is changed
print(devset.data.shape)
batchsize    = 32
train_loader = DataLoader(trainset,batch_size=batchsize,shuffle=True,drop_last=True)
dev_loader = DataLoader(devset,batch_size=batchsize,shuffle=True,drop_last=True)


Files already downloaded and verified
Files already downloaded and verified
(50000, 32, 32, 3)
(10000, 32, 32, 3)


In [15]:
# create a class for the model
def makeTheNet(printtoggle=False):

  class cifarnet(nn.Module):
    def __init__(self,printtoggle):
      super().__init__()
      
      # print toggle
      self.print = printtoggle

      ### -------------- feature map layers -------------- ###
      # first convolution layer
      self.conv1  = nn.Conv2d(3,64,3,padding=1)
      self.bnorm1 = nn.BatchNorm2d(64) # input the number of channels in this layer
      # output size: (32+2*1-3)/1 + 1 = 32/2 = 16 (/2 b/c maxpool)

      # second convolution layer
      self.conv2  = nn.Conv2d(64,128,3,padding=0)
      self.bnorm2 = nn.BatchNorm2d(128) # input the number of channels in this layer
      # output size: (16+2*0-3)/1 + 1 = 14/2 = 7 (/2 b/c maxpool)

      # Third convolution layer
      self.conv3  = nn.Conv2d(128,256,3,padding=0)
      self.bnorm3 = nn.BatchNorm2d(256) # input the number of channels in this layer
      # output size: (7+2*0-3)/1 + 1 = 5/2 = 2 (/2 b/c maxpool)

      ### -------------- linear decision layers -------------- ###
      self.fc1 = nn.Linear(2*2*256,256)
      self.bn1 = nn.BatchNorm1d(num_features=256)
      self.fc1_2 = nn.Linear(256,64)
      self.bn2 = nn.BatchNorm1d(num_features=64)
      self.fc2 = nn.Linear(64,10)

    def forward(self,x):
      
      if self.print: print(f'Input: {list(x.shape)}')
      
      # first block: convolution -> maxpool -> batchnorm -> relu
      x = F.max_pool2d(self.conv1(x),2)
      x = F.leaky_relu(self.bnorm1(x))
      if self.print: print(f'First CPR block: {list(x.shape)}')

      # second block: convolution -> maxpool -> batchnorm -> relu
      x = F.max_pool2d(self.conv2(x),2)
      x = F.leaky_relu(self.bnorm2(x))
      if self.print: print(f'Second CPR block: {list(x.shape)}')

      # third block: convolution -> maxpool -> batchnorm -> relu
      x = F.max_pool2d(self.conv3(x),2)
      x = F.leaky_relu(self.bnorm3(x))
      if self.print: print(f'Second CPR block: {list(x.shape)}')

      # reshape for linear layer
      nUnits = x.shape.numel()/x.shape[0]
      x = x.view(-1,int(nUnits))
      if self.print: print(f'Vectorized: {list(x.shape)}')
      
      # linear layers
      x = F.leaky_relu(self.fc1(x))
      x = self.bn1(x)
      x = F.dropout(x, p=0.5, training=self.training)
      x = F.leaky_relu(self.fc1_2(x))
      x = self.bn2(x)
      x = F.dropout(x, p=0.5, training=self.training)
      x = self.fc2(x)
      if self.print: print(f'Final output: {list(x.shape)}')

      return x

  # create the model instance
  net = cifarnet(printtoggle)
  
  # loss function
  lossfun = nn.CrossEntropyLoss()

  # optimizer
  optimizer = torch.optim.Adam(net.parameters(),lr=.001)

  return net,lossfun,optimizer

In [16]:
# test the model with one batch
net,lossfun,optimizer = makeTheNet(True)

X,y = iter(train_loader).next()
yHat = net(X)

# check size of output
print('\nOutput size:')
print(yHat.shape)

# # now let's compute the loss
loss = lossfun(yHat,torch.squeeze(y).long())
print(' ')
print('Loss:')
print(loss)

Input: [32, 3, 32, 32]
First CPR block: [32, 64, 16, 16]
Second CPR block: [32, 128, 7, 7]
Second CPR block: [32, 256, 2, 2]
Vectorized: [32, 1024]
Final output: [32, 10]

Output size:
torch.Size([32, 10])
 
Loss:
tensor(2.6906, grad_fn=<NllLossBackward0>)


In [7]:
# count the total number of parameters in the model
summary(net.to(device),(3,32,32))

Input: [2, 3, 32, 32]
First CPR block: [2, 64, 16, 16]
Second CPR block: [2, 128, 7, 7]
Second CPR block: [2, 256, 2, 2]
Vectorized: [2, 1024]
Final output: [2, 10]
----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 64, 32, 32]           1,792
       BatchNorm2d-2           [-1, 64, 16, 16]             128
            Conv2d-3          [-1, 128, 14, 14]          73,856
       BatchNorm2d-4            [-1, 128, 7, 7]             256
            Conv2d-5            [-1, 256, 5, 5]         295,168
       BatchNorm2d-6            [-1, 256, 2, 2]             512
            Linear-7                  [-1, 256]         262,400
       BatchNorm1d-8                  [-1, 256]             512
            Linear-9                   [-1, 64]          16,448
      BatchNorm1d-10                   [-1, 64]             128
           Linear-11                   [-1, 10]             650
To

In [17]:
# a function that trains the model

def function2trainTheModel():

  # number of epochs
  numepochs = 10
  
  # create a new model
  net,lossfun,optimizer = makeTheNet()

  # send the model to the GPU
  net.to(device)

  # initialize losses
  trainLoss = torch.zeros(numepochs)
  devLoss  = torch.zeros(numepochs)
  trainAccr  = torch.zeros(numepochs)
  devAccr   = torch.zeros(numepochs)


  # loop over epochs
  for epochi in range(numepochs):
    print("epoch : "+str(epochi))
    # loop over training data batches
    net.train()
    batchLoss = []
    batchErr  = []
    for X,y in train_loader:

      # push data to GPU
      X = X.to(device)
      y = y.to(device)

      # forward pass and loss
      yHat = net(X)
      loss = lossfun(yHat,y.long())

      # backprop
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()

      # loss and error from this batch
      batchLoss.append(loss.item())
      batchErr.append( torch.mean((torch.argmax(yHat,axis=1) == y.long()).float()).item() )
    # end of batch loop...

    # and get average losses and error rates across the batches
    trainLoss[epochi] = np.mean(batchLoss)
    trainAccr[epochi]  = 100*np.mean(batchErr)

    ### test performance
    net.eval()
    X,y = next(iter(dev_loader)) # extract X,y from test dataloader

    # push data to GPU
    X = X.to(device)
    y = y.to(device)

    with torch.no_grad(): # deactivates autograd
      yHat = net(X)
      loss = lossfun(yHat,y.long())
    
    #print("--------------------------------------------------------------")
    #t = torch.cuda.memory_stats(device)
    #print(t)
      
    # get loss and error rate from the test batch
    devLoss[epochi] = loss.item()
    devAccr[epochi]  = 100*torch.mean((torch.argmax(yHat,axis=1) == y.long()).float()).item()

    pl(trainLoss,devLoss,trainAccr,devAccr)
  # end epochs

  # function output
  return trainLoss,devLoss,trainAccr,devAccr,net

In [18]:
def pl(trainLoss,devLoss,trainAccr,devAccr):

  fig,ax = plt.subplots(1,2,figsize=(16,5))

  ax[0].plot(trainLoss,'s-',label='Train')
  ax[0].plot(devLoss,'o-',label='Dev')
  ax[0].set_xlabel('Epochs')
  ax[0].set_ylabel('Loss (Cross Entropy)')
  ax[0].set_title('Model loss')

  ax[1].plot(trainAccr,'s-',label='Train')
  ax[1].plot(devAccr,'o-',label='Dev')
  ax[1].set_xlabel('Epochs')
  ax[1].set_ylabel('Accuracy rates (%)')
  ax[1].set_title(f'Final model test accuracy rate: {devAccr[-1]:.2f}%')
  ax[1].legend()

  plt.show()


In [None]:
# ~2 minutes with 5 epochs on GPU
trainLoss,devLoss,trainAccr,devAccr,net = function2trainTheModel()

In [None]:
### ----  Convert RGB to Grayscale ---- ###

#trainTransform  = transforms.Compose([transforms.Grayscale(num_output_channels=1),
 #                                   transforms.ToTensor(), 
  #                                  transforms.Normalize((0.5), (0.5))])
#trainset = datasets.CIFAR10(root='./data', train=True, download=True,
 #   transform=trainTransform)
 #trainset[0][0].shape

In [None]:
# next issue: do we need to normalize the images?

#plt.hist(images[:10,:,:,:].view(1,-1).detach(),40);
#plt.title('Raw values')
#plt.show()

# yarp.
#images /= torch.max(images)

#plt.hist(images[:10,:,:,:].view(1,-1).detach(),40);
#plt.title('After normalization')
#plt.show()

In [None]:
# transformations
#transform = T.Compose([ T.ToTensor(),
 #                       T.Normalize([.5,.5,.5],[.5,.5,.5])
  #                     ])

# import the data and simultaneously apply the transform
#trainset = torchvision.datasets.CIFAR10(root='./data', train=True,  download=True, transform=transform)
#devtest  = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)

# split the devtest into two separate sets
#randidx = np.random.permutation(10000)   # a random permutation of indices
#devset  = Subset(devtest,randidx[:6000]) # Subset for devset
#testset = Subset(devtest,randidx[6000:]) # Subset for test

# transform to dataloaders
#batchsize    = 32
#train_loader = DataLoader(trainset,batch_size=batchsize,shuffle=True,drop_last=True)
#dev_loader   = DataLoader(devset,  batch_size=batchsize) # note: devtest in batches!
#test_loader  = DataLoader(testset, batch_size=len(testset))

In [None]:
# check size (should be images X channels X width X height
#print( train_loader.dataset.tensors[0].shape )
#print( train_loader.dataset.tensors[1].shape )

In [None]:
# Step 1: use scikitlearn to split the data dev and train sets
#train_data, dev_data, train_labels, dev_labels = train_test_split(images, labels, test_size=.1)

# Step 2: use scikitlearn to split the data dev and tes sets
#dev_data, test_data, dev_labels, test_labels = train_test_split(dev_data, dev_labels, test_size=.5)

# print(type(train_labels))
# print(type(dev_labels))
# print(type(test_labels))
# print(torch.FloatTensor(train_labels))

# Step 3: convert into PyTorch Datasets
#train_data = TensorDataset(train_data, torch.Tensor(train_labels))
#dev_data = TensorDataset(dev_data, torch.Tensor(dev_labels))
#test_data  = TensorDataset(test_data, torch.Tensor(test_labels))

# Step 4: translate into dataloader objects
#batchsize    = 32
#train_loader = DataLoader(train_data,batch_size=batchsize,shuffle=True,drop_last=True)
#dev_loader  = DataLoader(dev_data,batch_size=dev_data.tensors[0].shape[0])
#test_loader  = DataLoader(test_data,batch_size=test_data.tensors[0].shape[0])

In [None]:
# inspect a few random images

#fig,axs = plt.subplots(5,5,figsize=(10,10))

#for ax in axs.flatten():

  # select a random picture
  #randidx = np.random.choice(len(cdata.targets))

  # extract that image
  # using numpy array not tensor
  #pic = cdata.data[randidx,:,:,:]
  # and its label
  #label = cdata.classes[cdata.targets[randidx]]

  # and show!
  #ax.imshow(pic)
  #ax.text(16,0,label,ha='center',fontweight='bold',color='k',backgroundcolor='y')
  #ax.axis('off')

#plt.tight_layout()
#plt.show()

In [None]:
#classes = cdata.classes
#print(classes)
#labels = cdata.targets
#print(type(labels))
#images = torch.from_numpy(cdata.data).float()
#images = images.reshape(50000, 3, 32, 32)
#print(images.shape)