In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tqdm
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

In [None]:
if(torch.cuda.is_available()):
    device = torch.device('cuda')
elif(torch.backends.mps.is_available()):
    device = torch.device('mps')
else:
    device = torch.device('cpu')

In [None]:
IMAGE_SIZE = 32
batch_size = 32
mean, std = torch.tensor([0.4914, 0.4822, 0.4465]), torch.tensor([0.247, 0.243, 0.261])
# These values are mostly used by researchers as found to very useful in fast convergence


transform = transforms.Compose([
    transforms.RandomRotation(0.2) ,
    transforms.RandomHorizontalFlip() ,
    transforms.RandomVerticalFlip() ,
    transforms.ColorJitter(),
    transforms.ToTensor(),
    transforms.Normalize(mean, std)
])



train_dataset = torchvision.datasets.CIFAR10(root='./data', train=True,download=True, transform=transform)
val_dataset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size , shuffle = True)

val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size , shuffle = False)

classes = [ 'airplane', 'automobile', 'bird', 'cat','deer', 'dog', 'frog', 'horse', 'ship', 'truck']





In [None]:
def imshow(img):
    img = img / 2 + 0.5     # unnormalize
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()


# get some random training images
dataiter = iter(train_loader)
images, labels = next(dataiter)

# show images
imshow(torchvision.utils.make_grid(images))
# print labels
print(' '.join(f'{classes[labels[j]]:5s}' for j in range(batch_size)))

In [None]:
print(train_dataset.data.shape  , " - " , val_dataset.data.shape)
print(len(train_dataset.targets))

In [None]:
# functions to show an image

def unnormalize(img):
        # unnormalize
    img[0]= img[0]*std[0] + mean[0]
    img[1]= img[1]*std[1] + mean[1]
    img[2]= img[2]*std[2] + mean[2]
    return img
    
def imshow(img):
    # unnormalize
    img = unnormalize(img)
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()
    
def im_convert(tensor):
    img = tensor.cpu().clone().detach().numpy() #
    img = img.transpose(1, 2, 0)
    img = img * np.array(tuple(mean)) + np.array(tuple(std))
    img = img.clip(0, 1) # Clipping the size to print the images later
    return img


In [None]:
fig = plt.figure(figsize=(20, 10)) 

for i in range(len(classes)):
    dataiter = iter(train_loader)
    images, labels = next(dataiter)
    for j in range (len(labels)):
        if(labels[j].item() == i):
            ax = fig.add_subplot(2, 5, i+1, xticks=[], yticks=[])
            plt.imshow(im_convert(images[j]))
            ax.set_title(classes[i])



In [None]:
for i in range(len(classes)):
    dataiter = iter(train_loader)
    images, labels = next(dataiter)
    for j in range (len(labels)):
        if(labels[j].item() == i):
            print(classes[i])
            imshow(images[j])
            break

In [None]:
class conv_block(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, **kwargs):
        super(conv_block, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, **kwargs)
        self.relu = nn.ReLU(inplace=True)
    
    def forward(self, x):
        x = self.conv(x)
        x = self.relu(x)
        return x
#red --> reduction 
class Inception_block(nn.Module):
    def __init__(self, in_channels, ch1x1, ch3x3red, ch3x3, ch5x5red, ch5x5, pool_proj):
        super(Inception_block, self).__init__()
        # 1x1 convolution branch
        self.branch1 = conv_block(in_channels, ch1x1, kernel_size=1, stride=1, padding=0)
        
        # 1x1 convolution followed by 3x3 convolution branch
        self.branch2 = nn.Sequential(
            conv_block(in_channels, ch3x3red, kernel_size=1, stride=1, padding=0),
            conv_block(ch3x3red, ch3x3, kernel_size=3, stride=1, padding=1)
        )
        
        # 1x1 convolution followed by 5x5 convolution branch
        self.branch3 = nn.Sequential(
            conv_block(in_channels, ch5x5red, kernel_size=1, stride=1, padding=0),
            conv_block(ch5x5red, ch5x5, kernel_size=5, stride=1, padding=2)
        )
        
        # 3x3 max pooling followed by 1x1 convolution branch
        self.branch4 = nn.Sequential(
            nn.MaxPool2d(kernel_size=3, stride=1, padding=1),
            conv_block(in_channels, pool_proj, kernel_size=1, stride=1, padding=0)
        )
    
    def forward(self, x):
        branch1_output = self.branch1(x)
        branch2_output = self.branch2(x)
        branch3_output = self.branch3(x)
        branch4_output = self.branch4(x)
        output = torch.cat([branch1_output, branch2_output, branch3_output, branch4_output], 1)
        return output
