#### Installing and loading libraryes

In [None]:
!pip install torch torchvision scikit-learn matplotlib ranger-adabelief

In [None]:
# Standard libraries
import os
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt

# PyTorch core
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

# torchvision
from torchvision import datasets, transforms, models
from torchvision.models import resnet152, ResNet152_Weights

# Optimization and scheduling
from torch.optim.lr_scheduler import ReduceLROnPlateau, CosineAnnealingLR
from ranger_adabelief import RangerAdaBelief  # Make sure to install

# Scikit-learn
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score, ConfusionMatrixDisplay


#### Device Configuration and Data Augmentation


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

#### Transformations (Data Augmentation and Preprocessing)

In [None]:
# Data augmentation for training images
train_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(15),
    # Optional additional augmentations:
    # transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),
    # transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],  # ImageNet statistics
                         std=[0.229, 0.224, 0.225])
])

# Validation/test preprocessing only (no augmentation)
val_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])


#### Load Dataset

In [None]:
dataset_path = "/path/to/your/images"  # Update this path
dataset = datasets.ImageFolder(root=dataset_path, transform=train_transform)

#### Training Configuration

In [None]:
Image.MAX_IMAGE_PIXELS = None

In [None]:
 Cross-Validation configuration (80/20)
k_folds = 5
skf = StratifiedKFold(n_splits=k_folds, shuffle=True, random_state=42)

# Define the device (GPU or CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Dataset converted to tensors for StratifiedKFold
all_images = []
all_labels = []

# Define transformations to resize images
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize images to 224x224
    transforms.ToTensor(),          # Convert image to a tensor
])

for img, label in dataset:
    try:
        # Check if img is a file path or already a loaded image object
        if isinstance(img, str):  # If it's a file path
            with Image.open(img) as im:
                # Check image size to avoid DecompressionBombError
                if im.size[0] * im.size[1] > 178956970:  # Allowed pixel limit
                    continue  # Ignore images that are too large
                
                # Apply transformation to resize and convert to tensor
                im = transform(im)

                # If the image is valid, add it to the lists
                all_images.append(im)
                all_labels.append(label)
        else:  # If img is already a loaded image
            im = transform(img)  # Apply transformation
            all_images.append(im)
            all_labels.append(label)
    
    except Exception as e:
        # Capture decompression and other exceptions here
        continue  # Ignore images with other errors

# Convert images to tensors
# No need to use torch.stack, as 'im' is already a tensor due to the transformation

all_labels = torch.tensor(all_labels)

# Original model
weights = ResNet152_Weights.IMAGENET1K_V2  # Optimized weights for ImageNet
model = resnet152(weights=weights)

# Freeze the first layers
for name, param in model.named_parameters():
    if "layer1" in name or "layer2" in name:
        param.requires_grad = False  # Freeze initial layers (2)

# Get features from the final layer
num_features = model.fc.in_features

# Modify the final layer: add a dense layer and adjust output
model.fc = nn.Sequential(
    nn.Dropout(0.5),                      # Regularization with Dropout
    nn.Linear(num_features, 128),         # Additional dense layer with 128 units
    nn.ReLU(),                            # ReLU activation
    nn.Linear(128, len(dataset.classes))  # Output adapted to the number of classes
)

# Move the model to the device (GPU or CPU)
model = model.to(device)

# Set up loss criterion
criterion = nn.CrossEntropyLoss()

# Evaluate optimizer
use_ranger = True  # Change to False if you want to use AdamW instead of Ranger
if use_ranger:
    try:
        from ranger_adabelief import RangerAdaBelief
        optimizer = RangerAdaBelief(model.parameters(), lr=0.001, weight_decay=1e-4)
    except ImportError:
        raise ImportError("Ranger optimizer is not installed. Install it with `pip install ranger-adabelief`.")
else:
    optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=1e-4)

