In this project, we train a fully connect mlp and a CNN model based on CIFAR10 dataset, then compare their performance.


In [None]:
import torch
import torch.nn as n
import torch.nn.functional as F
from torch.utils.data import DataLoader, random_split

import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from IPython.display import Image

In [None]:
# Set up a transform to convert the images to tensor
transform = transforms.Compose([transforms.ToTensor()])

# Load the CIFAR-10 training dataset
trainset = torchvision.datasets.CIFAR10(root='./data', train=True,
                                        download=True, transform=transform)

num_train = len(trainset)
num_val = int(0.1 * num_train)
num_train -= num_val

train_dataset, val_dataset = random_split(trainset, [num_train, num_val])

# Load the CIFAR-10 test dataset
testset = torchvision.datasets.CIFAR10(root='./data', train=False,
                                       download=True, transform=transform)
classes = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import random

# Function to show an image
def imshow(img):
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.axis('off')  # turn off axis

# Display five random images along with their corresponding labels
fig, axs = plt.subplots(1, 5, figsize=(15, 3))
for i in range(5):
    idx = random.randint(0, len(trainset)-1)
    image, label = trainset[idx]
    axs[i].imshow(np.transpose(image.numpy(), (1, 2, 0)))
    axs[i].set_title(classes[label])
    axs[i].axis('off')

plt.show()

# Create a bar plot to visualize the distribution of classes in the CIFAR-10 dataset
class_counts = {classname: 0 for classname in classes}
for _, label in trainset:
    class_counts[classes[label]] += 1

# Plotting the bar chart
plt.figure(figsize=(10, 6))
plt.bar(class_counts.keys(), class_counts.values())
plt.xlabel('Classes')
plt.ylabel('Number of Images')
plt.title('Distribution of Classes in CIFAR-10 Dataset')
plt.show()

In [None]:
# Mean and standard deviation values for standardization
mean = [0.4914, 0.4822, 0.4465]
std = [0.2023, 0.1994, 0.2010]
# Define the transform including the normalization step
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean, std)
])

# Load the CIFAR-10 training dataset
trainset = torchvision.datasets.CIFAR10(root='./data', train=True,
                                        download=True, transform=transform)

num_train = len(trainset)
num_val = int(0.1 * num_train)
num_train -= num_val

train_dataset, val_dataset = random_split(trainset, [num_train, num_val])

# Load the CIFAR-10 test dataset
testset = torchvision.datasets.CIFAR10(root='./data', train=False,
                                       download=True, transform=transform)
classes = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

In [None]:
from torch.utils.data import DataLoader

# Set batch size and number of workers
batch_size = 64
num_workers = 4

# Create data loaders for the train, validation, and test datasets
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers)
test_loader = DataLoader(testset, batch_size=batch_size, shuffle=False, num_workers=num_workers)


In [None]:
import torch.nn as nn
import torch.nn.functional as F

class MLP(nn.Module):
    def __init__(self):
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(3072, 128)  # First fully connected layer, input size is 3072, output size is 128
        self.fc2 = nn.Linear(128, 64)    # Second fully connected layer with 64 units
        self.fc3 = nn.Linear(64, 10)     # Output layer with 10 units (classes)

    def forward(self, x):
        # Flattening the input image
        x = x.view(-1, 3072)
        # FC1 -> ReLU -> FC2 -> ReLU -> FC3
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x


model_MLP = MLP()

In [None]:
import torch.optim as optim
# Set batch size and number of workers
batch_size = 64
num_workers = 4

# Create data loaders for the train, validation, and test datasets
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers)
test_loader = DataLoader(testset, batch_size=batch_size, shuffle=False, num_workers=num_workers)
# Specify the learning rate for the optimizer
learning_rate = 0.01

# Create the SGD optimizer with weight_decay
optimizer = optim.SGD(model_MLP.parameters(), lr=learning_rate, weight_decay=1e-4)

# Define the loss function and optimizer
criterion = torch.nn.CrossEntropyLoss()
optimizer = optim.SGD(model_MLP.parameters(), lr=learning_rate, weight_decay=1e-4)

