
# Imports

In [None]:
import os
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.utils import resample
from PIL import Image
from datasets import load_metric
import copy
import xgboost as xgb
import torch.nn as nn
import torch
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from transformers import Trainer, TrainingArguments, AutoModelForImageClassification, AutoImageProcessor, EarlyStoppingCallback
import gradio as gr

# Data Preprocessing

In [None]:
# Set device (CPU or GPU if available)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
train_metadata = pd.read_csv('/mnt/SSD/hasan/Agile-Styling-ai/CSV_Clean_Data/Masked_CSV/All_R_C_P_4th_3rd_2nd_1st_Molmo_Train-v1.csv')
val_metadata = pd.read_csv('/mnt/SSD/hasan/Agile-Styling-ai/CSV_Clean_Data/Masked_CSV/All_R_C_P_4th_3rd_2nd_1st_Molmo_Val-v1.csv')
test_metadata = pd.read_csv('/mnt/SSD/hasan/Agile-Styling-ai/CSV_Clean_Data/Masked_CSV/All_R_C_P_4th_3rd_2nd_1st_Molmo_Test-v1.csv')

In [None]:
class AgileDataset(Dataset):
    def __init__(self, metadata, transform=None):
        self.metadata = metadata
        self.transform = transform

    def __len__(self):
        return len(self.metadata)

    def __getitem__(self, idx):
        # Get the full image path from the metadata
        img_path = self.metadata.iloc[idx, 0]  # This is the Image Path column
        image = Image.open(img_path)
        
        # Convert the image to RGB if it's in RGBA mode (4 channels)
        if image.mode != 'RGB':
            image = image.convert('RGB')
        
        label = self.metadata.iloc[idx, 1]  # Assuming 'Season' is the second column

        if self.transform is not None:
            image = self.transform(image)

        return {"image": image, "label": label}


In [None]:
# Define augmentation transforms using torchvision.transforms
transform = transforms.Compose([
    # transforms.RandomResizedCrop(224),
    # transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    # transforms.RandomRotation(20),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

In [None]:
# Function to balance the dataset
def balance_dataset(df):
    class_counts = df['Season'].value_counts()
    max_class_count = class_counts.max()
    balanced_df = pd.DataFrame(columns=df.columns)
    for label, count in class_counts.items():
        if count < max_class_count:
            samples_to_add = max_class_count - count
            class_df = df[df['Season'] == label]
            sampled_df = resample(class_df, replace=True, n_samples=samples_to_add, random_state=42)
            balanced_df = pd.concat([balanced_df, class_df, sampled_df])
        else:
            balanced_df = pd.concat([balanced_df, df[df['Season'] == label]])

    return balanced_df

In [None]:
path_counts = dict()
index_to_drop = list()

In [None]:
for index, row in train_metadata.iterrows():
    path = row['Image Path']
    label = row['Season']
    dirname = os.path.dirname(path)
    if path_counts.get(dirname, 0) >= 2:
        index_to_drop.append(index)
    else:
        path_counts.setdefault(dirname, 0)
        path_counts[dirname] += 1

In [None]:
train_metadata = train_metadata[~train_metadata.index.isin(index_to_drop)]

In [None]:
(train_metadata['Season'].value_counts() / len(train_metadata) * 6)

In [None]:
labels = sorted(train_metadata['Season'].unique())
label2id = dict(zip(labels, range(len(labels))))
id2label = dict(zip(range(len(labels)), labels))

In [None]:
# Check for unique values in 'Season' column
print(train_metadata['Season'].unique())


In [None]:
# Display rows where 'Season' is NaN
nan_rows = train_metadata[train_metadata['Season'].isna()]
print("Rows with NaN values in 'Season' column:")
print(nan_rows)

# Display the count of NaN values
print(f"\nNumber of NaN values in 'Season' column: {nan_rows.shape[0]}")

In [None]:
id2label

In [None]:
label2id

In [None]:
train_metadata = train_metadata.sample(frac=1, random_state=42).reset_index(drop=True)  # Shuffle the data

In [None]:
val_metadata = val_metadata.sample(frac=1, random_state=42).reset_index(drop=True)  # Shuffle the data

In [None]:
test_metadata = test_metadata.sample(frac=1, random_state=42).reset_index(drop=True)  # Shuffle the data

In [None]:
# Create DataLoader for the training dataset (with augmentation)
# train_df_balanced = balance_dataset(train_metadata)
train_dataset = AgileDataset(train_metadata, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)

In [None]:
train_metadata['Season'].value_counts()

In [None]:
train_dataset.transform

In [None]:
# Create DataLoader for the validation dataset (without augmentation)
val_dataset = AgileDataset(val_metadata, transform=transform)  # No augmentation for validation
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)

In [None]:
val_metadata['Season'].value_counts()

In [None]:
# Create DataLoader for the test dataset
test_dataset = AgileDataset(test_metadata, transform=transform)  # No augmentation for testing
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)

