<a href="https://colab.research.google.com/github/Jiyeong-Oh/CIFAR-10-Classification-with-MLP-and-ResNet-20/blob/main/%5BCode%5D%20CIFAR_10_Classification_with_MLP_and_ResNet_20.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Data Setting

In [None]:
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import pandas as pd


transform = transforms.Compose ([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5 ))])
batch_size = 64
trainset = torchvision.datasets.CIFAR10(root ='./ data', train =True, download =True, transform = transform)
testset = torchvision.datasets.CIFAR10(root ='./ data', train =False, download =True, transform = transform)
classes = ('plane', 'car', 'bird', 'cat','deer', 'dog', 'frog', 'horse', 'ship', 'truck')

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./ data/cifar-10-python.tar.gz


  0%|          | 0/170498071 [00:00<?, ?it/s]

Extracting ./ data/cifar-10-python.tar.gz to ./ data
Files already downloaded and verified


In [None]:
trainset.data.shape

(50000, 32, 32, 3)

In [None]:
cuda = torch.device('cuda')
torch.cuda.is_available()

True

In [None]:
def val_split(attr, tar): # attr: features, tar: target label
  # Making Dictionary with Answer Label as Key and its Indices as Value
  # so that we can split them into train/validation/test with the same digit propotion.
  y_dict = dict(zip([i for i in np.unique(tar)], [list(np.where(tar==i))[0] for i in np.unique(tar)]))
  y_dict = dict(zip([i for i in np.unique(tar)], [list(np.random.choice(y_dict[i], len(y_dict[i]), replace = False)) for i in y_dict.keys()])) # Shuffling

  train_idx=[]
  val_idx=[]
  val_tmp = [y_dict[i][:500] for i in sorted(y_dict.keys())] # Trimming validation set (500 for each label)
  train_tmp = [y_dict[i][500:] for i in sorted(y_dict.keys())] #Ttrimming training set (leftovers)
  
  for i in range(len(sorted(y_dict.keys()))):
    train_idx+=train_tmp[i]
    val_idx+=val_tmp[i]

  # Converting into Tensor Type
  tar = pd.Series(tar)
  X_train, y_train = torch.tensor(attr[train_idx], device=cuda), torch.tensor(tar.loc[train_idx].to_numpy(), device=cuda)
  X_val, y_val = torch.tensor(attr[val_idx], device=cuda), torch.tensor(tar.loc[val_idx].to_numpy(), device=cuda)
  
  return X_train, y_train, X_val, y_val

# Problem 1: MLP for CIFAR-10

In [None]:
###############################
# Remote Controller
###############################
epochs_num = 5
batch_size = 64
optimizer_choose = 'Adam'
learning_rate = 0.0001
final_test = True # only true for final test accuracy

class MLP_multi(nn.Module):
  def __init__(self, input_dim, output_dim):
    super(MLP_multi, self).__init__()
    self.module1 = nn.Linear(input_dim, 256, bias=True) #input layer -> hidden layer1
    self.module2 = nn.Linear(256, 256, bias=True) #hidden layer1 -> hidden layer2
    self.module3 = nn.Linear(256, output_dim, bias=True) #hidden layer 2 -> output layer. here, the output layer should be the number of classes (10)

  def forward(self, x):
    x = F.relu(self.module1(x))
    x = F.relu(self.module2(x))
    y_pred = self.module3(x)
    # softmax is gonna be applied when cross entropy loss is calculated
    return y_pred

