In [None]:
import os
import numpy as np
import pandas as pd
from collections import Counter
from tqdm import tqdm
from PIL import Image
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import AutoProcessor, AutoModelForImageClassification
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import f1_score
from sklearn.base import clone
from scipy.optimize import minimize
from xgboost import XGBClassifier

In [None]:
train_df = pd.read_csv('/kaggle/input/visual-taxonomy/train.csv')

train_df = train_df[train_df['Category'] == 'Kurtis'].reset_index(drop=True)

image_dir = '/kaggle/input/visual-taxonomy/train_images'

attr_columns = ['attr_1', 'attr_2', 'attr_3', 'attr_4', 'attr_5','attr_6','attr_7','attr_8', 'attr_9', 'attr_10']

In [None]:
test_df = pd.read_csv('/kaggle/input/visual-taxonomy/test.csv')

test_df = test_df[test_df['Category'] == 'Kurtis'].reset_index(drop=True)

test_image_dir = '/kaggle/input/visual-taxonomy/test_images'

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model_name = './pvt_v2_b2'
feature_extractor = AutoProcessor.from_pretrained(model_name)
model = AutoModelForImageClassification.from_pretrained(
    model_name, output_hidden_states=True, ignore_mismatched_sizes=True)
model.eval()
model.to(device)

In [None]:
class FeatureExtractionDataset(Dataset):
    def __init__(self, dataframe, image_dir, feature_extractor, image_size=(224, 224)):
        self.data = dataframe.reset_index(drop=True)
        self.image_dir = image_dir
        self.feature_extractor = feature_extractor
        self.image_size = image_size

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_id = self.data.loc[idx, 'id']
        img_path = os.path.join(self.image_dir, f"{str(img_id).zfill(6)}.jpg")
        image = Image.open(img_path).convert("RGB")
        # Resize the image to a consistent size
        image = image.resize(self.image_size)
        inputs = self.feature_extractor(images=image, return_tensors="pt")
        return inputs['pixel_values'].squeeze(0), img_id


In [None]:
# Initialize the submission DataFrame
submission_df = pd.DataFrame({
    'id': test_df['id'],
    'Category': test_df['Category'],
    'len': 9
})

In [None]:
def harmonic_f1_score(y_true, y_pred):
    f1_macro = f1_score(y_true, y_pred, average='macro')
    f1_micro = f1_score(y_true, y_pred, average='micro')
    harmonic_mean = 2 * (f1_macro * f1_micro) / (f1_macro + f1_micro + 1e-10)
    return f1_macro, f1_micro, harmonic_mean

def threshold_rounder(probs, thresholds):
    num_samples, num_classes = probs.shape
    preds = np.zeros(num_samples, dtype=int)
    for i in range(num_samples):
        adjusted_probs = probs[i] - thresholds
        preds[i] = np.argmax(adjusted_probs)
    return preds

def evaluate_f1_thresholds(thresholds, y_true, probs):
    y_pred = threshold_rounder(probs, thresholds)
    _, _, harmonic_mean = harmonic_f1_score(y_true, y_pred)
    return -harmonic_mean

In [None]:
def TrainML(model_class, X, y, test_data, n_splits=5, SEED=42):
    SKF = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=SEED)
    num_classes = len(np.unique(y))
    oof_probs = np.zeros((len(y), num_classes))
    test_probs = np.zeros((len(test_data), num_classes))
    
    for fold, (train_idx, val_idx) in enumerate(tqdm(SKF.split(X, y), desc="Training Folds", total=n_splits)):
        X_train, X_val = X[train_idx], X[val_idx]
        y_train, y_val = y[train_idx], y[val_idx]

        model = clone(model_class)
        model.fit(X_train, y_train)

        y_val_probs = model.predict_proba(X_val)
        oof_probs[val_idx] = y_val_probs

        test_probs += model.predict_proba(test_data) / n_splits  # Averaging predictions

    # Optimize thresholds
    initial_thresholds = [0.5] * num_classes
    bounds = [(0, 1)] * num_classes

    optimized = minimize(
        evaluate_f1_thresholds,
        x0=initial_thresholds,
        args=(y, oof_probs),
        method='Nelder-Mead',
        bounds=bounds
    )

    assert optimized.success, "Optimization did not converge."

    optimized_thresholds = optimized.x
    oof_preds = threshold_rounder(oof_probs, optimized_thresholds)
    test_preds = threshold_rounder(test_probs, optimized_thresholds)

    # Calculate and print the scores
    f1_macro, f1_micro, harmonic_mean = harmonic_f1_score(y, oof_preds)
    print(f"F1-Macro: {f1_macro:.4f}, F1-Micro: {f1_micro:.4f}, Harmonic Mean: {harmonic_mean:.4f}")

    return oof_preds, test_preds, optimized_thresholds


