# Introduction
Image classification is the process of taking an input (like a picture) and outputting a class (like “cat”) or a probability that the input is a particular class (“there’s a 90% probability that this input is a cat”). You can look at a picture and know that you’re looking at a terrible shot of your own face, but how can a computer learn to do that? With a convolutional neural network!

-----
# Goals
We would like you to establish a neural network involving advanced DNN modules (i.e. convolution layers, RELU, pooling and fully connection layers and etc.) to distinguish the specific category of an input image.

-------------
## Packages
Let's first import the necessary packages,

In [108]:
from __future__ import division

import warnings
from collections import namedtuple
import torch
import torch.nn as nn
from torch.jit.annotations import Optional, Tuple
from torch import Tensor
import os
import numpy as np
import os.path
from glob import glob
from PIL import Image
from tqdm import tqdm
import torchvision.datasets as dset
import torch.utils.data as data
from ipywidgets import IntProgress
from torchvision import transforms
import logging 
import matplotlib.pyplot as plt
import torch.optim as optim

-----
## GPU Device Configuration
Use the torch.device() and torch.cuda.is_available() functions to make sure you can use the GPU

In [109]:
device = "cuda"
if torch.cuda.is_available():
    device = torch.device('cuda')
    print (device)
else: 
    device = 'cpu'
    print(device)

cuda


-----
## Configuration
### hyperparameters
We then set up the hyper parameters.
we need to define several hyper parameters for our model:
1. learning rate
2. batch size when training
3. batch size when testing
4. number of epochs

In [110]:
learning_rate = 0.01
batch_train_size = 20
batch_test_size = 20
num_epochs = 50

Create a directory if it does not exist
you can use os.path.exists() to check whether it exists and using os.makedirs to create a directory.

-----
###  Image processing
Then, we define an image preprocessing object that our dataloader will use to preprocess our data. We use the pytorch API to preform the data processing.
1. Use transforms.Compose()
2. Use .RandomHorizontalFlip()
3. You add any extra transforms you like.
4. Create this transform for both the train set and test set. Note that for the test, we do not require any transform

In [111]:
train_transform = transforms.Compose([transforms.RandomHorizontalFlip(), transforms.ToTensor()])
test_transform = transforms.Compose([transforms.ToTensor()])

-----
### We then download and prepare the data with the transforms defined above:
1. Use command torchvision.datasets.CIFAR10() with root, train, download and transform positional arguments.
2. Use the same command to create both train split and test split.
3. Use torch.utils.data.DataLoader() to create the data loader based on the data we have.
3. Use this command for both the training split data loader and test split data loader

In [112]:
train_set = dset.CIFAR10(root='./data', train=True, download=True, transform=train_transform)
train_loader = data.DataLoader(dataset=train_set, batch_size=batch_train_size, shuffle=True)
test_set = dset.CIFAR10(root='./data', train=False, download=True, transform=test_transform)
test_loader = data.DataLoader(dataset=test_set, batch_size=batch_test_size, shuffle=False)

Files already downloaded and verified
Files already downloaded and verified


------
### Inception Module with dimension reductions
1. Create a python class called Inception which inherits nn.module

2. Create a init function to init this python class
    1. Require in_planes, kernel_1_x, kernel_3_in, kernel_3_x, kernel_5_in, kernel_5_x and pool_planes 7 arguments.
    
    2. There are 4 Sequential blocks: b1,b2,b3,b4
    
    3. b1 is a block that consists of 2D convolution, a 2D batch normalization layer and a ReLU activation function
    
    4. b2 is a block that consists of two 2D convolutions, two 2D batch normalization layers and two ReLU activation functions
    
    5. b3 is a block that consists of two 2D convolutions, two 2D batch normalization layers and two ReLU activation functions
    
    6. b4 is a block consists of a Maxpooling layer, a 2D convolution, a 2D batch normalization layer and a ReLU activation function
    
3. Create the forward function: the forward function will forward the input function though every block and return the concatenation of all the output.

In [113]:
class InceptionBlock(nn.Module):
    def __init__(self, in_planes, kernel_1_x, kernel_3_in, kernel_3_x, kernel_5_in, kernel_5_x, pool_planes):
        super(InceptionBlock, self).__init__()
        # 1x1 conv branch
        self.b1 = nn.Sequential(
            nn.Conv2d(in_planes, kernel_1_x, kernel_size=1),
            nn.BatchNorm2d(kernel_1_x),
            nn.ReLU(True),
        )

       # Block 2: 1x1 convolution followed by 3x3 convolution
        self.b2 = nn.Sequential(
            nn.Conv2d(in_planes, kernel_3_in, kernel_size=1),
            nn.BatchNorm2d(kernel_3_in),
            nn.ReLU(),
            nn.Conv2d(kernel_3_in, kernel_3_x, kernel_size=3, padding=1),
            nn.BatchNorm2d(kernel_3_x),
            nn.ReLU()
        )

        # Block 3: 1x1 convolution followed by 5x5 convolution
        self.b3 = nn.Sequential(
            nn.Conv2d(in_planes, kernel_5_in, kernel_size=1),
            nn.BatchNorm2d(kernel_5_in),
            nn.ReLU(),
            nn.Conv2d(kernel_5_in, kernel_5_x, kernel_size=5, padding=2),
            nn.BatchNorm2d(kernel_5_x),
            nn.ReLU()
        )
        
        # Block 4: MaxPooling followed by 1x1 convolution
        self.b4 = nn.Sequential(
            nn.MaxPool2d(kernel_size=3, stride=1, padding=1),
            nn.Conv2d(in_planes, pool_planes, kernel_size=1),
            nn.BatchNorm2d(pool_planes),
            nn.ReLU()
        )

    def forward(self, x):
        # Apply all blocks to the input
            out1 = self.b1(x)
            out2 = self.b2(x)
            out3 = self.b3(x)
            out4 = self.b4(x)
        # Concatenate along the channel dimension
            return torch.cat([out1, out2, out3, out4], dim=1)


