# Assignment 2

You will need to find a images set that you will use for classification using Convolutional Neural Networks. You will need to describe the process you followed for solving the problem. You will need to submit the Jupyter Notebook and the additional files may be needed for running the program.

Please, only one submission per group.

**Team**\
Abhinay Kotla (1002195827)\
Wonjun Park (1002237177)

## Intel Image Classification

https://www.kaggle.com/datasets/puneet6060/intel-image-classification

The dataset consists of various scenes or objects like Natural Scenes. Specifically, the labels are

* buildings
* forest
* glacier
* mountain
* sea
* street

The dataset enables a trained neural network model to classify those 6 labels.

It was published for holding a competition on the [online platform](https://www.analyticsvidhya.com/datahack/).

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader

from torchvision import transforms
from torchvision.datasets import ImageFolder

import os
import time

import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.metrics import confusion_matrix

from kagglehub import dataset_download

torch.manual_seed(42)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

if torch.cuda.is_available():
    print(f"PyTorch CUDA version: {torch.version.cuda}")

    print(f"GPU device name: {torch.cuda.get_device_name(0)}")

    print(f"GPU memory allocated: {torch.cuda.memory_allocated(0) / 1024**2:.2f} MB")

    total_memory = torch.cuda.get_device_properties(0).total_memory
    print(f"GPU total memory available: {total_memory / 1024**2:.2f} MB")

    reserved_memory = torch.cuda.memory_reserved(0)
    allocated_memory = torch.cuda.memory_allocated(0)
    free_memory = total_memory - allocated_memory
    print(f"GPU free memory: {free_memory / 1024**2:.2f} MB")
else:
    print("CUDA is not available on this system.")

In [None]:
transform = transforms.Compose([
    transforms.Resize((150, 150)),                
    transforms.ToTensor(),
])

The `transform` function is related to how to address image data. The following numbered bullets are about explanations;

1. **`transforms.Resize((150, 150))`**: Resizes the input image to a fixed size of 150x150 pixels. This ensures all images have the same dimensions (although the original dataset gave its image size is 150x150), allowing the neural network model to train uniformly among various images.

2. **`transforms.ToTensor()`**: Converts the image from a PIL image or NumPy array into a PyTorch tensor. It also scales the pixel values from the range [0, 255] to [0, 1].

In [None]:
path = dataset_download("puneet6060/intel-image-classification")
print("Path to dataset files:", path)

train_path = os.path.join(path, "seg_train", "seg_train")
test_path = os.path.join(path, "seg_test", "seg_test")

print("Classes in training folder:", os.listdir(train_path))

train_dataset = ImageFolder(root=train_path, transform=transform)
test_dataset = ImageFolder(root=test_path, transform=transform)

batch_size = 64
num_workers = 6

train_loader = DataLoader(
    train_dataset,
    batch_size=batch_size,
    shuffle=True,
    num_workers=num_workers,
    pin_memory=True
)

test_loader = DataLoader(
    test_dataset,
    batch_size=batch_size,
    shuffle=False,
    num_workers=num_workers,
    pin_memory=True
)

print("✅ Dataset loaded successfully!")
print("Classes:", train_dataset.classes)

In [None]:
print("Dataset type:", type(train_dataset))

first_image, first_label = train_dataset[0]

print(f"Shape of the first image: {first_image.shape}")  

image_np = first_image.permute(1, 2, 0).numpy()
image_np = (image_np * 0.5) + 0.5 

plt.imshow(image_np)
plt.title(f"Label: {train_dataset.classes[first_label]}")
plt.axis('off')
plt.show()

In [None]:
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        residual = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        out += self.shortcut(residual)
        out = self.relu(out)

        return out

In [None]:
class CustomSceneCNN(nn.Module):
    def __init__(self):
        super(CustomSceneCNN, self).__init__()


        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(16)
        self.relu = nn.ReLU(inplace=True)


        self.layer1 = ResidualBlock(16, 16)              
        self.layer2 = ResidualBlock(16, 32, stride=2)   
        self.layer3 = ResidualBlock(32, 32)              
        self.layer4 = ResidualBlock(32, 64, stride=2)    
        self.layer5 = ResidualBlock(64, 64)             


        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))

        self.fc1 = nn.Linear(64, 128)
        self.fc_bn1 = nn.BatchNorm1d(128)
        self.fc2 = nn.Linear(128, 64)
        self.fc_bn2 = nn.BatchNorm1d(64)
        self.fc3 = nn.Linear(64, 6)  

        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = self.relu(self.bn1(self.conv1(x)))

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.layer5(x)

        x = self.avgpool(x)
        x = x.view(x.size(0), -1)

        x = self.relu(self.fc_bn1(self.fc1(x)))
        x = self.dropout(x)
        x = self.relu(self.fc_bn2(self.fc2(x)))
        x = self.dropout(x)
        x = self.fc3(x)

        return x


