# Feather In Focus Kaggle Challenge

Model implementation

## Data Exploration

In [None]:
import os
import tqdm
import torch
import numpy as np
import pandas as pd
import seaborn as sns
import tensorflow as tf
import matplotlib.pyplot as plt

from PIL import Image
from pathlib import Path
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    f1_score, 
    precision_score, 
    recall_score, 
    confusion_matrix, 
    classification_report,
    accuracy_score
)
from transformers import (
    ViTForImageClassification, 
    ViTImageProcessor, 
    AdamW, 
    get_linear_schedule_with_warmup
)

In [None]:
base_dir = Path('aml-2024-feather-in-focus')
class_names = np.load(base_dir / "class_names.npy", allow_pickle=True).item()
train_df = pd.read_csv(base_dir / 'train_images.csv')
test_df = pd.read_csv(base_dir / 'test_images_path.csv')
attributes = np.load(base_dir / 'attributes.npy', allow_pickle=True)

with open(base_dir / 'attributes.txt', 'r') as f:
    attributes_names = f.read().splitlines()

label_to_name = {v: k.split('.')[1] for k, v in class_names.items()}
train_df['bird_name'] = train_df['label'].map(label_to_name)

# class_distribution = train_df['label'].value_counts()
plt.figure(figsize=(15, 6))
ax = sns.countplot(data=train_df, y='bird_name')
plt.title('Distribution of Bird Species in Training Set')
plt.xlabel('Count')
plt.ylabel('Species Name')
plt.show()

plt.figure(figsize=(15, 6))
sns.histplot(data=train_df, x='label')
plt.title('Distribution of Classes')
plt.xlabel('Species Label')
plt.ylabel('Count')

In [None]:
print(f'Number of classes: {len(train_df["bird_name"].unique())}')

In [None]:
sample_path = train_df['image_path'].iloc[0].lstrip('/') 
sample_image_path = base_dir / 'train_images' / sample_path
sample_image = tf.keras.preprocessing.image.load_img(sample_image_path)
sample_image



In [None]:
# check image size and format
image_sizes = []
for path in train_df['image_path'][:100]:
    clean_path = path.lstrip('/')
    full_path = base_dir / 'train_images' / clean_path
    img = tf.keras.preprocessing.image.load_img(full_path)
    image_sizes.append(img.size)


plt.hist(image_sizes)
plt.xlabel('Image Size')
plt.ylabel('Count')
plt.title('Distribution of Image Sizes')
plt.show()

## Model training and Dataset functions definition

Note that this uses data augmentation with the `augment_multiplier` parameter

In [3]:
class BirdDataset(Dataset):
    def __init__(self, image_paths, labels, processor, augment=False, augment_multiplier=3):
        """
        Custom dataset for bird images with gentle augmentation

        Args:
            image_paths (list): List of paths to bird images
            labels (list): Corresponding labels for images
            processor (ViTImageProcessor): Image processor for transformations
            augment (bool): Whether to apply data augmentation
        """
        self.image_paths = image_paths
        self.labels = labels
        self.processor = processor
        self.augment_multiplier = augment_multiplier
        self.augment = augment

        # More conservative augmentation
        self.augmentation = transforms.Compose([
            transforms.RandomRotation(10),  # Small rotation, max 10 degrees
            transforms.ColorJitter(
                brightness=0.2,  # Small brightness variation
                contrast=0.2,    # Small contrast variation
                saturation=0.15,  # Small saturation variation
                hue=0.05         # Very small hue shift
            )
        ])

    def __len__(self):
        if self.augment:
            return len(self.image_paths) * self.augment_multiplier
        return len(self.image_paths)

    def __getitem__(self, idx):
        original_idx = idx
        if self.augment:
            # Calculate the original image index and augmentation variant
            original_idx = idx % len(self.image_paths)
            augment_variant = idx // len(self.image_paths)

            image = Image.open(self.image_paths[original_idx]).convert('RGB')

            # Apply different augmentations for each variant
            if augment_variant > 0:
                image = self.augmentation(image)
        else:
            image = Image.open(self.image_paths[original_idx]).convert('RGB')

        # Process image using ViT image processor
        inputs = self.processor(images=image, return_tensors='pt')
        return {
            'pixel_values': inputs['pixel_values'].squeeze(),
            'labels': torch.tensor(self.labels[original_idx])
        }

