### Name: Le Hien Hieu
### ID: HE181040

In [None]:
!pip install -q -U datasets evaluate albumentations 

In [None]:
import os
import numpy as np
import pandas as pd
import random

import cv2
from PIL import Image
import matplotlib.pyplot as plt

import albumentations as A

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

import evaluate
from datasets import load_dataset
from transformers import AutoFeatureExtractor, ViTForImageClassification, SwinForImageClassification
from transformers import TrainingArguments, Trainer, EarlyStoppingCallback

#### Data

In [None]:
!mkdir -p /content/original_data
!unzip -q /content/drive/MyDrive/dat-301-m-ai-1802-ads-butterfly-classification.zip -d /content/original_data

In [None]:
# Show number of images in train folder
print("Number of images in train folder: ", len(os.listdir("/content/original_data/train/train")))

# Show number of images in test folder
print("Number of images in test folder: ", len(os.listdir("/content/original_data/test/test")))

In [None]:
def split2folder(path: os.PathLike, saved_path: os.PathLike, df: pd.DataFrame):
  """
  Read labels from a dataframe and save images to corresponding folders.

  Args:
    path (os.PathLike): path to the folder containing images.
    saved_path (os.PathLike): path to save images.
    df (pd.DataFrame): dataframe containing image names and labels
  """

  if not os.path.exists(saved_path):
    os.makedirs(saved_path)

  for i in range(len(df)):
    img_name = df.iloc[i, 0]
    label = df.iloc[i, 1].upper()
    img = Image.open(path + img_name)

    if not os.path.exists(saved_path + label):
      os.makedirs(saved_path + label)

    img.save(saved_path + label + "/" + img_name)

  print("Done!")

In [None]:
image_df = pd.read_csv("/content/original_data/Training_set.csv")
image_df.head()

In [None]:
# EDA on image_df
image_df.info()

In [None]:
# Count of each label and plot the distribution
label_count = image_df["label"].value_counts()
label_count.plot(kind="bar", figsize=(10, 5))
plt.title("Distribution of labels")
plt.show()

In [None]:
split2folder(path="/content/original_data/train/train/", saved_path="/content/train_dataset/", df=image_df)

In [None]:
# Define
model_name_1 = "google/vit-base-patch16-224-in21k"
model_name_2 = "microsoft/swin-base-patch4-window7-224"

root_dir = "/content/train_dataset/"

In [None]:
# Create dataset based on Huggingface format dataset
ds = load_dataset("imagefolder", data_dir=root_dir)

# Split the dataset into train and validation
train_valid_dataset = ds['train'].train_test_split(test_size=0.2, shuffle=True)
validation_dataset = train_valid_dataset['test']
train_dataset = train_valid_dataset['train']

train_dataset, validation_dataset

In [None]:
# Display an image from the dataset and label
im = train_dataset[0]['image']
label = train_dataset[0]['label']
display(im)
print("Label: ", label)

In [None]:
# Augmentation
def train_augment(example: dict):
    """
    Apply augmentation for single image.
    Args:
        example (dict): dictionary containing image and label.
    """
    image = example['image'] # PIL format

    # Convert PIL image to numpy array
    image = np.asarray(image, dtype=np.uint8) 

    aug = A.Compose([
        A.RandomBrightnessContrast(brightness_limit=0.1, contrast_limit=0.1, p=0.2),
        A.HorizontalFlip(p=0.5),
        A.VerticalFlip(p=0.5),
        A.Rotate(limit=30, interpolation=cv2.INTER_CUBIC, p=0.5),
        A.CoarseDropout(num_holes_range=(1, 5),
                        hole_height_range=(32, 64),
                        hole_width_range=(32, 64),
                        fill=0,
                        p=0.5),
    ])

    transform = aug(image=image)

    return {
        "image": Image.fromarray(transform['image']),
        "label": example["label"]
    }

In [None]:
class AugmentedDataset(Dataset):
    """
    Custom dataset class for applying augmentation.
    Args:
        dataset (Dataset): dataset containing images and labels.
        augment_fn (callable): augmentation function.
        processor (AutoFeatureExtractor): processor to preprocess
    """
    def __init__(self, dataset, augment_fn, processor):
        self.dataset = dataset
        self.augment_fn = augment_fn
        self.processor = processor

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        example = self.dataset[idx]
        augmented_example = self.augment_fn(example)

        augmented_image = augmented_example['image']
        label = augmented_example['label']

        processed_inputs = self.processor(images=augmented_image, return_tensors='pt')

        inputs = {
            'pixel_values': processed_inputs.pixel_values.squeeze(),
            'labels': label
        }
        return inputs

In [None]:
# Create processor
processor_1 = AutoFeatureExtractor.from_pretrained(model_name_1)

In [None]:
def transform_1(example_batch):
    """
    Apply processor to a batch of images.
    Args:
        example_batch (dict): batch of images and labels.
        processor (AutoFeatureExtractor): processor to preprocess.
    """
    inputs = processor_1([x for x in example_batch['image']],return_tensors='pt')
    inputs['labels'] = example_batch['label']
    return inputs

