# Setup

## Installs

In [None]:
!pip install git+https://github.com/openai/CLIP.git

## Imports

In [None]:
import os
import re
import collections
import random
import json
import typing
import warnings
from collections import defaultdict
from typing import Dict, List, Tuple
from pathlib import Path
from logging import getLogger, Logger, INFO, StreamHandler, FileHandler, Formatter

from tqdm import tqdm
import pandas as pd
import numpy as np
from PIL import Image

import torch
from torch import nn, optim
from torch.utils.data import Dataset, DataLoader
from torch.nn import functional as F

import torchvision
from torchvision import transforms
import torchmetrics

import clip
warnings.filterwarnings('ignore')
print("Libraries Imported Successfully!")

## Utility

In [None]:
def save_model(model: nn.Module, target_dir: str, model_name: str):
    """
    Save the model's state dictionary to a given directory.
    """
    target_dir_path = Path(target_dir)
    target_dir_path.mkdir(parents=True, exist_ok=True)  # Create the directory if it doesn't exist

    assert model_name.endswith(".pth") or model_name.endswith(".pt"), "model_name should end with '.pt' or '.pth'"
    
    model_save_path = target_dir_path / model_name
    print(f"[INFO] Saving model to: {model_save_path}")
    torch.save(obj=model.state_dict(), f=model_save_path)


def load_model(model: nn.Module, target_dir: str, model_name: str):
    """
    Load the model's state dictionary from a given directory.
    """
    assert model_name.endswith(".pth") or model_name.endswith(".pt"), "model_name should end with '.pt' or '.pth'"
    target_dir_path = Path(target_dir)
    
    model_load_path = target_dir_path / model_name
    assert model_load_path.is_file(), f"Model file not found at: {model_load_path}"
    
    print(f"[INFO] Loading model from: {model_load_path}")
    model.load_state_dict(torch.load(model_load_path))  # Load the model's state dictionary
    return model

In [None]:
def seed_everything(seed=42):
    """
    Set a seed for reproducibility across various libraries.
    """
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)  # Set the PYTHONHASHSEED environment variable
    np.random.seed(seed)  # Set the seed for NumPy
    torch.manual_seed(seed)  # Set the seed for PyTorch
    torch.cuda.manual_seed(seed)  # Set the seed for CUDA (if using GPU)
    torch.backends.cudnn.deterministic = True  # Make CuDNN deterministic

### Hyperparamters & Directories

In [None]:
# Hyperparameters setup
NUM_EPOCHS = 20
BATCH_SIZE = 32
LEARNING_RATE = 1e-4
NUM_WORKERS = 4

seed_everything(seed=42)

# Setup device (GPU if available, else CPU)
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Device: {device}")

In [None]:
clip_model, preprocess = clip.load("ViT-B/32", device=device)

In [None]:
data_dir = "/kaggle/input/visual-taxonomy"
train_img_dir = os.path.join(data_dir, "train_images")
train_path = os.path.join(data_dir, "train.csv")
test_img_dir = os.path.join(data_dir, "test_images")
test_path = os.path.join(data_dir, "test.csv")
cat_path = os.path.join(data_dir, "category_attributes.parquet")

print(f"{data_dir}")
print(f"{train_img_dir}")
print(f"{train_path}")
print(f"{test_img_dir}")
print(f"{test_path}")
print(f"{cat_path}")

# Data

In [None]:
# Load datasets
category_attributes = pd.read_parquet(cat_path)
train_df = pd.read_csv(train_path)
test_df = pd.read_csv(test_path)

In [None]:
# Create a dictionary mapping category to attributes and the number of attributes
category_to_attributes = {}
for _, row in category_attributes.iterrows():
    category_to_attributes[row['Category']] = {
        "num_attributes": row["No_of_attribute"],
        "attributes": row["Attribute_list"]
    }
category_to_attributes

In [None]:
# Load the attribute information from `category_attributes.parquet`
category_attributes_df = pd.read_parquet(cat_path)

# Initialize a dictionary to store vocabularies for each category's attributes
vocabularies = defaultdict(lambda: defaultdict(dict))

# Parse through each category to build attribute vocabularies
for _, row in category_attributes_df.iterrows():
    category = row["Category"]
    attributes = row["Attribute_list"]
    num_attributes = row["No_of_attribute"]
    
    # Initialize vocabularies for each attribute in the category
    for i, attribute_name in enumerate(attributes[:num_attributes]):
        vocabularies[category][attribute_name] = {}  # Unknown class initialized with 0 ID