In [None]:
test_dataset

In [None]:
len(test_metadata)

In [None]:
test_metadata['Season'].value_counts()

In [None]:
# Check dataset sizes
print(len(train_dataset), len(val_dataset), len(test_dataset))

In [None]:
# Get an item from the dataset
index = 10  # You can change this to any valid index within your dataset
sample = train_dataset[index]

# Print the output in the desired format
print("Output:")
print(sample)  # This will show the image tensor and label directly



# Metrics

In [None]:
# eval_pred.predictions

In [None]:
metric = load_metric("seqeval")

def compute_metrics(eval_pred):

    predictions = np.argmax(eval_pred.predictions, axis=1)
    labels = eval_pred.label_ids
    predictions = [id2label[_] for _ in predictions]
    labels = [id2label[_] for _ in labels]
    results = metric.compute(predictions=[predictions], references=[labels])

    # print(list(results))
    # print(results)
    # Unpack nested dictionaries
    final_results = {}

    results_copy = copy.deepcopy(results)
    for m in ['overall_precision', 'overall_recall', 'overall_f1', 'overall_accuracy']:
        _ = results_copy.pop(m)

    overall = dict(zip(['f1', 'precision', 'recall'], [{'nom': 0, 'denom': 0}] * 3))

    for l in results_copy:
        for m in ['f1', 'precision', 'recall']:
            weight = 1
            if l in []:
                weight = 4
            overall[m]['nom'] += results_copy[l]['f1'] * results_copy[l]['number'] * weight
            overall[m]['denom'] += results_copy[l]['number'] * weight

    for m in ['f1', 'precision', 'recall']:
        overall[m] = overall[m]['nom'] / overall[m]['denom']

    # for m in ['f1', 'precision', 'recall']:
    #   if overall[m]['denom'] != 0:
    #       overall[m] = overall[m]['nom'] / overall[m]['denom']
    #   else:
    #       overall[m] = 0  # Set to a suitable value when denominator is zero


    for m in ['f1', 'precision', 'recall']:
        results[f'overall_{m}'] = overall[m]
    results.pop('overall_accuracy')

    for key, value in results.items():
        if isinstance(value, dict):
            for n, v in value.items():
                final_results[f"{key}_{n}"] = v
        else:
            final_results[key] = value
    return final_results

In [None]:
# def get_collate_fn(processor):
#   Process = lambda x: processor(x,labelidz return_tensors='pt')['pixel_values']
def collate_fn(examples):
  pixel_values = torch.stack([example["image"] for example in examples])
  labels = torch.tensor([label2id[example["label"]] for example in examples])
  return {"pixel_values": pixel_values, "labels": labels}
# return collate_fn

# Dinov2-base

In [None]:
model_checkpoint = "facebook/dinov2-base"
# model_checkpoint = "vit models/convnext_results/train-1/checkpoint-1000"
image_processor = AutoImageProcessor.from_pretrained(model_checkpoint)


In [None]:
# Create the ViT model
model = AutoModelForImageClassification.from_pretrained(
    model_checkpoint,
    label2id = label2id,
    id2label = id2label,
    ignore_mismatched_sizes=True).to('cuda')

In [None]:
model

In [None]:
# Training arguments
batch_size = 16
epochs = 50
training_args = TrainingArguments(
    # output_dir="Vit-project-80-5-15/convnext_model/train-3",
    output_dir="/mnt/SSD/hasan/Agile-Styling-ai/Finetuned_Checpoint/Dinov2_Checkpoints/R_C_P_4th_3rd_2nd_1st/Test-1",
    remove_unused_columns=False,
    evaluation_strategy="steps",
    save_strategy="steps",
    eval_steps=5,
    save_steps=5,
    save_total_limit=5,
    logging_steps=5,
    learning_rate=0.00001,
    per_device_train_batch_size=batch_size,
    gradient_accumulation_steps=2,
    per_device_eval_batch_size=12,
    num_train_epochs=epochs,
    max_steps=int(np.ceil(len(train_dataset)/batch_size*epochs)),
    load_best_model_at_end=True,
    metric_for_best_model="eval_overall_f1",
    )

In [None]:
# callback = EarlyStoppingCallback(early_stopping_patience=5, early_stopping_threshold=0.9)

In [None]:
# Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    compute_metrics=compute_metrics,
    data_collator=collate_fn,
    # callbacks=[callback]
    )