# Improve the scheduler
scheduler = CosineAnnealingLR(optimizer, T_max=10, eta_min=1e-6)  # Cosine Annealing Scheduler


# Mixup function
def mixup_data(x, y, alpha=1.0):
    if alpha > 0:
        lam = np.random.beta(alpha, alpha)
    else:
        lam = 1.0
    batch_size = x.size()[0]
    index = torch.randperm(batch_size).to(x.device)
    mixed_x = lam * x + (1 - lam) * x[index, :]
    y_a, y_b = y, y[index]
    return mixed_x, y_a, y_b, lam

def mixup_criterion(criterion, pred, y_a, y_b, lam):
    return lam * criterion(pred, y_a) + (1 - lam) * criterion(pred, y_b)

# Freeze initial layers and adjust the rest
def freeze_layers(model):
    for name, param in model.named_parameters():
        if "layer1" in name or "layer2" in name:
            param.requires_grad = False
        else:
            param.requires_grad = True

freeze_layers(model)

#### Full Cross-Validated Training Pipeline with Mixup and ResNet-152

In [None]:
# Custom dataset class
class CustomImageDataset(Dataset):
    def __init__(self, image_paths, labels, transform=None):
        """
        :param image_paths: List of image file paths or PIL images
        :param labels: List of image labels
        :param transform: Transformations to apply to the images
        """
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img = self.image_paths[idx]
        label = self.labels[idx]

        if isinstance(img, str):  # If it's a file path
            img = Image.open(img)
        elif not isinstance(img, Image.Image):  # If it's neither a path nor a PIL image
            raise TypeError(f"Expected PIL Image or file path, got {type(img)}")

        if self.transform:
            img = self.transform(img)

        return img, label

# Define transformations for images
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # ImageNet mean and standard deviation
])

# Cross-Validation configuration
k_folds = 5
skf = StratifiedKFold(n_splits=k_folds, shuffle=True, random_state=42)

# Prepare lists of images and labels
all_images = []
all_labels = []

for img, label in dataset:  # This is where you should add your actual dataset
    all_images.append(img)
    all_labels.append(label)

# Create the custom dataset
custom_dataset = CustomImageDataset(all_images, all_labels, transform=transform)

# Create the DataLoader
dataloader = DataLoader(custom_dataset, batch_size=32, shuffle=True)

# Cross-Validation results
results = []