def process_image_paths(df, base_path):
    """
    Process image paths by adding a base path and normalizing Windows paths

    Args:
        df (pandas.DataFrame): DataFrame containing image paths
        base_path (str): Base directory path to prepend to image paths
        path_column (str, optional): Name of the column containing image paths

    Returns:
        list: Full image paths with base path added and normalized
    """
    # Normalize paths to work across different operating systems
    def normalize_path(path):
        # Replace Windows-style backslashes with forward slashes
        normalized = path.replace('/', '\\')
        # Remove leading slash if present to avoid double slashing
        if normalized.startswith('\\'):
            normalized = normalized[1:]
            print(normalized)
        return normalized


    full_paths = []
    for i, path in enumerate(df):
        sample_path = train_df['image_path'].iloc[i].lstrip('/')
        sample_image_path = base_dir / 'train_images' / sample_path
        full_paths.append(sample_image_path)

    # Combine base path with normalized image paths
    # full_paths = [
    #     os.path.normpath(os.path.join(base_path, normalize_path(path)))
    #     for path in df
    # ]

    return full_paths

def prepare_data(image_paths, labels):
    """
    Prepare data when you already have image paths and labels

    Args:
        image_paths (list): List of full paths to images
        labels (list): Corresponding labels for images

    Returns:
        tuple: image_paths, labels, and a dictionary mapping labels to indices
    """
    # Create a mapping of unique labels to integer indices
    unique_labels = sorted(set(labels))
    class_to_idx = {label: idx for idx, label in enumerate(unique_labels)}

    image_paths = process_image_paths(image_paths, base_dir)
    # image_paths = [path.replace('\\', '/') for path in image_paths]

    # Convert string labels to integer indices
    integer_labels = [class_to_idx[label] for label in labels]

    return image_paths, integer_labels, class_to_idx

