# Prediction of the "classification" label 

In [None]:
import torch
from transformers import AutoTokenizer, AutoModel, RobertaModel, RobertaTokenizer, ViTModel, BlipProcessor, BlipForQuestionAnswering
import pickle 
import torch.nn as nn
import pandas as pd
import numpy as np
import os

from sklearn.model_selection import train_test_split
import torch.optim as optim
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
%matplotlib inline
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import LabelEncoder
from torch.utils.data import TensorDataset, DataLoader, Dataset
from torch.nn import CrossEntropyLoss
from PIL import Image
from torchvision.transforms import Compose, Resize, ToTensor, Normalize

In [None]:
df_combined = pd.read_csv('df_combined.csv')
df_combined

## NLP prediction

https://huggingface.co/BAAI/bge-reranker-large

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

In [None]:
size_limit: int = 60

final_text_embeddings = []

# Load model from HuggingFace Hub
tokenizer = AutoTokenizer.from_pretrained('BAAI/bge-small-en-v1.5')
model = AutoModel.from_pretrained('BAAI/bge-small-en-v1.5').to(device)
model.eval()

filename = 'text_embeddings.obj'

if not os.path.exists(filename):
    for index in range(0, len(df_combined), size_limit):

        # Sentences we want sentence embeddings for
        # sentences = df_combined.text.to_list()[:size_limit]
        sentences = df_combined.text.to_list()[max(0, index):min(index + size_limit, len(df_combined))]

        # Tokenize sentences
        encoded_input = tokenizer(sentences, padding=True, truncation=True, return_tensors='pt').to(device)
        # for s2p(short query to long passage) retrieval task, add an instruction to query (not add instruction for passages)
        # encoded_input = tokenizer([instruction + q for q in queries], padding=True, truncation=True, return_tensors='pt')

        # Compute token embeddings
        with torch.no_grad():
            model_output = model(**encoded_input)
            # Perform pooling. In this case, cls pooling.
            sentence_embeddings = model_output["last_hidden_state"][:, 0]
        # normalize embeddings
        sentence_embeddings = torch.nn.functional.normalize(sentence_embeddings, p=2, dim=1)
        #print("Sentence embeddings:", sentence_embeddings)
        final_text_embeddings.append(sentence_embeddings)
        print(index)

        filehandler = open(filename, 'wb') 
        pickle.dump(final_text_embeddings, filehandler)
        filehandler.close()
else:
    filehandler = open(filename, 'rb') 
    final_text_embeddings = pickle.load(filehandler)
    filehandler.close()
    
final_text_embeddings

In [None]:
type(model)

In [None]:
#sentence_embeddings.shape

In [None]:
final_text_embeddings

In [None]:
final_text_embeddings[0].shape

In [None]:
#scores = sentence_embeddings[0] @ sentence_embeddings[1].T
scores = final_text_embeddings[0][0] @ final_text_embeddings[0][1].T
scores

In [None]:
scores.shape

In [None]:
df_combined["classification_by_editorial"].value_counts()

In [None]:
df_combined["classification"].unique().__len__()

In [None]:
# TODO: TEMPORARY FIX, NEED TO RECOMPUTE EMBEDDINGS

#non_latin_rows = df_combined[(df_combined['text'].str.contains(r'[^\x00-\x7F]')) | (df_combined['title'].str.contains(r'[^\x00-\x7F]'))]
#df_combined = non_latin_rows
#df_combined.__len__()

In [None]:
# flatten the final embeddings list into a numpy array
final_text_embeddings = torch.cat(final_text_embeddings, dim=0)
final_text_embeddings.shape

In [None]:
class Multiclass(nn.Module):
    def __init__(self):
        super().__init__()
        self.hidden = nn.Linear(384, 256)
        self.act = nn.ReLU()
        self.hidden2 = nn.Linear(256, 64)
        self.act2 = nn.ReLU()
        self.output = nn.Linear(64, 4)
        
    def forward(self, x):
        x = self.hidden(x)
        x = self.act(x)
        x = self.hidden2(x)
        x = self.act2(x)
        x = self.output(x)
        #return x
        return torch.nn.functional.log_softmax(x, dim=1)
    
