In [None]:
import pandas as pd

df = pd.read_csv("data/imdb_top_1000.csv")
df.head(3)

In [None]:
df = df[["Overview", "Series_Title", "Genre"]]

In [None]:
from sklearn.svm import LinearSVC
from sklearn.preprocessing import LabelEncoder
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.pipeline import make_pipeline
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix

label_column = "Genre"

# we filter and take only the first genre for simplicity
df[label_column] = df[label_column].apply(lambda x: x.split(",")[0])

# also, we filter the data to only include the top 5 genres
top_genres = df[label_column].value_counts().nlargest(5).index
df = df[df[label_column].isin(top_genres)]

# split the data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(
    df["Overview"],
    df[label_column],
    test_size=0.2,
    random_state=42,
    stratify=df[label_column],
)

le = LabelEncoder()
y_train = le.fit_transform(y_train)
y_test = le.transform(y_test)

tfidf = TfidfVectorizer()
X_train = tfidf.fit_transform(X_train)
X_test = tfidf.transform(X_test)

model = LinearSVC()
model.fit(X_train, y_train)

# make predictions
y_pred = model.predict(X_test)

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

def draw_classification_report(y_test, y_pred):
	cr = classification_report(y_test, y_pred, zero_division=0, target_names=le.classes_, output_dict=True)

	fig, ax = plt.subplots(figsize=(10, 6))
	sns.heatmap(
		pd.DataFrame(cr).iloc[:-1, :].T,
		annot=True,
		fmt=".2f",
		cmap="Greens",
		cbar=False,
		ax=ax,
	)
	ax.set_title("Classification Report")
	ax.set_xlabel("Classes")
	ax.set_ylabel("Metrics")
	plt.xticks(rotation=45)
	plt.yticks(rotation=0)
	plt.tight_layout()
	plt.show()

draw_classification_report(y_test, y_pred)


In [None]:

import seaborn as sns
import matplotlib.pyplot as plt


cm = confusion_matrix(y_test, y_pred, labels=range(len(le.classes_)), normalize="true")

# draw confusion matrix
plt.figure(figsize=(10, 8))
sns.heatmap(
	cm,
	annot=True,
	fmt=".2f",
	cmap="Blues",
	xticklabels=le.classes_,
	yticklabels=le.classes_,
)


## Train a feedforward network using glove embeddings

In [None]:
import torch.nn as nn
import torch
import torch.nn.functional as F
from torchtext.vocab import GloVe


class GloveModel(nn.Module):
    def __init__(self, embed_dim, num_classes):
        super(GloveModel, self).__init__()
        global_vectors = GloVe(name='6B', dim=embed_dim)
        glove_weights = torch.load(f".vector_cache/glove.6B.{embed_dim}d.txt.pt")
        self.embedding = nn.Embedding.from_pretrained(glove_weights[2])
        self.linear1 = nn.Linear(embed_dim, num_classes)
        self.dropout = nn.Dropout(p=0.2)

    def forward(self, x):
        x = self.embedding(x)
		# 20% of matrix cells become zero
        x = self.dropout(x)
		# take the maximum per vector (max pooling)
        x = x.max(dim=1)[0]
		# project to the number of classes
        out = self.linear1(x)
        return out

In [None]:
from tqdm import tqdm
from torch.nn.utils.rnn import pad_sequence
from sklearn.preprocessing import LabelEncoder

# Define hyperparameters
embed_dim = 50
num_classes = len(le.classes_)
learning_rate = 0.001
num_epochs = 100
batch_size = 32
early_stopping_patience = 3

# Initialize the model, loss function, and optimizer
model = GloveModel(embed_dim=embed_dim, num_classes=num_classes)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)


# Prepare data for training
def preprocess_text(texts, vocab):
    return [
        [vocab.stoi[word] for word in text.split() if word in vocab.stoi]
        for text in texts
    ]


# split the data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(
    df["Overview"],
    df[label_column],
    test_size=0.2,
    random_state=42,
    stratify=df[label_column],
)

# Encode labels
le = LabelEncoder()
y_train = le.fit_transform(y_train)
y_test = le.transform(y_test)

global_vectors = GloVe(name="6B", dim=embed_dim)
X_train_indices = preprocess_text(X_train, global_vectors)
X_train_tensor = [torch.tensor(x, dtype=torch.long) for x in X_train_indices]

X_train_padded = pad_sequence(X_train_tensor, batch_first=True)
y_train_tensor = torch.tensor(y_train, dtype=torch.long)

epoch_acc = []

# Training loop
for epoch in tqdm(range(num_epochs), desc="Epochs", unit="epoch"):
    model.train()
    epoch_loss = 0
    correct = 0
    total = 0

    for i in tqdm(
        range(0, len(X_train_padded), batch_size), desc="Batches", unit="batch"
    ):
        batch_X = X_train_padded[i : i + batch_size]
        batch_y = y_train_tensor[i : i + batch_size]

        # Forward pass
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total += batch_y.size(0)
        correct += (predicted == batch_y).sum().item()

    accuracy = correct / total
    if len(epoch_acc) > early_stopping_patience and max(epoch_acc[:-early_stopping_patience]) >= accuracy:
        # stop training if accuracy does not improve
        break
    epoch_acc.append(accuracy)

    # TODO: add learning curves

    print(
        f"Epoch [{epoch + 1}/{num_epochs}], Loss: {epoch_loss:.4f}, Accuracy: {accuracy:.4f}"
    )

In [None]:
# predict on test set
model.eval()
X_test_indices = preprocess_text(X_test, global_vectors)
X_test_tensor = [torch.tensor(x, dtype=torch.long) for x in X_test_indices]
X_test_padded = pad_sequence(X_test_tensor, batch_first=True)
y_test_tensor = torch.tensor(y_test, dtype=torch.long)
y_pred_tensor = model(X_test_padded)
_, y_pred = torch.max(y_pred_tensor, 1)
# Convert predictions to numpy array
y_pred = y_pred.detach().numpy()
# Convert y_test_tensor to numpy array
y_test = y_test_tensor.detach().numpy()

draw_classification_report(y_test, y_pred)
