In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import pandas as pd
import numpy as np

import random
import json
from tqdm.auto import tqdm
import warnings
from PIL import Image
warnings.filterwarnings('ignore')
%matplotlib inline

import torch
from transformers import BertTokenizer, BertModel, RobertaTokenizer, RobertaModel
import torchvision.transforms as transforms
import torchvision.models as models


In [None]:
# Set a random seed
random_seed = 42
random.seed(random_seed)

# Set a random seed for PyTorch (for GPU as well)
torch.manual_seed(random_seed)
if torch.cuda.is_available():
    print('available')
    torch.cuda.manual_seed_all(random_seed)

available


In [None]:
# Check if gpu is being used
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
# create image urls such as image_train_0.jpg and image_test_0.jpg
def create_image_urls(trainOrTest, length):
    image_strings = [f"image_{trainOrTest}_{num}.jpg" for num in range(length)]
    return image_strings

In [None]:
def preprocessDataframe(news, trainOrTest):
    print(news.shape, news['2_way_label'].value_counts())
    image_strings = create_image_urls(trainOrTest, news.shape[0])
    image_df = pd.DataFrame({'image_main': image_strings})
    news = pd.concat([news, image_df], axis=1)
    return news

In [None]:
# read training dataset
df_train = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/fakeddit_train.csv')
df_train = preprocessDataframe(df_train, 'train')

(3374, 16) 2_way_label
1    1941
0    1433
Name: count, dtype: int64


In [None]:
# read test dataset
df_test = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/fakeddit_test.csv')
df_test = preprocessDataframe(df_test, 'test')

(665, 16) 2_way_label
1    390
0    275
Name: count, dtype: int64


In [None]:
class BertFeatureExtractor():
    def __init__(self, sentences):
        super(BertFeatureExtractor, self).__init__()
        self.tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
        self.model = BertModel.from_pretrained('bert-base-uncased')
        self.sentences = sentences
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model.to(self.device)
        # The function returns  token IDs and attention masks
    def encoder(self):
        # Tokenize and encode text using batch_encode_plus
        encoding = self.tokenizer.batch_encode_plus(
            self.sentences,                    # List of input texts
            #max_length=1024,                    #maximum number of embeddings
            padding=True,              # Pad to the maximum sequence length
            truncation=True,           # Truncate to the maximum sequence length if necessary
            return_tensors='pt',      # Return PyTorch tensors
            add_special_tokens=True    # Add special tokens CLS and SEP
        )

        self.input_ids = encoding['input_ids'].to(self.device)  # Token IDs
        self.attention_mask = encoding['attention_mask'].to(self.device)  # Attention mask

    def generate_embeddings(self):
        # Generate sentence embeddings using BERT model
        with torch.no_grad():
            outputs = self.model(self.input_ids, attention_mask=self.attention_mask)
            word_embeddings = outputs.last_hidden_state  # This contains the word embeddings
            print('Finished generating embeddings')
            return word_embeddings.mean(dim=1)


In [None]:
def transform_image(image):
    if image.mode != 'RGB':
        return image.convert('RGB')
    else:
        return image

In [None]:
class ResnetFeatureExtractor():
  def __init__(self, image_urls, folder_url):
      super(ResnetFeatureExtractor, self).__init__()
      self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
      # Load pre-trained ResNet-50 model
      self.model = models.resnet50(pretrained=True).to(self.device)
      # Remove the last fully connected layer (softmax layer)
      self.model = torch.nn.Sequential(*list(self.model.children())[:-1])
      # Set the model to evaluation  mode
      self.model.eval()
      self.preprocess = transforms.Compose([
          transforms.Lambda(lambda x: transform_image(x)),
          transforms.Resize(256),
          transforms.CenterCrop(224),
          transforms.ToTensor(),
          transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ])
      self.image_urls = image_urls
      self.folder_url = folder_url
  # Function to extract features from a list of image URLs
  def extract_features_from_urls(self, batch_size=32):
      features = []
      progress_bar = tqdm(range(int(len(self.image_urls)/batch_size)))
      for i in range(0, len(self.image_urls), batch_size):
          batch_urls = self.image_urls[i:i + batch_size]
          batch_images = []
          for url in batch_urls:
              # Download image from URL
              img = Image.open(self.folder_url+ url)
              # Preprocess the image
              img_tensor = self.preprocess(img)
              batch_images.append(img_tensor)
          # Stack images into a batch tensor
          batch_images = torch.stack(batch_images)
          # Forward pass through the model
          with torch.no_grad():
              batch_features = self.model(batch_images.to(self.device))
          # Append batch features to the list
          features.append(batch_features.squeeze())
          progress_bar.update(1)
      features_tensor = torch.cat(features, dim=0)
      return features_tensor.to(self.device)