nn_model = Multiclass()

In [None]:
# Assuming df_combined["classification_by_editorial"] is a pandas Series
labels = df_combined["classification_by_editorial"]

# Initialize the LabelEncoder
le = LabelEncoder()

# Fit the LabelEncoder and transform the labels
labels_encoded = le.fit_transform(labels)

# Convert labels_encoded to a tensor
labels_encoded = torch.tensor(labels_encoded).to(device)

# Split the data into training and test sets
train_data, test_data, train_labels, test_labels = train_test_split(final_text_embeddings, labels_encoded, test_size=0.2, random_state=42)

# Convert the training and test sets into PyTorch Datasets
train_dataset = TensorDataset(train_data, train_labels)
test_dataset = TensorDataset(test_data, test_labels)

# Create DataLoaders for the training and test sets
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# Initialize the model
nn_model = Multiclass().to(device)

# Define the loss function and the optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Training loop
for epoch in range(20):  # Number of epochs
    for inputs, labels in train_loader:
        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = nn_model(inputs)
        loss = criterion(outputs, labels)

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

# Evaluation
correct = 0
total = 0
predictions = []
true_labels = []
with torch.no_grad():
    for inputs, labels in test_loader:
        outputs = nn_model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        predictions.extend(predicted.tolist())
        true_labels.extend(labels.tolist())

print('Accuracy of the model on the test set: %d %%' % (100 * correct / total))

# Generate the confusion matrix
cm = confusion_matrix(true_labels, predictions)

# Plot the confusion matrix
plt.figure(figsize=(10, 10))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.xlabel('Predicted labels')
plt.ylabel('True labels')
plt.title('Confusion Matrix')
plt.show()

# Print the classification report
print(classification_report(true_labels, predictions))
le.classes_

In [None]:
df_combined["classification_by_editorial"].values

In [None]:
# Label encoding
label_encoder = LabelEncoder()
encoded_labels = label_encoder.fit_transform(df_combined["classification_by_editorial"])

# Splitting the data with encoded labels
X_train, X_test, y_train, y_test = train_test_split(final_text_embeddings, encoded_labels, test_size=0.2, random_state=42)

X_train = torch.tensor(X_train, dtype=torch.float32).to(device)
X_test = torch.tensor(X_test, dtype=torch.float32).to(device)
y_train = torch.tensor(y_train, dtype=torch.long).to(device)
y_test = torch.tensor(y_test, dtype=torch.long).to(device)

# Initialize the model
nn_model = Multiclass().to(device)

# Define loss function and optimizer
criterion = nn.NLLLoss()
optimizer = optim.Adam(nn_model.parameters(), lr=0.001)

# Training the model
epochs = 100
train_losses = []
test_losses = []

for epoch in range(epochs):
    nn_model.train()
    optimizer.zero_grad()
    output = nn_model(X_train)
    loss = criterion(output, y_train)
    loss.backward()
    optimizer.step()
    train_losses.append(loss.item())

    nn_model.eval()
    output = nn_model(X_test)
    loss = criterion(output, y_test)
    test_losses.append(loss.item())

    print(f"Epoch {epoch+1}/{epochs}, Train Loss: {train_losses[-1]}, Test Loss: {test_losses[-1]}")

# Plotting the training and testing losses
plt.plot(train_losses, label='Training loss')
plt.plot(test_losses, label='Testing loss')
plt.legend()
plt.show()

# Evaluation
with torch.no_grad():
    nn_model.eval()
    output = nn_model(X_test)
    y_test = y_test.cpu()
    _, preds = torch.max(output, 1)
    preds = preds.cpu()
    accuracy = accuracy_score(y_test, preds)
    print("Accuracy:", accuracy)
    print("Classification Report:")
    print(classification_report(y_test, preds))
    
    # Confusion Matrix
    cm = confusion_matrix(y_test, preds)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', cbar=False)
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.title('Confusion Matrix')
    plt.show()

label_encoder.classes_

## Version 2 of the NLP

