In [1]:
import numpy as np
from sklearn import datasets
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error,accuracy_score,classification_report
import matplotlib.pyplot as plt
import torch.nn.functional as F
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import random_split, TensorDataset, DataLoader
from torchvision import datasets, transforms
import pandas
import os

In [2]:
base_dir = os.getcwd()
data_dir = os.path.join(base_dir, 'split_dataset')

train_dir = os.path.join(data_dir, 'train')
test_studio_dir = os.path.join(data_dir, 'test_studio')
test_realworld_dir = os.path.join(data_dir, 'test_realworld')

# image preprocessing: resize and convert to tensor 
transform = transforms.Compose([
    transforms.Resize((128,128)),
    transforms.ToTensor()
])

train_dataset = datasets.ImageFolder(root=train_dir, transform=transform)
test_studio_dataset = datasets.ImageFolder(root=test_studio_dir, transform=transform)
test_realworld_dataset = datasets.ImageFolder(root=test_realworld_dir, transform=transform)


val_size = int(0.1 * len(train_dataset))
train_size = len(train_dataset) - val_size

train_subset, val_subset = random_split(train_dataset, [train_size, val_size])


train_loader = DataLoader(train_subset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_subset, batch_size=32, shuffle=False)

test_studio_loader = DataLoader(test_studio_dataset, batch_size=32, shuffle=False)
test_realworld_loader = DataLoader(test_realworld_dataset, batch_size=32, shuffle=False)

In [None]:
'''class CNN(nn.Module):
    def __init__(self, in_channels=3, dropout_prob=0.5):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=in_channels, out_channels=8, kernel_size=3, padding=1)   # (1,8,8) → (8,8,8)
        self.conv2 = nn.Conv2d(in_channels=8, out_channels=64, kernel_size=3, padding=1)  # (8,4,4) → (64,4,4)
        self.pool = nn.MaxPool2d(2, 2)                            # halves spatial size
        self.dropout = nn.Dropout(dropout_prob)
        self.fc1 = nn.Linear(64 * 2 * 2, 128)
        self.fc2 = nn.Linear(128, 10)  # 10 classes (digits 0–9)

    def forward(self, x):      # input x shape: (batch_size, 1, 8, 8)
        x = self.pool(F.relu(self.conv1(x)))  # conv1 + relu + pool → (8,4,4)
        x = self.pool(F.relu(self.conv2(x)))  # conv2 + relu + pool → (64,2,2)
        x = torch.flatten(x, 1)               # flatten except batch dim
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x'''

In [None]:
# cnn_weight_model1.pth; TRAIN_ACCURACY = 99 % , TEST_STUDI0_ACCURACY = 70%, TEST_REALWORLD_ACCURACY = 70%

'''class CNN(nn.Module):
    def __init__(self, in_channels=3, dropout_prob=0.5, num_classes=10):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, 8, 3, padding=1)
        self.conv2 = nn.Conv2d(8, 64, 3, padding=1)
        self.conv3 = nn.Conv2d(64, 128, 3, padding=1)  # new conv
        self.pool = nn.MaxPool2d(2, 2)
        self.dropout = nn.Dropout(dropout_prob)
        self.fc1 = nn.Linear(128 * 16 * 16, 128)  # adjust to output of last pool
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))  # 128→64
        x = self.pool(F.relu(self.conv2(x)))  # 64→32
        x = self.pool(F.relu(self.conv3(x)))  # 32→16
        x = torch.flatten(x, 1)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x'''

In [3]:
# cnn_weight_model2.pth TRAIN_ACCURACY = , TEST_STUDI0_ACCURACY = , TEST_REALWORLD_ACCURACY = 

class SimpleCNN(nn.Module):
    def __init__(self, in_channels=3, num_classes=10, dropout_prob=0.5):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, 16, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, padding=1)
        self.dropout = nn.Dropout(dropout_prob)
        self.fc1 = nn.Linear(32 * 32 * 32, 128)  # after 2 pools from 128x128 → 32x32
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))  # 128 → 64
        x = self.pool(F.relu(self.conv2(x)))  # 64 → 32
        x = torch.flatten(x, 1)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x


In [4]:
num_epochs=50
lr=0.001
batch_size=64