In [None]:
# Create transformed validation dataset 
# No need to augment validation dataset but only turn it into tensor
tf_validation_dataset = validation_dataset.with_transform(transform_1)

In [None]:
# Print the first element of the transformed validation dataset to check
tf_validation_dataset[0]

In [None]:
# Create transformed train dataset: apply augmentation and turn it into tensor
tf_train_dataset = AugmentedDataset(train_dataset, train_augment, processor=processor_1)

In [None]:
# Print the first element of the transformed train dataset to check
tf_train_dataset[0]

In [None]:
def collate_fn(batch: list):
    """
    Collate function to be used in DataLoader."
    Args:
        batch (list): list of examples.
    """
    return {
        'pixel_values': torch.stack([x['pixel_values'] for x in batch]),
        'labels': torch.tensor([x['labels'] for x in batch])
    }

In [None]:
# Create metrics for evaluation
metric = evaluate.load("accuracy")

def compute_metrics(p):
    """
    Compute metrics for evaluation, assume labels are in the form of integers.
    Args:
        p (Trainer): trainer object containing predictions and labels.
    """
    return metric.compute(predictions=np.argmax(p.predictions, axis=1), references=p.label_ids)

In [None]:
# Check for labels
labels = tf_validation_dataset.features['label'].names
print(len(labels))

In [None]:
# Define model
model = ViTForImageClassification.from_pretrained(
    model_name_1,
    num_labels=len(labels),
    id2label={str(i): c for i, c in enumerate(labels)},
    label2id={c: str(i) for i, c in enumerate(labels)},
    ignore_mismatched_sizes=True
)

In [None]:
training_args = TrainingArguments(
  output_dir="./vit-butterflies-google-final",
  per_device_train_batch_size=32,
  eval_strategy="steps",
  num_train_epochs=50,
  fp16=True,
  save_steps=500,
  eval_steps=500,
  logging_steps=10,
  learning_rate=2e-4,
  save_total_limit=5,
  remove_unused_columns=False,
  push_to_hub=True,
  report_to='tensorboard',
  load_best_model_at_end=True,
)

In [None]:
trainer = Trainer(
    model=model,
    args=training_args,
    data_collator=collate_fn,
    compute_metrics=compute_metrics,
    train_dataset=tf_train_dataset,
    eval_dataset=tf_validation_dataset,
    processing_class=processor_1,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=5)],
)

In [None]:
train_results = trainer.train()

In [None]:
trainer.save_model()
trainer.log_metrics("train", train_results.metrics)
trainer.save_metrics("train", train_results.metrics)
trainer.save_state()

In [None]:
# Create processor 2
processor_2 = AutoFeatureExtractor.from_pretrained(model_name_2)

In [None]:
def transform_2(example_batch):
    """
    Apply processor to a batch of images.
    Args:
        example_batch (dict): batch of images and labels.
        processor (AutoFeatureExtractor): processor to preprocess.
    """
    inputs = processor_2([x for x in example_batch['image']],return_tensors='pt')
    inputs['labels'] = example_batch['label']
    return inputs

In [None]:
# Create transformed train dataset: apply augmentation and turn it into tensor
tf_validation_dataset = validation_dataset.with_transform(transform_2)

# Create transformed train dataset: apply augmentation and turn it into tensor
tf_train_dataset = AugmentedDataset(train_dataset, train_augment, processor=processor_2)

In [None]:
cutmix = transforms.v2.CutMix(num_classes=75)
mixup = transforms.v2.MixUp(num_classes=75)
cutmix_or_mixup = transforms.v2.RandomChoice([cutmix, mixup])

# Create collator applying cutmix or mixup
class CutMixOrMixUpCollator:
    """
    Custom collator to apply cutmix or mixup.
    Args:
        collator (callable): collator function to be used.
        cutmix_or_mixup (callable): cutmix or mixup function to be used.
    """
    def __init__(self, collator, cutmix_or_mixup):
        self.collator = collator
        self.cutmix_or_mixup = cutmix_or_mixup

    def __call__(self, examples):
        # Collate the examples
        batch = self.collator(examples)

        pixel_values = batch["pixel_values"]
        labels = batch["labels"]

        if pixel_values.ndim == 4: # Check if the input is a batch of images
            augmented_batch = self.cutmix_or_mixup(pixel_values, labels)
            batch["pixel_values"] = augmented_batch[0]
            batch["labels"] = augmented_batch[1]

        return batch

cutmix_mixup_collector = CutMixOrMixUpCollator(collate_fn, cutmix_or_mixup)

In [None]:
metric = evaluate.load("accuracy")

def compute_metrics(p):
    """
    Compute metrics for evaluation, cutmix and mixup return one-hot encoded labels.
    Args:
        p (Trainer): trainer object containing predictions and labels.
    """
    hard_labels = np.argmax(p.label_ids, axis=1)
    predictions = np.argmax(p.predictions, axis=1)
    return metric.compute(predictions=predictions, references=hard_labels)