In [None]:
# Ensure CUDA is available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Load pre-trained model tokenizer (vocabulary)
tokenizer = RobertaTokenizer.from_pretrained('roberta-base')

# Load pre-trained model (weights)
model = RobertaModel.from_pretrained('roberta-base')
model = model.to(device)
model.eval()

# Define a custom classifier
class CustomClassifier(torch.nn.Module):
    def __init__(self, roberta_model):
        super(CustomClassifier, self).__init__()
        self.roberta = roberta_model
        self.dense = torch.nn.Linear(768, 4)  # Roberta-base has 768 hidden units

    def forward(self, inputs):
        outputs = self.roberta(**inputs).last_hidden_state
        logits = self.dense(outputs[:, 0, :])  # Use the representation of the [CLS] token
        return logits

# Instantiate the custom classifier
classifier = CustomClassifier(model).to(device)

In [None]:
size_limit: int = 80
filename = 'text_embeddings_roberta.obj'

final_text_embeddings = []

if not os.path.exists(filename):
    for index in range(0, len(df_combined), size_limit):

        # Sentences we want sentence embeddings for
        # sentences = df_combined.text.to_list()[:size_limit]
        sentences = df_combined.text.to_list()[max(0, index):min(index + size_limit, len(df_combined))]

        # Tokenize sentences
        encoded_input = tokenizer(sentences, padding=True, truncation=True, return_tensors='pt')

        # Compute token embeddings
        with torch.no_grad():
            model_output = model(**encoded_input).last_hidden_state
        # normalize embeddings
        # sentence_embeddings = torch.nn.functional.normalize(sentence_embeddings, p=2, dim=1)
        final_text_embeddings.append(model_output)
        print(index)

        filehandler = open(filename, 'wb') 
        pickle.dump(final_text_embeddings, filehandler)
        filehandler.close()
else:
    filehandler = open(filename, 'rb') 
    final_text_embeddings = pickle.load(filehandler)
    filehandler.close()
    
final_text_embeddings

In [None]:
final_text_embeddings[0].shape

In [None]:
final_text_embeddings[0]

In [None]:
final_text_embeddings = torch.cat(final_text_embeddings, dim=0)
final_text_embeddings.shape

In [None]:
# Label encoding
label_encoder = LabelEncoder()
labels = label_encoder.fit_transform(df_combined["classification_by_editorial"])

embeddings = final_text_embeddings

# Split the data into a training set and a test set
train_texts, test_texts, train_labels, test_labels = train_test_split(embeddings, labels[:800], test_size=0.2)

# Convert lists to tensors
train_texts = torch.stack(tuple(train_texts))
train_labels = torch.tensor(train_labels)
test_texts = torch.stack(tuple(test_texts))
test_labels = torch.tensor(test_labels)

# Train the custom model
classifier.train()
optimizer = torch.optim.Adam(classifier.parameters())
loss_fn = CrossEntropyLoss()

for epoch in range(10):  # Number of epochs
    optimizer.zero_grad()
    inputs = {'input_ids': train_texts}
    logits = classifier(inputs)
    loss = loss_fn(logits, train_labels)
    loss.backward()
    optimizer.step()

# Evaluate the model's performance using the test set
classifier.eval()
with torch.no_grad():
    logits = classifier(test_texts)
    predictions = torch.argmax(logits, dim=-1)
    correct_predictions = (predictions == test_labels).sum().item()
    total_predictions = test_labels.size(0)
    accuracy = correct_predictions / total_predictions

print(f'Test Accuracy: {accuracy * 100:.2f}%')


## Version 3

In [None]:
# Create a PyTorch dataset
class ClassifierDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_len):
        self.texts = texts.to_numpy()
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len        

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = str(self.texts[idx])
        label = self.labels[idx]
        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_len,
            return_token_type_ids=False,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt',
        )
        return {
            'text': text,
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'labels': torch.tensor(label, dtype=torch.long)
        }

# Create data loaders
def create_data_loader(texts, labels, tokenizer, max_len, batch_size):
    ds = ClassifierDataset(
        texts=texts,
        labels=labels,
        tokenizer=tokenizer,
        max_len=max_len
    )
    return DataLoader(ds, batch_size=batch_size, num_workers=4)