# Iterate over the folds
for fold, (train_idx, val_idx) in enumerate(skf.split(all_images, all_labels)):
    print(f"Fold {fold+1}/{k_folds}")

    # Split data into training and validation
    train_images, train_labels = [all_images[i] for i in train_idx], [all_labels[i] for i in train_idx]
    val_images, val_labels = [all_images[i] for i in val_idx], [all_labels[i] for i in val_idx]

    # Create DataLoaders for the current fold
    train_dataset = CustomImageDataset(train_images, train_labels, transform=transform)
    val_dataset = CustomImageDataset(val_images, val_labels, transform=transform)

    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

    # Load ResNet152 with pre-trained ImageNet weights
    weights = ResNet152_Weights.IMAGENET1K_V2
    model = resnet152(weights=weights)

    # Freeze the first layers
    for name, param in model.named_parameters():
        if "layer1" in name or "layer2" in name:
            param.requires_grad = False

    # Adjust model architecture
    num_features = model.fc.in_features
    model.fc = nn.Sequential(
        nn.Dropout(0.5),
        nn.Linear(num_features, 128),
        nn.ReLU(),
        nn.Linear(128, len(dataset.classes))
    )

    model = model.to(device)

    # Loss and optimizer configuration
    criterion = nn.CrossEntropyLoss()
    optimizer = RangerAdaBelief(model.parameters(), lr=0.001, weight_decay=1e-4)

    # Cosine Annealing Scheduler
    scheduler = CosineAnnealingLR(optimizer, T_max=10, eta_min=1e-6)

    # Mixup function
    def mixup_data(x, y, alpha=1.0):
        if alpha > 0:
            lam = np.random.beta(alpha, alpha)
        else:
            lam = 1.0
        batch_size = x.size()[0]
        index = torch.randperm(batch_size).to(x.device)
        mixed_x = lam * x + (1 - lam) * x[index, :]
        y_a, y_b = y, y[index]
        return mixed_x, y_a, y_b, lam

    def mixup_criterion(criterion, pred, y_a, y_b, lam):
        return lam * criterion(pred, y_a) + (1 - lam) * criterion(pred, y_b)

    # Dataset Visualization
    dataloader = DataLoader(custom_dataset, batch_size=1, shuffle=True)
    examples_per_class = {}
    class_labels = dataset.classes

    for images, labels in dataloader:
        for img, label in zip(images, labels):
            class_name = class_labels[label.item()]
            if class_name not in examples_per_class:
                examples_per_class[class_name] = img
            if len(examples_per_class) == len(class_labels):
                break
        if len(examples_per_class) == len(class_labels):
            break

    fig, axes = plt.subplots(1, len(class_labels), figsize=(15, 5))
    for ax, (class_name, img) in zip(axes, examples_per_class.items()):
        ax.imshow(img.permute(1, 2, 0))
        ax.set_title(class_name)
        ax.axis('off')
    plt.tight_layout()
    plt.show()

    # Training and Validation
    epochs = 100
    early_stop_patience = 10
    best_val_loss = float('inf')
    early_stop_counter = 0

    for epoch in range(epochs):
        # Training
        model.train()
        train_loss = 0.0
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)

            # Apply Mixup
            mixed_images, labels_a, labels_b, lam = mixup_data(images, labels, alpha=0.4)

            optimizer.zero_grad()
            outputs = model(mixed_images)
            loss = mixup_criterion(criterion, outputs, labels_a, labels_b, lam)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()

        train_loss /= len(train_loader)

        # Validation
        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0
        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                loss = criterion(outputs, labels)
                val_loss += loss.item()

                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        val_loss /= len(val_loader)
        val_accuracy = 100 * correct / total

        # Update the scheduler
        scheduler.step(val_loss)

        # Early Stopping
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            early_stop_counter = 0
            torch.save(model, f"/path/to/your/images{fold+1}.pth")
        else:
            early_stop_counter += 1
            if early_stop_counter >= early_stop_patience:
                print(f"Early stopping triggered for fold {fold+1}.")
                break

        print(f"Fold {fold+1}, Epoch {epoch+1}/{epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.2f}%")

    # Save fold results
    results.append({
        'fold': fold+1,
        'val_loss': best_val_loss,
        'val_accuracy': val_accuracy
    })

# Print global results
print("\nCross-validation results:")
for result in results:
    print(f"Fold {result['fold']}: Val Loss = {result['val_loss']:.4f}, Val Accuracy = {result['val_accuracy']:.2f}%")

mean_loss = np.mean([r['val_loss'] for r in results])
mean_accuracy = np.mean([r['val_accuracy'] for r in results])
print(f"Mean Val Loss: {mean_loss:.4f}, Mean Val Accuracy: {mean_accuracy:.2f}%")

#### Best Model Selection

In [None]:
# Find the fold with the best val_loss
best_fold = min(results, key=lambda x: x['val_loss'])
best_val_loss = best_fold['val_loss']
best_fold_index = best_fold['fold']

print(f"The best model is found in fold {best_fold_index} with a val_loss of {best_val_loss:.4f}")


# Find the fold with the best val_accuracy
best_fold_accuracy = max(results, key=lambda x: x['val_accuracy'])
best_val_accuracy = best_fold_accuracy['val_accuracy']
best_fold_accuracy_index = best_fold_accuracy['fold']

print(f"The best model is found in fold {best_fold_accuracy_index} with a val_accuracy of {best_val_accuracy:.2f}%")