In [None]:
import torch.nn.functional as F

for attr_idx, attr in enumerate(attr_columns, start=1):
    print(f"\nProcessing {attr}...")

    # Drop rows with NaN
    df = train_df.dropna(subset=[attr]).reset_index(drop=True)
    if df.empty:
        print(f"No data available for {attr}, skipping.")
        submission_df[attr] = 'dummy_value'
        continue

    y_labels = df[attr].values

    # Encode labels
    label_encoder = LabelEncoder()
    y = label_encoder.fit_transform(y_labels)
    num_classes = len(np.unique(y))
    print(num_classes)

    # Handle cases where y has only one class
    if num_classes < 2:
        print(f"Only one class present in {attr}, skipping training.")
        # Assign the single class to all test samples
        test_preds_attr = np.full(len(test_df), y[0])
    else:
        # Load the model specific to the attribute
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        model_name = './pvt_v2_b2'
        feature_extractor = AutoProcessor.from_pretrained(model_name)
        model = AutoModelForImageClassification.from_pretrained(
            model_name, num_labels=num_classes, output_hidden_states=True, ignore_mismatched_sizes=True)
        model.to(device)
        model.eval()

        # Specify the model path for the current attribute
        model_path = f"./model weights/Kurtis/pvt_classifier_kurtis_attr_{attr_idx}_best.pth"
        if not os.path.exists(model_path):
            print(f"Model path {model_path} does not exist for {attr}, skipping.")
            submission_df[attr] = 'dummy_value'
            continue

        # Load the state dictionary
        state_dict = torch.load(model_path, map_location=device)
        model.load_state_dict(state_dict)
        model.eval()

        # Prepare the dataset for the current attribute
        feature_dataset = FeatureExtractionDataset(df, image_dir, feature_extractor)
        feature_loader = DataLoader(feature_dataset, batch_size=8, shuffle=False, num_workers=4, pin_memory=True)

        # Extract features for training data with adaptive pooling
        features_list = []
        with torch.no_grad():
            for pixel_values, img_ids in tqdm(feature_loader, desc=f"Extracting Features for {attr}"):
                pixel_values = pixel_values.to(device)
                outputs = model(pixel_values=pixel_values)
                hidden_states = outputs.hidden_states[-1]  # Shape: (batch_size, channels, height, width)
                # Apply adaptive pooling
                pooled_features = F.adaptive_avg_pool2d(hidden_states, output_size=(1, 1))
                pooled_features = pooled_features.view(pooled_features.size(0), -1).cpu().numpy()
                features_list.append(pooled_features)
        X_attr = np.concatenate(features_list, axis=0)

        # Extract features for test data using the same model
        test_feature_dataset = FeatureExtractionDataset(test_df, test_image_dir, feature_extractor)
        test_feature_loader = DataLoader(test_feature_dataset, batch_size=8, shuffle=False, num_workers=4, pin_memory=True)

        test_features_list = []
        with torch.no_grad():
            for pixel_values, img_ids in tqdm(test_feature_loader, desc=f"Extracting Test Features for {attr}"):
                pixel_values = pixel_values.to(device)
                outputs = model(pixel_values=pixel_values)
                hidden_states = outputs.hidden_states[-1]
                pooled_features = F.adaptive_avg_pool2d(hidden_states, output_size=(1, 1))
                pooled_features = pooled_features.view(pooled_features.size(0), -1).cpu().numpy()
                test_features_list.append(pooled_features)
        test_features = np.concatenate(test_features_list, axis=0)

        # Compute class weights
        class_counts = Counter(y)
        total_samples = len(y)
        class_weights = {class_label: total_samples / count for class_label, count in class_counts.items()}
        class_weights_list = [class_weights[i] for i in range(len(class_counts))]

        # Define the classifier model
        xgb_model = XGBClassifier(
        n_estimators=200,              
        learning_rate=0.05,                    
        tree_method='hist',
        device='cuda',               
        random_state=42     
        )

        # Train the model
        oof_preds, test_preds_attr, _ = TrainML(xgb_model, X_attr, y, test_features)

    # Decode predictions
    decoded_predictions = label_encoder.inverse_transform(test_preds_attr)

    # Add predictions to the submission DataFrame
    submission_df[attr] = decoded_predictions


In [None]:
# Fill missing attribute columns with 'dummy_value'
for attr in attr_columns:
    if attr not in submission_df.columns:
        submission_df[attr] = 'dummy_value'

In [None]:
# Ensure the correct column order
submission_df = submission_df[['id', 'Category', 'len'] + attr_columns]

In [None]:
# Save the submission file
submission_df.to_csv('submission_kurtis.csv', index=False)
print("Submission file created successfully!")