# Check if GPU is available and set the device accordingly
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Move the model to the chosen device
model_MLP.to(device)

# Number of epochs
num_epochs = 10
loss_MLP=0.0
# Training loop
for epoch in range(num_epochs):
    running_loss = 0.0
    for i, data in enumerate(train_loader, 0):
        # Get the inputs; data is a list of [inputs, labels]
        inputs, labels = data[0].to(device), data[1].to(device)

        # Zero the parameter gradients
        optimizer.zero_grad()
        # Forward pass
        outputs = model_MLP(inputs)
        loss = criterion(outputs, labels)

        # Backward pass and optimize
        loss.backward()
        optimizer.step()

        # Print statistics
        running_loss += loss.item()
        if i % 200 == 199:    # Print every 200 mini-batches
            print('[Epoch: %d, Mini-batch: %5d] loss: %.3f' %
                  (epoch + 1, i + 1, running_loss / 200))
            running_loss = 0.0
    loss_MLP=loss
print('Finished Training')
print(loss_MLP.item())

Now we start to train the CNN model.

In [None]:
import torch.nn as nn

class ArchitecturalBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, down_sample=False):
        super(ArchitecturalBlock, self).__init__()

        # Define the first convolutional layer
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3,
                               stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)

        # Define the second convolutional layer
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3,
                               stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        # Store parameters for downsampling and stride
        self.down_sample = down_sample
        self.stride = stride
        self.in_channels = in_channels
        self.out_channels = out_channels

    def down_sampling(self, x):
        # Implement down-sampling
        out = F.pad(x, (0, 0, 0, 0, 0, self.out_channels - self.in_channels))
        out = nn.MaxPool2d(2, stride=self.stride)(out)
        return out

    # Define the forward pass for the block
    def forward(self, x):
        # Save a copy of the input for the residual connection
        shortcut = x

        # Apply the first convolutional layer, batch normalization, and ReLU activation
        out = self.conv1(x)
        out = self.bn1(out)
        out = nn.ReLU()(out)

        # Apply the second convolutional layer and batch normalization
        out = self.conv2(out)
        out = self.bn2(out)

        # Apply down-sampling to the shortcut if needed
        if self.down_sample or self.in_channels != self.out_channels:
            shortcut = self.down_sampling(shortcut)

        # Add the shortcut connection to the output and apply ReLU thereafter
        out += shortcut
        out = nn.ReLU()(out)

        return out



In [None]:
class Model(nn.Module):
    def __init__(self, num_layers, block, num_classes=10):
        super(Model, self).__init__()
        self.num_layers = num_layers

        # input(channel:3) -> (conv 3x3) -> (bn) -> (relu) -> output(channel:16)
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=16, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(16)
        self.relu = nn.ReLU(inplace=True)

        # Define layers
        self.layers_2n = self.get_layers(block, 16, 16, stride=1)  # feature map size = 16x32x32
        self.layers_4n = self.get_layers(block, 16, 32, stride=2)  # feature map size = 32x16x16
        self.layers_6n = self.get_layers(block, 32, 64, stride=2)  # feature map size = 64x8x8

        # Output layers
        self.avg_pool = nn.AvgPool2d(8, stride=1)
        self.fc_out = nn.Linear(64, num_classes)

        # Initialize weights
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

    def get_layers(self, block, in_channels, out_channels, stride):
        if stride == 2:
            down_sample = True
        else:
            down_sample = (in_channels != out_channels)

        layer_list = nn.ModuleList([])
        # Downsample on first block if stride is 2
        layer_list.append(block(in_channels, out_channels, stride, down_sample))

        # Remaining blocks
        for _ in range(1, self.num_layers):
            layer_list.append(block(out_channels, out_channels))

        return nn.Sequential(*layer_list)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)

        x = self.layers_2n(x)
        x = self.layers_4n(x)
        x = self.layers_6n(x)

        x = self.avg_pool(x)
        x = x.view(x.size(0), -1)
        x = self.fc_out(x)

        return x
