## Read and Split Data

In [1]:
# Read data here
# Extract zip file if folder does not already exist
import os
import zipfile
import json

folder_path = 'data/CT23_1A_checkworthy_multimodal_english_v2'
zip_file_path = 'data/CT23_1A_checkworthy_multimodal_english_v2.zip'

def zip_extration(folder_path, zip_file_path):
    print('Zip file extraction started')
    if not os.path.exists(folder_path):
        print('Folder does not exist, extracting zip file')
        os.makedirs(folder_path)

        with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
            zip_ref.extractall(folder_path)
    
    print('Zip file extracted')

zip_extration(folder_path, zip_file_path)

train_path = folder_path + '/CT23_1A_checkworthy_multimodal_english_train.jsonl'
test_path = folder_path + '/CT23_1A_checkworthy_multimodal_english_test.jsonl'

def split_json(data):
    text_data = {
        'tweet_id': data['tweet_id'],
        'tweet_url': data['tweet_url'],
        'text': data['tweet_text'] + data['ocr_text'],
        'class_label': data['class_label']
    }

    image_data = {
        'tweet_id': data['tweet_id'],
        'tweet_url': data['tweet_url'],
        'class_label': data['class_label'],
        'image_path': data['image_path'],
        'image_url': data['image_url']
    }

    return text_data, image_data


# Read data from the folder
def read_data(file_path):
    text_data = []
    image_data = []
    with open(file_path, 'r') as file:
        for line in file:
            json_obj = json.loads(line)
            text, image = split_json(json_obj)
            text_data.append(text)
            image_data.append(image)
    return text_data, image_data

train_text_data, train_image_data = read_data(train_path)
print(f'Text: {train_text_data[0]}')
print(f'Image: {train_image_data[0]}')


Zip file extraction started
Zip file extracted
Image: {'tweet_id': '1222845188567003136', 'tweet_url': 'https://twitter.com/user/status/1222845188567003136', 'class_label': 'Yes', 'image_path': 'images_labeled/train/1222845188567003136.jpg', 'image_url': 'http://pbs.twimg.com/media/EPhrN-SU4AAKvGf.jpg'}


## Clean Data

In [2]:
# Clean data here (maybe lemmatizer and such)


## Run Text Analyzer Model

In [3]:
try:
    import transformers
except ImportError:
    %pip install transformers
# Train model here
# Use RoBERTa model or GPT model from Hugging Face

  from .autonotebook import tqdm as notebook_tqdm


## Run Image Analyser Model

In [4]:
# # Train model here
# # Use VIT model from Hugging Face
# try:
#     from transformers import ViTFeatureExtractor, ViTModel, RobertaTokenizer, RobertaModel
#     import torch
#     import torch.nn as nn
# except ImportError:
#     %pip install transformers
#     %pip install torch
#     %pip install pillow

# from PIL import Image
# # Train model here

# # Text model
# tokenizer = RobertaTokenizer.from_pretrained("roberta-base")
# text_model = RobertaModel.from_pretrained("roberta-base")

# # ViT model from Hugging Face
# image_processor = ViTFeatureExtractor.from_pretrained("google/vit-base-patch16-224-in21k")
# image_model = ViTModel.from_pretrained('google/vit-base-patch16-224-in21k')


# image_path = folder_path + '/' + train_image_data[0]['image_path']
# image = Image.open(image_path)

# text = train_text_data[0]['tweet_text'] + ' ' + train_text_data[0]['ocr_text']

# # Text input and output
# text_inputs = tokenizer(text, return_tensors="pt", padding=True)
# text_outputs = text_model(**text_inputs)

# # Image input and output
# image_inputs = image_processor(images=image, return_tensors="pt")
# image_outputs = image_model(pixel_values=image_inputs.pixel_values)

# # Combine text and image features (e.g., concatenation)
# combined_features = torch.cat([text_outputs.last_hidden_state.mean(dim=1), image_outputs.last_hidden_state.mean(dim=1)], dim=1)


# # Define the classification model
# classification_model = nn.Sequential(
#     nn.Linear(combined_features.shape[-1], 2)  # Output size is 2 for binary classification
# )

# # Forward pass to obtain logits
# logits = classification_model(combined_features)

# # Apply softmax activation function to obtain probabilities
# probabilities = nn.functional.softmax(logits, dim=-1)

# # Get the predicted class
# predicted_class = torch.argmax(probabilities, dim=-1)

# # Convert predicted class to "Yes" or "No"
# predicted_label = ["Yes", "No"][predicted_class.item()]

# print("Predicted label:", predicted_label)
# print("Ground Truth:", train_text_data[0]['class_label'])




In [5]:
# from PIL import Image

# from transformers import CLIPProcessor, CLIPModel, RobertaTokenizer, RobertaForSequenceClassification

# # CLIP ViT

# model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
# processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