# Load training data
train_df = pd.read_csv(train_path)

# Populate vocabularies with unique values for each attribute
for _, row in train_df.iterrows():
    category = row["Category"]
    for i, attribute_name in enumerate(vocabularies[category].keys()):
        attr_value = row[f"attr_{i+1}"]
        
        # Add unique attribute values to the vocabulary
        if attr_value not in vocabularies[category][attribute_name]:
            vocabularies[category][attribute_name][attr_value] = len(vocabularies[category][attribute_name])

# Example output for vocabularies
print("Sample vocabulary for 'Sarees' category attributes:")
for attr_name, attr_vocab in vocabularies["Sarees"].items():
    print(f"{attr_name}: {attr_vocab}")

In [None]:
import pandas as pd

# Define the CategoryAttributes class
class CategoryAttributes:
    def __init__(self, name, num_attributes):
        self.name = name
        self.num_attributes = num_attributes
        self.attributes = {}  # Dictionary to hold attribute names and their values

    def add_attribute(self, attribute_name):
        if attribute_name not in self.attributes:
            self.attributes[attribute_name] = {}

    def add_value(self, attribute_name, value):
        if attribute_name in self.attributes:
            if value not in self.attributes[attribute_name]:
                self.attributes[attribute_name][value] = len(self.attributes[attribute_name])

category_attributes_df = pd.read_parquet(cat_path)

# Initialize a list to store CategoryAttributes instances
category_attributes_list = []

# Parse through each category to build attribute vocabularies
for _, row in category_attributes_df.iterrows():
    category_name = row["Category"]
    num_attributes = row["No_of_attribute"]
    
    # Create a new CategoryAttributes instance
    category_attr = CategoryAttributes(category_name, num_attributes)
    
    # Initialize attributes for each category
    attributes = row["Attribute_list"]
    for attribute_name in attributes[:num_attributes]:
        category_attr.add_attribute(attribute_name)
    
    # Store the CategoryAttributes instance
    category_attributes_list.append(category_attr)

# Adjust the path accordingly
train_df = pd.read_csv(train_path)

# Populate the attributes with unique values from training data
for _, row in train_df.iterrows():
    category_name = row["Category"]
    
    # Find the corresponding CategoryAttributes instance
    for category_attr in category_attributes_list:
        if category_attr.name == category_name:
            for i, attribute_name in enumerate(category_attr.attributes.keys()):
                attr_value = row[f"attr_{i+1}"]
                
                # Add unique attribute values to the corresponding attribute
                category_attr.add_value(attribute_name, attr_value)

# Example output for vocabularies
for category_attr in category_attributes_list:
    print(f"Sample vocabulary for '{category_attr.name}' category attributes:")
    for attr_name, values in category_attr.attributes.items():
        print(f"{attr_name}: {values}")

In [None]:
# Generate prompts for each attribute class in the vocabularies
attribute_prompts = defaultdict(lambda: defaultdict(list))
text_features = defaultdict(lambda: defaultdict(list))

for category, attributes in vocabularies.items():
    for attr_name, classes in attributes.items():
        prompts = [f"a photo of a {class_name} for the {attr_name} in {category}" for class_name in classes.keys()]
        attribute_prompts[category][attr_name] = prompts

        # Tokenize and encode the prompts for each attribute
        tokens = clip.tokenize(prompts).to(device)
        with torch.no_grad():
            encoded_text = clip_model.encode_text(tokens)
            text_features[category][attr_name] = encoded_text

In [None]:
for category, attributes in vocabularies.items():
    print(f"Category: {category}")
    for attr_name, class_dict in attributes.items():
        print(f"  Attribute: {attr_name} - Classes: {list(class_dict.keys())}")

In [None]:
# Data augmentation and normalization for training
data_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomVerticalFlip(p=0.3),
    transforms.RandomRotation(degrees=15),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.RandomAffine(degrees=0, translate=(0.1, 0.1), scale=(0.9, 1.1)),
    transforms.RandomPerspective(distortion_scale=0.2, p=0.5),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
    transforms.RandomErasing(p=0.2, scale=(0.02, 0.33), ratio=(0.3, 3.3), value='random'),
    transforms.RandomApply([transforms.GaussianBlur(kernel_size=3)], p=0.3)
])

In [None]:
import torch
from torch.utils.data import Dataset