def fine_tune_vit(num_epochs=5, batch_size=16, learning_rate=2e-5):
    """
    Fine-tune Vision Transformer for bird classification

    Args:
        data_dir (str): Directory containing bird images
        num_epochs (int): Number of training epochs
        batch_size (int): Batch size for training
        learning_rate (float): Learning rate for optimization
    """
    # Load pre-trained model and processor
    processor = ViTImageProcessor.from_pretrained('google/vit-base-patch16-224-in21k')
    model = ViTForImageClassification.from_pretrained(
        'google/vit-base-patch16-224-in21k',
        num_labels=len(train_df['bird_name'].unique()),
        ignore_mismatched_sizes=True
    )

    # Prepare data
    image_paths, labels, class_to_idx = prepare_data(train_df['image_path'], train_df['bird_name'])

    print(image_paths)
    # Split into train and validation sets
    train_paths, val_paths, train_labels, val_labels = train_test_split(
        image_paths, labels, test_size=0.2, random_state=42, stratify=train_df['bird_name']
    )

    # Create datasets
    train_dataset = BirdDataset(train_paths, train_labels, processor, augment=True, augment_multiplier=2)
    val_dataset = BirdDataset(val_paths, val_labels, processor, augment=False)

    # Create data loaders
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size)

    # Prepare optimizer and learning rate scheduler
    optimizer = AdamW(model.parameters(), lr=learning_rate)
    total_steps = len(train_loader) * num_epochs
    scheduler = get_linear_schedule_with_warmup(
        optimizer,
        num_warmup_steps=0,
        num_training_steps=total_steps
    )

    # Training loop
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f'Using {device}')
    model.to(device)

    metrics_epoch = []
    for epoch in range(num_epochs):
        model.train()
        total_train_loss = 0

        for batch in tqdm.tqdm(train_loader):
            optimizer.zero_grad()

            inputs = {k: v.to(device) for k, v in batch.items()
                      if k != 'labels'}
            labels = batch['labels'].to(device)

            outputs = model(**inputs, labels=labels)
            loss = outputs.loss
            total_train_loss += loss.item()

            loss.backward()
            # torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()
            scheduler.step()

        # Validation
        model.eval()
        total_val_loss = 0
        correct_predictions = 0
        total_predictions = 0

        with torch.no_grad():
            for batch in val_loader:
                inputs = {k: v.to(device) for k, v in batch.items()
                          if k != 'labels'}
                labels = batch['labels'].to(device)

                outputs = model(**inputs, labels=labels)
                loss = outputs.loss
                total_val_loss += loss.item()

                logits = outputs.logits
                predictions = torch.argmax(logits, dim=1)
                correct_predictions += (predictions == labels).sum().item()
                total_predictions += labels.size(0)

        avg_train_loss = total_train_loss / len(train_loader)
        avg_val_loss = total_val_loss / len(val_loader)
        val_accuracy = correct_predictions / total_predictions

        print(f"Epoch {epoch+1}/{num_epochs}")
        print(f"Train Loss: {avg_train_loss:.4f}")
        print(f"Validation Loss: {avg_val_loss:.4f}")
        print(f"Validation Accuracy: {val_accuracy:.4f}")

        metrics = {}
        metrics['epoch'] = epoch
        metrics['train_loss'] = avg_train_loss
        metrics['val_loss'] = avg_val_loss
        metrics['val_accuracy'] = val_accuracy

        metrics_epoch.append(metrics)

        # Save the fine-tuned model
        model.save_pretrained(f'./bird_classification_model_{epoch}')
        processor.save_pretrained(f'./bird_classification_model_{epoch}')

    metrics_df = pd.DataFrame(metrics_epoch)
    metrics_df.to_csv('metrics.csv', index=False)

    return model, processor, class_to_idx

## Model fine tune

In [None]:
model, proc, class_to_idx = fine_tune_vit(num_epochs=15, batch_size=32, learning_rate=1e-4)

Train set metrics

In [None]:
def evaluate_model(model, val_loader, device=None):
    """
    Comprehensive model evaluation function
    
    Args:
        model (torch.nn.Module): Trained model
        val_loader (torch.utils.data.DataLoader): Validation data loader
        device (torch.device, optional): Device to run evaluation on
    
    Returns:
        dict: Comprehensive evaluation metrics
    """
    # Use GPU if available and not specified
    if device is None:
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    # Prepare model for evaluation
    model.eval()
    model.to(device)
    
    # Lists to store predictions and true labels
    all_preds = []
    all_labels = []
    
    # Disable gradient calculation
    with torch.no_grad():
        for batch in val_loader:
            # Move inputs and labels to device
            inputs = {k: v.to(device) for k, v in batch.items() if k != 'labels'}
            labels = batch['labels'].to(device)
            
            # Forward pass
            outputs = model(**inputs)
            
            # Get predictions
            logits = outputs.logits
            preds = torch.argmax(logits, dim=1)
            
            # Collect predictions and labels
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    # Calculate metrics
    metrics = {
        'accuracy': accuracy_score(all_labels, all_preds),
        'f1_macro': f1_score(all_labels, all_preds, average='macro'),
        'f1_micro': f1_score(all_labels, all_preds, average='micro'),
        'f1_weighted': f1_score(all_labels, all_preds, average='weighted'),
        'precision_macro': precision_score(all_labels, all_preds, average='macro'),
        'precision_micro': precision_score(all_labels, all_preds, average='micro'),
        'precision_weighted': precision_score(all_labels, all_preds, average='weighted'),
        'recall_macro': recall_score(all_labels, all_preds, average='macro'),
        'recall_micro': recall_score(all_labels, all_preds, average='micro'),
        'recall_weighted': recall_score(all_labels, all_preds, average='weighted')
    }
    
    return metrics, all_preds, all_labels