In [None]:
len(train_dataset)

In [None]:
# Training
train_results = trainer.train()

In [None]:
test_pred = trainer.predict(test_dataset)

In [None]:
test_pred.metrics

In [None]:
y_test = torch.tensor([label2id[test_dataset[i]['label']] for i in range(len(test_dataset))])
y_test.shape

In [None]:
print(len(y_test))  # true labels you have
print(len(test_pred))  # Predictions made

In [None]:
(test_pred)

In [None]:
# Check the length of both
print(f"Length of y_test: {len(y_test)}")
print(f"Length of test_pred: {len(test_pred.predictions)}")

# Optionally, check the first few values to make sure they look correct
print(f"First 5 true labels: {y_test[:5]}")
print(f"First 5 predictions: {test_pred[:5]}")

In [None]:
test_pred

In [None]:
y_test

In [None]:
test_pred.predictions

In [None]:
labelids = [np.argmax(a) for a in test_pred.predictions]

In [None]:
test_pred.label_ids

In [None]:
target = list(id2label.keys())  
target_names = list(id2label.values())
from sklearn import metrics
test_predictions = test_pred.predictions
# test_predictions = test_pred.label_ids
test_predictions = labelids
# threshold = np.round(metrics.roc_auc_score(y_test6, y_pred6,multi_class='ovo', average='weighted'),4)
# print('ROC AUC Score is :', threshold)
print('Accuracy score is :', np.round(metrics.accuracy_score(y_test,  test_predictions),4))
print('Precision score is :', np.round(metrics.precision_score(y_test, test_predictions, average='weighted'),4))
print('Recall score is :', np.round(metrics.recall_score(y_test,  test_predictions, average='weighted'),4))
print('F1 Score is :', np.round(metrics.f1_score(y_test,  test_predictions, average='weighted'),4))
print('Cohen Kappa Score:', np.round(metrics.cohen_kappa_score(y_test,  test_predictions),4))
print('\t\tClassification Report:\n', metrics.classification_report(y_test,  test_predictions, labels=target, target_names=target_names))

# Inference From Finetned Checkpoints

In [None]:
# Replace this path with the correct path to your fine-tuned model
checkpoint_path = "/mnt/SSD/hasan/Agile-Styling-ai/Finetuned_Checpoint/Dinov2_Checkpoints/R_C_P_4th_3rd_2nd_1st/Test-1/checkpoint-255"

# Load the model and the processor (image processor)
model = AutoModelForImageClassification.from_pretrained(
    checkpoint_path,
    label2id=label2id,
    id2label=id2label,
    ignore_mismatched_sizes=True
).to(device)

image_processor = AutoImageProcessor.from_pretrained(checkpoint_path)

In [None]:
# Define the test dataset transforms (only resizing and normalization)
test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Create DataLoader for the test dataset (without augmentation)
test_dataset = AgileDataset(test_metadata, transform=test_transform)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)


In [None]:
from sklearn import metrics

In [None]:
# Use the trained model to predict on the test dataset
test_pred = trainer.predict(test_dataset)

In [None]:
labelids = [np.argmax(a) for a in test_pred.predictions]

In [None]:
y_test = torch.tensor([label2id[test_dataset[i]['label']] for i in range(len(test_dataset))])
y_test.shape

In [None]:
# 255
target = list(id2label.keys())  
target_names = list(id2label.values())
from sklearn import metrics
test_predictions = test_pred.predictions
# test_predictions = test_pred.label_ids
test_predictions = labelids
# threshold = np.round(metrics.roc_auc_score(y_test6, y_pred6,multi_class='ovo', average='weighted'),4)
# print('ROC AUC Score is :', threshold)
print('Accuracy score is :', np.round(metrics.accuracy_score(y_test,  test_predictions),4))
print('Precision score is :', np.round(metrics.precision_score(y_test, test_predictions, average='weighted'),4))
print('Recall score is :', np.round(metrics.recall_score(y_test,  test_predictions, average='weighted'),4))
print('F1 Score is :', np.round(metrics.f1_score(y_test,  test_predictions, average='weighted'),4))
print('Cohen Kappa Score:', np.round(metrics.cohen_kappa_score(y_test,  test_predictions),4))
print('\t\tClassification Report:\n', metrics.classification_report(y_test,  test_predictions, labels=target, target_names=target_names))

# Swinv2

In [None]:
# Load the image processor for the specified model checkpoint
model_checkpoint = "microsoft/swinv2-base-patch4-window12-192-22k"
# model_checkpoint = "microsoft/swinv2-base-patch4-window12-192-22k"
image_processor = AutoImageProcessor.from_pretrained("microsoft/swinv2-base-patch4-window12-192-22k")