def model():
    block = ArchitecturalBlock
    model = Model(3, block)
    return model

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR
from torch.optim.lr_scheduler import MultiStepLR
# Initialize model, loss function, optimizer, and learning rate scheduler
model = model()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Move the model to the chosen device
model.to(device)

criterion = nn.CrossEntropyLoss()

optimizer = optim.SGD(model.parameters(), lr=0.1, momentum=0.9, weight_decay=1e-4)

# Calculate the milestone epochs
batch_size = 64
steps_per_epoch = len(train_loader.dataset) / batch_size
milestones = [int(32000 / steps_per_epoch), int(48000 / steps_per_epoch)]

# Initialize the MultiStepLR scheduler
scheduler = MultiStepLR(optimizer, milestones=milestones, gamma=0.1)

# Training loop
loss_shortcut=0.0
num_epochs =  1
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0

    for i, (inputs, labels) in enumerate(train_loader):
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        # Backward pass and optimize
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        # Step the scheduler once every batch
        scheduler.step()

        if (i + 1) % 100 == 0:  # Print every 100 mini-batches
            print(f'Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{len(train_loader)}], Loss: {running_loss / 100:.4f}')
            running_loss = 0.0
    loss_shortcut=loss
torch.save(model.state_dict(), 'model.pth')

Plot the loss of CNN model in trainning.

In [None]:
import matplotlib.pyplot as plt
import numpy as np
def model():
    block = ArchitecturalBlock
    model = Model(3, block)
    return model
class EarlyStopping:
    def __init__(self, patience=5, verbose=False, delta=0, path='model_best.pth'):
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = np.Inf
        self.delta = delta
        self.path = path

    def __call__(self, val_loss, model):
        score = -val_loss

        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
        elif score < self.best_score + self.delta:
            self.counter += 1
            if self.verbose:
                print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
            self.counter = 0

    def save_checkpoint(self, val_loss, model):
        if self.verbose:
            print(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}).  Saving model ...')
        torch.save(model.state_dict(), self.path)
        self.val_loss_min = val_loss

# Initialize model, loss function, optimizer, and scheduler
model_earlystop = model()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model_earlystop.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model_earlystop.parameters(), lr=0.1, momentum=0.9, weight_decay=1e-4)

# Calculate the milestone epochs
batch_size = 64
steps_per_epoch = len(train_loader.dataset) / batch_size
milestones = [int(32000 / steps_per_epoch), int(48000 / steps_per_epoch)]

# Initialize the MultiStepLR scheduler
scheduler = MultiStepLR(optimizer, milestones=milestones, gamma=0.1)

# Training loop with early stopping
num_epochs = 10 # Set the number of epochs
early_stopping = EarlyStopping(patience=5, verbose=True, path='model_best.pth')
loss_earlystop=0.0
all_train_losses = []
valid_losses = []

for epoch in range(num_epochs):
    model_earlystop.train()
    batch_losses = []

    # Training phase
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()

        # Forward pass
        outputs = model_earlystop(inputs)
        loss = criterion(outputs, labels)

        # Backward pass and optimize
        loss.backward()
        optimizer.step()

        batch_losses.append(loss.item())
        scheduler.step()
        loss_earlystop=loss
    all_train_losses.extend(batch_losses)

    # Validation phase
    model_earlystop.eval()
    running_val_loss = 0.0
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model_earlystop(inputs)
            val_loss = criterion(outputs, labels)
            running_val_loss += val_loss.item()

    epoch_val_loss = running_val_loss / len(val_loader)
    valid_losses.append(epoch_val_loss)

    # Early stopping call
    early_stopping(epoch_val_loss, model_earlystop)
    if early_stopping.early_stop:
        print("Early stopping triggered")
        loss_earlystop=epoch_val_loss
        break

# Plotting training and validation loss per epoch
plt.figure(figsize=(10, 5))
plt.plot(all_train_losses, label='Training Loss')
plt.plot(valid_losses, label='Validation Loss')
plt.title("Training and Validation Loss per Epoch")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()
plt.show()