class FashionDataset(Dataset):
    def __init__(self, dataframe, image_dir, vocabularies, transform=None):
        self.data = dataframe
        self.image_dir = image_dir
        self.vocabularies = vocabularies
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        img_id = row["id"]
        category = row["Category"]
        
        # Load image
        image_path = os.path.join(data_dir, f"{self.image_dir}/{img_id:06d}.jpg")
        image = Image.open(image_path).convert("RGB")
        if self.transform:
            image = self.transform(image)

        # Get attribute labels as IDs
        attributes = []
        for i, attr_name in enumerate(self.vocabularies[category].keys()):
            attr_value = row[f"attr_{i+1}"]
            attr_id = self.vocabularies[category][attr_name].get(attr_value, 0)  # 0 for unknown
            attributes.append(attr_id)

        return image, category, torch.tensor(attributes)

# Model

In [None]:
import torch.nn as nn

class MultiOutputCLIPModel(nn.Module):
    def __init__(self, clip_model, category_to_attributes, vocabularies):
        super(MultiOutputCLIPModel, self).__init__()
        self.clip_model = clip_model
        self.vocabularies = vocabularies

        # Initialize classifiers for each category's attributes
        self.classifiers = nn.ModuleDict()
        for category, attributes in vocabularies.items():
            classifiers = nn.ModuleList()
            for attr_name in attributes:
                num_classes = len(attributes[attr_name])  # Size of vocabulary for each attribute
                classifier = nn.Linear(clip_model.visual.output_dim, num_classes)
                classifiers.append(classifier)
            self.classifiers[category] = classifiers

    def forward(self, images, categories):
        with torch.no_grad():
            image_features = self.clip_model.encode_image(images)

        outputs = {}
        for i, category in enumerate(categories):
            classifiers = self.classifiers[category]
            outputs[category] = [classifier(image_features[i]) for classifier in classifiers]

        return outputs

# Training

In [None]:
# # Prepare dataloaders
# train_dataset = FashionDataset(train_df, 'train_images', category_to_attributes, preprocess)
# train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

# # Initialize model, optimizer, and loss function
# multi_output_model = MultiOutputCLIPModel(clip_model, category_to_attributes, attribute_prompts).to(device)
# optimizer = torch.optim.Adam(multi_output_model.parameters(), lr=1e-4)
# criterion = nn.CrossEntropyLoss()  # Use Cross-Entropy as each attribute is mutually exclusive

# # Training loop
# for epoch in range(NUM_EPOCHS):
#     multi_output_model.train()
#     total_loss = 0
    
#     with tqdm(train_loader, unit="batch") as tepoch:
#         tepoch.set_description(f"Epoch {epoch + 1}/{NUM_EPOCHS}")

#         for images, categories, attributes in tepoch:
#             images = images.to(device)
            
#             # Forward pass
#             outputs = multi_output_model(images, categories)
#             loss = 0

#             # Calculate loss for each category and attribute
#             for i, category in enumerate(categories):
#                 for j, attribute in enumerate(vocabularies[category].keys()):
#                     attr_id = attributes[i][j]
#                     output = outputs[category][j].unsqueeze(0)  # shape to [1, num_classes] if necessary
                    
#                     # Calculate loss for each attribute
#                     loss += criterion(output, attr_id.view(-1))

#             # Backpropagation
#             optimizer.zero_grad()
#             loss.backward()
#             optimizer.step()

#             # Update the total loss
#             total_loss += loss.item()
#             tepoch.set_postfix(loss=total_loss / len(train_loader))

#     print(f"Epoch {epoch + 1}, Average Loss: {total_loss / len(train_loader)}")

In [None]:
def zero_shot_predict(id, category, text_features, attribute_prompts):
    # Generate the image path and load/preprocess the image
    image_path = os.path.join(train_img_dir, f"{id:06d}.jpg")
    image = preprocess(Image.open(image_path)).unsqueeze(0).to(device)
    
    # Encode image with CLIP model
    with torch.no_grad():
        image_features = clip_model.encode_image(image)

    predictions = {}
    
    # Iterate over each attribute and its associated class features
    for attr_name, class_features in text_features[category].items():
        # Compute cosine similarity between image and each class feature vector
        similarities = (image_features @ class_features.T).squeeze(0)
        best_class_idx = similarities.argmax().item()
        
        # Extract the prompt and split to get the predicted class name
        best_prompt = attribute_prompts[category][attr_name][best_class_idx]
        best_class_name = best_prompt.split()[-1]  # Assuming class name is the last word

        # Debug: Print the prompt and class name to verify correctness
        print(f"Attribute: {attr_name}, Prompt: {best_prompt}, Predicted Class: {best_class_name}")
        
        # Store the prediction
        predictions[attr_name] = best_class_name

    return predictions