In [None]:
# Create the ViT model
model = AutoModelForImageClassification.from_pretrained(
    model_checkpoint,
    label2id = label2id,
    id2label = id2label,
    ignore_mismatched_sizes=True).to('cuda')

In [None]:
# callback = EarlyStoppingCallback(early_stopping_patience=8)

In [None]:
# Training arguments
batch_size = 4
epochs = 50
training_args = TrainingArguments(
    output_dir="/mnt/SSD/hasan/Agile-Styling-ai/Finetuned_Checpoint/Swin_v2_Checkpoints/R_C_P_4th_3rd_2nd_1st/Test-1",
    remove_unused_columns=False,
    evaluation_strategy="steps",
    save_strategy="steps",
    eval_steps=5,
    save_steps=5,
    save_total_limit=1,
    logging_steps=100,
    learning_rate=0.00001,
    per_device_train_batch_size=batch_size,
    gradient_accumulation_steps=1,
    per_device_eval_batch_size=12,
    num_train_epochs=epochs,
    max_steps=int(np.ceil(len(train_dataset)/batch_size*epochs)),
    load_best_model_at_end=True,
    metric_for_best_model="eval_overall_f1",
    )

In [None]:
# Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    # tokenizer=image_processor,
    compute_metrics=compute_metrics,
    data_collator=collate_fn,
    # callbacks=[callback]
    )

In [None]:
# Training
train_results = trainer.train()

In [None]:
test_pred = trainer.predict(test_dataset)

In [None]:
test_pred.metrics

In [None]:
y_test = torch.tensor([label2id[test_dataset[i]['label']] for i in range(len(test_dataset))])
y_test.shape

In [None]:
print(len(y_test))  # true labels you have
print(len(test_pred))  # Predictions made

In [None]:
(test_pred)

In [None]:
# Check the length of both
print(f"Length of y_test: {len(y_test)}")
print(f"Length of test_pred: {len(test_pred.predictions)}")

# Optionally, check the first few values to make sure they look correct
print(f"First 5 true labels: {y_test[:5]}")
print(f"First 5 predictions: {test_pred[:5]}")


In [None]:
test_pred

In [None]:
y_test

In [None]:
test_pred.predictions

In [None]:
labelids = [np.argmax(a) for a in test_pred.predictions]

In [None]:
test_pred.label_ids

In [None]:
target = list(id2label.keys())  
target_names = list(id2label.values())
from sklearn import metrics
test_predictions = test_pred.predictions
# test_predictions = test_pred.label_ids
test_predictions = labelids
# threshold = np.round(metrics.roc_auc_score(y_test6, y_pred6,multi_class='ovo', average='weighted'),4)
# print('ROC AUC Score is :', threshold)
print('Accuracy score is :', np.round(metrics.accuracy_score(y_test,  test_predictions),4))
print('Precision score is :', np.round(metrics.precision_score(y_test, test_predictions, average='weighted'),4))
print('Recall score is :', np.round(metrics.recall_score(y_test,  test_predictions, average='weighted'),4))
print('F1 Score is :', np.round(metrics.f1_score(y_test,  test_predictions, average='weighted'),4))
print('Cohen Kappa Score:', np.round(metrics.cohen_kappa_score(y_test,  test_predictions),4))
print('\t\tClassification Report:\n', metrics.classification_report(y_test,  test_predictions, labels=target, target_names=target_names))

## Swin Inference from Finetuned Checkpoints

In [None]:
# Replace this path with the correct path to your fine-tuned model
checkpoint_path = "/mnt/SSD/hasan/Agile-Styling-ai/Finetuned_Checpoint/Swin_v2_Checkpoints/R_C_P_4th_3rd_2nd_1st/Test-1/checkpoint-955"

# Load the model and the processor (image processor)
model = AutoModelForImageClassification.from_pretrained(
    checkpoint_path,
    label2id=label2id,
    id2label=id2label,
    ignore_mismatched_sizes=True
).to(device)

image_processor = AutoImageProcessor.from_pretrained(checkpoint_path)

In [None]:
# Define the test dataset transforms (only resizing and normalization)
test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Create DataLoader for the test dataset (without augmentation)
test_dataset = AgileDataset(test_metadata, transform=test_transform)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)


In [None]:
from sklearn import metrics

In [None]:
# Use the trained model to predict on the test dataset
test_pred = trainer.predict(test_dataset)

In [None]:
labelids = [np.argmax(a) for a in test_pred.predictions]

