In [12]:
import os
import pandas as pd
import torch
import torch.nn as nn
from torchvision import models
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torchvision.datasets import ImageFolder
from torchvision.transforms import ToTensor, Compose, Resize, Normalize
from sklearn.model_selection import train_test_split, StratifiedKFold, StratifiedGroupKFold
from torch.utils.data import Subset
from sklearn.metrics import f1_score
import time



In [13]:
# Configuration
DATA_DIR = "data"
BATCH_SIZE = 32
EPOCHS = 10
IMG_SIZE = (128, 128)
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
N_SPLITS = 5  # Number of folds for cross-validation

In [14]:
# Transformations
transform = Compose([
    Resize(IMG_SIZE),
    ToTensor(),
    Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

In [15]:
import timm
import torch.nn as nn

def build_pretrained_model(num_classes):
    model = timm.create_model('convnext_base', pretrained=True)
    model.head = nn.Linear(model.head.in_features, num_classes)
    return model


In [16]:
# def build_pretrained_model(num_classes):
#     model = models.efficientnet_b0(pretrained=True)
#     model.classifier[1] = nn.Linear(model.classifier[1].in_features,num_classes)
#     return model


In [17]:
# class SimpleCNN(nn.Module):
#     def __init__(self, num_classes=10):
#         super(SimpleCNN, self).__init__()
        
#         # Convolutional layers
#         self.conv1 = nn.Conv2d(in_channels=3, out_channels=16, kernel_size=3, padding=1)
#         self.conv2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, padding=1)
        
#         # Pooling layer
#         self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        
#         # Calculate the size after convolutions and pooling
#         # Input size: IMG_SIZE[0] x IMG_SIZE[1]
#         # After first conv+pool: IMG_SIZE[0]/2 x IMG_SIZE[1]/2
#         # After second conv+pool: IMG_SIZE[0]/4 x IMG_SIZE[1]/4
#         conv_output_size = 32 * (IMG_SIZE[0] // 4) * (IMG_SIZE[1] // 4)
        
#         # Fully connected layers
#         self.fc1 = nn.Linear(conv_output_size, 128)
#         self.fc2 = nn.Linear(128, num_classes)
    
#     def forward(self, x):
#         x = self.pool(F.relu(self.conv1(x)))  # -> [B, 16, IMG_SIZE[0]/2, IMG_SIZE[1]/2]
#         x = self.pool(F.relu(self.conv2(x)))  # -> [B, 32, IMG_SIZE[0]/4, IMG_SIZE[1]/4]
#         x = x.view(x.size(0), -1)             # flatten
#         x = F.relu(self.fc1(x))
#         x = self.fc2(x)
#         return x

In [18]:
# Simple CNN Model
class CNN(nn.Module):
    def __init__(self, num_classes):
        super(CNN, self).__init__()
        self.conv_layers = nn.Sequential(
            nn.Conv2d(3, 32, 
                      kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
        )
        self.fc_layers = nn.Sequential(
            nn.Flatten(),
            nn.Linear(64 * (IMG_SIZE[0] // 4) * (IMG_SIZE[1] // 4), 128),
            nn.ReLU(),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        x = self.conv_layers(x)
        x = self.fc_layers(x)
        return x

In [19]:
# model = build_pretrained_model(num_classes).to(DEVICE)

In [20]:
# model_name = model.__class__.__name__  # Get model name
# print(model_name)

In [21]:
# Load full dataset
dataset = ImageFolder(root=DATA_DIR, transform=transform)

# Get indices and labels for stratified split
indices = list(range(len(dataset)))
labels = [dataset.targets[i] for i in indices]

# Initialize StratifiedKFold
skf = StratifiedKFold(n_splits=N_SPLITS, shuffle=True, random_state=42)

# Lists to store metrics for each fold
fold_metrics = []
epoch_metrics = []  # List to store per-epoch metrics

# Training and evaluation for each fold
for fold, (train_idx, val_idx) in enumerate(skf.split(indices, labels)):
    print(f"\nFold {fold + 1}/{N_SPLITS}")
    
    # Create data loaders for this fold
    train_subsampler = Subset(dataset, train_idx)
    val_subsampler = Subset(dataset, val_idx)
    
    train_loader = DataLoader(train_subsampler, batch_size=BATCH_SIZE, shuffle=True)
    val_loader = DataLoader(val_subsampler, batch_size=BATCH_SIZE, shuffle=False)
    
    # Initialize model for this fold
    num_classes = len(dataset.classes)
    model = build_pretrained_model(num_classes).to(DEVICE)
    model_name = model.__class__.__name__  # Get model name
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    
    # Training loop
    for epoch in range(EPOCHS):
        model.train()
        running_loss = 0.0
        epoch_start_time = time.time()
        
        for images, labels in train_loader:
            images, labels = images.to(DEVICE), labels.to(DEVICE)
            
            outputs = model(images)
            loss = criterion(outputs, labels)
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
        
        epoch_end_time = time.time()
        epoch_duration = epoch_end_time - epoch_start_time
        epoch_loss = running_loss/len(train_loader)
        print(f"Epoch [{epoch+1}/{EPOCHS}], Loss: {epoch_loss:.4f}, Time: {epoch_duration:.2f}s")
        
        # Store epoch metrics
        epoch_metrics.append({
            'fold': fold + 1,
            'epoch': epoch + 1,
            'running_loss': epoch_loss,
            'epoch_time': epoch_duration,
            'model_name': model_name
        })
        
        # Save epoch metrics after each epoch
        epoch_metrics_df = pd.DataFrame(epoch_metrics)
        try:
            epoch_metrics_df.to_csv('epoch_metrics.csv', index=False)
        except Exception as e:
            print(f"Error saving epoch metrics: {e}")
    
    # Validation loop
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    all_preds = []
    all_labels = []
    
    val_start_time = time.time()
    with torch.no_grad():
        for images, labels in val_loader:
            images, labels = images.to(DEVICE), labels.to(DEVICE)
            
            outputs = model(images)
            loss = criterion(outputs, labels)
            
            val_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    val_end_time = time.time()
    val_duration = val_end_time - val_start_time
    
    # Calculate metrics for this fold
    avg_val_loss = val_loss / len(val_loader)
    accuracy = 100 * correct / total
    f1 = f1_score(all_labels, all_preds, average='weighted')
    
    # Store metrics for this fold
    current_fold_metrics = {
        'fold': fold + 1,
        'running_loss': running_loss/len(train_loader),
        'val_loss': avg_val_loss,
        'val_accuracy': accuracy,
        'f1_score': f1,
        'validation_time': val_duration,
        'model_name': model_name
    }
    fold_metrics.append(current_fold_metrics)
    
    # Save fold metrics to CSV
    current_fold_df = pd.DataFrame([current_fold_metrics])
    try:
        current_fold_df.to_csv('model_metrics.csv', mode='a', header=not os.path.exists('model_metrics.csv'), index=False)
    except Exception as e:
        print(f"Error saving fold {fold + 1} metrics: {e}")
    
    print(f"Fold {fold + 1} Results:")
    print(f"Validation Loss: {avg_val_loss:.4f}")
    print(f"Validation Accuracy: {accuracy:.2f}%")
    print(f"F1 Score: {f1:.4f}")
    print(f"Validation Time: {val_duration:.2f}s")

# Convert metrics to DataFrame and calculate final statistics
metrics_df = pd.DataFrame(fold_metrics)
mean_metrics = metrics_df.mean()
std_metrics = metrics_df.std()

print("\nCross-validation Results:")
print(f"Average Loss: {mean_metrics['val_loss']:.4f} ± {std_metrics['val_loss']:.4f}")
print(f"Average Accuracy: {mean_metrics['val_accuracy']:.2f}% ± {std_metrics['val_accuracy']:.2f}%")
print(f"Average F1 Score: {mean_metrics['f1_score']:.4f} ± {std_metrics['f1_score']:.4f}")
print(f"Average Validation Time: {mean_metrics['validation_time']:.2f}s ± {std_metrics['validation_time']:.2f}s")


Fold 1/5


RuntimeError: mat1 and mat2 shapes cannot be multiplied (131072x4 and 1024x149)

In [8]:
# Artifact
# # Load full dataset
# dataset = ImageFolder(root=DATA_DIR, transform=transform)

# # Get indices and labels for stratified split
# indices = list(range(len(dataset)))
# labels = [dataset.targets[i] for i in indices]
# # Create subset of dataset
# train_dataset = torch.utils.data.Subset(dataset, train_indices)
# train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
# test_dataset = torch.utils.data.Subset(dataset, test_indices)
# test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

In [9]:
# # Initialize model
# num_classes = len(dataset.classes)
# model = CNN(num_classes=num_classes).to(DEVICE)

# # Loss and optimizer
# criterion = nn.CrossEntropyLoss()
# optimizer = optim.Adam(model.parameters(), lr=0.001)

In [10]:
# # Training loop
# for epoch in range(1):
#     model.train()
#     running_loss = 0.0
#     for images, labels in train_loader:
#         images, labels = images.to(DEVICE), labels.to(DEVICE)

#         outputs = model(images)
#         loss = criterion(outputs, labels)

#         optimizer.zero_grad()
#         loss.backward()
#         optimizer.step()

#         running_loss += loss.item()

#     print(f"Epoch [{epoch+1}/{EPOCHS}], Loss: {running_loss/len(train_loader):.4f}")

In [11]:
# # Testing loop
# model.eval()  # Set model to evaluation mode
# test_loss = 0.0
# correct = 0
# total = 0

# with torch.no_grad():  # Disable gradient computation for testing
#     for images, labels in test_loader:
#         images, labels = images.to(DEVICE), labels.to(DEVICE)
        
#         outputs = model(images)
#         loss = criterion(outputs, labels)
        
#         test_loss += loss.item()
#         _, predicted = torch.max(outputs.data, 1)
#         total += labels.size(0)
#         correct += (predicted == labels).sum().item()

# # Calculate and print test metrics
# avg_test_loss = test_loss / len(test_loader)
# accuracy = 100 * correct / total

# # Calculate F1 score

# f1 = f1_score(labels.cpu().numpy(), predicted.cpu().numpy(), average='weighted')

# print(f"Test Loss: {avg_test_loss:.4f}")
# print(f"Test Accuracy: {accuracy:.2f}%")
# print(f"F1 Score: {f1:.4f}")


In [12]:
# # Create a dictionary to store metrics
# metrics = {
#     'running_loss': running_loss/len(train_loader),
#     'test_loss': avg_test_loss,
#     'test_accuracy': accuracy,
#     'f1_score': f1
# }

# # Convert to DataFrame and save to CSV
# import pandas as pd
# metrics_df = pd.DataFrame([metrics])

# # Check if file exists and append or create new
# try:
#     existing_df = pd.read_csv('model_metrics.csv')
#     metrics_df = pd.concat([existing_df, metrics_df], ignore_index=True)
# except FileNotFoundError:
#     pass

# metrics_df.to_csv('model_metrics.csv', index=False)