class GoogLeNet(nn.Module):
    def __init__(self , in_channels , num_classes):
        super(GoogLeNet ,self).__init__()
        
        self.conv1 = conv_block(in_channels , out_channels = 64 , kernel_size = (7,7)  ,stride = (2,2) ,padding  = (3,3))
        self.conv1 = conv_block(in_channels=3, out_channels=64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        self.maxpool1 = nn.MaxPool2d(kernel_size = 3 , stride = 2 , padding =1)
        self.conv2 = conv_block(64 , 192 , kernel_size = 3  ,stride = 1 ,padding  = 1)
        self.maxpool2 = nn.MaxPool2d(kernel_size = 3 , stride = 2 , padding =1)

        self.Inception3a = Inception_block(192 ,64 ,96 ,128 ,16 ,32 ,32)
        self.Inception3b = Inception_block(256 ,128 ,128 ,192 ,32 ,96 ,64)
        self.maxpool3 = nn.MaxPool2d(kernel_size = 3 , stride = 2 , padding =1)
        
        self.Inception4a = Inception_block(480 ,192 ,96 ,208 ,16 ,48 ,64)
        self.Inception4b = Inception_block(512 ,160 ,112 ,224 ,24 ,64 ,64)
        self.Inception4c = Inception_block(512 ,128 ,128 ,256 ,24 ,64 ,64)
        self.Inception4d = Inception_block(512 ,112 ,114 ,288 ,32 ,64 ,64)
        self.Inception4e = Inception_block(528 ,256 ,160 ,320 ,32 ,128 ,128)
        
        self.maxpool4 = nn.MaxPool2d(kernel_size = 3 ,stride = 2 , padding =1)
        self.Inception5a = Inception_block(832 ,256 ,160 ,320 ,32 ,128 ,128)
        self.Inception5b = Inception_block(832 ,384 ,192 ,384 ,48 ,128 ,128)
        self.maxpool1 = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        self.maxpool2 = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        self.maxpool3 = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        self.maxpool4 = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)

        self.avgpool = nn.AvgPool2d(kernel_size = 7 , stride =  1)
        self.avgpool = nn.AvgPool2d(kernel_size=1, stride=1)
        self.dropout  = nn.Dropout2d(p=0.4)
        self.fc1 = nn.Linear(4096 , num_classes)
    
    def forward(self ,x):
        x = self.conv1(x)
        x = self.maxpool1(x)
        x = self.conv2(x)
        x = self.maxpool2(x)
        
        x = self.Inception3a(x)
        x = self.Inception3b(x)
        x = self.maxpool3(x)
        
        x = self.Inception4a(x)
        x = self.Inception4b(x)
        x = self.Inception4c(x)
        x = self.Inception4d(x)
        x = self.Inception4e(x)
        x = self.maxpool4(x)
        
        x = self.Inception5a(x)
        x = self.Inception5b(x)
        x = self.avgpool(x)
        
        x = x.reshape(x.shape[0], -1)
        x = self.dropout(x)
        x = self.fc1(x)
        return x


In [None]:
model = GoogLeNet(in_channels=3 , num_classes=10)
model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001,momentum=0.9)

In [None]:
num_epochs = 5
# Training loop
train_acc = []
train_loss = []
val_acc = []
val_loss = []
for epoch in range(num_epochs):
    running_loss = 0.0
    correct_train = 0
    total_train = 0

    model.train()  # Set the model to training mode

    for images, labels in train_loader:
        images = images.to(device)  # Move the input tensor to the GPU
        labels = labels.to(device)  # Move the labels tensor to the GPU

        optimizer.zero_grad()
        
        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        # Calculate training accuracy
        _, predicted = torch.max(outputs.data, 1)
        total_train += labels.size(0)
        correct_train += (predicted == labels).sum().item()

    training_loss = running_loss / len(train_loader)
    training_accuracy = 100 * correct_train / total_train
    

    # Evaluation on test set
    model.eval()  # Set the model to evaluation mode
    test_loss = 0.0
    correct_test = 0
    total_test = 0

    with torch.no_grad():
        for images, labels in val_loader:
            images = images.to(device)  # Move the input tensor to the GPU
            labels = labels.to(device)  # Move the labels tensor to the GPU

            outputs = model(images)
            loss = criterion(outputs, labels)
            test_loss += loss.item()

            # Calculate test accuracy
            _, predicted = torch.max(outputs.data, 1)
            total_test += labels.size(0)
            correct_test += (predicted == labels).sum().item()

    test_loss /= len(val_loader)
    test_accuracy = 100 * correct_test / total_test
    
    train_acc.append(training_accuracy)
    train_loss.append(training_loss)
    val_acc.append(test_accuracy)
    val_loss.append(test_loss)

    # Print the average loss and accuracy for this epoch
    print(f"Epoch {epoch+1}:")
    print(f"  Train Loss: {training_loss:.4f} | Train Accuracy: {training_accuracy:.2f}%")
    print(f"  Test Loss: {test_loss:.4f} | Test Accuracy: {test_accuracy:.2f}%")
    print("*************************")

In [None]:
final_state = model.state_dict()
torch.save(final_state, 'model_state.pth')

In [None]:
f, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))
t = f.suptitle('Basic CNN Performance', fontsize=12)
f.subplots_adjust(top=0.85, wspace=0.3)

epoch_list = list(range(1,num_epochs+1))
ax1.plot(epoch_list, train_acc, label='Train Accuracy')
ax1.plot(epoch_list, val_acc, label='Validation Accuracy')
ax1.set_xticks(np.arange(0, 60, 5))
ax1.set_ylabel('Accuracy Value')
ax1.set_xlabel('Epoch')
ax1.set_title('Accuracy')
l1 = ax1.legend(loc="best")

ax2.plot(epoch_list, train_loss, label='Train Loss')
ax2.plot(epoch_list, val_loss, label='Validation Loss')
ax2.set_xticks(np.arange(0, 60, 5))
ax2.set_ylabel('Loss Value')
ax2.set_xlabel('Epoch')
ax2.set_title('Loss')
l2 = ax2.legend(loc="best")