-----
### GoogLeNet Module: the structure is in the lab manual

In [114]:
class GoogLeNet(nn.Module):
    def __init__(self, num_classes=10):  # Default for CIFAR-10
        super(GoogLeNet, self).__init__()
        
        # Initial convolutional layers
        self.pre_layers = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        )
        
        # Inception Blocks
        self.inception3a = InceptionBlock(64, 64, 96, 128, 16, 32, 32)
        self.inception3b = InceptionBlock(256, 128, 128, 192, 32, 96, 64)
        self.maxpool1 = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.inception4a = InceptionBlock(480, 192, 96, 208, 16, 48, 64)
        self.inception4b = InceptionBlock(512, 160, 112, 224, 24, 64, 64)
        self.inception4c = InceptionBlock(512, 128, 128, 256, 24, 64, 64)
        self.inception4d = InceptionBlock(512, 112, 144, 288, 32, 64, 64)
        self.inception4e = InceptionBlock(528, 256, 160, 320, 32, 128, 128)
        self.maxpool2 = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.inception5a = InceptionBlock(832, 256, 160, 320, 32, 128, 128)
        self.inception5b = InceptionBlock(832, 384, 192, 384, 48, 128, 128)
        
        # Final average pooling and fully connected layer
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))  # Adaptive pooling to output 1x1
        self.dropout = nn.Dropout(0.4)
        self.fc = nn.Linear(1024, num_classes)

    def forward(self, x):
        # Initial layers
        x = self.pre_layers(x)
        
        # Inception blocks with intermediate max pooling
        x = self.inception3a(x)
        x = self.inception3b(x)
        x = self.maxpool1(x)
        x = self.inception4a(x)
        x = self.inception4b(x)
        x = self.inception4c(x)
        x = self.inception4d(x)
        x = self.inception4e(x)
        x = self.maxpool2(x)
        x = self.inception5a(x)
        x = self.inception5b(x)
        # Final layers
        x = self.avgpool(x)
        x = torch.flatten(x, 1)  # Flatten for the fully connected layer
        x = self.dropout(x)
        x = self.fc(x)
        
        return x

### Next, we create the network and send it to the target device

In [115]:
num_classes = 10 
model = GoogLeNet(num_classes=num_classes)

# Move the model to the target device
model = model.to(device)

# Print the model to confirm it's correctly initialized
print(model)