In [None]:
model = SwinForImageClassification.from_pretrained(
    model_name_2,
    num_labels=len(labels),
    id2label={str(i): c for i, c in enumerate(labels)},
    label2id={c: str(i) for i, c in enumerate(labels)},
    ignore_mismatched_sizes=True
)

In [None]:
# Freeze embeddings layer and encoder layers 0, 1, 2
for name, param in model.named_parameters():
    if "swin.embeddings" in name:
        param.requires_grad = False
    elif "swin.encoder.layers.0" in name:
        param.requires_grad = False
    elif "swin.encoder.layers.1" in name:
        param.requires_grad = False
    elif "swin.encoder.layers.2" in name:
        param.requires_grad = False
    else:
        param.requires_grad = True

In [None]:
trainable_params = 0
total_params = 0
for param in model.parameters():
    total_params += param.numel()
    if param.requires_grad:
        trainable_params += param.numel()

print(f"Total parameters: {total_params}")
print(f"Trainable parameters: {trainable_params}")
print(f"Freezen parameters: {total_params - trainable_params}")

In [None]:
hidden_size = model.config.hidden_size
new_classifier = nn.Sequential(
    nn.Linear(hidden_size, hidden_size // 2),
    nn.ReLU(),
    nn.Dropout(0.2),
    nn.Linear(hidden_size // 2, hidden_size // 4),
    nn.ReLU(),
    nn.Dropout(0.2),
    nn.Linear(hidden_size // 4, len(labels))
)
model.classifier = new_classifier

In [None]:
training_args = TrainingArguments(
    output_dir="./result/swin-base-patch4-window7-224",
    num_train_epochs=50,
    per_device_train_batch_size=32,
    learning_rate=1e-4,

    eval_strategy="steps",
    eval_steps=100,
    per_device_eval_batch_size = 32,

    fp16=True,
    save_steps=100,
    save_total_limit=3,
    warmup_steps=500,
    weight_decay=0.01,

    logging_steps=10,
    remove_unused_columns=False,
    push_to_hub=True,
    report_to='tensorboard',
    load_best_model_at_end=True,
    metric_for_best_model="eval_loss",
    greater_is_better=False
)

In [None]:
trainer = Trainer(
    model=model,
    args=training_args,
    data_collator=cutmix_mixup_collector,
    compute_metrics=compute_metrics,
    train_dataset=tf_train_dataset,
    eval_dataset=tf_validation_dataset,
    processing_class=processor_2,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=5)]
)

In [None]:
train_results = trainer.train()

In [None]:
trainer.save_model()
trainer.log_metrics("train", train_results.metrics)
trainer.save_metrics("train", train_results.metrics)
trainer.save_state()

### Prediction

In [None]:
class ImageFolderDataset(Dataset):
    def __init__(self, image_folder, transform=None):
        self.image_folder = image_folder
        self.image_paths = [os.path.join(image_folder, img) for img in os.listdir(image_folder) if img.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif'))] # Lọc các file ảnh
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        image = Image.open(image_path).convert('RGB')

        if self.transform:
            image = self.transform(image)

        return image, image_path

# Transform cho Swin Transformer
image_size = 224 
mean = (0.485, 0.456, 0.406) # Mean ImageNet
std = (0.229, 0.224, 0.225)  # Std ImageNet

transform = transforms.Compose([
    transforms.Resize((image_size, image_size)),
    transforms.ToTensor(),
    transforms.Normalize(mean, std)
])

In [None]:
# Load model đã train (hoặc pre-trained nếu bạn dùng pre-trained)
model_name_1 = "hieulhwork24/vit-butterflies-google-final"
model_name_2 = "hieulhwork24/swinv2-base-patch4-window8-256"

model_1 = ViTForImageClassification.from_pretrained(model_name_1, num_labels=75)
model_2 = SwinForImageClassification.from_pretrained(model_name_2, num_labels=75)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_1.to(device)
model_2.to(device)
model_1.eval()
model_2.eval()

In [None]:
test_folder = "/content/test"
batch_size = 256
num_workers = 4

test_dataset = ImageFolderDataset(test_folder, transform=transform)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers)

predictions = {}
class_names = sorted(os.listdir("/content/train_2"))

with torch.no_grad():
    for batch_images, image_paths in test_dataloader:
        batch_images = batch_images.to(device)

        outputs_1 = model_1(batch_images)
        logits_1 = outputs_1.logits

        outputs_2 = model_2(batch_images)
        logits_2 = outputs_2.logits

        # Calculate avaerage probablities of class then choose max label
        average_logits = (logits_1 + logits_2) / 2
        predicted_classes = torch.argmax(average_logits, dim=-1)

        for i in range(len(image_paths)):
            image_path = image_paths[i]
            predicted_class_index = predicted_classes[i].item()
            predicted_class_name = class_names[predicted_class_index]
            predictions[os.path.basename(image_path)] = predicted_class_name

In [None]:
predition_df = pd.DataFrame(list(predictions.items()), columns=["ID", "label"])

predition_df.to_csv("submission.csv", index=False)