In [None]:
def multiMLP(trainSet, testSet, epochs_num=epochs_num, batch_size=batch_size, optimizer_choose=optimizer_choose, learning_rate=learning_rate, final_test=final_test):
  # Data Splitting
  X_train, y_train, X_val, y_val = val_split(trainSet.data.astype('float32'), np.array(trainSet.targets, dtype=np.int64))
  X_test, y_test = torch.tensor(testSet.data.astype('float32')), torch.tensor(np.array(testSet.targets, dtype=np.int64))
  
  #########################################
  ## reshaping Xs, only for MLP
  #########################################
  X_train = X_train.reshape(X_train.shape[0], 32*32*3)
  X_val = X_val.reshape(X_val.shape[0], 32*32*3)
  X_test = X_test.reshape(X_test.shape[0], 32*32*3)
  data_size = len(X_train)

  # Model Training
  model = MLP_multi(X_train.shape[1], 10).cuda()
  if optimizer_choose == 'SGD':
    optimizer = optim.SGD(model.parameters(), lr = learning_rate)
  elif optimizer_choose == 'Adam':
    optimizer = optim.Adam(model.parameters(), lr = learning_rate)
  model.train()
  criterion = nn.CrossEntropyLoss() # loss function

  for epoch in range(epochs_num):
    cost = 0
    batch = torch.randperm(data_size) # batch mixing
    for i in range(0, data_size, batch_size):
      input = X_train[batch[i:i+batch_size]].cuda()
      answer = y_train[batch[i:i+batch_size]]
      optimizer.zero_grad()
      prediction = model(input)
      loss = criterion(prediction, answer)
      loss.backward()
      optimizer.step()
      cost += loss.data

    # Validation Check
    model.eval()
    with torch.no_grad():
      val_prediction = model(X_val.cuda())
      acc_count = 0
      for i in range(len(val_prediction)):
        pred = torch.argmax(val_prediction[i])
        if pred==y_val[i]:
          acc_count+=1
      # Train Loss & Validation Accuracy
      print('Epoch Train Loss {}: {: .2f}'.format(epoch,cost),  ", Validation Accuracy: {: .2f}".format(acc_count/len(y_val)))
    
  # Final Test Accuracy Check
  if final_test==True:
    model.eval()
    with torch.no_grad():
      test_prediction = model(X_test.cuda())
      acc_count = 0
      for i in range(len(test_prediction)):
        pred = torch.argmax(test_prediction[i])
        if pred==y_test[i]:
          acc_count+=1
      print("Final Test Accuracy: {: .2f}".format(acc_count/len(y_test)))

In [None]:
multiMLP(trainset, testset, epochs_num, batch_size, optimizer_choose, learning_rate, final_test)

Epoch Train Loss 0:  1757.92 , Validation Accuracy:  0.32
Epoch Train Loss 1:  1309.56 , Validation Accuracy:  0.37
Epoch Train Loss 2:  1243.85 , Validation Accuracy:  0.37
Epoch Train Loss 3:  1198.21 , Validation Accuracy:  0.39
Epoch Train Loss 4:  1174.15 , Validation Accuracy:  0.40
Final Test Accuracy:  0.40


# Problem 2: ResNet-20 for CIFAR-10

In [None]:
class Building_Block(nn.Module):
  def __init__(self, input_dim, output_dim, downsample=None, stride=1):
    super(Building_Block, self).__init__()
    self.conv1 = nn.Conv2d(input_dim, output_dim, kernel_size=3, stride=stride, padding=1, bias=False)
    self.batchNorm1 = nn.BatchNorm2d(output_dim)
    self.relu = nn.ReLU(inplace=True)
    self.conv2 = nn.Conv2d(output_dim, output_dim, kernel_size=3, stride=1, padding=1, bias=False)
    self.batchNorm2 = nn.BatchNorm2d(output_dim)
    self.downsample = downsample
    self.stride = stride

  def forward(self, x):
    identity_mapping = x

    out = self.conv1(x)
    out = self.batchNorm1(out)
    out = self.relu(out)
    out = self.conv2(out)
    out = self.batchNorm2(out)

    # skip connection
    if self.downsample != None:
      identity_mapping = self.downsample(x)
    out += identity_mapping
    out = self.relu(out)

    return out

