In [None]:
import os
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from PIL import Image
import pandas as pd
from tqdm import tqdm
from sklearn.metrics import accuracy_score
import numpy as np
import random
from sklearn.model_selection import train_test_split  # Added import for train_test_split

# Define transform to preprocess the images
data_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
    transforms.RandomRotation(10), 
    transforms.ToTensor(),
])

# Set a seed for PyTorch
torch.manual_seed(42)
if torch.cuda.is_available():
    torch.cuda.manual_seed(42)

# Set a seed for NumPy
np.random.seed(42)

# Set a seed for the random module
random.seed(42)


class LabelSmoothingCrossEntropy(torch.nn.Module):
    def __init__(self, smoothing=0.1, reduction='mean'):
        super(LabelSmoothingCrossEntropy, self).__init__()
        self.smoothing = smoothing
        self.reduction = reduction

    def forward(self, pred, target):
        confidence = 1.0 - self.smoothing
        logprobs = torch.nn.functional.log_softmax(pred, dim=-1)
        nll_loss = -logprobs.gather(dim=-1, index=target.unsqueeze(-1))
        nll_loss = nll_loss.squeeze(-1)
        smooth_loss = -logprobs.mean(dim=-1)
        loss = confidence * nll_loss + self.smoothing * smooth_loss

        if self.reduction == 'sum':
            loss = loss.sum()
        elif self.reduction == 'mean':
            loss = loss.mean()

        return loss


# Custom dataset class
class BirdDataset(Dataset):
    def __init__(self, folder_path, transform=None, is_test=False):
        self.folder_path = folder_path
        self.transform = transform
        self.image_paths = []
        self.labels = []
        self.class_to_idx = {}
        self.idx_to_class = {}  # Added for mapping index to class name
        self.is_test = is_test

        self._load_data()

    def _load_data(self):
        if self.is_test:
            # For the test set, only load image paths, no labels
            self.image_paths = [os.path.join(self.folder_path, img) for img in os.listdir(self.folder_path)]
        else:
            # For the training set, load images and labels
            classes = os.listdir(self.folder_path)
            for i, class_name in enumerate(classes):
                class_path = os.path.join(self.folder_path, class_name)
                if os.path.isdir(class_path):
                    self.class_to_idx[class_name] = i
                    self.idx_to_class[i] = class_name  # Added for mapping index to class name
                    for image_name in os.listdir(class_path):
                        image_path = os.path.join(class_path, image_name)
                        self.image_paths.append(image_path)
                        self.labels.append(i)

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        image = Image.open(image_path).convert('RGB')

        if self.transform:
            image = self.transform(image)

        if self.is_test:
            # Return image and image name during the test
            image_name = os.path.basename(image_path)
            return image, image_name
        else:
            label = self.labels[idx]
            return image, label, idx  # Include index for tracking

# Load the datasets
all_dataset = BirdDataset('/kaggle/input/mlfinalproject/data/train', transform=data_transform, is_test=False)

# Split the dataset into training and validation sets
train_size = int(0.8 * len(all_dataset))  # 80% for training
val_size = len(all_dataset) - train_size  # 20% for validation

train_dataset, val_dataset = torch.utils.data.random_split(all_dataset, [train_size, val_size])

# Load the test dataset
test_dataset = BirdDataset('/kaggle/input/mlfinalproject/data/test', transform=data_transform, is_test=True)

# Create data loader
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# Create data loaders for training and validation
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)

# Load the ResNet-18 model with local weights
# model_weights_path = '/kaggle/input/resnet18/resnet18-f37072fd.pth'  # Update with the correct path
# model = models.resnet18(pretrained=False)
pretrained_model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
# model.load_state_dict(torch.load(model_weights_path))
# model.fc = torch.nn.Linear(512, 200)  # Change the output layer to match the number of bird classes
pretrained_model.fc = torch.nn.Linear(2048, 200)


# Optionally, finetune specific layers (uncomment if needed)
# for param in model.parameters():
#     param.requires_grad = False
# model.fc.weight.requires_grad = True
# model.fc.bias.requires_grad = True

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
pretrained_model = pretrained_model.to(device)

# Define loss function and optimizer
criterion = LabelSmoothingCrossEntropy(smoothing = 0.1)
optimizer = torch.optim.Adam(pretrained_model.parameters(), lr=0.0005)