class Classifier(nn.Module):
    def __init__(self, n_classes):
        super(Classifier, self).__init__()
        self.roberta = RobertaModel.from_pretrained('roberta-base')
        self.drop = nn.Dropout(p=0.3)
        self.hidden = nn.Linear(self.roberta.config.hidden_size, 128)  # Change 128 to your desired hidden layer size
        self.out = nn.Linear(128, n_classes)

    def forward(self, input_ids, attention_mask):
        text_output = self.roberta(
            input_ids=input_ids,
            attention_mask=attention_mask
        )
        
        text_pooled_output = text_output.last_hidden_state[:, 0, :]
        
        output = self.drop(text_pooled_output)
        output = nn.ReLU()(self.hidden(output))
        # return self.out(output)
        return torch.nn.functional.log_softmax(self.out(output), dim=1)

In [None]:
# Check if CUDA is available and set PyTorch to use GPU or CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load pretrained model/tokenizer
tokenizer = RobertaTokenizer.from_pretrained('roberta-base')
# model = RobertaModel.from_pretrained('roberta-base').to(device)

data_subset = df_combined[:500]

# Encode labels
le = LabelEncoder()
encoded_labels = le.fit_transform(data_subset['classification_by_editorial'])

# Split data into training and test sets
train_texts, test_texts, train_labels, test_labels = train_test_split(data_subset['text'], encoded_labels, test_size=0.2, random_state=42)

BATCH_SIZE = 6
MAX_LEN = 96

train_data_loader = create_data_loader(train_texts, train_labels, tokenizer, MAX_LEN, BATCH_SIZE)
test_data_loader = create_data_loader(test_texts, test_labels, tokenizer, MAX_LEN, BATCH_SIZE)

# Initialize the classifier and optimizer
model = Classifier(len(le.classes_)).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-5)

In [None]:
from transformers import get_linear_schedule_with_warmup
from torch.nn import CrossEntropyLoss

# Define the loss function
loss_fn = CrossEntropyLoss().to(device)

# Define the number of training epochs
EPOCHS = 2

train_losses = []
test_losses = []

total_steps = len(train_data_loader) * EPOCHS
scheduler = get_linear_schedule_with_warmup(
    optimizer,
    num_warmup_steps=0,
    num_training_steps=total_steps
)

for epoch in range(EPOCHS):
    print(f'STARTING Epoch {epoch + 1}/{EPOCHS}')
    model.train()
    total_loss = 0

    for batch in train_data_loader:
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["labels"].to(device)
        
        model.zero_grad()
        outputs = model(input_ids, attention_mask)
        loss = loss_fn(outputs, labels)
        loss.backward()
        optimizer.step()
        scheduler.step()
        optimizer.zero_grad()

        total_loss += loss.item()

    avg_train_loss = total_loss / len(train_data_loader)
    train_losses.append(avg_train_loss)

    model.eval()
    total_loss = 0

    for batch in test_data_loader:
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["labels"].to(device)

        with torch.no_grad():
            outputs = model(input_ids, attention_mask)

        loss = loss_fn(outputs, labels)
        total_loss += loss.item()

    avg_test_loss = total_loss / len(test_data_loader)
    test_losses.append(avg_test_loss)

    print(f"Epoch {epoch+1}/{EPOCHS}, Train Loss: {avg_train_loss}, Test Loss: {avg_test_loss}")

# Plotting the training and testing losses
plt.plot(train_losses, label='Training loss')
plt.plot(test_losses, label='Testing loss')
plt.legend()
plt.show()

# Evaluation
model.eval()
predictions = []
true_labels = []

for batch in test_data_loader:
    input_ids = batch["input_ids"].to(device)
    attention_mask = batch["attention_mask"].to(device)
    labels = batch["labels"].to(device)
    
    with torch.no_grad():
        outputs = model(input_ids, attention_mask)

    _, preds = torch.max(outputs, 1)
    predictions.extend(preds.cpu().numpy())
    true_labels.extend(labels.cpu().numpy())