In [None]:
class ResNet_20(nn.Module):
  def __init__(self):
    super(ResNet_20, self).__init__()
    self.input_dim = 64

    # 가장 초기 입력 시 사용 convolution
    self.conv1 = nn.Conv2d(in_channels=3, out_channels=16, kernel_size=3, stride=1, padding=1, bias=False)
    self.batchNorm1 = nn.BatchNorm2d(16)
    self.relu = nn.ReLU(inplace=True)
    # self.pooling = nn.MaxPool2d(3, stride=2) # for conv2_x

    # layer에 들어가는.
    self.conv2_x = self.make_layer(16, 16)
    self.conv4_x = self.make_layer(16, 32)
    self.conv6_x = self.make_layer(32, 64)


    self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
    self.fc = nn.Linear(64, 10)


  def make_layer(self,in_channel, out_channel):
    if in_channel != out_channel:
      downsample = nn.Sequential(
          nn.Conv2d(in_channel, out_channel, kernel_size=3, stride=2, padding=1, bias=False),
          nn.BatchNorm2d(out_channel)
      )
      layers_list = nn.ModuleList([Building_Block(in_channel, out_channel, downsample=downsample, stride=2)])
    else:
      layers_list = nn.ModuleList([Building_Block(in_channel, out_channel, downsample=None, stride=1)])

    for i in range(2): # 3-1 = 2
      layers_list.append(Building_Block(out_channel, out_channel, downsample=None, stride=1))
    return nn.Sequential(*layers_list)
  
  def forward(self, x):
    #first layer
    out = self.conv1(x)
    out = self.batchNorm1(out)
    out = self.relu(out)

    out = self.conv2_x(out)
    out = self.conv4_x(out)
    out = self.conv6_x(out)

    out = self.avgpool(out)
    out = torch.reshape(out, (-1,batch_size))
    y_pred = self.fc(out)

    return y_pred
  
  def implement(self, x):
    return self.forward(x)

In [None]:
epochs_num = 5
batch_size = 64
optimizer_choose = 'Adam'
learning_rate = 0.0001
final_test = True # only true for final test accuracy

def ResNet(trainSet, testSet, epochs_num=epochs_num, batch_size=batch_size, optimizer_choose=optimizer_choose, learning_rate=learning_rate, final_test=final_test):
  # Data Splitting
  X_train, y_train, X_val, y_val = val_split(trainSet.data.astype('float32'), np.array(trainSet.targets, dtype=np.int64))
  X_test, y_test = torch.tensor(testSet.data.astype('float32')), torch.tensor(np.array(testSet.targets, dtype=np.int64))
  data_size = len(X_train)

  # Model Training
  model = ResNet_20().cuda()
  if optimizer_choose == 'SGD':
    optimizer = optim.SGD(model.parameters(), lr = learning_rate)
  elif optimizer_choose == 'Adam':
    optimizer = optim.Adam(model.parameters(), lr = learning_rate)
  model.train()
  criterion = nn.CrossEntropyLoss() # loss function

  for epoch in range(epochs_num):
    cost = 0
    batch = torch.randperm(data_size) # batch mixing
    for i in range(0, data_size, batch_size):
      input = X_train[batch[i:i+batch_size]].cuda().permute(0,3,1,2)
      answer = y_train[batch[i:i+batch_size]]
      optimizer.zero_grad()
      prediction = model(input)
      loss = criterion(prediction, answer)
      loss.backward()
      optimizer.step()
      cost += loss.data

    # Validation Check
    model.eval()
    with torch.no_grad():
      val_prediction = model(X_val.cuda().permute(0,3,1,2))
      acc_count = 0
      for i in range(len(val_prediction)):
        pred = torch.argmax(val_prediction[i])
        if pred==y_val[i]:
          acc_count+=1
      # Train Loss & Validation Accuracy
      print('Epoch Train Loss {}: {: .2f}'.format(epoch,cost),  ", Validation Accuracy: {: .2f}".format(acc_count/len(y_val)))
    
  # Final Test Accuracy Check
  if final_test==True:
    model.eval()
    with torch.no_grad():
      test_prediction = model(X_test.cuda().permute(0,3,1,2))
      acc_count = 0
      for i in range(len(test_prediction)):
        pred = torch.argmax(test_prediction[i])
        if pred==y_test[i]:
          acc_count+=1
      print("Final Test Accuracy: {: .2f}".format(acc_count/len(y_test)))

In [None]:
ResNet(trainset, testset, epochs_num, batch_size, optimizer_choose, learning_rate, final_test)

Epoch Train Loss 0:  1133.53 , Validation Accuracy:  0.48
Epoch Train Loss 1:  949.43 , Validation Accuracy:  0.53
Epoch Train Loss 2:  857.16 , Validation Accuracy:  0.58
Epoch Train Loss 3:  798.46 , Validation Accuracy:  0.57
Epoch Train Loss 4:  749.50 , Validation Accuracy:  0.61
Final Test Accuracy:  0.61