# Load the best model for evaluation on test set
model_earlystop.load_state_dict(torch.load('model_best.pth'))

# Test phase
model_earlystop.eval()
running_test_loss = 0.0
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model_earlystop(inputs)
        test_loss = criterion(outputs, labels)
        running_test_loss += test_loss.item()

average_test_loss = running_test_loss / len(test_loader)
print(f'Average Test Loss: {average_test_loss:.4f}')

Finally we create a copy of Architecture Block without shortcut, then we do the comparsion of these three models.

In [None]:
class ArchitecturalBlockNoShortcut(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, down_sample=False):
        super(ArchitecturalBlockNoShortcut, self).__init__()

        # Define the first convolutional layer
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3,
                               stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)

        # Define the second convolutional layer
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3,
                               stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        # Store parameters for downsampling and stride
        self.down_sample = down_sample
        self.stride = stride
        self.in_channels = in_channels
        self.out_channels = out_channels

    def down_sampling(self, x):
        # Implement down-sampling here using the appropriate method or layer
        out = F.pad(x, (0, 0, 0, 0, 0, self.out_channels - self.in_channels))
        out = nn.MaxPool2d(2, stride=self.stride)(out)
        return out

    # Define the forward pass for the block
    def forward(self, x):
        # Save a copy of the input for the residual connection
        shortcut = x

        # Apply the first convolutional layer, batch normalization, and ReLU activation
        out = self.conv1(x)
        out = self.bn1(out)
        out = nn.ReLU()(out)

        # Apply the second convolutional layer and batch normalization
        out = self.conv2(out)
        out = self.bn2(out)

        return out




def model2():
    block = ArchitecturalBlockNoShortcut
    model2 = Model(3, block)
    return model2

#Train it in the same way with early stop
# Initialize model, loss function, optimizer, and scheduler
model_noshortcut = model2()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model_noshortcut.to(device)
model
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model_noshortcut.parameters(), lr=0.1, momentum=0.9, weight_decay=1e-4)

# Calculate the milestone epochs
batch_size = 64
steps_per_epoch = len(train_loader.dataset) / batch_size
milestones = [int(32000 / steps_per_epoch), int(48000 / steps_per_epoch)]

# Initialize the MultiStepLR scheduler
scheduler = MultiStepLR(optimizer, milestones=milestones, gamma=0.1)

# Training loop with early stopping
num_epochs = 10  # Set the number of epochs
early_stopping = EarlyStopping(patience=5, verbose=True, path='model_best.pth')
lose_earlystop=0.0
all_train_losses = []
valid_losses = []

for epoch in range(num_epochs):
    model_noshortcut.train()
    batch_losses = []

    # Training phase
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()

        # Forward pass
        outputs = model_noshortcut(inputs)
        loss = criterion(outputs, labels)

        # Backward pass and optimize
        loss.backward()
        optimizer.step()

        batch_losses.append(loss.item())
        scheduler.step()
        loss_noshortcut=loss
    all_train_losses.extend(batch_losses)

    # Validation phase
    model_noshortcut.eval()
    running_val_loss = 0.0
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model_noshortcut(inputs)
            val_loss = criterion(outputs, labels)
            running_val_loss += val_loss.item()

    epoch_val_loss = running_val_loss / len(val_loader)
    valid_losses.append(epoch_val_loss)

    # Early stopping call
    early_stopping(epoch_val_loss, model_noshortcut)
    if early_stopping.early_stop:
        print("Early stopping triggered")
        loss_noshortcut=epoch_val_loss
        break




In [None]:
#plot
models = ['With Shortcut', 'Without Shortcut', 'MLP']
losses = [loss_earlystop.item(), loss_noshortcut.item(), loss_MLP.item()]
plt.bar(models, losses)
plt.xlabel('Model Type')
plt.ylabel('Final Training Loss')
plt.title('Comparison of Training Loss Across Different Models')
plt.show()