In [None]:
!pip install torch torchvision timm scikit-learn pandas numpy albumentations>=1.1.0

# Import necessary libraries
import os
import random
import numpy as np
import pandas as pd
from PIL import Image
from sklearn.metrics import f1_score
from sklearn.model_selection import StratifiedKFold
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import timm
import albumentations as A
from albumentations.pytorch import ToTensorV2
from torch.cuda.amp import GradScaler, autocast

In [None]:
# Suppress warnings for cleaner output
import warnings
warnings.filterwarnings('ignore')

# Set a seed for reproducibility across different runs
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
# Determine the device to use (GPU if available, otherwise CPU)
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Define the number of cross-validation folds
FOLDS = 5
# Define the number of training epochs for each fold
EPOCHS = 15
# Define the batch size for training and validation data loaders
BATCH_SIZE = 24
# Define the image size for resizing
IMG_SIZE = 416

In [None]:
# Define data augmentation transformations for training
train_aug = A.Compose([
    # Randomly crop and resize the image
    A.RandomResizedCrop(size=(IMG_SIZE, IMG_SIZE), scale=(0.8, 1.0)),
    # Flip the image horizontally with a probability of 0.5
    A.HorizontalFlip(p=0.5),
    # Apply random shift, scale, and rotate transformations
    A.ShiftScaleRotate(shift_limit=0.1, scale_limit=0.1, rotate_limit=15, p=0.5),
    # Apply random color jitter
    A.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1, p=0.5),
    # Apply CoarseDropout for regularization
    A.CoarseDropout(max_holes=8, max_height=IMG_SIZE//20, max_width=IMG_SIZE//20, p=0.5),
    # Normalize the image with specified mean and standard deviation
    A.Normalize(mean=(0.485,0.456,0.406), std=(0.229,0.224,0.225)),
    # Convert the image to a PyTorch tensor
    ToTensorV2(),
])

# Define Test Time Augmentation (TTA) transformations
def tta_transforms(image):
    transforms = [
        # Base transformation: resize and normalize
        A.Compose([
            A.Resize(height=IMG_SIZE, width=IMG_SIZE),
            A.Normalize(mean=(0.485,0.456,0.406), std=(0.229,0.224,0.225)),
            ToTensorV2()
        ]),
        # Horizontal flip transformation: flip, resize and normalize
        A.Compose([
            A.HorizontalFlip(p=1.0),
            A.Resize(height=IMG_SIZE, width=IMG_SIZE),
            A.Normalize(mean=(0.485,0.456,0.406), std=(0.229,0.224,0.225)),
            ToTensorV2()
        ]),
    ]
    # Apply transformations and return a list of augmented images
    return [tr(image=image)['image'] for tr in transforms]

In [None]:
# Define a custom dataset class for loading sheep images and labels
class SheepDataset(Dataset):
    def __init__(self, df, img_dir, transforms=None):
        # Initialize the dataset with dataframe, image directory, and transformations
        self.df = df.reset_index(drop=True)
        self.img_dir = img_dir
        self.transforms = transforms
        # Create a mapping from label strings to integer indices
        self.label_map = {lbl: idx for idx, lbl in enumerate(sorted(df['label'].unique()))}
        # Create an inverse mapping from integer indices back to label strings
        self.inv_map = {v: k for k, v in self.label_map.items()}

    def __len__(self):
        # Return the number of samples in the dataset
        return len(self.df)

    def __getitem__(self, idx):
        # Get a sample from the dataset at the given index
        row = self.df.iloc[idx]
        # Load and convert the image to RGB format
        img = np.array(Image.open(os.path.join(self.img_dir, row['filename'])).convert('RGB'))
        # Apply transformations if any are defined
        if self.transforms:
            img = self.transforms(image=img)['image']
        # Get the integer label using the label map
        label = self.label_map[row['label']]
        return img, label

In [None]:
# Define the Mixup data augmentation function
def mixup_data(x, y, alpha=0.4):
    # Generate a random lambda value from the Beta distribution
    if alpha > 0:
        lam = np.random.beta(alpha, alpha)
    else:
        lam = 1
    # Get batch size and a random permutation of indices
    batch_size = x.size(0)
    index = torch.randperm(batch_size).to(DEVICE)
    # Create mixed images and labels
    mixed_x = lam * x + (1 - lam) * x[index]
    y_a, y_b = y, y[index]
    return mixed_x, y_a, y_b, lam

In [None]:
# Function to get the deep learning model
def get_model(num_classes):
    # Create an EfficientNet B5 model with specified number of output classes
    model = timm.create_model(
       "hf_hub:timm/efficientnet_b5.sw_in12k",
        pretrained=True, # Load pre-trained weights
        num_classes=num_classes, # Set the number of output classes
        global_pool='avg' # Use average pooling
    )
    return model.to(DEVICE) # Move the model to the specified device

# Define the loss function (Cross-Entropy Loss)
criterion = nn.CrossEntropyLoss()