In [None]:
model = CustomSceneCNN().to(device)
print(model)

conv_layers = sum(1 for module in model.modules() if isinstance(module, nn.Conv2d))
print(f"Number of layers: {conv_layers}")

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
def train(model, train_loader, optimizer, criterion, device):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)

        optimizer.zero_grad()

        outputs = model(data)
        loss = criterion(outputs, target)

        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = outputs.max(1)
        total += target.size(0)
        correct += predicted.eq(target).sum().item()

    train_loss = running_loss / len(train_loader)
    train_accuracy = 100. * correct / total
    return train_loss, train_accuracy

In [None]:
def test(model, test_loader, criterion, device):
    model.eval()
    test_loss = 0
    correct = 0
    total = 0
    all_preds = []
    all_targets = []

    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            outputs = model(data)
            test_loss += criterion(outputs, target).item()
            _, predicted = outputs.max(1)
            total += target.size(0)
            correct += predicted.eq(target).sum().item()

            all_preds.extend(predicted.cpu().numpy())
            all_targets.extend(target.cpu().numpy())

    test_loss /= len(test_loader)
    test_accuracy = 100. * correct / total

    return test_loss, test_accuracy, all_preds, all_targets

In [None]:
model_path = './intel_scene_cnn_model.pth'
history_path = './intel_scene_cnn_history.pth'

if os.path.exists(model_path):
    train_time_compute = 0
    print(f"Loading pre-trained model from {model_path}")
    model.load_state_dict(torch.load(model_path, weights_only=True))

    if os.path.exists(history_path):
        history = torch.load(history_path)
        train_losses = history['train_losses']
        train_accuracies = history['train_accuracies']
        test_losses = history['test_losses']
        test_accuracies = history['test_accuracies']
        print(f"Loaded training history for {len(train_losses)} epochs")
    else:
        print("No training history found. Evaluating current model performance...")
        test_loss, test_accuracy, _, _ = test(model, test_loader, criterion, device)
        print(f'Loaded Model Test Accuracy: {test_accuracy:.2f}%')
        train_losses = [0]
        test_losses = [test_loss]
        train_accuracies = [0]
        test_accuracies = [test_accuracy]
else:
    print("No previously trained model found. Training from scratch...")
    train_time_compute = 0
    start_time = time.time()
    max_epochs = 20
    patience = 3
    best_accuracy = 0
    no_improve_count = 0

    train_losses = []
    train_accuracies = []
    test_losses = []
    test_accuracies = []

    for epoch in range(max_epochs):
        train_loss, train_accuracy = train(model, train_loader, optimizer, criterion, device)
        test_loss, test_accuracy, _, _ = test(model, test_loader, criterion, device)

        train_losses.append(train_loss)
        train_accuracies.append(train_accuracy)
        test_losses.append(test_loss)
        test_accuracies.append(test_accuracy)

        print(f'Epoch {epoch+1}/{max_epochs}, '
              f'Train Loss: {train_loss:.4f}, Train Acc: {train_accuracy:.2f}%, '
              f'Test Loss: {test_loss:.4f}, Test Acc: {test_accuracy:.2f}%')

        if test_accuracy > best_accuracy:
            best_accuracy = test_accuracy
            torch.save(model.state_dict(), model_path)
            print(f"Model improved, saved to {model_path}")
            no_improve_count = 0
        else:
            no_improve_count += 1
            print(f"No improvement for {no_improve_count} epochs")

        history = {
            'train_losses': train_losses,
            'train_accuracies': train_accuracies,
            'test_losses': test_losses,
            'test_accuracies': test_accuracies
        }
        torch.save(history, history_path)

        if no_improve_count >= patience:
            print(f"Early stopping after {epoch+1} epochs")
            break

    end_time = time.time()
    training_time = end_time - start_time
    print(f'Training completed in {training_time:.2f} seconds')

    model.load_state_dict(torch.load(model_path, weights_only=True))
    print(f"Loaded best model with accuracy: {best_accuracy:.2f}%")
    print(f"Training history saved to {history_path}")


In [None]:
if train_time_compute:
    end_time = time.time()
    training_time = end_time - start_time
    print(f'Training completed in {training_time:.2f} seconds')

_, final_accuracy, all_preds, all_targets = test(model, test_loader, criterion, device)
print(f'Final Test Accuracy: {final_accuracy:.2f}%')

cm = confusion_matrix(all_targets, all_preds)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=train_dataset.classes,
            yticklabels=train_dataset.classes)
plt.title("Confusion Matrix")
plt.xlabel("Predicted")
plt.ylabel("True")
plt.tight_layout()
plt.show()