In [None]:
y_test = torch.tensor([label2id[test_dataset[i]['label']] for i in range(len(test_dataset))])
y_test.shape

In [None]:
# On New Data
target = list(id2label.keys())  
target_names = list(id2label.values())
from sklearn import metrics
test_predictions = test_pred.predictions
# test_predictions = test_pred.label_ids
test_predictions = labelids
# threshold = np.round(metrics.roc_auc_score(y_test6, y_pred6,multi_class='ovo', average='weighted'),4)
# print('ROC AUC Score is :', threshold)
print('Accuracy score is :', np.round(metrics.accuracy_score(y_test,  test_predictions),4))
print('Precision score is :', np.round(metrics.precision_score(y_test, test_predictions, average='weighted'),4))
print('Recall score is :', np.round(metrics.recall_score(y_test,  test_predictions, average='weighted'),4))
print('F1 Score is :', np.round(metrics.f1_score(y_test,  test_predictions, average='weighted'),4))
print('Cohen Kappa Score:', np.round(metrics.cohen_kappa_score(y_test,  test_predictions),4))
print('\t\tClassification Report:\n', metrics.classification_report(y_test,  test_predictions, labels=target, target_names=target_names))

# Beit

In [None]:
# Load the image processor for the specified model checkpoint
# model_checkpoint = "Vit-project-80-5-15/beit_model/train-3/checkpoint-2600"
model_checkpoint = "microsoft/beit-base-patch16-224"
image_processor = AutoImageProcessor.from_pretrained("microsoft/beit-base-patch16-224")

In [None]:
# Create the ViT model
model = AutoModelForImageClassification.from_pretrained(
    model_checkpoint,
    label2id = label2id,
    id2label = id2label,
    ignore_mismatched_sizes=True).to('cuda')

In [None]:
# callback = EarlyStoppingCallback(early_stopping_patience=30)

In [None]:
# Training arguments
batch_size = 16
epochs = 50
training_args = TrainingArguments(
    output_dir="/mnt/SSD/hasan/Agile-Styling-ai/Finetuned_Checpoint/Beit_Checkpoints/R_C_P_4th_3rd_2nd_1st/Test-1",
    remove_unused_columns=False,
    evaluation_strategy="steps",
    save_strategy="steps",
    eval_steps=10,
    save_steps=10,
    save_total_limit=3,
    logging_steps=10,
    learning_rate=0.00001,
    per_device_train_batch_size=batch_size,
    gradient_accumulation_steps=2,
    per_device_eval_batch_size=12,
    num_train_epochs=epochs,
    max_steps=int(np.ceil(len(train_dataset)/batch_size*epochs)),
    load_best_model_at_end=True,
    metric_for_best_model="eval_overall_f1",
    )

In [None]:
# Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    # tokenizer=image_processor,
    compute_metrics=compute_metrics,
    data_collator=collate_fn,
    # callbacks=[callback]
    )

In [None]:
# Training
train_results = trainer.train()

In [None]:
test_pred = trainer.predict(test_dataset)

In [None]:
test_pred.metrics

In [None]:
test_pred.metrics

In [None]:
y_test = torch.tensor([label2id[test_dataset[i]['label']] for i in range(len(test_dataset))])
y_test.shape

In [None]:
print(len(y_test))  # true labels you have
print(len(test_pred))  # Predictions made

In [None]:
# (test_pred)

In [None]:
# Check the length of both
print(f"Length of y_test: {len(y_test)}")
print(f"Length of test_pred: {len(test_pred.predictions)}")

# Optionally, check the first few values to make sure they look correct
print(f"First 5 true labels: {y_test[:5]}")
print(f"First 5 predictions: {test_pred[:5]}")


In [None]:
test_pred

In [None]:
y_test

In [None]:
test_pred.predictions

In [None]:
labelids = [np.argmax(a) for a in test_pred.predictions]

In [None]:
test_pred.label_ids

In [None]:
target = list(id2label.keys())  
target_names = list(id2label.values())
from sklearn import metrics
test_predictions = test_pred.predictions
# test_predictions = test_pred.label_ids
test_predictions = labelids
# threshold = np.round(metrics.roc_auc_score(y_test6, y_pred6,multi_class='ovo', average='weighted'),4)
# print('ROC AUC Score is :', threshold)
print('Accuracy score is :', np.round(metrics.accuracy_score(y_test,  test_predictions),4))
print('Precision score is :', np.round(metrics.precision_score(y_test, test_predictions, average='weighted'),4))
print('Recall score is :', np.round(metrics.recall_score(y_test,  test_predictions, average='weighted'),4))
print('F1 Score is :', np.round(metrics.f1_score(y_test,  test_predictions, average='weighted'),4))
print('Cohen Kappa Score:', np.round(metrics.cohen_kappa_score(y_test,  test_predictions),4))
print('\t\tClassification Report:\n', metrics.classification_report(y_test,  test_predictions, labels=target, target_names=target_names))