In [None]:
class FusionModel():
    def __init__(self, sentences_list, image_urls_train, image_folder_train):
        super(FusionModel, self).__init__()
        self.bertModel = BertFeatureExtractor(sentences_list)
        self.resnet50 = ResnetFeatureExtractor(image_urls_train, image_folder_train)

    def generate_embeddings(self):
        self.bertModel.encoder()
        return self.bertModel.generate_embeddings()

    def generate_image_features(self):
        return self.resnet50.extract_features_from_urls(32)

    def get_features(self):
        sentence_features = self.generate_embeddings()
        image_features = self.generate_image_features()
        combined_tensors = []
        for sentence, image in zip(sentence_features, image_features):
            # Concatenate word_embedding and feature
            combined_tensor = torch.cat((sentence, image), dim=0)
            combined_tensors.append(combined_tensor)
        # Convert list of lists to list of tensors
        combined_tensors = [torch.tensor(sublist) for sublist in combined_tensors]
        # Convert list of tensors to tensor of tensors
        combined_tensors = torch.stack(combined_tensors)
        combined_tensors = combined_tensors.float()
        return combined_tensors, sentence_features, image_features

In [None]:
sentences_list_train = df_train['clean_title'].tolist()
image_urls_train = df_train['image_main'].tolist()
image_folder_train = '/content/drive/MyDrive/Colab Notebooks/fakeddit_train_new/'

In [None]:
# feature extraction for training data
fusion_model_train = FusionModel(sentences_list_train, image_urls_train, image_folder_train)
fused_features_train, text_features_train, image_features_train = fusion_model_train.get_features()

tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/440M [00:00<?, ?B/s]

Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /root/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth
100%|██████████| 97.8M/97.8M [00:00<00:00, 149MB/s]


Finished generating embeddings


  0%|          | 0/105 [00:00<?, ?it/s]

In [None]:
print(fused_features_train.dtype, len(fused_features_train), len(fused_features_train[0]))

In [None]:
# Save the fused train tensor to a file
file_path_fused_train = '/content/drive/MyDrive/Colab Notebooks/fused_features_train.pt'
torch.save(fused_features_train, file_path_fused_train)

# Save the text train tensor to a file
file_path_text_train = '/content/drive/MyDrive/Colab Notebooks/text_features_train.pt'
torch.save(text_features_train, file_path_text_train)

# Save the image train tensor to a file
file_path_image_train = '/content/drive/MyDrive/Colab Notebooks/image_features_train.pt'
torch.save(image_features_train, file_path_image_train)

#fused_features_train = torch.load(file_path_fused_train)
#text_features_train = torch.load(file_path_text_train)
#image_features_train = torch.load(file_path_image_train)

In [None]:
sentences_list_test = df_test['clean_title'].tolist()
image_urls_test = df_test['image_main'].tolist()
image_folder_test = '/content/drive/MyDrive/Colab Notebooks/fakeddit_test_new/'

In [None]:
# feature extraction for test data
fusion_model_test = FusionModel(sentences_list_test, image_urls_test, image_folder_test)
fused_features_test, text_features_test, image_features_test = fusion_model_test.get_features()

Finished generating embeddings


  0%|          | 0/20 [00:00<?, ?it/s]

In [None]:
print(fused_features_test.dtype, len(fused_features_test), len(fused_features_test[0]))

torch.float32 665 2816


In [None]:
# Save the fused test tensor to a file
file_path_fused_test = '/content/drive/MyDrive/Colab Notebooks/fused_features_test.pt'
torch.save(fused_features_test, file_path_fused_test)

# Save the text test tensor to a file
file_path_text_test = '/content/drive/MyDrive/Colab Notebooks/text_features_test.pt'
torch.save(text_features_test, file_path_text_test)

# Save the image test tensor to a file
file_path_image_test = '/content/drive/MyDrive/Colab Notebooks/image_features_test.pt'
torch.save(image_features_test, file_path_image_test)

#fused_features_test = torch.load(file_path_fused_test)
#text_features_test = torch.load(file_path_text_test)
#image_features_test = torch.load(file_path_image_test)