In [None]:
# Function to train the model for one epoch
def train_one_epoch(model, loader, optimizer, scaler):
    # Set the model to training mode
    model.train()
    total_loss = 0
    # Iterate over the data loader
    for images, labels in loader:
        images, labels = images.to(DEVICE), labels.to(DEVICE)
        # Apply Mixup to the data
        images, y_a, y_b, lam = mixup_data(images, labels)
        # Zero the gradients
        optimizer.zero_grad()
        # Use automatic mixed precision for training
        with autocast():
            # Get model outputs
            outputs = model(images)
            # Calculate the Mixup loss
            loss = lam * criterion(outputs, y_a) + (1 - lam) * criterion(outputs, y_b)
        # Scale the loss and perform backpropagation
        scaler.scale(loss).backward()
        # Update model weights
        scaler.step(optimizer)
        # Update the scaler for the next iteration
        scaler.update()
        # Accumulate the total loss
        total_loss += loss.item() * images.size(0)
    # Return the average loss for the epoch
    return total_loss / len(loader.dataset)

In [None]:
# Function to validate the model
def validate(model, loader):
    # Set the model to evaluation mode
    model.eval()
    preds, trues = [], []
    # Disable gradient calculation during validation
    with torch.no_grad():
        # Iterate over the validation data loader
        for images, labels in loader:
            images = images.to(DEVICE)
            # Get model outputs
            outputs = model(images)
            # Get the predicted class with the highest probability
            _, predicted = torch.max(outputs, 1)
            # Extend the lists of predictions and true labels
            preds.extend(predicted.cpu().tolist())
            trues.extend(labels.tolist())
    # Calculate and return the weighted F1 score
    return f1_score(trues, preds, average='weighted')

In [None]:
# Perform cross-validation and calculate Out-of-Fold (OOF) predictions
# Load the training labels
train_df = pd.read_csv('/content/drive/MyDrive/data/train_labels.csv')
# Initialize an array to store OOF predictions
oof_preds = np.zeros(len(train_df), dtype=int)
# Set up StratifiedKFold for cross-validation
skf = StratifiedKFold(n_splits=FOLDS, shuffle=True, random_state=SEED)

# Iterate through each fold
for fold, (train_idx, val_idx) in enumerate(skf.split(train_df, train_df['label'])):
    print(f"\n=== Fold {fold+1}/{FOLDS} ===")
    # Split the training data into training and validation sets for the current fold
    df_train, df_val = train_df.iloc[train_idx], train_df.iloc[val_idx]

    # Create training dataset with augmentations
    train_set = SheepDataset(df_train, '/content/drive/MyDrive/data/train/', transforms=train_aug)
    # Create validation dataset with standard transformations
    val_set = SheepDataset(
        df_val,
        '/content/drive/MyDrive/data/train/',
        transforms=A.Compose([
            A.Resize(height=IMG_SIZE, width=IMG_SIZE),
            A.Normalize(mean=(0.485,0.456,0.406), std=(0.229,0.224,0.225)),
            ToTensorV2()
        ])
    )
    # Create data loaders for training and validation
    train_loader = DataLoader(train_set, batch_size=BATCH_SIZE, shuffle=True, num_workers=4)
    val_loader = DataLoader(val_set, batch_size=BATCH_SIZE, shuffle=False, num_workers=4)

    # Get the model, optimizer, learning rate scheduler, and GradScaler
    model = get_model(len(train_set.label_map))
    optimizer = optim.AdamW(model.parameters(), lr=3e-4, weight_decay=1e-2)
    scheduler = optim.lr_scheduler.OneCycleLR(
        optimizer, max_lr=3e-3, epochs=EPOCHS, steps_per_epoch=len(train_loader)
    )
    scaler = GradScaler()

    # Initialize best F1 score for saving the best model
    best_f1 = 0
    # Train the model for the specified number of epochs
    for epoch in range(1, EPOCHS+1):
        loss = train_one_epoch(model, train_loader, optimizer, scaler)
        scheduler.step()
        val_f1 = validate(model, val_loader)
        print(f"Epoch {epoch}: Loss={loss:.4f}, Val F1={val_f1:.4f}")
        # Save the model if the validation F1 score improves
        if val_f1 > best_f1:
            best_f1 = val_f1
            torch.save(model.state_dict(), f"model_fold{fold}.pth")
    print(f"Fold {fold+1} Best F1: {best_f1:.4f}")

    # Perform OOF predictions with Test Time Augmentation (TTA)
    model.load_state_dict(torch.load(f"model_fold{fold}.pth"))
    model.eval()
    with torch.no_grad():
        for idx, row in enumerate(df_val.itertuples()):
            # Load the image and apply TTA
            img = np.array(Image.open(os.path.join('/content/drive/MyDrive/data/train/', row.filename)).convert('RGB'))
            tts = tta_transforms(img)
            # Get predictions for each TTA image and average them
            outs = [model(t.unsqueeze(0).to(DEVICE)).softmax(1).cpu().numpy() for t in tts]
            # Store the final OOF prediction for the current image
            oof_preds[val_idx[idx]] = np.argmax(np.mean(outs, axis=0))

