# Imports

In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor
from torchvision import datasets, transforms

import matplotlib.pyplot as plt
from tqdm import tqdm

import os
import pandas as pd
from sklearn.preprocessing import LabelEncoder

pd.reset_option('display.max_rows')


In [9]:
if torch.cuda.is_available():
    device = "cuda"
else:
    device = "cpu"

print(f"Using {device} device")

Using cpu device


In [None]:
transform = transforms.Compose([
    transforms.Grayscale(num_output_channels=1),   # Convert to grayscale if necessary
    transforms.Resize((28, 28)),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))           # Normalize to mean=0.5, std=0.5 for grayscale
])

# Load dataset
train_dataset = datasets.ImageFolder(root='path_to_your_train_folder', transform=transform)
test_dataset = datasets.ImageFolder(root='path_to_your_test_folder', transform=transform)

# DataLoader
train_loader = DataLoader(train_dataset, batch_size=200, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=200, shuffle=False)


In [6]:
class CNN(nn.Module):
    def __init__(self,drop):
        super(CNN, self).__init__()
        self.drop = drop

        self.convolutional_layers = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=5, stride=1, padding=2),                # Convolutional layer with 32 kernels, window size 5, padding size 2, stride 1
            nn.ReLU(inplace=True),                                               # In place ReLU activation layer
            nn.MaxPool2d(kernel_size=2, stride=2),                               # Max pooling layer with window size 2, stride 2
            nn.Conv2d(32, 64, kernel_size=5, stride=1, padding=2),               # Convolutional layer with 64 kernels, window size 5, padding size 2, stride 1
            nn.ReLU(inplace=True),                                               # Second In-place ReLU activation
            nn.MaxPool2d(kernel_size=2, stride=2),                               # Max pooling layer with window size 2, stride 2
        )

                                                                                # Since torch.Size([1, 28, 28])
                                                                                # H = 28, W = 28 and C = 1 (Height, Width, number of channels)
                                                                                # After the first pooling step, H=28/2 = 14, W=28/2= 14 and C=1
                                                                                # After the second pooling step, H=14/2=7, W=14/2=7 and C=1
                                                                                # Depth = 64 because size of the kernel in the second conv layer is 64
                                                                                # Dimensions of features 7 * 7 * 64
        self.fully_connected_layer1 = nn.Sequential(
            nn.Linear(3136, 1024),                                               # Fully connected layer with 1024 output channels
            nn.ReLU(inplace= True)                                                # In place ReLU activation layer

       )
        self.dropout = nn.Dropout(p = 0.4)                                        # Dropout layer with drop rate 0.4
        self.fully_connected_layer2 = nn.Linear(1024, 10)                         # Fully connected layer with 10 output channels and 1024 inputs from prev pooling step

    def forward(self, x):
        input = self.convolutional_layers(x)                                     # apply conv layers
        input = input.reshape(-1, 3136)                                          # reshape
        input =  self.fully_connected_layer1(input)                              # apply fc layeer 1
        if(self.drop == True):                                                   # is dropout needed ?
          #print("applied Dropout")
          input = self.dropout(input)

        input = self.fully_connected_layer2(input)                                 # apply fc 2
        return input

# Print net
net = CNN(drop=True).to(device)
print(net)

CNN(
  (convolutional_layers): Sequential(
    (0): Conv2d(1, 32, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
    (1): ReLU(inplace=True)
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Conv2d(32, 64, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
    (4): ReLU(inplace=True)
    (5): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (fully_connected_layer1): Sequential(
    (0): Linear(in_features=3136, out_features=1024, bias=True)
    (1): ReLU(inplace=True)
  )
  (dropout): Dropout(p=0.4, inplace=False)
  (fully_connected_layer2): Linear(in_features=1024, out_features=10, bias=True)
)


In [7]:
def train(train_dataset, net, to_train, opt, epochs=10, batch=200, learning_rate=1e-3):
    # Initialize loss
    criterion = nn.CrossEntropyLoss()
    losslist = []
    acclist=[]

    # Create dataloader
    MNIST_train_dataloader = DataLoader(train_dataset, batch_size=batch, shuffle=True)

    # Select optimizer
    if(opt=='adam'):
        optimizer = optim.Adam(to_train,lr=learning_rate)
    else:
        optimizer = optim.SGD(to_train,lr=learning_rate,momentum = 0.99)
    optimizer.zero_grad()

    # Set model to training mode
    net.train()
    for k in tqdm(range(epochs)):
        for it, (X,y) in enumerate(MNIST_train_dataloader):
            # Send to device
            X, y = X.to(device), y.to(device)

            # Train the model using the optimizer and the batch data.
            # Append the loss and accuracy from each iteration to the losslist and acclist arrays
            optimizer.zero_grad()                                                # Resets the gradients of all optimized torch.Tensor s
            pred_outputs = net(X)                                                # forward pass: [output] forward(input, target)
            curr_loss = criterion(pred_outputs, y)                               # applying the loss function
            curr_loss.backward()                                                 # backward pass: [gradInput] backward(input, target)
            optimizer.step()                                                     # reevaluates the model and returns the loss.


            # print(curr_loss.item())
            # print(y)
            # loss_value = curr_loss[0]
            loss_value = curr_loss.item()                                        # get current batch loss
            losslist.append(loss_value)                                          # append curr value to list

            predicted = pred_outputs.argmax(dim=1)                               # getting predicted class labels from output of the nn
            is_correct_prediction = (predicted == y)                             # element wise check if predicted == y
            correct_predictions = is_correct_prediction.sum().item()             # counts the number of correct predictions and convert to python scalar
            batch_size = y.size(0)                                               # get batch size
            curr_accuracy = correct_predictions /  batch_size                    # compute the current accuracy
            acclist.append(curr_accuracy)

    return losslist,acclist

# Used to test or evaluate your network. Already written for you.
def test(test_dataset, net):
    batch = 200
    test_dataloader = DataLoader(test_dataset, batch_size=batch)
    size = len(test_dataloader.dataset)

    # Set model to eval mode
    net.eval()

    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in test_dataloader:
            # Send to device
            X, y = X.to(device), y.to(device)

            # Prediction
            pred = net(X)

            # Calculate number of correct predictions in the batch
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()

    # Compute total accuracy
    acc = correct / size
    return acc