# Training parameters
num_epochs = 1  # Set the desired number of total epochs

# Load the checkpoint if available
checkpoint_path = '/kaggle/working/model_epoch_34.pth'  # Update with the correct path

if os.path.isfile(checkpoint_path):
    print(f"Loading checkpoint from {checkpoint_path}")
     
    checkpoint = torch.load(checkpoint_path, map_location=torch.device('cpu'))
    
    # Print the keys in the checkpoint
    print("Keys in the checkpoint:", checkpoint.keys())

    # Check for the correct key name and load the model state
    if 'model_state_dict' in checkpoint:
        pretrained_model.load_state_dict(checkpoint['model_state_dict'])
    else:
        pretrained_model.load_state_dict(checkpoint)

    # Load the optimizer state only if the key exists
    if 'optimizer_state_dict' in checkpoint:
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    else:
        print("Optimizer state not found in the checkpoint. Creating a new optimizer.")
        optimizer = torch.optim.Adam(pretrained_model.parameters(), lr=0.0005)

    # Load the epoch only if the key exists
    if 'epoch' in checkpoint:
        start_epoch = checkpoint['epoch'] + 1
    else:
        print("Epoch information not found in the checkpoint. Starting from epoch 0.")
        start_epoch = 0
else:
    print("Checkpoint not found. Starting training from scratch.")
    start_epoch = 0

for epoch in range(start_epoch, start_epoch + num_epochs):
    pretrained_model.train()
    running_loss = 0.0
    predictions_train = []
    labels_train = []

    # Training loop
    with tqdm(total=len(train_loader), desc=f'Epoch {epoch+1}/{start_epoch + num_epochs}', unit='batch') as pbar:
        for inputs, labels, _ in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = pretrained_model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            predictions_train.extend(predicted.cpu().numpy())
            labels_train.extend(labels.cpu().numpy())

            pbar.set_postfix({'Loss': running_loss / (pbar.n + 1)})
            pbar.update(1)

        # Calculate the average training loss and accuracy for the epoch
        average_loss = running_loss / len(train_loader)
        train_accuracy = accuracy_score(labels_train, predictions_train)
        print(f'Epoch {epoch+1}/{start_epoch + num_epochs}, Average Loss: {average_loss:.4f}, Training Accuracy: {train_accuracy:.4f}')
        
        # Validation loop
        pretrained_model.eval()
        running_val_loss = 0.0
        predictions_val = []
        labels_val = []

        with torch.no_grad():
            for inputs, labels, _ in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = pretrained_model(inputs)
                loss = criterion(outputs, labels)

                running_val_loss += loss.item()
                _, predicted = torch.max(outputs, 1)
                predictions_val.extend(predicted.cpu().numpy())
                labels_val.extend(labels.cpu().numpy())

        # Calculate the average validation loss and accuracy for the epoch
        average_val_loss = running_val_loss / len(val_loader)
        val_accuracy = accuracy_score(labels_val, predictions_val)
        print(f'Epoch {epoch+1}/{start_epoch + num_epochs}, Average Validation Loss: {average_val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}')

        # Save the trained model state after each epoch
        torch.save({
            'epoch': epoch,
            'model_state_dict': pretrained_model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
        }, f'model_epoch_{epoch + 1}.pth')

        # Test the model on the test set and save predictions to CSV
        pretrained_model.eval()
        predictions_test = []
        image_names_test = []

        with torch.no_grad():
            for inputs, image_names in test_loader:
                inputs = inputs.to(device)
                outputs = pretrained_model(inputs)
                _, predicted = torch.max(outputs, 1)
                predictions_test.extend(predicted.cpu().numpy())
                image_names_test.extend(image_names)

        # Save test predictions to CSV for each epoch
        test_predictions_df = pd.DataFrame({'id': image_names_test, 'label': predictions_test})
        # 1. Delete the '.jpg' extension in the 'id' column
        test_predictions_df['id'] = test_predictions_df['id'].apply(lambda x: os.path.splitext(x)[0])
        # 2. Use the class folder's name instead of a number
        test_predictions_df['label'] = test_predictions_df['label'].apply(lambda x: f'{all_dataset.idx_to_class[x]}')
        test_predictions_df.to_csv(f'epoch_{epoch + 1}.csv', index=False)


# Save the trained model
torch.save(pretrained_model.state_dict(), 'trained_model.pth')