In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import numpy as np
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix
import seaborn as sns


In [3]:
# Set random seed for reproducibility
torch.manual_seed(42)
np.random.seed(42)

### Task 1 : Data Loading and Exploration

In [6]:
# Define transformations
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,))  # MNIST mean and std
])

In [7]:
# Load training and test datasets
train_dataset = torchvision.datasets.MNIST(root='.', train=True, download=True, transform=transform)
test_dataset = torchvision.datasets.MNIST(root='.', train=False, download=True, transform=transform)


100%|██████████| 9.91M/9.91M [00:38<00:00, 259kB/s]
100%|██████████| 28.9k/28.9k [00:00<00:00, 111kB/s]
100%|██████████| 1.65M/1.65M [00:06<00:00, 254kB/s]
100%|██████████| 4.54k/4.54k [00:00<00:00, 4.44MB/s]


In [8]:
# Create data loaders
batch_size = 64
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [9]:
# Display sample images from the dataset
def show_sample_images():
    plt.figure(figsize=(10, 5))
    for i in range(10):
        plt.subplot(2, 5, i+1)
        plt.imshow(train_dataset.data[i], cmap='gray')
        plt.title(f"Label: {train_dataset.targets[i].item()}")
        plt.axis('off')
    plt.tight_layout()
    plt.savefig('mnist_samples.png')
    plt.close()

# Calculate and report class distribution
def analyze_class_distribution():
    # Count occurrences of each digit in the training set
    train_labels = train_dataset.targets.numpy()
    unique_labels, counts = np.unique(train_labels, return_counts=True)
    
    # Print class distribution
    print("Class Distribution in Training Set:")
    for label, count in zip(unique_labels, counts):
        print(f"Digit {label}: {count} samples ({count/len(train_labels)*100:.2f}%)")
    
    # Visualize the distribution
    plt.figure(figsize=(10, 5))
    plt.bar(unique_labels, counts)
    plt.title('Class Distribution in MNIST Training Set')
    plt.xlabel('Digit')
    plt.ylabel('Number of Samples')
    plt.xticks(unique_labels)
    plt.grid(axis='y', alpha=0.3)
    plt.savefig('class_distribution.png')
    plt.close()
    
    return unique_labels, counts

### Task 2: CNN Implementation

