In [15]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from transformers import BertTokenizer, BertModel
from PIL import Image
import json
from torchvision.transforms.functional import to_tensor
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
import numpy as np

In [2]:
# Tokenizer and BERT model
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
bert_model = BertModel.from_pretrained('bert-base-uncased')
bert_model.eval()

# Image transformation
image_transform = transforms.Compose([
    transforms.Resize((224, 224)), # pads or shrinks the image to 224*224
    transforms.ToTensor(),
])

In [3]:
# MMF dataset class
class MmfDataset(Dataset):
    def __init__(self, data, image_folder, image_transform, tokenizer):
        self.data = data
        self.image_folder = image_folder
        self.image_transform = image_transform
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        entry = self.data[idx]

        # Load and preprocess image
        image_path = self.image_folder + entry["image"]
        image = Image.open(image_path).convert("RGB")
        image = self.image_transform(image)

        # Tokenize and obtain text embeddings using BERT
        text = entry["text"]
        tokens = self.tokenizer(text, return_tensors='pt', truncation=True, padding=True)
        with torch.no_grad():
            text_embedding = bert_model(**tokens).last_hidden_state.mean(dim=1)

        # Label encoding
        label = entry["labels"][0]
        if label == "not harmful":
            encoded_label = 0
        elif label == "somewhat harmful":
            encoded_label = 1
        elif label == "very harmful":
            encoded_label = 2

        # Convert encoded_label to a PyTorch tensor
        encoded_label_tensor = torch.tensor(encoded_label)

        return image, text_embedding, encoded_label_tensor

In [4]:
def collate_fn(batch):
    # Unpack the batch into separate lists for images, text_embeddings, and labels
    images, text_embeddings, labels = zip(*batch)

    # Stack images and text_embeddings into tensors
    images = torch.stack(images)
    text_embeddings = torch.stack(text_embeddings)

    # Stack labels into a tensor
    labels = torch.stack(labels)

    return images, text_embeddings, labels

In [12]:
train_dataset_path = "C:\\Users\\aysen\\Documents\\GitHub\\harmful_meme_models\\data\\datasets\\memes\\defaults\\annotations\\train.jsonl"
test_dataset_path = "C:\\Users\\aysen\\Documents\\GitHub\\harmful_meme_models\\data\\datasets\\memes\\defaults\\annotations\\test.jsonl"
image_folder = "C:\\Users\\aysen\\Documents\\GitHub\\harmful_meme_models\\data\\datasets\\memes\\defaults\\images\\"

# Read the JSON string for training dataset from the file
with open(train_dataset_path, "r", encoding='cp437') as file:
    train_dataset_str = file.read()
    file.close()

# Read the JSON string for test dataset from the file
with open(test_dataset_path, "r", encoding='cp437') as file:
    test_dataset_str = file.read()
    file.close()

# Parse the JSON string
train_dataset = [json.loads(entry) for entry in train_dataset_str.strip().split('\n')]
test_dataset = [json.loads(entry) for entry in test_dataset_str.strip().split('\n')]

# Split the training set into training and validation sets
train_dataset, val_dataset = train_test_split(train_dataset, test_size=0.2, random_state=42)

# Create DataLoader instances for both training and validation sets
mmf_dataset_train = MmfDataset(data=train_dataset, image_folder=image_folder, image_transform=image_transform, tokenizer=tokenizer)
data_loader_train = DataLoader(mmf_dataset_train, batch_size=32, shuffle=True, collate_fn=collate_fn)

mmf_dataset_val = MmfDataset(data=val_dataset, image_folder=image_folder, image_transform=image_transform, tokenizer=tokenizer)
data_loader_val = DataLoader(mmf_dataset_val, batch_size=32, shuffle=False, collate_fn=collate_fn)

# Create DataLoader for the test set
mmf_dataset_test = MmfDataset(data=test_dataset, image_folder=image_folder, image_transform=image_transform, tokenizer=tokenizer)
data_loader_test = DataLoader(mmf_dataset_test, batch_size=32, shuffle=False, collate_fn=collate_fn)

In [6]:
# Model definition
class MmfClassifier(nn.Module):
    
    def __init__(self, image_feature_size, text_feature_size, num_classes):
        
        super(MmfClassifier, self).__init__()
        self.shared_layer = nn.Linear(image_feature_size + text_feature_size, 256)
        self.dropout = nn.Dropout(0.25)
        self.relu = nn.LeakyReLU()
        self.output_layer = nn.Linear(256, num_classes)

    def forward(self, image_data, text_data):

        # Reshape to (batch_size, channels*height*width)
        flattened_image_data = image_data.view(image_data.size(0), -1)

        # Reshape to (batch_size, sequence_length*embedding_size)
        flattened_text_data = text_data.view(text_data.size(0), -1)

        # Combine visual and textual features 
        combined_features = torch.cat((flattened_image_data, flattened_text_data), dim=1)
        shared_output = self.relu(self.shared_layer(combined_features))
        x = self.dropout(shared_output)
        output = self.output_layer(x)

        return output