GoogLeNet(
  (pre_layers): Sequential(
    (0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3))
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  )
  (inception3a): InceptionBlock(
    (b1): Sequential(
      (0): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1))
      (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU(inplace=True)
    )
    (b2): Sequential(
      (0): Conv2d(64, 96, kernel_size=(1, 1), stride=(1, 1))
      (1): BatchNorm2d(96, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU()
      (3): Conv2d(96, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (4): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (5): ReLU()
    )
    (b3): Sequential(
      (0): Conv2d(64, 16, kerne

### Finally, we create:
 1. An optimizer  (we use adam optimzer here)
 2. A Criterion (CrossEntropy) function
 3. A Scheduler which decays the learning rate of each parameter group by gamma once the number of epoch reaches one of the milestones.

In [116]:
gamma = 0.5
milestones = [50, 100]

# Optimizer: Adam
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Criterion: Cross-Entropy Loss
criterion = nn.CrossEntropyLoss()

# Scheduler: MultiStepLR
scheduler = optim.lr_scheduler.MultiStepLR(optimizer, milestones=milestones, gamma=gamma)


-----
##  Training
Then, we are going to train our Network

1. Set our network to the training mode.
2. Initialize the train loss, total data size, and number corrected predictions. 
3. For each data in the training split
    1. Put the data to the correct devices using .to()
    2. Reset the gradient of the optimzier.
    3. Feed the data forward to the google net
    4. Use the criterion function to compute the loss term
    5. Backpropagate the loss
    6. Update the network parameters using the optimizier
    7. Accumulate the training loss
    8. Find the prediction. hint: using torch.max()
    9. Increment the total_data size
    10. Increment the corrected prediction
    11. Print log
    
-----
##  Testing
Then, we are going to test our module

1. Set our network to the test model.
2. Initialize the test loss, total data size, and number corrected predictions. 
3. For each data in the testing split, we warp it using torch.no_grad()
    1. Put the data to the correct devices using .to()
    2. Feed the data forward to the google net
    3. Use the criterion function to compute the loss term
    4. Accumulate the testing loss
    5. Find the prediciton. hint: using torch.max()
    6. Increment the data size
    7. Increment the corrected prediction
    8. Print log

-----
##  Epochs:
For each epoch:
1. Train the model
2. Step the scheduler
3. Test our model
4. Update the accuracies
5. Save the module at the end and print the accuracy

In [None]:
gamma = 0.5
milestones = [50, 100]
best_accuracy = 0.0  # To track the best test accuracy

learning_rate = 0.001
batch_train_size = 20
batch_test_size = 20
num_epochs = 50
train_accuracy=[]
test_accuracy=[]
train_losses=[]
test_losses=[]
epochs=list(range(1, 1+num_epochs))

# Optimizer, criterion, and scheduler
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.CrossEntropyLoss()
scheduler = optim.lr_scheduler.MultiStepLR(optimizer, milestones=milestones, gamma=gamma)

# Training and testing loop
for epoch in range(num_epochs):
    print(f"Epoch {epoch+1}/{num_epochs}")
    # Training 
    model.train()
    train_loss = 0.0
    total_train = 0
    correct_train = 0
    for inputs, labels in train_loader: 
        inputs, labels = inputs.to(device), labels.to(device)
        
        # Zero gradients
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        # Accumulate training loss and predictions
        train_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total_train += labels.size(0)
        correct_train += (predicted == labels).sum().item()
    
    train_accuracy.append((correct_train / total_train) * 100)
    value1=(correct_train / total_train) * 100
    train_losses.append(train_loss / len(train_loader))  # Average train loss

    print(f"Training Loss: {train_loss/len(train_loader):.4f}, Training Accuracy: {value1:.2f}%")
    # Step the scheduler
    scheduler.step()
    # Testing phase
    model.eval()
    test_loss = 0.0
    total_test = 0
    correct_test = 0
    
    with torch.no_grad():
        for inputs, labels in test_loader:  # Assuming test_loader is defined
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            # Accumulate test loss and calculate predictions
            test_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total_test += labels.size(0)
            correct_test += (predicted == labels).sum().item()
    
    test_accuracy.append( (correct_test / total_test) * 100)
    value=(correct_test / total_test) * 100
    test_losses.append(test_loss / len(test_loader)) 
    print(f"Test Loss: {test_loss/len(test_loader):.4f}, Test Accuracy: {value:.2f}%")
    
    # Update best accuracy and save model
    if value > best_accuracy:
        best_accuracy = value
        torch.save(model.state_dict(), "best_googlenet_model.pth")
        print(f"New best model saved with accuracy: {best_accuracy:.2f}%")

# Final summary
print(f"Training complete. Best Test Accuracy: {best_accuracy:.2f}%")

Epoch 1/50
Training Loss: 1.5740, Training Accuracy: 43.03%
Test Loss: 1.2536, Test Accuracy: 53.46%
New best model saved with accuracy: 53.46%
Epoch 2/50
Training Loss: 1.1086, Training Accuracy: 61.38%
Test Loss: 1.0397, Test Accuracy: 63.64%
New best model saved with accuracy: 63.64%
Epoch 3/50
Training Loss: 0.9066, Training Accuracy: 68.99%
Test Loss: 0.8594, Test Accuracy: 70.39%
New best model saved with accuracy: 70.39%
Epoch 4/50
Training Loss: 0.7837, Training Accuracy: 73.67%
Test Loss: 0.8642, Test Accuracy: 71.54%
New best model saved with accuracy: 71.54%
Epoch 5/50
Training Loss: 0.6917, Training Accuracy: 76.67%
Test Loss: 0.7581, Test Accuracy: 73.73%
New best model saved with accuracy: 73.73%
Epoch 6/50
Training Loss: 0.6120, Training Accuracy: 79.35%
Test Loss: 0.6884, Test Accuracy: 76.43%
New best model saved with accuracy: 76.43%
Epoch 7/50
Training Loss: 0.5613, Training Accuracy: 81.06%
Test Loss: 0.6420, Test Accuracy: 78.57%
New best model saved with accuracy:

In [None]:
plt.plot(epochs, train_accuracy, label='Train Accuracy', marker='o')
plt.plot(epochs, test_accuracy, label='Test Accuracy', marker='x')
plt.title('Train and Test Accuracy Over Epochs')
plt.xlabel('Epochs')
plt.ylabel('Accuracy (%)')
plt.legend()
plt.grid(True)
plt.show()

plt.figure(figsize=(10, 6))
plt.plot(epochs, train_losses, label='Train Loss', marker='o')
plt.plot(epochs, test_losses, label='Test Loss', marker='x')
plt.title('Loss Convergence Over Epochs')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)
plt.show()