In [None]:
# Import Packages
import pandas as pd
import numpy as np
import os
from PIL import Image
from tqdm import tqdm
import torch
from transformers import ViTImageProcessor, ViTForImageClassification, Trainer, TrainingArguments, ViTFeatureExtractor, ViTForImageClassification
from sklearn.metrics import f1_score, accuracy_score
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from datasets import Dataset, DatasetDict


In [None]:
import accelerate
import transformers

accelerate.__version__
transformers.__version__, accelerate.__version__

In [None]:
# Load the Data from CSV
csv_file = "clean_MovieGenre.csv"  # Path to your CSV file
df = pd.read_csv(csv_file)

# Specify the binarized columns
binarized_columns = ['Action', 'Adventure', 'Animation', 'Biography', 'Comedy', 'Crime', 'Documentary', 
                     'Drama', 'Family', 'Fantasy', 'Film-Noir', 'History', 'Horror', 'Music', 'Musical', 
                     'Mystery', 'Romance', 'Sci-Fi', 'Short', 'Sport', 'Thriller', 'War', 'Western']
image_folder = "sample_images"  # sample_images for developing, downloaded_images for final


In [None]:
images = []
labels = []
batch_size = 128  

# Load images and labels in batches
num_batches = len(df) // batch_size + 1

for batch_num in tqdm(range(num_batches)):
    start_idx = batch_num * batch_size
    end_idx = min((batch_num + 1) * batch_size, len(df))
    
    batch_df = df.iloc[start_idx:end_idx]
    batch_images = []
    batch_labels = []
    
    for index, row in batch_df.iterrows():
        filename = str(row.iloc[0]) + ".jpg"  # filenames match the imdbIDs
        label = [int(row[column]) for column in binarized_columns]  # Extract binarized labels for each genre

        image_path = os.path.join(image_folder, filename)
        try:
            image = Image.open(image_path)  # Open image using PIL
            image = image.convert("RGB")  # Convert image to RGB mode if necessary
            image_array = np.array(image)  # Convert PIL Image to numpy array
            batch_images.append(image_array)
            batch_labels.append(label)
        except Exception as e:
            pass
    
    # Concatenate the batches of images and labels
    if batch_images:
        images.append(np.array(batch_images))
        labels.append(np.array(batch_labels))

# Concatenate all batches into single numpy arrays
if images:
    images = np.concatenate(images, axis=0)
    labels = np.concatenate(labels, axis=0)

# Print the shapes of the loaded data
print("Images shape:", images.shape)
print("Labels shape:", labels.shape)


In [None]:
# Display sample images
plt.figure(figsize=(12, 12))
for i in range(9):
    ax = plt.subplot(3, 3, i + 1)
    plt.imshow(images[i])
    plt.axis('off')
    plt.title([column for column, label in zip(binarized_columns, labels[i]) if label == 1], fontsize=15)
plt.tight_layout()
plt.show()


In [None]:
# Perform train-test split
X_train, X_test, y_train, y_test = train_test_split(images, labels, test_size=0.2, random_state=42)  # change test_size if want quicker runtime

# Print the shapes of the split datasets
print("Training images shape:", X_train.shape)
print("Training labels shape:", y_train.shape)
print("Testing images shape:", X_test.shape)
print("Testing labels shape:", y_test.shape)

train_label_distribution = np.sum(y_train, axis=0)
test_label_distribution = np.sum(y_test, axis=0)

combined_data = {
    "Label": binarized_columns,
    "Train Distribution": train_label_distribution,
    "Test Distribution": test_label_distribution
}

df = pd.DataFrame(combined_data)
print(df)

In [None]:
# Create Dataset from Numpy arrays
train_dataset = Dataset.from_dict({
    "image": list(X_train),
    "label": list(y_train)
})

test_dataset = Dataset.from_dict({
    "image": list(X_test),
    "label": list(y_test)
})

# Combine into a DatasetDict
dataset_dict = DatasetDict({
    "train": train_dataset,
    "test": test_dataset
})

print(dataset_dict)
first_example = dataset_dict["train"][0]
print(first_example)

In [None]:
from transformers import ViTImageProcessor
from torchvision.transforms import Resize, Compose
from torchvision.transforms import ToPILImage
import torch

processor = ViTImageProcessor.from_pretrained("google/vit-base-patch16-224-in21k")
size = processor.size["height"]

_train_transforms = Compose([
    Resize((size, size)),
])

_val_transforms = Compose([
    Resize((size, size)),
])

def train_transforms(example):
    images = [ToPILImage()(np.array(image)) for image in example['image']] 
    transformed_images = [_train_transforms(image) for image in images]  
    example['image'] = transformed_images
    return example

def val_transforms(example):
    images = [ToPILImage()(np.array(image)) for image in example['image']]  
    transformed_images = [_val_transforms(image) for image in images]  
    example['image'] = transformed_images
    return example