# image_path = folder_path + '/' + train_image_data[0]['image_path']
# image = Image.open(image_path)

# labels = ["Misleading information", "Out of context", "False connection", "Unverifiable information", "Satire or humor", "Opinion", "Not checkworthy", "True information", "Exaggeration", "Miscaptioned"]
# inputs = processor(text=labels, images=image, return_tensors="pt", padding=True)

# outputs = model(**inputs)
# logits_per_image = outputs.logits_per_image # this is the image-text similarity score
# probs_clip = logits_per_image.softmax(dim=1) # we can take the softmax to get the label probabilities


# # RoBERTa
# tokenizer = RobertaTokenizer.from_pretrained('roberta-base')
# model = RobertaForSequenceClassification.from_pretrained('roberta-base', num_labels=len(labels))

# # Assume you have a list of texts
# text = train_text_data[0]['tweet_text'] + ' ' + train_text_data[0]['ocr_text']
# inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True)
# outputs = model(**inputs)

# logits = outputs.logits
# probs_roberta = logits.softmax(dim=1)

# # Assume `probs_roberta` and `probs_clip` are the probabilities from the RoBERTa and CLIP models, respectively
# probs_roberta_list = probs_roberta[0].tolist()
# probs_clip_list = probs_clip[0].tolist()

# # Average the probabilities for each label
# avg_probs = [(prob_roberta + prob_clip) / 2 for prob_roberta, prob_clip in zip(probs_roberta_list, probs_clip_list)]

# # Create a dictionary of the average probabilities
# avg_label_probs = dict(zip(labels, avg_probs))
# print("Average label probabilities:", avg_label_probs)

# # Determine if the content should be checked
# # You can define your own threshold and checkworthy labels
# threshold = 0.5
# checkworthy_labels = ["Misleading information", "Out of context", "Manipulated image"]
# should_check = any(avg_label_probs[label] > threshold for label in checkworthy_labels)

# print("Should check:", should_check)


In [6]:
# from transformers import RobertaTokenizer, RobertaForSequenceClassification

# tokenizer = RobertaTokenizer.from_pretrained('roberta-base')
# model = RobertaForSequenceClassification.from_pretrained('roberta-base', num_labels=len(labels))

# # Assume you have a list of texts
# text = train_text_data[0]['tweet_text'] + ' ' + train_text_data[0]['ocr_text']
# inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True)
# outputs = model(**inputs)

# logits = outputs.logits
# probs = logits.softmax(dim=1)

# # Convert the probabilities to a list for each text
# probs_list = probs[0].tolist()

# label_probs = dict(zip(labels, probs_list))
# print("Label probabilities:", label_probs)

In [12]:
from torch.optim import AdamW
from PIL import Image
import torch
from sklearn.preprocessing import LabelEncoder

from transformers import CLIPProcessor, CLIPModel, RobertaTokenizer, RobertaForSequenceClassification

# Prepare the data
texts = [data['text'] for data in train_text_data]
labels = [data['class_label'] for data in train_text_data]  # You need to have labels for your training data

images = [Image.open(folder_path + '/' + data['image_path']) for data in train_image_data]

# Convert labels to numerical values
le = LabelEncoder()
labels_num = le.fit_transform(labels)

# CLIP ViT model
clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

# RoBERTa model
roberta_tokenizer = RobertaTokenizer.from_pretrained('roberta-base')
roberta_model = RobertaForSequenceClassification.from_pretrained('roberta-base', num_labels=len(labels))


inputs_text = roberta_tokenizer(texts, return_tensors="pt", padding=True, truncation=True)
inputs_text["labels"] = torch.tensor(labels_num)

inputs_image = clip_processor(images=images, return_tensors="pt", padding=True, truncation=True)
inputs_image["labels"] = torch.tensor(labels_num)
# Define the optimizers
optimizer_roberta = AdamW(roberta_model.parameters(), lr=1e-5)
optimizer_clip = AdamW(clip_model.parameters(), lr=1e-5)

# Train the models
roberta_model.train()
clip_model.train()
for epoch in range(10):  # Number of epochs is a hyperparameter you can tune
    outputs_roberta = roberta_model(**inputs_text)
    loss_roberta = outputs_roberta.loss
    loss_roberta.backward()
    optimizer_roberta.step()
    optimizer_roberta.zero_grad()

    outputs_clip = clip_model(**inputs_image)
    loss_clip = outputs_clip.loss
    loss_clip.backward()
    optimizer_clip.step()
    optimizer_clip.zero_grad()

Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at roberta-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Unused or unrecognized kwargs: padding, truncation.


RuntimeError: [enforce fail at alloc_cpu.cpp:114] data. DefaultCPUAllocator: not enough memory: you tried to allocate 3705667584 bytes.

## Concatenate The Two Models

In [None]:
# Concatenate models here two get the image and text results together