## Beit Inference from Finetuned Checkpoints

In [None]:
test_pred = trainer.predict(test_dataset)

In [None]:
labelids = [np.argmax(a) for a in test_pred.predictions]

In [None]:
y_test = torch.tensor([label2id[test_dataset[i]['label']] for i in range(len(test_dataset))])
y_test.shape

In [None]:
# Replace this path with the correct path to your fine-tuned model
checkpoint_path = "/mnt/SSD/hasan/Agile-Styling-ai/Finetuned_Checpoint/Beit_Checkpoints/R_C_P_4th_3rd_2nd_1st/Test-1/checkpoint-1120"

# Load the model and the processor (image processor)
model = AutoModelForImageClassification.from_pretrained(
    checkpoint_path,
    label2id=label2id,
    id2label=id2label,
    ignore_mismatched_sizes=True
).to(device)

image_processor = AutoImageProcessor.from_pretrained(checkpoint_path)

In [None]:
# Define the test dataset transforms (only resizing and normalization)
test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Create DataLoader for the test dataset (without augmentation)
test_dataset = AgileDataset(test_metadata, transform=test_transform)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)


In [None]:
from sklearn import metrics

In [None]:
# Use the trained model to predict on the test dataset
test_pred = trainer.predict(test_dataset)

In [None]:
labelids = [np.argmax(a) for a in test_pred.predictions]

In [None]:
y_test = torch.tensor([label2id[test_dataset[i]['label']] for i in range(len(test_dataset))])
y_test.shape

In [None]:
# 1120
target = list(id2label.keys())  
target_names = list(id2label.values())
from sklearn import metrics
test_predictions = test_pred.predictions
# test_predictions = test_pred.label_ids
test_predictions = labelids
# threshold = np.round(metrics.roc_auc_score(y_test6, y_pred6,multi_class='ovo', average='weighted'),4)
# print('ROC AUC Score is :', threshold)
print('Accuracy score is :', np.round(metrics.accuracy_score(y_test,  test_predictions),4))
print('Precision score is :', np.round(metrics.precision_score(y_test, test_predictions, average='weighted'),4))
print('Recall score is :', np.round(metrics.recall_score(y_test,  test_predictions, average='weighted'),4))
print('F1 Score is :', np.round(metrics.f1_score(y_test,  test_predictions, average='weighted'),4))
print('Cohen Kappa Score:', np.round(metrics.cohen_kappa_score(y_test,  test_predictions),4))
print('\t\tClassification Report:\n', metrics.classification_report(y_test,  test_predictions, labels=target, target_names=target_names))

# Ensemble

## Load Dino

In [None]:
# model_checkpoint = "/content/drive/MyDrive/re/swinv2_results/swinv2-checkpoint"
model_checkpoint = "/mnt/SSD/hasan/Agile-Styling-ai/Finetuned_Checpoint/Dinov2_Checkpoints/R_C_P_4th_3rd_2nd_1st/Test-1/checkpoint-255"

In [None]:
# Create the ViT model
model = AutoModelForImageClassification.from_pretrained(
    model_checkpoint,
    label2id = label2id,
    id2label = id2label,
    ignore_mismatched_sizes=True).to('cuda')

In [None]:
import torch.nn as nn

In [None]:
dino_model = nn.Sequential(*list(model.children())[:-1])
# new_model = model.children()[:-1]
# new_model

## Load Swin

In [None]:
# model_checkpoint = "/content/drive/MyDrive/re/swinv2_results/swinv2-checkpoint"
model_checkpoint = "/mnt/SSD/hasan/Agile-Styling-ai/Finetuned_Checpoint/Swin_v2_Checkpoints/R_C_P_4th_3rd_2nd_1st/Test-1/checkpoint-955"

In [None]:
# Create the ViT model
model = AutoModelForImageClassification.from_pretrained(
    model_checkpoint,
    label2id = label2id,
    id2label = id2label,
    ignore_mismatched_sizes=True).to('cuda')

In [None]:
swin_model = nn.Sequential(*list(model.children())[:-1])
# new_model = model.children()[:-1]
# new_model

## Load Beit

In [None]:
# model_checkpoint = "/content/drive/MyDrive/re/swinv2_results/swinv2-checkpoint"
model_checkpoint = "/mnt/SSD/hasan/Agile-Styling-ai/Finetuned_Checpoint/Beit_Checkpoints/R_C_P_4th_3rd_2nd_1st/Test-1/checkpoint-1120"