accuracy = accuracy_score(true_labels, predictions)
print("Accuracy:", accuracy)
print("Classification Report:")
print(classification_report(true_labels, predictions))

# Confusion Matrix
cm = confusion_matrix(true_labels, predictions)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', cbar=False)
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title('Confusion Matrix')
plt.show()

2 epochs with Roberta model:

```
Accuracy: 0.9721559074299635
Classification Report:
              precision    recall  f1-score   support

           0       0.88      0.97      0.92      3908
           1       0.99      0.98      0.98     17162
           2       0.93      0.87      0.90      2410
           3       0.99      0.98      0.98     17570

    accuracy                           0.97     41050
   macro avg       0.95      0.95      0.95     41050
weighted avg       0.97      0.97      0.97     41050
```

## Multimodality

In [None]:
# Create a PyTorch dataset
class ClassifierDataset(Dataset):
    def __init__(self, texts, labels, images, tokenizer, max_len):
        self.texts = texts.to_numpy()
        self.labels = labels
        self.images = images.tolist()
        self.tokenizer = tokenizer
        self.max_len = max_len
        self.transform = Compose([Resize((224, 224)), ToTensor(), Normalize(
            mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])])
        

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        #print("IMAGE: " + './images/' + str(self.images[idx]) + '.jpg')
        image = Image.open('./images/' + str(self.images[idx]) + '.jpg').convert('RGB')
        image = self.transform(image)
        
        #print("TEXTS")
        text = str(self.texts[idx])
        label = self.labels[idx]
        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_len,
            return_token_type_ids=False,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt',
        )
        return {
            'text': text,
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'image': image,
            'labels': torch.tensor(label, dtype=torch.long)
        }

# Create data loaders
def create_data_loader(texts, labels, tokenizer, images, max_len, batch_size):
    ds = ClassifierDataset(
        texts=texts,
        labels=labels,
        tokenizer=tokenizer,
        images=images,
        max_len=max_len
    )
    return DataLoader(ds, batch_size=batch_size, num_workers=4)

class Classifier(nn.Module):
    def __init__(self, n_classes):
        super(Classifier, self).__init__()
        self.roberta = RobertaModel.from_pretrained('roberta-base')
        self.vit = ViTModel.from_pretrained('google/vit-base-patch16-224')
        self.drop = nn.Dropout(p=0.3)
        self.hidden = nn.Linear(self.roberta.config.hidden_size + self.vit.config.hidden_size, 128)  # Change 128 to your desired hidden layer size
        self.out = nn.Linear(128, n_classes)

    def forward(self, input_ids, attention_mask, image):
        text_output = self.roberta(
            input_ids=input_ids,
            attention_mask=attention_mask
        )
        image_output = self.vit(image)
        
        text_pooled_output = text_output.last_hidden_state[:, 0, :]
        image_pooled_output = image_output.last_hidden_state[:, 0, :]
        
        #print("TEXT SHAPE: " + str(text_pooled_output.shape))
        #print("IMAGE SHAPE: " + str(image_pooled_output.shape))
        
        # Simple fusion by concatenation
        combined = torch.cat((text_pooled_output, image_pooled_output), dim=1)
        
        output = self.drop(combined)
        output = nn.ReLU()(self.hidden(output))
        # return self.out(output) 
        return torch.nn.functional.log_softmax(self.out(output), dim=1)

In [None]:
# Create a PyTorch dataset
class ClassifierDataset(Dataset):
    def __init__(self, texts, labels, images, tokenizer, max_len):
        self.texts = texts.to_numpy()
        self.labels = labels
        self.images = images.tolist()
        self.tokenizer = tokenizer
        self.max_len = max_len
        self.transform = Compose([Resize((224, 224)), ToTensor(), Normalize(
            mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])])

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        image = Image.open(
            './images/' + str(self.images[idx]) + '.jpg').convert('RGB')
        image = self.transform(image)

        text = str(self.texts[idx])
        label = self.labels[idx]
        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_len,
            return_token_type_ids=False,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt',
        )
        return {
            'text': text,
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'image': image,
            'labels': torch.tensor(label, dtype=torch.long)
        }

