# Convolutional Neural Network

Let's practice CNN by training the model to predict the number of coins in each image.

In [None]:
import kagglehub

path_to_dataset = kagglehub.dataset_download("balabaskar/count-coins-image-dataset")
print("Path to dataset files:", path_to_dataset)

In [None]:
import os
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Subset
from torchvision import transforms
from torchvision.utils import make_grid

from lightning.pytorch import LightningDataModule
from lightning.pytorch import LightningModule
import lightning.pytorch as L

print(L.__version__)

import matplotlib.pyplot as plt
%matplotlib inline
from PIL import Image

In [None]:
csv_path = os.path.join(path_to_dataset, 'coins_count_values.csv')
df = pd.read_csv(csv_path)
display(df[0:3])

# map image_name to coins_count
img2ct = df.set_index("image_name")["coins_count"].to_dict()

In [None]:
coins_dir = os.path.join(path_to_dataset, 'coins_images/coins_images')
print(coins_dir)
path_label = []
for dirpath, _, filenames in os.walk(coins_dir):
    if len(filenames) > 0:
        folder_name = os.path.basename(dirpath)
        print(f"Found {len(filenames)} images in {folder_name}")
        for filename in filenames:
            path = (os.path.join(dirpath, filename))
            label = img2ct[filename]
            path_label += [(path, label)]

print(path_label[0:3])

In [None]:
class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, path_label, transform=None):
        self.path_label = path_label
        self.transform = transform

    def __len__(self):
        return len(self.path_label)

    def __getitem__(self, idx):
        path, label = self.path_label[idx]
        img = Image.open(path).convert('RGB')

        if self.transform is not None:
            img = self.transform(img)

        return img, label

In [None]:
class DataModule(LightningDataModule):
    def __init__(self, path_label, mean, std, batch_size=32):
        super().__init__()
        self.path_label = path_label
        self.batch_size = batch_size

        self.transform = transforms.Compose([
            transforms.Resize(224),
            transforms.CenterCrop(224),
            transforms.ToTensor(),
            transforms.Normalize(mean, std)
        ])

        self.train_dataset = None
        self.test_dataset = None

    def setup(self, stage=None):
        dataset = CustomDataset(self.path_label, self.transform)
        dataset_size = len(dataset)
        train_size = int(0.8 * dataset_size)
        test_size = dataset_size - train_size

        self.train_dataset = Subset(dataset, range(train_size))
        self.test_dataset = Subset(dataset, range(train_size, dataset_size))

    def train_dataloader(self):
        return DataLoader(self.train_dataset, batch_size=self.batch_size, shuffle=True)

    def test_dataloader(self):
        return DataLoader(self.test_dataset, batch_size=self.batch_size)

    def predict_dataloader(self):
        return DataLoader(self.test_dataset, batch_size=self.batch_size)

    def __len__(self):
        if self.train_dataset is not None:
            return len(self.train_dataset)
        elif self.test_dataset is not None:
            return len(self.test_dataset)
        else:
            return 0


In [None]:
def get_mean_std(path_label):
    _transform = transforms.Compose([
        transforms.Resize(224),
        transforms.CenterCrop(224),
        transforms.ToTensor()
    ])
    path_list = [e[0] for e in path_label]
    R = []
    G = []
    B = []
    for path in path_list:
        img = Image.open(path).convert('RGB')
        img = _transform(img)
        tensor = img.numpy()
        tensor = np.moveaxis(tensor, -1, 0)
        R.append(tensor[0])
        G.append(tensor[1])
        B.append(tensor[2])
    R = np.stack(R, axis=0)
    G = np.stack(G, axis=0)
    B = np.stack(B, axis=0)
    mean = np.array([np.mean(R), np.mean(G), np.mean(B)])
    std = np.array([np.std(R), np.std(G), np.std(B)])
    return mean, std


mean, std = get_mean_std(path_label)
print(f"Mean: {mean.tolist()}")
print(f"Std: {std.tolist()}")

## CNN v1

Here we have stacked multiple layers of `nn.Conv2d` and `nn.Linear` in the model.

The model learns the spatial feature of each coin in an image to produce a number.

The goal is to output a number that matches the number of coins in the image.

In [None]:
import math