In [None]:
# Initialize an empty list to store results
results = []
max_examples = 20  # Limit the number of examples for testing

# Iterate over each row in train_df to generate predictions
for idx, row in train_df.iterrows():
    if idx >= max_examples:
        break  # Stop after max_examples for testing purposes
    
    # Get image ID and category for the current row
    image_id = row["id"]
    category = row["Category"]
    
    # Perform zero-shot prediction with debugging information
    predictions = zero_shot_predict(image_id, category, text_features, attribute_prompts)
    
    # Build the row dictionary in the required format
    row_result = {
        "id": image_id,
        "Category": category
    }
    
    # Fill in each attribute prediction based on `vocabularies` keys
    for i, attr_name in enumerate(vocabularies[category].keys()):
        row_result[f"attr_{i+1}"] = predictions.get(attr_name, "Unknown")  # Set to "Unknown" if no prediction
    
    # Append the result to the results list
    results.append(row_result)

# Convert the list of results into a DataFrame
results_df = pd.DataFrame(results)

# Display the first few rows of the predictions DataFrame
print(results_df.head())

In [None]:
# Generate prompts with modified structure for attribute classes
for category, attributes in vocabularies.items():
    for attr_name, classes in attributes.items():
        # Revised prompt structure with class name at the start
        prompts = [f"a photo of a {class_name} for the {attr_name} in {category}" for class_name in classes.keys()]
        attribute_prompts[category][attr_name] = prompts

        # Tokenize and encode the prompts
        tokens = clip.tokenize(prompts).to(device)
        with torch.no_grad():
            encoded_text = clip_model.encode_text(tokens)
            text_features[category][attr_name] = encoded_text

# Revised zero-shot prediction function with debug info
def zero_shot_predict(id, category, text_features, attribute_prompts):
    # Load and preprocess image
    image_path = os.path.join(train_img_dir, f"{id:06d}.jpg")
    image = preprocess(Image.open(image_path)).unsqueeze(0).to(device)
    
    # Encode image
    with torch.no_grad():
        image_features = clip_model.encode_image(image)

    predictions = {}
    
    # Iterate over attributes to predict each one
    for attr_name, class_features in text_features[category].items():
        similarities = (image_features @ class_features.T).squeeze(0)
        best_class_idx = similarities.argmax().item()
        
        # Extract the class name based on updated prompt structure
        best_prompt = attribute_prompts[category][attr_name][best_class_idx]
        best_class_name = best_prompt.split()[3]  # Extract based on revised prompt format
        
        # Debug print to confirm
        print(f"Attribute: {attr_name}, Prompt: {best_prompt}, Predicted Class: {best_class_name}")
        
        # Save prediction
        predictions[attr_name] = best_class_name

    return predictions

# Run a loop with debug checks
results = []
for idx, row in train_df.iterrows():
    if idx >= 20:
        break  # Limit examples for testing
    
    # Get predictions
    image_id = row["id"]
    category = row["Category"]
    predictions = zero_shot_predict(image_id, category, text_features, attribute_prompts)
    
    # Format row results
    row_result = {
        "id": image_id,
        "Category": category
    }
    for i, attr_name in enumerate(vocabularies[category].keys()):
        row_result[f"attr_{i+1}"] = predictions.get(attr_name, "Unknown")

    results.append(row_result)

# Convert to DataFrame for analysis
results_df = pd.DataFrame(results)
print(results_df.head())

## Metrics, Loss, Optimizer

In [None]:
# Example: assume there are 5 categories with [3, 4, 2, 5, 3] attributes respectively
category_attribute_counts = [3, 4, 2, 5, 3]
metric = AttributeScoreMetric(category_attribute_counts)

# Assume preds and labels are torch tensors of shape (batch_size, total_attributes)
# Example: batch size of 10, total attributes = 3 + 4 + 2 + 5 + 3 = 17
batch_size = 10
total_attributes = sum(category_attribute_counts)
preds = torch.randint(0, 2, (batch_size, total_attributes))  # Random predictions for binary classification
labels = torch.randint(0, 2, (batch_size, total_attributes))  # Random ground truth labels

# Compute the score
score = metric(preds, labels)
print("Overall Score:", score.item())

# Inference

## Submission