train_dataset = train_dataset.map(train_transforms)
test_dataset = test_dataset.map(val_transforms)

def collate_fn(examples):
    pixel_values = torch.stack([example["pixel_values"] for example in examples])
    labels = torch.tensor([example["label"] for example in examples])
    return {"pixel_values": pixel_values, "labels": labels}

train_dataloader = torch.utils.data.DataLoader(train_dataset, collate_fn=collate_fn, batch_size=16)
test_dataloader = torch.utils.data.DataLoader(test_dataset, collate_fn=collate_fn, batch_size=16)

In [None]:
# Model Specification
# processor = ViTImageProcessor.from_pretrained('google/vit-base-patch16-224-in21k')
model = ViTForImageClassification.from_pretrained(
    'google/vit-base-patch16-224-in21k',
    num_labels=len(binarized_columns),
    problem_type="multi_label_classification"
)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print("Using device:", device)
model = model.to(device)


In [None]:
# TrainingArguments and Trainer
training_args = TrainingArguments(
    output_dir='./results',
    evaluation_strategy="epoch",
    save_strategy="epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=5,
    weight_decay=0.01,
    logging_dir='./logs',
    load_best_model_at_end=True,
    metric_for_best_model="precision",
)

def compute_metrics(p):
    logits, labels = p
    preds = (logits > 0.5).astype(int)
    f1 = f1_score(labels, preds, average='micro')
    return {"f1": f1}

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=dataset_dict["train"],
    eval_dataset=dataset_dict["test"],
    compute_metrics=compute_metrics,
)


In [None]:
### Training
trainer.train()
trainer.evaluate()


In [None]:
# Perform inference
predicted_probabilities = []

# Test DataLoader
test_loader = torch.utils.data.DataLoader(dataset_dict["test"], batch_size=batch_size, shuffle=False)

model.eval()
with torch.no_grad():
    for batch in test_loader:
        inputs = {k: v.to(device) for k, v in batch.items() if k != 'label'}
        outputs = model(**inputs)
        probabilities = torch.sigmoid(outputs.logits)
        predicted_probabilities.append(probabilities.cpu().numpy())

# Concatenate predicted probabilities for all batches
predicted_probabilities = np.concatenate(predicted_probabilities, axis=0)


In [None]:
# Convert lists to NumPy arrays
y_test_np = np.vstack([np.array(item) for item in dataset_dict["test"]['label']])
predicted_probabilities_np = np.array(predicted_probabilities)

# Define the threshold for binary classification
threshold = 0.5

# Threshold predicted probabilities to obtain binary predictions
predicted_labels_np = (predicted_probabilities_np > threshold).astype(int)

# Initialize lists to store F1 scores and accuracies for each label
f1_scores = []
accuracies = []

# Iterate over each label
for i, label in enumerate(binarized_columns):
    # Calculate F1 score
    f1 = f1_score(y_test_np[:, i], predicted_labels_np[:, i], average='binary')
    f1_scores.append(f1)
    
    # Calculate accuracy
    accuracy = accuracy_score(y_test_np[:, i], predicted_labels_np[:, i])
    accuracies.append(accuracy)

data = {
    "Label": binarized_columns,
    "F1 Score": f1_scores,
    "Accuracy": accuracies
}

df_metrics = pd.DataFrame(data)
print(df_metrics)

# Compute average F1 score
avg_f1_score = np.mean(f1_scores)
print("\nAverage F1 score:", avg_f1_score)

# Compute average accuracy
average_accuracy = sum(accuracies) / len(accuracies)
print("Average accuracy:", average_accuracy)


In [None]:
def display(X_test, y_test, predicted_probabilities, binarized_columns, threshold=0.5):
    fig, axes = plt.subplots(3, 3, figsize=(12, 12))

    for idx, ax in enumerate(axes.flat):
        if idx < len(X_test):
            poster = X_test[idx]
            true_labels = [binarized_columns[i] for i, label in enumerate(y_test[idx]) if label == 1]
            predicted_genre_probabilities = predicted_probabilities[idx]

            # Sort predicted probabilities and select labels based on the number of true labels
            num_true_labels = len(true_labels)
            top_predicted_idx = np.argsort(predicted_genre_probabilities)[::-1][:num_true_labels]
            predicted_genre_labels = [binarized_columns[i] for i in top_predicted_idx]
            top_predicted_probabilities = predicted_genre_probabilities[top_predicted_idx]

            ax.imshow(poster)
            ax.set_title(f"True Genres: {true_labels}\nPredicted Genres: {predicted_genre_labels}\nProbabilities: {top_predicted_probabilities}", fontsize=10)
            ax.axis('off')
        else:
            ax.axis('off')  # Hide empty subplots
        
    plt.tight_layout()
    plt.show()

# Note: Ensure X_test, y_test, predicted_probabilities are available in the scope when calling display function.
display(X_test, y_test, predicted_probabilities, binarized_columns)