In [7]:
# Hyperparameters
image_feature_size = 3*224*224 # Image feature size
text_feature_size = 768  # Text feature size
num_classes = 3  # Number of classes

# Instantiate the model
model = MmfClassifier(image_feature_size, text_feature_size, num_classes)

# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01)

In [8]:
print("First entry in the dataset:")
print(json.dumps(dataset[0], indent=2))

First entry in the dataset:
{
  "id": "covid_memes_18",
  "image": "covid_memes_18.png",
  "labels": [
    "somewhat harmful",
    "individual"
  ],
  "text": "Bernie or Elizabeth?\nBe informed.Compare them on the issues that matter.\nIssue: Who makes the dankest memes?\n"
}


In [9]:
# Hyperparameters for training
num_epochs = 100
patience = 5  # Number of epochs with no improvement after which training will be stopped
best_val_loss = float('inf')
current_patience = 0

# Training and Validation loops
for epoch in range(num_epochs):
    
    # Training loop
    model.train()
    for batch in data_loader_train:
        images, text_embeddings, labels = batch
        outputs = model(images, text_embeddings)
        loss = criterion(outputs, labels)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    # Validation loop
    model.eval()
    
    val_loss = 0.0
    correct_predictions = 0
    total_samples = 0
    
    with torch.no_grad():
        for batch_val in data_loader_val:
            images_val, text_embeddings_val, labels_val = batch_val
            outputs_val = model(images_val, text_embeddings_val)
            loss_val = criterion(outputs_val, labels_val)
            val_loss += loss_val.item()

            _, predicted = torch.max(outputs_val, 1)
            correct_predictions += (predicted == labels_val).sum().item()
            total_samples += labels_val.size(0)

    avg_val_loss = val_loss / len(data_loader_val)
    accuracy = correct_predictions / total_samples

    print(f"Epoch {epoch+1}/{num_epochs}, Training Loss: {loss.item()}, Validation Loss: {avg_val_loss}, Validation Accuracy: {accuracy}")

    # Check for early stopping
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        current_patience = 0
        # Save the model if needed
        torch.save(model.state_dict(), 'best_model.pth')
    else:
        current_patience += 1

    if current_patience >= patience:
        print(f"Early stopping after {epoch} epochs without improvement.")
        break



#######################################################
    

Epoch 1/100, Training Loss: 1.0272610187530518, Validation Loss: 0.9543550202721044, Validation Accuracy: 0.30016583747927034
Epoch 2/100, Training Loss: 0.984973132610321, Validation Loss: 0.8530751905943218, Validation Accuracy: 0.6417910447761194
Epoch 3/100, Training Loss: 1.633434534072876, Validation Loss: 0.9965877909409372, Validation Accuracy: 0.6135986733001658
Epoch 4/100, Training Loss: 0.8003036379814148, Validation Loss: 0.8768869669813859, Validation Accuracy: 0.6417910447761194
Epoch 5/100, Training Loss: 0.8227801322937012, Validation Loss: 0.8548082113265991, Validation Accuracy: 0.6417910447761194
Epoch 6/100, Training Loss: 0.5822281837463379, Validation Loss: 0.9917216677414743, Validation Accuracy: 0.6417910447761194
Epoch 7/100, Training Loss: 0.6507325768470764, Validation Loss: 0.8963533200715718, Validation Accuracy: 0.6417910447761194
Early stopping after 6 epochs without improvement.


NameError: name 'MyModel' is not defined

In [16]:
# Load the best model
best_model = MmfClassifier(image_feature_size=image_feature_size, text_feature_size=text_feature_size, num_classes=num_classes)
best_model.load_state_dict(torch.load('best_model.pth'))
best_model.eval()

all_predictions = []
all_ground_truth = []

with torch.no_grad():
    for batch_test in data_loader_test:
        images_test, text_embeddings_test, labels_test = batch_test
        outputs_test = best_model(images_test, text_embeddings_test)
        _, predicted_test = torch.max(outputs_test, 1)
        all_predictions.extend(predicted_test.cpu().numpy())
        all_ground_truth.extend(labels_test.cpu().numpy())

# Convert predictions and ground truth to numpy arrays
all_predictions = np.array(all_predictions)
all_ground_truth = np.array(all_ground_truth)

# Calculate various metrics
accuracy = accuracy_score(all_ground_truth, all_predictions)
precision = precision_score(all_ground_truth, all_predictions, average='weighted')
recall = recall_score(all_ground_truth, all_predictions, average='weighted')
f1 = f1_score(all_ground_truth, all_predictions, average='weighted')
conf_matrix = confusion_matrix(all_ground_truth, all_predictions)

# Print the results
print(f"Accuracy: {accuracy * 100:.2f}%")
print(f"Precision: {precision:.2f}")
print(f"Recall: {recall:.2f}")
print(f"F1-score: {f1:.2f}")
print("Confusion Matrix:")
print(conf_matrix)

Accuracy: 64.97%
Precision: 0.42
Recall: 0.65
F1-score: 0.51
Confusion Matrix:
[[230   0   0]
 [103   0   0]
 [ 21   0   0]]


  _warn_prf(average, modifier, msg_start, len(result))