def plot_confusion_matrix(all_labels, all_preds, class_names, normalize=True):
    """
    Create and plot confusion matrix
    
    Args:
        all_labels (list): True labels
        all_preds (list): Predicted labels
        class_names (list): List of class names
        normalize (bool): Whether to normalize confusion matrix
    
    Returns:
        matplotlib.figure.Figure: Confusion matrix plot
    """
    # Compute confusion matrix
    cm = confusion_matrix(all_labels, all_preds)
    
    # Normalize if requested
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
    
    # Create plot
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, cmap='Blues', 
                xticklabels=class_names, 
                yticklabels=class_names)
    plt.title('Confusion Matrix')
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.tight_layout()
    
    return plt.gcf()

def generate_classification_report(all_labels, all_preds, class_names):
    """
    Generate detailed classification report
    
    Args:
        all_labels (list): True labels
        all_preds (list): Predicted labels
        class_names (list): List of class names
    
    Returns:
        str: Detailed classification report
    """
    return classification_report(
        all_labels, 
        all_preds, 
        target_names=class_names,
        output_dict=True
    )

# Example usage
def main(model, processor, class_to_idx, val_dataset):
    # Assume you have your model, validation loader, and class names
    # model, processor, class_to_idx = load_model()
    val_loader = DataLoader(val_dataset, batch_size=32)
    
    # Get class names (convert index to name)
    class_names = [k for k, v in sorted(class_to_idx.items(), key=lambda item: item[1])]
    
    # Evaluate model
    metrics, all_preds, all_labels = evaluate_model(model, val_loader)
    
    # Print metrics
    print("Model Evaluation Metrics:")
    for metric, value in metrics.items():
        print(f"{metric}: {value:.4f}")
    
    # Plot confusion matrix
    cm_fig = plot_confusion_matrix(all_labels, all_preds, class_names)
    cm_fig.savefig('confusion_matrix.png')
    
    # Generate classification report
    report_dict = generate_classification_report(all_labels, all_preds, class_names)
    
    return report_dict

proc = ViTImageProcessor.from_pretrained('bird_classification_model_2')
model = ViTForImageClassification.from_pretrained(
    'bird_classification_model_2',
    num_labels=len(train_df['bird_name'].unique()),
    ignore_mismatched_sizes=True
)

# test_df['bird_name'] = test_df['label'].map(label_to_name)
tot_paths, tot_labels, class_to_idx = prepare_data(train_df['image_path'], train_df['bird_name'])

train_paths, val_paths, train_labels, val_labels = train_test_split(
    tot_paths, tot_labels, test_size=0.2, random_state=42, stratify=train_df['bird_name']
)
val_dataset = BirdDataset(val_paths, val_labels, proc)
report_dict = main(model, proc, class_to_idx, val_dataset)

In [None]:
f1_scores = [report_dict[k]['f1-score'] for k in train_df['bird_name'].unique()]
min_f1 = min(f1_scores)
max_f1 = max(f1_scores)
print(f"Min F1 Score: {min_f1}")
print(f"Max F1 Score: {max_f1}")

In [None]:
bird_with_min_f1 = [f for f in train_df['bird_name'].unique() if report_dict[f]['f1-score'] == min_f1]
bird_with_max_f1 = [f for f in train_df['bird_name'].unique() if report_dict[f]['f1-score'] == max_f1]
print(bird_with_min_f1)
print(bird_with_max_f1)

In [None]:
font = {'size': 16}
plt.rc('font', **font)

fig = plt.figure(figsize=(6, 6))
fig.patch.set_alpha(0)

ax = fig.add_subplot(1, 1, 1)
ax.violinplot(f1_scores, positions=[0])
ax.set_xlim(0, 0.30)
ax.set_xlabel('Proportion of species')
ax.set_ylabel('F1 Score')
ax.set_title('F1 Score Distribution by Bird Species')

