In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader, Dataset
from PIL import Image
import numpy as np
import pandas as pd
import os
import torch.nn.functional as F

from sklearn.utils.class_weight import compute_class_weight
from torch.cuda.amp import autocast, GradScaler

# Check if GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Data Augmentation and Normalization for Training and Testing
train_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(45),
    transforms.ColorJitter(brightness=0.3, contrast=0.3),
    transforms.ToTensor(),
    transforms.RandomErasing(p=0.3),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

test_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# Custom Dataset for Unlabeled Test Data
class TestDataset(Dataset):
    def __init__(self, image_dir, transform=None):
        self.image_dir = image_dir
        self.transform = transform
        self.image_files = [f for f in sorted(os.listdir(image_dir)) if f.endswith('.jpg') or f.endswith('.png')]

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_path = os.path.join(self.image_dir, self.image_files[idx])
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image, self.image_files[idx]

# Load Data
train_data = datasets.ImageFolder('C:\MLCPS\WP\Project 1 Data\Project 1 Data\Train_Data', transform=train_transforms)

# Compute class weights to handle imbalance
targets = [label for _, label in train_data.samples]
class_weights = compute_class_weight(class_weight='balanced', classes=np.unique(targets), y=targets)
class_weights = torch.tensor(class_weights, dtype=torch.float).to(device)

# Create a validation dataset (15% of the data)
val_size = int(0.15 * len(train_data))
train_size = len(train_data) - val_size
train_dataset, val_dataset = torch.utils.data.random_split(train_data, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)

test_data = TestDataset('C:\MLCPS\WP\Project 1 Data\Project 1 Data\Test_Data', transform=test_transforms)
test_loader = DataLoader(test_data, batch_size=16, shuffle=False)

# Define the Models (VGG16, ResNet50, EfficientNet)
vgg16_model = models.vgg16(pretrained=True)
for param in vgg16_model.features[-10:]:
    param.requires_grad = True
vgg16_model.classifier[6] = nn.Linear(vgg16_model.classifier[6].in_features, 5)  # Adjust for 5 classes
vgg16_model = vgg16_model.to(device)

resnet50_model = models.resnet50(pretrained=True)
for param in resnet50_model.layer4.parameters():
    param.requires_grad = True
resnet50_model.fc = nn.Linear(resnet50_model.fc.in_features, 5)  # Adjust for 5 classes
resnet50_model = resnet50_model.to(device)

efficientnet_model = models.efficientnet_b0(pretrained=True)
for param in efficientnet_model.features[-2:]:
    param.requires_grad = True
efficientnet_model.classifier[1] = nn.Linear(efficientnet_model.classifier[1].in_features, 5)  # Adjust for 5 classes
efficientnet_model = efficientnet_model.to(device)

# Function to ensemble predictions by averaging softmax scores with weights
def ensemble_predict_weighted(models, loader, weights):
    model_predictions = []
    for i, model in enumerate(models):
        model.eval()  # Set model to evaluation mode
        predictions = []
        with torch.no_grad():
            for inputs, _ in loader:
                inputs = inputs.to(device)
                outputs = model(inputs)
                probs = F.softmax(outputs, dim=1)  # Get softmax probabilities
                predictions.append(probs.cpu().numpy() * weights[i])  # Apply weight to predictions
        model_predictions.append(np.vstack(predictions))
    
    avg_predictions = np.sum(model_predictions, axis=0)
    return np.argmax(avg_predictions, axis=1)  # Return the class with the highest weighted average probability

# Gradient Accumulation and Mixed Precision Training
scaler = GradScaler()  # For mixed precision training
accumulation_steps = 4  # Accumulate gradients over 4 steps

# Early stopping parameters
early_stopping_patience = 5
best_val_loss = float('inf')
epochs_no_improve = 0