class CNNv1(LightningModule):

    def __init__(self):
        super(CNNv1, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, 3, 1)
        self.conv2 = nn.Conv2d(6, 16, 3, 1)
        self.fc1 = nn.Linear(16 * 54 * 54, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 20)
        self.fc4 = nn.Linear(20, 1)

    def forward(self, X):
        # I=(3,224,224)
        # K=6, F=3, S=1, O=(6,222,222)
        X = F.relu(self.conv1(X))

        # F=2, S=2, O=(6,111,111)
        X = F.max_pool2d(X, 2, 2)

        # K=16, F=3, S=1, O=(16,109,109)
        X = F.relu(self.conv2(X))

        # F=2, S=2, O=(16,54,54)
        X = F.max_pool2d(X, 2, 2)
        X = X.view(-1, 16 * 54 * 54)

        # I=(16,54,54), O=120
        X = F.relu(self.fc1(X))

        # I=120, O=84
        X = F.relu(self.fc2(X))

        # I=84, O=20
        X = F.relu(self.fc3(X))

        # I=20, O=1
        X = self.fc4(X)

        # reshape (N,1) to N, where N = batch_size
        return X.squeeze(1)

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=0.001)

    def training_step(self, batch, batch_idx):
        X, y = batch
        y_hat = self(X)
        loss = F.mse_loss(y_hat, y.float())
        self.log("train_loss", loss)
        return loss

    def validation_step(self, batch, batch_idx):
        X, y = batch
        y_hat = self(X)
        loss = F.mse_loss(y_hat, y.float())
        self.log("val_loss", loss)

    def test_step(self, batch, batch_idx):
        X, y = batch
        y_hat = self(X)
        mse_loss = F.mse_loss(y_hat, y.float())
        loss = math.sqrt(mse_loss)
        self.log("mse_loss", mse_loss)
        self.log("avg_loss", loss)


In [None]:
datamodule = DataModule(path_label=path_label, mean=mean, std=std)
datamodule.setup()

In [None]:
model = CNNv1()
trainer = L.Trainer(max_epochs=30)
trainer.fit(model, datamodule)

In [None]:
datamodule.setup(stage='test')
test_loader = datamodule.test_dataloader()
trainer.test(dataloaders=test_loader)

In [None]:
for images, labels in datamodule.train_dataloader():
    break
im = make_grid(images, nrow=16)

plt.figure(figsize=(12, 12))
plt.imshow(np.transpose(im.numpy(), (1, 2, 0)))

inv_normalize = transforms.Normalize(mean=-mean / std, std=1 / std)
im = inv_normalize(im)

plt.figure(figsize=(12, 12))
plt.imshow(np.transpose(im.numpy(), (1, 2, 0)))

In [None]:
device = torch.device("cpu")  #"cuda:0"

model.eval()
y_true = []
y_pred = []
with torch.no_grad():
    for test_data in datamodule.test_dataloader():
        test_images, test_labels = test_data[0].to(device), test_data[1].to(device)
        pred = model(test_images)
        for i in range(len(pred)):
            y_true.append(test_labels[i].item())
            y_pred.append(pred[i].item())

In [None]:
min(y_pred), max(y_pred)

In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(6, 6))
plt.scatter(y_true, y_pred, alpha=0.5, color='blue')
plt.xlabel("True Values")
plt.ylabel("Predicted Values")
plt.title("Scatter Plot of True vs Predicted (Coin Counts)")
plt.xlim(0, 40)
plt.ylim(0, 40)
plt.grid(True)
plt.show()

## Evaluation
Our model has trained to predict the number of coins in the image.

Test evaluation metrics of $MSE = 19.9$ indicate that the model is lacking the accuracy.

It means that the model is expected to **miss ~4 coins per image** by average.

One might argue that it's unfair to judge the model this way because the number of coins varies greatly across the images.

However, it's clearly obvious in the plot above that the model is overfitted to some degree.

## CNN v2

Work in progress...

In [None]:
import math


class CNNv2(LightningModule):

    def __init__(self):
        super(CNNv2, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, 3, 1)
        self.conv2 = nn.Conv2d(6, 16, 3, 1)
        self.fc1 = nn.Linear(16 * 54 * 54, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 20)
        self.fc4 = nn.Linear(20, 1)

    def forward(self, X):
        # I=(3,224,224)
        # K=6, F=3, S=1, O=(6,222,222)
        X = F.relu(self.conv1(X))

        # F=2, S=2, O=(6,111,111)
        X = F.max_pool2d(X, 2, 2)

        # K=16, F=3, S=1, O=(16,109,109)
        X = F.relu(self.conv2(X))

        # F=2, S=2, O=(16,54,54)
        X = F.max_pool2d(X, 2, 2)
        X = X.view(-1, 16 * 54 * 54)

        # I=(16,54,54), O=120
        X = F.relu(self.fc1(X))

        # I=120, O=84
        X = F.relu(self.fc2(X))

        # I=84, O=20
        X = F.relu(self.fc3(X))

        # I=20, O=1
        X = self.fc4(X)

        # reshape (N,1) to N, where N = batch_size
        return X.squeeze(1)

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=0.001)

    def training_step(self, batch, batch_idx):
        X, y = batch
        y_hat = self(X)
        loss = F.mse_loss(y_hat, y.float())
        self.log("train_loss", loss)
        return loss

    def validation_step(self, batch, batch_idx):
        X, y = batch
        y_hat = self(X)
        loss = F.mse_loss(y_hat, y.float())
        self.log("val_loss", loss)

    def test_step(self, batch, batch_idx):
        X, y = batch
        y_hat = self(X)
        mse_loss = F.mse_loss(y_hat, y.float())
        loss = math.sqrt(mse_loss)
        self.log("mse_loss", mse_loss)
        self.log("avg_loss", loss)

In [None]:
model = CNNv2()
trainer.fit(model, datamodule)

In [None]:
datamodule.setup(stage='test')
test_loader = datamodule.test_dataloader()
trainer.test(dataloaders=test_loader)