In [None]:
import os
import torch
from torch.utils.data import Dataset
from torchvision import transforms
from PIL import Image
import numpy as np

class CustomDataset(Dataset):
    def __init__(self, data_folder, transform=None):
        self.data_folder = data_folder
        self.transform = transform

        self.class_names = sorted(os.listdir(data_folder))
        self.class_to_idx = {class_name: idx for idx, class_name in enumerate(self.class_names)}
        self.image_paths = []
        self.labels = []
        self.data = []

        for class_name in self.class_names:
            class_folder = os.path.join(data_folder, class_name)
            class_label = self.class_to_idx[class_name]
            for filename in os.listdir(class_folder):
                img_path = os.path.join(class_folder, filename)
                self.image_paths.append(img_path)
                self.labels.append(class_label)
                image = Image.open(img_path)
                self.data.append(np.array(image)) 

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path)
        label = self.labels[idx]

        if self.transform:
            image = self.transform(image)

        return image, label

In [None]:
import torch.nn as nn
import torch.nn.functional as F

class CustomModel(nn.Module):
    def __init__(self, dropout_rate=0.5):
        super(CustomModel, self).__init__()
        
        self.features = nn.Sequential(
            # First block
            nn.Conv2d(1, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),
            nn.Dropout2d(0.2),
            
            # Second block
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),
            nn.Dropout2d(0.3),
            
            # Third block
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),
            nn.Dropout2d(0.4),
        )
        
        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool2d((7, 7)),
            nn.Flatten(),
            nn.Linear(128 * 7 * 7, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(inplace=True),
            nn.Dropout(dropout_rate),
            nn.Linear(512, 2)
        )
        
        # Initialize weights
        self._initialize_weights()
    
    # added a pre-initialization of weights
    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.constant_(m.bias, 0)
    
    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x

In [None]:
# IMPORTANT!
# Be careful, this is the images data transformation, use the transformation that suits your need

transform_train = transforms.Compose([
    transforms.ToTensor(),
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),
    transforms.Grayscale(num_output_channels=1),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(15),
    transforms.RandomAffine(degrees=0, translate=(0.1, 0.1)),
    transforms.ColorJitter(brightness=0.2, contrast=0.2),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485], std=[0.229])
])

transform_valid = transforms.Compose([
    transforms.ToTensor(),
    transforms.ToPILImage(),          # First convert to PIL
    transforms.Resize((224, 224)),    # Then resize
    transforms.Grayscale(num_output_channels=1),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.485],
        std=[0.229]
    ),
])

In [7]:
import os 
import splitfolders

def split_dataset(input_folder, output_folder, ratio=(0.7, 0.2, 0.1), seed=42):
    """
    Split a dataset into train, validation, and test sets while maintaining the folder structure.
    
    Parameters:
    input_folder (str): Path to input folder containing category subfolders (cats, dogs)
    output_folder (str): Path where split datasets will be created
    ratio (tuple): Ratio for train, validation, test splits (default: 70%, 20%, 10%)
    seed (int): Random seed for reproducibility
    """
    
    # Create output directory if it doesn't exist
    os.makedirs(output_folder, exist_ok=True)
    
    # Split the data
    splitfolders.ratio(
        input=input_folder,
        output=output_folder,
        seed=seed,
        ratio=ratio,
        group_prefix=None  # Don't add extra prefix to folders
    )

In [8]:
# Run this cell to build the images split into datasets folder
# in this example the images are taken from "images" folder in the root of the project
split_dataset("../images", "./datasets/split_images")

Copying files: 5856 files [00:02, 2283.29 files/s]


In [None]:
# Training configuration
config = {
    'data_root': './datasets/split_images',  
    'batch_size': 16,
    'num_epochs': 100,
    'learning_rate': 0.003,
    'device': torch.device('cuda' if torch.cuda.is_available() else 'cpu'),
    'save_path': './model_checkpoints'
}

# Create save directory if it doesn't exist
os.makedirs(config['save_path'], exist_ok=True)

# Initialize datasets and dataloaders
train_dataset = CustomDataset(
    data_folder=os.path.join(config['data_root'], 'train'),
    transform=transform_train
)

val_dataset = CustomDataset(
    data_folder=os.path.join(config['data_root'], 'val'),
    transform=transform_valid
)

In [None]:
from torch.utils.data import DataLoader
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau

train_loader = DataLoader(
    train_dataset,
    batch_size=config['batch_size'],
    shuffle=True,
    num_workers=2
)

val_loader = DataLoader(
    val_dataset,
    batch_size=config['batch_size'],
    shuffle=False,
    num_workers=2
)

# Initialize model, loss function, and optimizer
model = CustomModel().to(config['device'])
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=config['learning_rate'], weight_decay=0.01)
scheduler = ReduceLROnPlateau(optimizer, mode='max', factor=0.1, patience=5, verbose=True)

In [None]:
from tqdm import tqdm

# Rest of the training loop remains the same as before, but add scheduler step
def train_epoch(model, loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    
    for inputs, labels in tqdm(loader, desc='Training'):
        inputs, labels = inputs.to(device), labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        
        # Add L2 regularization
        l2_lambda = 0.001
        l2_reg = torch.tensor(0.).to(device)
        for param in model.parameters():
            l2_reg += torch.norm(param)
        loss += l2_lambda * l2_reg
        
        loss.backward()
        
        # Gradient clipping
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        
        optimizer.step()
        
        running_loss += loss.item()
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()
    
    return running_loss / len(loader), 100. * correct / total

In [None]:
def validate(model, loader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    
    with torch.no_grad():
        for inputs, labels in tqdm(loader, desc='Validation'):
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            
            running_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()
    
    return running_loss / len(loader), 100. * correct / total

In [None]:
# Training loop
best_val_acc = 0
train_losses = []
train_accs = []
val_losses = []
val_accs = []

print(f"Training on device: {config['device']}")

for epoch in range(config['num_epochs']):
    print(f"\nEpoch {epoch+1}/{config['num_epochs']}")
    
    # Train
    train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, config['device'])
    
    train_losses.append(train_loss)
    train_accs.append(train_acc)
    
    # Validate
    val_loss, val_acc = validate(model, val_loader, criterion, config['device'])
    val_losses.append(val_loss)
    val_accs.append(val_acc)
    
    print(f'Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.2f}%')
    print(f'Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.2f}%')
    
    scheduler.step(val_acc)
    # Save best model
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        torch.save({
            'epoch': epoch + 1,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'val_acc': val_acc,
        }, os.path.join(config['save_path'], 'best_model.pth'))
        print(f'New best model saved with validation accuracy: {val_acc:.2f}%')