# Train each model
def train_model(model, train_loader, val_loader, num_epochs=50):
    criterion = nn.CrossEntropyLoss(weight=class_weights)  # Use class weights in loss function
    optimizer = optim.Adam(model.parameters(), lr=1e-4)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=3, factor=0.1)

    global epochs_no_improve, best_val_loss
    for epoch in range(num_epochs):
        model.train()
        optimizer.zero_grad()
        running_loss = 0.0
        correct_train = 0
        total_train = 0
        for i, (inputs, labels) in enumerate(train_loader):
            inputs, labels = inputs.to(device), labels.to(device)
            
            with autocast():  # Mixed precision
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                loss = loss / accumulation_steps  # Gradient accumulation
                scaler.scale(loss).backward()

            if (i + 1) % accumulation_steps == 0:  # Update weights after gradient accumulation
                scaler.step(optimizer)
                scaler.update()
                optimizer.zero_grad()

            # Track training accuracy
            running_loss += loss.item() * inputs.size(0)
            _, preds = torch.max(outputs, 1)
            correct_train += torch.sum(preds == labels).item()
            total_train += labels.size(0)
        
        # Calculate training loss and accuracy
        epoch_loss = running_loss / len(train_loader.dataset)
        train_acc = 100 * correct_train / total_train

        # Validation phase
        model.eval()
        correct_val = 0
        total_val = 0
        val_loss = 0.0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item() * inputs.size(0)
                _, preds = torch.max(outputs, 1)
                correct_val += torch.sum(preds == labels).item()
                total_val += labels.size(0)

        # Calculate validation accuracy and loss
        val_acc = 100 * correct_val / total_val
        val_loss = val_loss / len(val_loader.dataset)

        scheduler.step(val_loss)  # Step learning rate scheduler
        print(f'Epoch {epoch+1}/{num_epochs}, '
              f'Training Loss: {epoch_loss:.4f}, Training Accuracy: {train_acc:.2f}%, '
              f'Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_acc:.2f}%')

        # Early stopping check
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            epochs_no_improve = 0
        else:
            epochs_no_improve += 1
            if epochs_no_improve == early_stopping_patience:
                print("Early stopping triggered")
                break

    print('Training Complete')

# Train the models individually
print("Training VGG16...")
train_model(vgg16_model, train_loader, val_loader, num_epochs=50)

print("Training ResNet50...")
train_model(resnet50_model, train_loader, val_loader, num_epochs=50)

print("Training EfficientNet...")
train_model(efficientnet_model, train_loader, val_loader, num_epochs=50)

# Ensemble the models with weighted averaging
models = [vgg16_model, resnet50_model, efficientnet_model]
weights = [0.4, 0.3, 0.3]  # Adjust weights based on individual model performance




Using device: cuda


  scaler = GradScaler()  # For mixed precision training
  with autocast():  # Mixed precision


Training VGG16...
Epoch 1/50, Training Loss: 0.4014, Training Accuracy: 29.87%, Validation Loss: 1.5616, Validation Accuracy: 44.83%
Epoch 2/50, Training Loss: 0.3747, Training Accuracy: 34.74%, Validation Loss: 1.4865, Validation Accuracy: 42.97%
Epoch 3/50, Training Loss: 0.3631, Training Accuracy: 41.37%, Validation Loss: 1.4310, Validation Accuracy: 41.11%
Epoch 4/50, Training Loss: 0.3411, Training Accuracy: 43.29%, Validation Loss: 1.3738, Validation Accuracy: 40.05%
Epoch 5/50, Training Loss: 0.3169, Training Accuracy: 51.38%, Validation Loss: 1.2930, Validation Accuracy: 51.46%
Epoch 6/50, Training Loss: 0.3126, Training Accuracy: 50.49%, Validation Loss: 1.3534, Validation Accuracy: 50.40%
Epoch 7/50, Training Loss: 0.2970, Training Accuracy: 54.04%, Validation Loss: 1.2649, Validation Accuracy: 53.58%
Epoch 8/50, Training Loss: 0.2807, Training Accuracy: 55.91%, Validation Loss: 1.2851, Validation Accuracy: 52.52%
Epoch 9/50, Training Loss: 0.2736, Training Accuracy: 55.63%, 

In [4]:
# Generate predictions on the test set
test_preds = ensemble_predict_weighted(models, test_loader, weights)

# Prepare submission
test_image_ids = [os.path.splitext(fname)[0] for _, fname in test_data]
sample_submission = pd.DataFrame({'ID': test_image_ids, 'Predictions': test_preds + 1})  # Adjust class numbering if needed
sample_submission = sample_submission.sort_values(by='ID')
sample_submission.to_csv('F_1.csv', index=False)
print("Submission file saved as 'Trial_7")

Submission file saved as 'Trial_7