In [10]:
# (a) Design and implement a CNN model
class MNISTClassifier(nn.Module):
    def __init__(self):
        super(MNISTClassifier, self).__init__()
        # First convolutional layer
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1)
        self.relu1 = nn.ReLU()
        self.pool1 = nn.MaxPool2d(kernel_size=2)
        
        # Second convolutional layer
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.relu2 = nn.ReLU()
        self.pool2 = nn.MaxPool2d(kernel_size=2)
        
        # Third convolutional layer for better feature extraction
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.relu3 = nn.ReLU()
        self.pool3 = nn.MaxPool2d(kernel_size=2)
        
        # Fully connected layers
        self.fc1 = nn.Linear(128 * 3 * 3, 512)
        self.relu4 = nn.ReLU()
        self.dropout = nn.Dropout(0.5)  # Prevent overfitting
        self.fc2 = nn.Linear(512, 10)   # 10 output classes for digits 0-9
        
        # Initialize weights
        self._initialize_weights()
        
    def forward(self, x):
        x = self.pool1(self.relu1(self.conv1(x)))
        x = self.pool2(self.relu2(self.conv2(x)))
        x = self.pool3(self.relu3(self.conv3(x)))
        
        # Flatten the tensor for the fully connected layer
        x = x.view(-1, 128 * 3 * 3)
        
        x = self.relu4(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x
    
    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.constant_(m.bias, 0)


### Task 3: Model Training

In [11]:
# (a) Split data into training and validation sets
def train_model(model, train_loader, test_loader, epochs=15):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)

    train_losses = []
    train_accuracies = []
    test_losses = []
    test_accuracies = []

    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0
        
        for i, (inputs, labels) in enumerate(train_loader):
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        train_loss = running_loss / len(train_loader)
        train_accuracy = 100 * correct / total
        train_losses.append(train_loss)
        train_accuracies.append(train_accuracy)

        model.eval()
        val_running_loss = 0.0
        val_correct = 0
        val_total = 0
        
        with torch.no_grad():
            for inputs, labels in test_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                
                val_running_loss += loss.item()
                _, predicted = torch.max(outputs.data, 1)
                val_total += labels.size(0)
                val_correct += (predicted == labels).sum().item()

        val_loss = val_running_loss / len(test_loader)
        val_accuracy = 100 * val_correct / val_total
        test_losses.append(val_loss)
        test_accuracies.append(val_accuracy)

        print(f'Epoch {epoch+1}/{epochs}, '
              f'Train Loss: {train_loss:.4f}, Train Acc: {train_accuracy:.2f}%, '
              f'Val Loss: {val_loss:.4f}, Val Acc: {val_accuracy:.2f}%')

    plt.figure(figsize=(12, 5))

    plt.subplot(1, 2, 1)
    plt.plot(range(1, epochs+1), train_accuracies, label='Training')
    plt.plot(range(1, epochs+1), test_accuracies, label='Validation')
    plt.title('Model Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy (%)')
    plt.legend()
    plt.grid(True)

    plt.subplot(1, 2, 2)
    plt.plot(range(1, epochs+1), train_losses, label='Training')
    plt.plot(range(1, epochs+1), test_losses, label='Validation')
    plt.title('Model Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid(True)
    
    plt.tight_layout()
    plt.savefig('training_curves.png')
    plt.close()

    torch.save(model.state_dict(), 'mnist_classifier.pth')
    
    return model, train_losses, train_accuracies, test_losses, test_accuracies

### Task 4: Evaluation and Analysis

In [12]:
def evaluate_model(model, test_loader):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    model.eval()
    
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    # Calculate metrics
    accuracy = accuracy_score(all_labels, all_preds)
    precision, recall, f1, _ = precision_recall_fscore_support(all_labels, all_preds, average='weighted')
    
    # Print evaluation metrics
    print("\nEvaluation Metrics:")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1 Score: {f1:.4f}")
    
    # Create confusion matrix
    cm = confusion_matrix(all_labels, all_preds)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=range(10), yticklabels=range(10))
    plt.title('Confusion Matrix')
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.savefig('confusion_matrix.png')
    plt.close()
    
    # Identify most common misclassifications
    errors = np.array(all_labels) != np.array(all_preds)
    misclassified_indices = np.where(errors)[0]
    
    if len(misclassified_indices) > 0:
        true_labels = np.array(all_labels)[misclassified_indices]
        pred_labels = np.array(all_preds)[misclassified_indices]
        
        # Create a dictionary to count misclassifications
        misclass_counts = {}
        for true_label, pred_label in zip(true_labels, pred_labels):
            pair = (true_label, pred_label)
            misclass_counts[pair] = misclass_counts.get(pair, 0) + 1
        
        # Find the most common misclassifications
        sorted_misclass = sorted(misclass_counts.items(), key=lambda x: x[1], reverse=True)
        
        print("\nMost Common Misclassifications:")
        for (true_label, pred_label), count in sorted_misclass[:5]:  # Show top 5
            print(f"True: {true_label}, Predicted: {pred_label}, Count: {count}")
    
    return accuracy, precision, recall, f1, cm

In [13]:
 # Task 1: Data Loading and Exploration
print("Performing exploratory data analysis...")
show_sample_images()
unique_labels, counts = analyze_class_distribution()
    
# Task 2: Implement CNN model
print("\nImplementing CNN model...")
model = MNISTClassifier()
print(model)
    
# Task 3: Train model
print("\nTraining the model...")
model, train_losses, train_accuracies, test_losses, test_accuracies = train_model(
  model, train_loader, test_loader, epochs=15
)
   
# Task 4: Evaluate model
print("\nEvaluating the model...")
accuracy, precision, recall, f1, cm = evaluate_model(model, test_loader)
    
# Suggest improvements
print("\nSuggested Improvements:")
print("1. Data augmentation: Apply random rotations, shifts, and zoom to increase the diversity of training samples.")
print("2. Batch normalization: Add batch normalization layers after convolutions to stabilize training.")
print("3. Learning rate scheduling: Implement a learning rate schedule to reduce the learning rate over time.")
print("4. Architecture enhancements: Try residual connections or deeper architecture to improve feature extraction.")
print("5. Ensemble methods: Train multiple models and combine their predictions for better performance.")

Performing exploratory data analysis...
Class Distribution in Training Set:
Digit 0: 5923 samples (9.87%)
Digit 1: 6742 samples (11.24%)
Digit 2: 5958 samples (9.93%)
Digit 3: 6131 samples (10.22%)
Digit 4: 5842 samples (9.74%)
Digit 5: 5421 samples (9.04%)
Digit 6: 5918 samples (9.86%)
Digit 7: 6265 samples (10.44%)
Digit 8: 5851 samples (9.75%)
Digit 9: 5949 samples (9.92%)

Implementing CNN model...
MNISTClassifier(
  (conv1): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (relu1): ReLU()
  (pool1): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (relu2): ReLU()
  (pool2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv3): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (relu3): ReLU()
  (pool3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (fc1): Linear(in_features=1152, 