In [None]:
# Create the ViT model
model = AutoModelForImageClassification.from_pretrained(
    model_checkpoint,
    label2id = label2id,
    id2label = id2label,
    ignore_mismatched_sizes=True).to('cuda')

In [None]:
beit_model = nn.Sequential(*list(model.children())[:-1])
# new_model = model.children()[:-1]
# new_model

# Extracting Features

In [None]:
def extract_features(dataset, model):

    all_features = []
    for i in range(len(dataset)):
        with torch.no_grad():
            features = model(torch.unsqueeze(dataset[i]['image'], 0).to('cuda'))
        features = features.pooler_output.cpu()
        all_features.append(features[0])
    
    all_features = torch.stack(all_features)

    return all_features

In [None]:
dino_train_features = torch.concat([extract_features(train_dataset, dino_model), extract_features(val_dataset, dino_model)])
dino_test_features = extract_features(test_dataset, dino_model)

In [None]:
dino_train_features.shape, dino_test_features.shape

In [None]:
swin_train_features = torch.concat([extract_features(train_dataset, swin_model), extract_features(val_dataset, swin_model)])
swin_test_features = extract_features(test_dataset, swin_model)

In [None]:
swin_train_features.shape, swin_test_features.shape

In [None]:
beit_train_features = torch.concat([extract_features(train_dataset, beit_model), extract_features(val_dataset, beit_model)])
beit_test_features = extract_features(test_dataset, beit_model)

In [None]:
beit_train_features.shape, beit_test_features.shape

## Concatenating Features

In [None]:
concatenated_train_features = torch.concat([dino_train_features, swin_train_features, beit_train_features], axis=1)

In [None]:
concatenated_train_features = torch.concat([dino_train_features, swin_train_features, beit_train_features], axis=1)

In [None]:
concatenated_train_features.shape

In [None]:
concatenated_test_features = torch.concat([dino_test_features, swin_test_features, beit_test_features], axis=1)

In [None]:
np.save('4th_X_train.npy', concatenated_train_features.numpy())
np.save('4th_X_test.npy', concatenated_test_features.numpy())

In [None]:
concatenated_test_features.shape

## Encoding Labels

In [None]:
y_train = train_dataset.metadata['Season'].apply(lambda x: label2id[x]).to_list() \
+ val_dataset.metadata['Season'].apply(lambda x: label2id[x]).to_list()

In [None]:
np.save("4th_Y_train.npy", np.array(y_train))

In [None]:
y_train = torch.tensor(y_train)

In [None]:
# y_test = torch.tensor([label2id[test_dataset[i]['label']] for i in range(len(test_dataset))])
y_test = test_dataset.metadata['Season'].apply(lambda x: label2id[x]).to_list()

In [None]:
np.save("4th_Y_test.npy", np.array(y_test))

# XGBoost model performance

In [None]:
import xgboost as xgb

In [None]:
concatenated_train_features = np.load('4th_X_train.npy')
concatenated_test_features = np.load('4th_X_test.npy')
y_train = np.load('4th_Y_train.npy')
y_test = np.load('4th_Y_test.npy')

In [None]:
y_train, y_test = torch.tensor(y_train), torch.tensor(y_test)

In [None]:
y_train.shape, y_test.shape

In [None]:
# Create an XGBoost classifier instance
xgb_model = xgb.XGBClassifier(
    tree_method='gpu_hist',
    learning_rate=.1,    # Learning rate for boosting
    n_estimators=100000,     # Number of boosting rounds (trees)
    max_depth=4,          # Maximum depth of individual trees
    # objective='binary:logistic',  # Objective function for binary classification
    objective='multi:softmax',  # Objective function for multiclass classification
    num_class=4, # Number of classes,
    subsample=0.2
)

In [None]:
xgb_model.fit(concatenated_train_features, y_train)

In [None]:
# 4th Round
print("Performance Report:")
y_pred = xgb_model.predict(concatenated_test_features)
target = list(id2label.keys())  
target_names = list(id2label.values())
from sklearn import metrics
# threshold = np.round(metrics.roc_auc_score(y_test6, y_pred6,multi_class='ovo', average='weighted'),4)
# print('ROC AUC Score is :', threshold)
print('Accuracy score is :', np.round(metrics.accuracy_score(y_test, y_pred), 4))
print('Precision score is :', np.round(metrics.precision_score(y_test, y_pred, average='weighted'), 4))
print('Recall score is :', np.round(metrics.recall_score(y_test, y_pred, average='weighted'), 4))
print('F1 Score is :', np.round(metrics.f1_score(y_test, y_pred, average='weighted'), 4))
print('Cohen Kappa Score:', np.round(metrics.cohen_kappa_score(y_test, y_pred), 4))
print('\t\tClassification Report:\n', metrics.classification_report(y_test, y_pred, labels=target, target_names=target_names))