## Predictions

In [None]:
class TestBirdDataset(Dataset):
    def __init__(self, image_paths, processor):
        # Convert paths to strings if they're WindowsPath objects
        self.image_paths = [str(path) for path in image_paths]
        self.processor = processor
    
    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, idx):
        # Open image and process
        image_path = self.image_paths[idx]
        image = Image.open(image_path).convert('RGB')
        inputs = self.processor(images=image, return_tensors='pt')
        
        return {
            'pixel_values': inputs['pixel_values'].squeeze(),
            'image_path': image_path  # or filename if you prefer
        }

def predict_images(model, processor, image_paths, class_to_idx, batch_size=16):
    """
    Predict labels for a set of images
    
    Args:
        model (ViTForImageClassification): Trained model
        processor (ViTImageProcessor): Image processor
        image_paths (list): List of image file paths
        class_to_idx (dict): Mapping of class indices to labels
        batch_size (int, optional): Batch size for prediction
    
    Returns:
        dict: Mapping of image filenames to predicted labels
    """
    # Prepare device
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    model.eval()
    
    # Create dataset and dataloader
    test_dataset = TestBirdDataset(image_paths, processor)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    
    # Prepare for predictions
    predictions = {}
    idx_to_class = {v: k for k, v in class_to_idx.items()}
    
    # Predict
    with torch.no_grad():
        for batch in test_loader:
            # Prepare inputs
            inputs = {k: v.to(device) for k, v in batch.items() if k != 'image_path'}
            
            # Get predictions
            outputs = model(**inputs)
            logits = outputs.logits
            preds = torch.argmax(logits, dim=1)
            
            # Map predictions to class names
            batch_preds = preds.cpu().numpy()
            batch_paths = batch['image_path']
            
            # Store predictions with filenames
            for path, pred in zip(batch_paths, batch_preds):
                filename = os.path.basename(path)
                predictions[filename] = idx_to_class[pred]
    
    return predictions

def test_prediction():
    # Load model and processor
    unique_labels = sorted(train_df['bird_name'].unique())
    class_to_idx = {label: idx for idx, label in enumerate(unique_labels)}
    processor = ViTImageProcessor.from_pretrained('bird_classification_model_14')
    model = ViTForImageClassification.from_pretrained(
        'bird_classification_model_14',
        num_labels=len(train_df['bird_name'].unique()),
        ignore_mismatched_sizes=True
    )
    
    # Prepare image paths
    image_paths, *_ = prepare_data(test_df['image_path'], train=False)

    
    # Get predictions
    predictions = predict_images(model, processor, image_paths, class_to_idx)
    
    # Create DataFrame with predictions
    results_df = pd.DataFrame.from_dict(predictions, orient='index', columns=['predicted_label'])
    results_df.index.name = 'filename'
    results_df.reset_index(inplace=True)
    
    return results_df, class_to_idx

Get predictions for the test set:

In [None]:
df, class_to_idx = test_prediction()

In [None]:
true_class_to_idx = train_df['bird_name'].unique()

In [None]:
true_class_to_idx[66]

In [None]:
df.head()

In [None]:
df[df['filename'] == '5.jpg']['predicted_label']

In [None]:
for l, label in enumerate(true_class_to_idx):
    df['predicted_label'][df['predicted_label'] == label] = l + 1

In [None]:
df.head()

In [None]:
# write all_pred to test_images_sample.csv
pred_df = df.copy()
pred_df['predicted_label'] = pred_df['predicted_label']
pred_df = pred_df.rename(columns={'filename': 'id', 'predicted_label': 'label'})
pred_df['id'] = np.linspace(1, 4000, 4000, dtype=int)

In [None]:
pred_df.head()

In [None]:
len(pred_df.index)

In [None]:
pred_df.to_csv('test_images_sample5.csv', index=False)