In [5]:
# Model, Loss, Optimizer
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = SimpleCNN().to(device)
criterion = nn.CrossEntropyLoss()  # for multi-class classification
optimizer = optim.Adam(model.parameters(), lr=lr)


In [None]:
'''# Training loop
for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0.0

    for batch_x, batch_y in train_loader:
        batch_x = batch_x.to(device)
        batch_y = batch_y.to(device)

        logits = model(batch_x)
        loss = criterion(logits, batch_y)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    avg_loss = epoch_loss / len(train_loader)
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}")'''

In [6]:
best_val_loss = float('inf')
patience = 5
counter = 0

for epoch in range(num_epochs):
    #train
    model.train()
    running_loss = 0.0
    for batch_x, batch_y in train_loader:
        batch_x, batch_y = batch_x.to(device), batch_y.to(device)
        optimizer.zero_grad()
        logits = model(batch_x)
        loss = criterion(logits, batch_y)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    avg_train_loss = running_loss / len(train_loader)

    #validatiom
    model.eval()
    val_loss = 0.0
    correct, total = 0, 0
    with torch.no_grad():
        for batch_x, batch_y in val_loader:
            batch_x, batch_y = batch_x.to(device), batch_y.to(device)
            logits = model(batch_x)
            loss = criterion(logits, batch_y)
            val_loss += loss.item()
            _, preds = torch.max(logits, 1)
            correct += (preds == batch_y).sum().item()
            total += batch_y.size(0)

    avg_val_loss = val_loss / len(val_loader)
    val_acc = correct / total

    print(f"Epoch [{epoch+1}/{num_epochs}] | Train Loss: {avg_train_loss:.4f} | "
          f"Val Loss: {avg_val_loss:.4f} | Val Acc: {val_acc:.4f}")

    #early
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        counter = 0
        torch.save(model.state_dict(), "best_model.pth")
    else:
        counter += 1
        if counter >= patience:
            print("Early stopping triggered!")
            break


Epoch [1/50] | Train Loss: 1.7277 | Val Loss: 1.4556 | Val Acc: 0.4766
Epoch [2/50] | Train Loss: 1.4325 | Val Loss: 1.2970 | Val Acc: 0.5608
Epoch [3/50] | Train Loss: 1.3061 | Val Loss: 1.2435 | Val Acc: 0.5677
Epoch [4/50] | Train Loss: 1.1968 | Val Loss: 1.1814 | Val Acc: 0.6101
Epoch [5/50] | Train Loss: 1.0906 | Val Loss: 1.0780 | Val Acc: 0.6380
Epoch [6/50] | Train Loss: 0.9855 | Val Loss: 1.0882 | Val Acc: 0.6557
Epoch [7/50] | Train Loss: 0.8971 | Val Loss: 1.0538 | Val Acc: 0.6627
Epoch [8/50] | Train Loss: 0.8122 | Val Loss: 1.1124 | Val Acc: 0.6487
Epoch [9/50] | Train Loss: 0.7499 | Val Loss: 1.0475 | Val Acc: 0.6772
Epoch [10/50] | Train Loss: 0.6965 | Val Loss: 1.0549 | Val Acc: 0.6703
Epoch [11/50] | Train Loss: 0.6340 | Val Loss: 1.1209 | Val Acc: 0.6703
Epoch [12/50] | Train Loss: 0.5919 | Val Loss: 1.1231 | Val Acc: 0.6816
Epoch [13/50] | Train Loss: 0.5337 | Val Loss: 1.1733 | Val Acc: 0.6873
Epoch [14/50] | Train Loss: 0.5096 | Val Loss: 1.2677 | Val Acc: 0.6892
E

In [None]:
torch.save(model.state_dict(), "cnn_weights_model2.pth")

In [None]:
# Recreate the same model architecture

model = SimpleCNN(in_channels=3, num_classes=len(train_dataset.classes))
model.load_state_dict(torch.load("cnn_weights.pth"))
model.to(device)
model.eval()

In [None]:
# test the model
model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for batch_x, batch_y in test_realworld_loader:
        batch_x = batch_x.to(device)
        batch_y = batch_y.to(device)

        logits = model(batch_x)
        _, preds = torch.max(logits, 1)

        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(batch_y.cpu().numpy())
        
accuracy = accuracy_score(all_labels, all_preds)
print(f"Test Accuracy: {accuracy:.4f}")