# Create data loaders


def create_data_loader(texts, labels, tokenizer, images, max_len, batch_size):
    ds = ClassifierDataset(
        texts=texts,
        labels=labels,
        tokenizer=tokenizer,
        images=images,
        max_len=max_len
    )
    return DataLoader(ds, batch_size=batch_size, num_workers=4)


class Classifier(nn.Module):
    def __init__(self, n_classes):
        super(Classifier, self).__init__()
        self.roberta = RobertaModel.from_pretrained('roberta-base')
        self.vit = ViTModel.from_pretrained('google/vit-base-patch16-224')
        self.drop = nn.Dropout(p=0.3)
        # Change 128 to your desired hidden layer size
        self.hidden = nn.Linear(self.roberta.config.hidden_size, 128)
        self.out = nn.Linear(128, n_classes)

    def forward(self, input_ids, attention_mask, image):
        text_output = self.roberta(
            input_ids=input_ids,
            attention_mask=attention_mask
        )
        image_output = self.vit(image)

        text_pooled_output = text_output.last_hidden_state[:, 0, :]
        image_pooled_output = image_output.last_hidden_state[:, 0, :]

        # Simple fusion by concatenation
        # combined = torch.cat((text_pooled_output, image_pooled_output), dim=1)
        # Using mul (*) operation
        # combined = text_pooled_output.mul(image_pooled_output)
        # using + operator
        combined = text_pooled_output + image_pooled_output
        # TODO: using einsum  (need to work out some bugs) 
        # combined = torch.einsum('ij,ij->ij', text_pooled_output, image_pooled_output)
        
        # print("SHAPE of combined: " + str(combined.shape))

        output = self.drop(combined)
        output = nn.ReLU()(self.hidden(output))
        # return self.out(output)
        return torch.nn.functional.log_softmax(self.out(output), dim=1)

In [None]:
# Check if CUDA is available and set PyTorch to use GPU or CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load pretrained model/tokenizer
tokenizer = RobertaTokenizer.from_pretrained('roberta-base')
# model = RobertaModel.from_pretrained('roberta-base').to(device)

data_subset = df_combined[:100]

# Encode labels
le = LabelEncoder()
encoded_labels = le.fit_transform(data_subset['classification_by_editorial'])

# Split data into training and test sets
train_texts, test_texts, train_images, test_images, train_labels, test_labels = train_test_split(
    data_subset['text'], data_subset["id"], encoded_labels, test_size=0.2, random_state=42)

BATCH_SIZE = 6
MAX_LEN = 96

train_data_loader = create_data_loader(train_texts, train_labels, tokenizer, train_images, MAX_LEN, BATCH_SIZE)
test_data_loader = create_data_loader(test_texts, test_labels, tokenizer, test_images, MAX_LEN, BATCH_SIZE)

# Initialize the classifier and optimizer
model = Classifier(len(le.classes_)).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-5)

In [None]:
from transformers import get_linear_schedule_with_warmup
from torch.nn import CrossEntropyLoss

# Define the loss function
loss_fn = CrossEntropyLoss().to(device)

# Define the number of training epochs
EPOCHS = 2

train_losses = []
test_losses = []

total_steps = len(train_data_loader) * EPOCHS
scheduler = get_linear_schedule_with_warmup(
    optimizer,
    num_warmup_steps=0,
    num_training_steps=total_steps
)

for epoch in range(EPOCHS):
    print(f'STARTING Epoch {epoch + 1}/{EPOCHS}')
    model.train()
    total_loss = 0

    for batch in train_data_loader:
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["labels"].to(device)
        images = batch["image"].to(device)

        model.zero_grad()
        outputs = model(input_ids, attention_mask, images)
        loss = loss_fn(outputs, labels)
        loss.backward()
        optimizer.step()
        scheduler.step()
        optimizer.zero_grad()
        
        total_loss += loss.item()

    avg_train_loss = total_loss / len(train_data_loader)
    train_losses.append(avg_train_loss)

    model.eval()
    total_loss = 0

    for batch in test_data_loader:
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["labels"].to(device)
        images = batch["image"].to(device)

        with torch.no_grad():
            outputs = model(input_ids, attention_mask, images)

        loss = loss_fn(outputs, labels)
        total_loss += loss.item()

    avg_test_loss = total_loss / len(test_data_loader)
    test_losses.append(avg_test_loss)

    print(f"Epoch {epoch+1}/{EPOCHS}, Train Loss: {avg_train_loss}, Test Loss: {avg_test_loss}")