In [None]:
# Save the trained XGBoost model
xgb_model.save_model("xgboost_model_4th.json")

# Gradio

In [None]:
# Assuming model checkpoints for DINO v2, Swin v2, BEiT
dino_model_checkpoint = "/mnt/SSD/hasan/Agile-Styling-ai/Finetuned_Checpoint/Dinov2_Checkpoints/Test_5/checkpoint-27"
swin_model_checkpoint = "/mnt/SSD/hasan/Agile-Styling-ai/Finetuned_Checpoint/Swin_v2_Checkpoints/Test_1/checkpoint-55"
beit_model_checkpoint = "/mnt/SSD/hasan/Agile-Styling-ai/Finetuned_Checpoint/Beit_Checkpoints/Test_1/checkpoint-209"

In [None]:
import gradio as gr
import torch
import numpy as np
import xgboost as xgb
from PIL import Image
import torchvision.transforms as transforms
from transformers import AutoImageProcessor, AutoModelForImageClassification
import torch.nn as nn

# Set device (CUDA if available)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Define model paths for DINO, Swin, BEiT models (replace with actual paths)
# dino_model_checkpoint = "/path/to/dino_model_checkpoint"
# swin_model_checkpoint = "/path/to/swin_model_checkpoint"
# beit_model_checkpoint = "/path/to/beit_model_checkpoint"
# xgb_model_path = "xgboost_model.json"  # XGBoost model path

# Load pretrained models
dino_model = nn.Sequential(*list(AutoModelForImageClassification.from_pretrained(dino_model_checkpoint).children())[:-1]).to(device)
swin_model = nn.Sequential(*list(AutoModelForImageClassification.from_pretrained(swin_model_checkpoint).children())[:-1]).to(device)
beit_model = nn.Sequential(*list(AutoModelForImageClassification.from_pretrained(beit_model_checkpoint).children())[:-1]).to(device)

# Load the saved XGBoost model
xgb_model = xgb.XGBClassifier()
xgb_model.load_model(xgb_model_path)

# Define the image preprocessing pipeline
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Feature extraction function
def extract_features(image, model):
    try:
        # Transform the image to tensor and add batch dimension
        image = transform(image).unsqueeze(0).to(device)  # Prepare image for inference
        with torch.no_grad():
            # Get model output
            output = model(image)
        
        # Extract the pooler_output or logits (depending on the model)
        features = output.pooler_output if hasattr(output, 'pooler_output') else output.logits
        
        # Flatten the output to make it compatible with XGBoost
        features = features.flatten().cpu().numpy()  # Flatten the features
        return features
    except Exception as e:
        print(f"Error during feature extraction: {e}")
        return None

# Prediction function
def predict_season(image):
    try:
        # Extract features from all models (DINO, Swin, BEiT)
        dino_features = extract_features(image, dino_model)
        swin_features = extract_features(image, swin_model)
        beit_features = extract_features(image, beit_model)

        if dino_features is None or swin_features is None or beit_features is None:
            return "Error during feature extraction"

        # Debugging: Print shapes of the extracted features
        print(f"DINO Features Shape: {dino_features.shape}")
        print(f"Swin Features Shape: {swin_features.shape}")
        print(f"BEiT Features Shape: {beit_features.shape}")

        # Concatenate features from all models into one array
        concatenated_features = np.concatenate([dino_features, swin_features, beit_features], axis=0)
        print(f"Concatenated Features Shape: {concatenated_features.shape}")

        # Make prediction using XGBoost model
        prediction = xgb_model.predict([concatenated_features])[0]

        # Map prediction (index) to label (season)
        id2label = {0: 'True Autumn', 1: 'True Spring', 2: 'True Summer', 3: 'True Winter'}  # Modify as needed
        predicted_season = id2label[prediction]

        return predicted_season

    except Exception as e:
        print(f"Error during prediction: {e}")
        return f"Error during prediction: {e}"

# Gradio Interface
iface = gr.Interface(
    fn=predict_season,  # Prediction function
    inputs=gr.Image(type="pil", label="Upload Image"),  # Image input
    outputs=gr.Textbox(label="Predicted Season"),  # Text output for predicted season
    live=True  # Update output as soon as image is uploaded
)

# Launch the Gradio app
iface.launch(server_name="0.0.0.0", server_port=7999, share=True)  # Set share=True to get a public URL
