In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision import transforms
from PIL import Image
import os
import pandas as pd
from tqdm import tqdm

In [12]:
IMG_SIZE = (224, 224)
BATCH_SIZE = 32
EPOCHS = 20
NUM_CLASSES = 2

DATASET_PATH = "dataset"
CSV_PATH = "train_clean.csv"
IMAGE_DIR = None

MODEL_SAVE_PATH = "cnn_pipeline_model.pth"

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {DEVICE}")


Using device: cpu


In [3]:
preprocessing_transform = transforms.Compose([
    transforms.Resize(IMG_SIZE),
    transforms.ToTensor()
])


In [4]:
class ImageFolderDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.images = []
        self.labels = []

        for class_name in sorted(os.listdir(root_dir)):
            class_path = os.path.join(root_dir, class_name)
            if os.path.isdir(class_path):
                class_label = int(class_name)
                for img_name in os.listdir(class_path):
                    if img_name.lower().endswith(('.jpg', '.jpeg', '.png')):
                        self.images.append(os.path.join(class_path, img_name))
                        self.labels.append(class_label)

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = Image.open(self.images[idx]).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image, torch.tensor(self.labels[idx], dtype=torch.long)


In [5]:
class CSVImageDataset(Dataset):
    def __init__(self, csv_path, image_dir, transform=None):
        self.df = pd.read_csv(csv_path, dtype={"ID": str})
        self.image_dir = image_dir
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        img_id = self.df.iloc[idx]["ID"]
        label = int(self.df.iloc[idx]["label"])

        img_path = None
        for ext in (".jpg", ".jpeg", ".png"):
            candidate = os.path.join(self.image_dir, img_id + ext)
            if os.path.exists(candidate):
                img_path = candidate
                break

        if img_path is None:
            raise FileNotFoundError(f"Image not found: {img_id}")

        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)

        return image, torch.tensor(label, dtype=torch.long)


In [6]:
print("ðŸ“‚ Loading dataset...")

if os.path.exists(DATASET_PATH):
    full_dataset = ImageFolderDataset(DATASET_PATH, preprocessing_transform)
    print(f"Loaded {len(full_dataset)} images from folder structure")

elif os.path.exists(CSV_PATH):
    if IMAGE_DIR is None:
        possible_dirs = [
            "Combined_Resized_256",
            "Normalised_Image_256",
            "Standardized_Image_256",
            "Renamed_Ok",
            "Renamed_Not_OK"
        ]
        for d in possible_dirs:
            if os.path.exists(d):
                IMAGE_DIR = d
                break

    if IMAGE_DIR is None:
        raise FileNotFoundError("Could not auto-detect IMAGE_DIR")

    full_dataset = CSVImageDataset(CSV_PATH, IMAGE_DIR, preprocessing_transform)
    print(f"Loaded {len(full_dataset)} images from CSV using {IMAGE_DIR}")

else:
    raise FileNotFoundError("Neither dataset folder nor CSV file found")


ðŸ“‚ Loading dataset...
Loaded 5701 images from CSV using Combined_Resized_256


In [7]:
train_size = int(0.8 * len(full_dataset))
val_size = len(full_dataset) - train_size

train_dataset, val_dataset = random_split(
    full_dataset,
    [train_size, val_size],
    generator=torch.Generator().manual_seed(42)
)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)

print(f"Train: {train_size}, Validation: {val_size}")


Train: 4560, Validation: 1141


In [8]:
class CNNModel(nn.Module):
    def __init__(self, num_classes=2):
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 32, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(32, 64, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(64, 128, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(128 * 28 * 28, 128),
            nn.ReLU(),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        return self.classifier(self.features(x))


In [9]:
model = CNNModel(NUM_CLASSES).to(DEVICE)
print(model)


CNNModel(
  (features): Sequential(
    (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): ReLU()
    (5): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (6): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (7): ReLU()
    (8): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (classifier): Sequential(
    (0): Flatten(start_dim=1, end_dim=-1)
    (1): Linear(in_features=100352, out_features=128, bias=True)
    (2): ReLU()
    (3): Linear(in_features=128, out_features=2, bias=True)
  )
)


In [10]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters())


In [11]:
best_val_acc = 0.0

for epoch in range(EPOCHS):
    model.train()
    train_correct = 0
    train_total = 0

    for images, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{EPOCHS} [Train]"):
        images, labels = images.to(DEVICE), labels.to(DEVICE)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        train_correct += (outputs.argmax(1) == labels).sum().item()
        train_total += labels.size(0)

    train_acc = 100 * train_correct / train_total

    model.eval()
    val_correct = 0
    val_total = 0

    with torch.no_grad():
        for images, labels in tqdm(val_loader, desc=f"Epoch {epoch+1}/{EPOCHS} [Val]"):
            images, labels = images.to(DEVICE), labels.to(DEVICE)
            outputs = model(images)
            val_correct += (outputs.argmax(1) == labels).sum().item()
            val_total += labels.size(0)

    val_acc = 100 * val_correct / val_total
    print(f"Epoch {epoch+1}: Train Acc={train_acc:.2f}%, Val Acc={val_acc:.2f}%")

    if val_acc > best_val_acc:
        best_val_acc = val_acc
        torch.save(model, "best_model_complete.pth")


Epoch 1/20 [Train]:   0%|          | 0/143 [00:00<?, ?it/s]


RuntimeError: mat1 and mat2 shapes cannot be multiplied (32x131072 and 100352x128)

In [None]:
print("Best validation accuracy:", best_val_acc)


In [None]:
def predict_single_image(image_path):
    model = torch.load("best_model_complete.pth", map_location=DEVICE)
    model.eval()

    img = Image.open(image_path).convert("RGB")
    img_tensor = preprocessing_transform(img).unsqueeze(0).to(DEVICE)

    with torch.no_grad():
        probs = torch.softmax(model(img_tensor), dim=1)
        pred = probs.argmax(1).item()

    return pred, probs[0][pred].item()