# Plotting the training and testing losses
plt.plot(train_losses, label='Training loss')
plt.plot(test_losses, label='Testing loss')
plt.legend()
plt.show()

# Evaluation
model.eval()
predictions = []
true_labels = []

for batch in test_data_loader:
    input_ids = batch["input_ids"].to(device)
    attention_mask = batch["attention_mask"].to(device)
    labels = batch["labels"].to(device)
    images = batch["image"].to(device)
    
    with torch.no_grad():
            outputs = model(input_ids, attention_mask, images)

    _, preds = torch.max(outputs, 1)
    predictions.extend(preds.cpu().numpy())
    true_labels.extend(labels.cpu().numpy())

accuracy = accuracy_score(true_labels, predictions)
print("Accuracy:", accuracy)
print("Classification Report:")
print(classification_report(true_labels, predictions))

# Confusion Matrix
cm = confusion_matrix(true_labels, predictions)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', cbar=False)
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title('Confusion Matrix')
plt.show()

### Pretrained multimodal models:

In [None]:
model = RobertaModel.from_pretrained('roberta-base')
model.config

In [None]:
processor = BlipProcessor.from_pretrained("Salesforce/blip-vqa-capfilt-large")
model = BlipForQuestionAnswering.from_pretrained(
    "Salesforce/blip-vqa-capfilt-large")

model.config.output_hidden_states = True

model.classifier = nn.Sequential(
    nn.Linear(768, 512),
    nn.ReLU(),
    nn.Dropout(0.2),
    nn.Linear(512, 4),
    nn.LogSoftmax(dim=1)
)

raw_image = Image.open(
    './images/' + str(df_combined['id'][0]) + '.jpg').convert('RGB')

question = "how many dogs are in the picture?"
inputs = processor(raw_image, question, return_tensors="pt")

out = model.generate(**inputs)
print(processor.decode(out[0], skip_special_tokens=True))

In [None]:
dir(out)

In [None]:
model(**inputs, labels=torch.tensor([0]))

In [None]:
from transformers import CLIPProcessor, CLIPModel

model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

model.config.output_hidden_states = True

image = Image.open(
    './images/' + str(df_combined['id'][0]) + '.jpg').convert('RGB')

inputs = processor(text=["a photo of a cat", "a photo of a dog"],
                   images=image, return_tensors="pt")

outputs = model(**inputs)
# this is the image-text similarity score
logits_per_image = outputs.logits_per_image
# we can take the softmax to get the label probabilities
probs = logits_per_image.softmax(dim=1)

In [None]:
dir(outputs)

In [None]:
outputs.text_model_output

In [None]:
dir(outputs)

In [None]:
from transformers import VisualBertModel, AutoTokenizer
import torch

# Load pretrained model and tokenizer
model = VisualBertModel.from_pretrained('uclanlp/visualbert-vcr')
tokenizer = AutoTokenizer.from_pretrained("google-bert/bert-base-uncased")

# Sample text and image
text = "What color is the cat?"
image = torch.randn(3, 224, 224)  # This should be your actual image tensor

# Prepare the inputs
inputs = tokenizer(text, return_tensors='pt')
inputs['visual_feats'] = image.unsqueeze(0)

# Forward pass
outputs = model(**inputs)

# Get the last hidden state and the pooler output
last_hidden_state = outputs.last_hidden_state
pooler_output = outputs.pooler_output

# Now you can add your custom MLP classifier head on top of these outputs