In [None]:
# Calculate and print the overall Out-of-Fold (OOF) F1 score
# Create the label map again for calculating OOF F1 score
label_map = {lbl: idx for idx, lbl in enumerate(sorted(train_df['label'].unique()))}
# Calculate the weighted F1 score using true labels and OOF predictions
print("OOF F1:", f1_score(train_df['label'].map(lambda x: label_map[x]), oof_preds, average='weighted'))

In [None]:
# Perform inference on the test set and ensemble predictions from different folds
# Create a dataframe of test filenames
test_files = pd.DataFrame(os.listdir('/content/drive/MyDrive/data/test/'), columns=['filename'])
# Initialize an array to store predictions for all test files
all_preds = np.zeros((len(test_files), len(train_set.label_map)))

# Iterate through each trained model from the cross-validation folds
for fold in range(FOLDS):
    # Get a new model and load the trained weights for the current fold
    model = get_model(len(train_set.label_map))
    model.load_state_dict(torch.load(f"model_fold{fold}.pth"))
    # Set the model to evaluation mode
    model.eval()
    # Disable gradient calculation during inference
    with torch.no_grad():
        # Iterate through each test file
        for idx, fname in enumerate(test_files['filename']):
            # Load the test image and apply TTA
            img = np.array(Image.open(os.path.join('/content/drive/MyDrive/data/test/', fname)).convert('RGB'))
            tts = tta_transforms(img)
            # Get predictions for each TTA image and average them
            outs = [model(t.unsqueeze(0).to(DEVICE)).softmax(1).cpu().numpy() for t in tts]
            # Accumulate the averaged predictions for ensembling
            all_preds[idx] += np.mean(outs, axis=0).flatten()

# Average the predictions across all folds
all_preds /= FOLDS
# Get the predicted labels by taking the argmax of the ensembled predictions
labels = [train_set.inv_map[np.argmax(p)] for p in all_preds]
# Create the submission dataframe
submission = pd.DataFrame({'filename': test_files['filename'], 'label': labels})
# Save the submission file
submission.to_csv('submission.csv', index=False)
print("Ensembled submission saved to submission_ensemble.csv")

In [None]:
# Visualize predicted images from the test set
import matplotlib.pyplot as plt

# Define a simple dataset class for displaying test images
class SimpleTestDataset(Dataset):
    def __init__(self, df, img_dir, transforms=None):
        self.df = df.reset_index(drop=True)
        self.img_dir = img_dir
        self.transforms = transforms

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        img_path = os.path.join(self.img_dir, row['filename'])
        img = np.array(Image.open(img_path).convert('RGB'))
        if self.transforms:
            img = self.transforms(image=img)['image']
        return img, row['filename']

# Create a dataframe with test filenames and predicted labels
test_results_df = pd.DataFrame({'filename': test_files['filename'], 'predicted_label': labels})

# Define transformations for displaying images
display_transforms = A.Compose([
    A.Resize(height=IMG_SIZE, width=IMG_SIZE),
])

# Create a dataset for displaying test images
test_dataset_display = SimpleTestDataset(test_results_df, '/content/drive/MyDrive/data/test/', transforms=display_transforms)

# --- Display Predicted Images ---

# Function to display a grid of test images with predicted labels
def display_predicted_images(dataset, results_df, num_rows=3, num_cols=10):
    """
    Displays a few test images along with their predicted labels in a grid.
    """
    num_images = num_rows * num_cols
    fig, axes = plt.subplots(num_rows, num_cols, figsize=(num_cols * 3, num_rows * 3))
    axes = axes.flatten()

    # Select random indices for displaying images
    display_indices = random.sample(range(len(dataset)), min(num_images, len(dataset)))

    # Iterate through the selected indices and display images
    for i, idx in enumerate(display_indices):
        if i >= num_images:
            break

        img_tensor, filename = dataset[idx]

        # Get the predicted label for the current image
        predicted_label = results_df[test_results_df['filename'] == filename]['predicted_label'].iloc[0]

        # Convert tensor to numpy array for display if needed
        if isinstance(img_tensor, torch.Tensor):
            img_display = img_tensor.permute(1, 2, 0).numpy()

            # Load the original image to apply display transformations
            original_img = np.array(Image.open(os.path.join(dataset.img_dir, filename)).convert('RGB'))
            img_display = display_transforms(image=original_img)['image']

        else:
            img_display = img_tensor

        # Display the image and set the title with the predicted label
        axes[i].imshow(img_display)
        axes[i].set_title(f"Pred: {predicted_label}")
        axes[i].axis('off') # Hide axes

    # Hide any unused subplots
    for j in range(i + 1, len(axes)):
        axes[j].axis('off')

    # Adjust layout and display the plot
    plt.tight_layout()
    plt.show()

# Call the function to display predicted images
display_predicted_images(test_dataset_display, test_results_df, num_rows=4, num_cols=10)