In [None]:
# Create a PyTorch dataset
class ClassifierDataset(Dataset):
    def __init__(self, texts, labels, images):
        self.texts = texts.to_numpy()
        self.labels = labels
        self.images = images.tolist()

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        image = Image.open(
            './images/' + str(self.images[idx]) + '.jpg').convert('RGB')

        text = str(self.texts[idx])
        label = self.labels[idx]
        return {
            'text': text,
            'image': image,
            'labels': torch.tensor(label, dtype=torch.long)
        }

def create_data_loader(texts, labels, images, batch_size):
    ds = ClassifierDataset(
        texts=texts,
        labels=labels,
        images=images,
    )
    return DataLoader(ds, batch_size=batch_size, num_workers=4)


class CustomCLIPClassifier(nn.Module):
    def __init__(self, num_classes):
        super(CustomCLIPClassifier, self).__init__()
        self.clip = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
        self.classifier = nn.Sequential(
            nn.Linear(self.clip.config.hidden_size, 512),
            nn.ReLU(),
            nn.Linear(512, num_classes)
        )

    def forward(self, input_ids, pixel_values, labels=None):
        outputs = self.clip(input_ids=input_ids, pixel_values=pixel_values)
        logits = self.classifier(outputs.pooler_output)
        loss = None
        if labels is not None:
            loss_fct = nn.CrossEntropyLoss()
            loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
        return loss, logits

In [None]:
# Check if CUDA is available and set PyTorch to use GPU or CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load pretrained model/tokenizer
processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

data_subset = df_combined[:500]

# Encode labels
le = LabelEncoder()
encoded_labels = le.fit_transform(data_subset['classification_by_editorial'])

# Split data into training and test sets
train_texts, test_texts, train_images, test_images, train_labels, test_labels = train_test_split(
    data_subset['text'], data_subset["id"], encoded_labels, test_size=0.2, random_state=42)

BATCH_SIZE = 6
MAX_LEN = 96

train_data_loader = create_data_loader(
    train_texts, train_labels, train_images, BATCH_SIZE)
test_data_loader = create_data_loader(
    test_texts, test_labels, test_images, BATCH_SIZE)

# Initialize the classifier and optimizer
model = CustomCLIPClassifier(num_classes=4).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-5)

In [None]:
# Define your hyperparameters
num_epochs = 1

# Training loop
for epoch in range(num_epochs):
    # Train
    model.train()
    total_train_loss = 0
    for batch in train_data_loader:
        print(batch)
        optimizer.zero_grad()
        input_ids, pixel_values, labels = batch
        loss, _ = model(input_ids=input_ids,
                        pixel_values=pixel_values, labels=labels)
        total_train_loss += loss.item()
        loss.backward()
        optimizer.step()

    print(f"Epoch {epoch+1}/{num_epochs}")
    print(f"Train loss: {total_train_loss/len(train_data_loader)}")

# Evaluation loop
model.eval()
predictions, true_labels = [], []

with torch.no_grad():
    for batch in test_data_loader:
        input_ids, pixel_values, labels = batch
        _, logits = model(input_ids=input_ids, pixel_values=pixel_values)
        preds = torch.argmax(logits, dim=1)
        predictions.extend(preds.cpu().numpy())
        true_labels.extend(labels.cpu().numpy())

# Compute the metrics
accuracy = accuracy_score(true_labels, predictions)

# Testing fusion approaches

In [None]:
# create two tensort of the size 768 and 768
a = torch.randn(6, 768)
b = torch.randn(6, 768)

In [None]:
# Basic element-wise multiplication

d = a.mul(b)
d.shape

In [None]:
e = torch.matmul(a, b.reshape(768, 6))
e

In [None]:
torch.matmul(a, b.T)

In [None]:
(a * b) == d

In [None]:
(a + b).shape

In [None]:
f = torch.einsum('ik,jk->ij', a, b)
f

In [None]:
f == (a * b)

In [None]:
# compute einsum and normalise it
g = torch.einsum('ij,jk->ik', a, b.T)
g = torch.nn.functional.normalize(g, p=2, dim=1)
g

In [None]:
i = torch.einsum('ij,jk->ik', a, b.reshape(768, 6))
i = torch.nn.functional.normalize(g, p=2, dim=1)
i

In [None]:
h = torch.ger(a[0], b[0])
h.shape