# Insatll all the required libraries

In [None]:
!pip install --upgrade pip
!pip install requests
!pip install io
!pip install torch
!pip install numpy
!pip install pandas
!pip install transformers
!pip install torchvision
!pip install tqdm

# Import all the necessary libraries

In [None]:
import re
import requests
from io import BytesIO
import torch
import pandas as pd
import numpy as np
from torch import nn
from torch.utils.data import Dataset
from torch.utils.data import DataLoader,random_split
import transformers
from transformers import AutoTokenizer, AutoModel
import torch.nn.functional as F
from transformers import ViTModel,ViTImageProcessor
from PIL import Image
import torchvision.transforms as transforms

In [None]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
device

# Class initialization of model architecture

In [None]:
class transformermodel(nn.Module):
    def __init__(self):
        super(transformermodel, self).__init__()
        self.base_model = ViTModel.from_pretrained('google/vit-base-patch16-224-in21k')
        self.base_model_1 = AutoModel.from_pretrained('vinai/bertweet-base')
        for param in self.base_model.parameters():
            param.requires_grad = False
        for param in self.base_model_1.parameters():
            param.requires_grad = False
        self.img_enc1 = nn.TransformerEncoderLayer(768, 8, activation='gelu', batch_first=True)
        self.txt_enc1 = nn.TransformerEncoderLayer(768, 8, activation='gelu', batch_first=True)
        self.txt_enc2 = nn.TransformerEncoderLayer(768, 12, activation='gelu', batch_first=True)
        self.img_enc2 = nn.TransformerEncoderLayer(768, 12, activation='gelu', batch_first=True)
        self.txt_enc3 = nn.TransformerEncoderLayer(768, 12, activation='gelu', batch_first=True)
        self.img_enc3 = nn.TransformerEncoderLayer(768, 12, activation='gelu', batch_first=True)
        self.img_dec1 = nn.TransformerDecoderLayer(768, 12, activation='gelu', batch_first=True)
        self.txt_dec1 = nn.TransformerDecoderLayer(768, 12, activation='gelu', batch_first=True)
        self.txt_dec2 = nn.TransformerDecoderLayer(768, 8, activation='gelu', batch_first=True)
        self.img_dec2 = nn.TransformerDecoderLayer(768, 8, activation='gelu', batch_first=True)
        self.img_dec3 = nn.TransformerDecoderLayer(768, 12, activation='gelu', batch_first=True)
        self.txt_dec3 = nn.TransformerDecoderLayer(768, 12, activation='gelu', batch_first=True)

        self.fc = nn.Linear(768, 768)
        self.fc1 = nn.Linear(768, 768)
        self.fc2 = nn.Linear(768, 768)
        self.fc3 = nn.Linear(768, 768)
        self.corr1 = nn.Linear(768, 256)
        self.corr1_1 = nn.Linear(256, 64)
        self.corr2 = nn.Linear(768, 256)
        self.corr2_1 = nn.Linear(256, 64)
        self.img = nn.Linear(768, 256)
        self.img_1 = nn.Linear(256, 64)
        self.txt = nn.Linear(768, 256)
        self.txt_1 = nn.Linear(256, 64)
        self.FC = nn.Linear(64 * 4 + 5, 128)
        self.FC1 = nn.Linear(128, 64)
        self.FC2 = nn.Linear(64, 1)

    def forward(self, txt, mask, img, followers,time, is_photo, is_gif, is_video):
        txt_output = self.base_model_1(txt, attention_mask=mask)
        img_output = self.base_model(img)
        img_output['last_hidden_state'] = (self.img_enc1(img_output['last_hidden_state']))
        txt_output['last_hidden_state'] = (self.txt_enc1(txt_output['last_hidden_state']))
        cross_attention_img = self.img_dec1(img_output['last_hidden_state'], txt_output['last_hidden_state'])
        cross_attention_txt = self.txt_dec1(txt_output['last_hidden_state'], img_output['last_hidden_state'])
        cross_attention_img = (self.img_enc2(cross_attention_img))
        cross_attention_txt = (self.txt_enc2(cross_attention_txt))
        cross_attention_img1 = ((self.img_dec2(cross_attention_img, txt_output['last_hidden_state'])))
        cross_attention_txt1 = ((self.txt_dec2(cross_attention_txt, img_output['last_hidden_state'])))
        cross_attention_img1 = (self.img_enc3(cross_attention_img1))
        cross_attention_txt1 = (self.txt_enc3(cross_attention_txt1))
        cross_attention_img2 = (self.img_dec3(cross_attention_img1, txt_output['last_hidden_state']))
        cross_attention_txt2 = (self.txt_dec3(cross_attention_txt1, img_output['last_hidden_state']))
        corr1 = F.gelu(self.fc(cross_attention_img2[:, 0, :]))
        corr2 = F.gelu(self.fc1(cross_attention_txt2[:, 0, :]))
        corr1 = F.gelu((self.corr1_1(F.elu((self.corr1(corr1))))))
        corr2 = F.gelu((self.corr2_1(F.elu((self.corr2(corr2))))))
        img_out = img_output['pooler_output']
        txt_out = txt_output['pooler_output']
        img_out = F.gelu((self.img_1(F.elu((self.img(img_out))))))
        txt_out = F.gelu((self.txt_1(F.elu((self.txt(txt_out))))))
        pooler_output = (torch.cat([corr1, corr2, img_out, txt_out, followers, is_photo, is_gif, is_video, time], dim=1))
        pooler_output = (F.gelu((self.FC1(F.leaky_relu((self.FC(pooler_output)))))))
        return torch.squeeze(self.FC2(pooler_output))

# Load the model
Replace '/path/to/your/models/model.pth' with the actual path for your model file.

In [None]:
model = torch.load('/kaggle/input/final-model/final.pth', map_location=device)

# Function to remove emojis or others special characters excluding punctuation marks from the tweet text
The Bertweet model tokenizer is not able to tokenize special characters so preprocess functions avoids those warnings

In [None]:
def preprocess(text):
    # Remove emojis
    emoji_pattern = re.compile("["
                               u"\U0001F600-\U0001F64F"  # emoticons
                               u"\U0001F300-\U0001F5FF"  # symbols & pictographs
                               u"\U0001F680-\U0001F6FF"  # transport & map symbols
                               u"\U0001F700-\U0001F77F"  # alchemical symbols
                               u"\U0001F780-\U0001F7FF"  # Geometric Shapes Extended
                               u"\U0001F800-\U0001F8FF"  # Supplemental Arrows-C
                               u"\U0001F900-\U0001F9FF"  # Supplemental Symbols and Pictographs
                               u"\U0001FA00-\U0001FA6F"  # Chess Symbols
                               u"\U0001FA70-\U0001FAFF"  # Symbols and Pictographs Extended-A
                               u"\U00002702-\U000027B0"  # Dingbats
                               u"\U000024C2-"  # Exclude '#' from removal
                               u"\U0001F251"  # Exclude '#' from removal
                               "]+", flags=re.UNICODE)
    text_no_emojis = emoji_pattern.sub(r'', text)

    # Remove special characters, keeping digits, '#' and basic punctuation marks
    text_no_special_chars = re.sub(r'[^\w\s\d#.,!?;:()\'"]', '', text_no_emojis)

    # Split the text into words
    words = text_no_special_chars.split()

    # Concatenate words into a single string with space-separated words
    concatenated_string = ' '.join(words)

    return concatenated_string


# Dataloader class

In [None]:
class CustomDataset(Dataset):
    def __init__(self, annotations_file):
        ds = pd.read_excel(annotations_file)
        self.dates = pd.to_datetime(ds['date'])
        curr = pd.Timestamp('2023-12-05 12:00:00')
        self.pattern =  r'https[^\' ]*\''
        self.dates = ((curr - self.dates).dt.total_seconds() / (24 * 3600))
        self.texts = [preprocess(text) for text in ds['content']]
        self.followers = torch.tensor(ds['followers'], dtype=torch.float)
        self.media = ds['media']
        self.id = ds['id']
        self.is_photo = torch.tensor(ds['media'].apply(lambda x: 1 if len(x) >= 2 and x[1] == 'P' else 0), dtype=torch.float)
        self.is_video = torch.tensor(ds['media'].apply(lambda x: 1 if len(x) >= 2 and x[1] == 'V' else 0), dtype=torch.float)
        self.is_gif = torch.tensor(ds['media'].apply(lambda x: 1 if len(x) >= 2 and x[1] == 'G' else 0), dtype=torch.float)
        self.img_processor = ViTImageProcessor.from_pretrained('google/vit-base-patch16-224')
        self.txt_tokenizer = AutoTokenizer.from_pretrained('vinai/bertweet-base')

        # Tokenize all texts during initialization
        self.tokenized_texts = self.txt_tokenizer(self.texts, return_tensors="pt", padding="max_length", truncation=True)

    def __len__(self):
        return len(self.dates)

    def __getitem__(self, index):
        followers = self.followers[index]
        is_photo = self.is_photo[index]
        is_video = self.is_video[index]
        is_gif = self.is_gif[index]
        url = re.search(self.pattern, self.media[index])
        try:
            response = requests.get(url[0][0:-1])
            if response.status_code == 200:
                image = Image.open(BytesIO(response.content))
                if image.mode == 'RGBA':
                    image = image.convert('RGB')
                img_tensor = torch.tensor(self.img_processor(image)['pixel_values'][0], dtype=torch.float)
            else:
                img_tensor = torch.zeros(3, 224, 224)
        except Exception as e:
            img_tensor = torch.zeros(3, 224, 224)
        time = torch.tensor(self.dates[index], dtype=torch.float)

        # Access input_ids and attention_mask from tokenized_texts
        input_ids = self.tokenized_texts['input_ids'][index]
        attention_mask = self.tokenized_texts['attention_mask'][index]

        sample = {
            "Text": {
                "input_ids": input_ids,
                "attention_mask": attention_mask,
            },
            "followers": torch.unsqueeze(followers,0),
            "img": img_tensor,
            "time": torch.unsqueeze(time,0),
            "is_photo" : torch.unsqueeze(is_photo,0),
            "is_video" : torch.unsqueeze(is_video,0),
            "is_gif" : torch.unsqueeze(is_gif,0),
            "id" : self.id[index],
        }

        return sample

# Creating an instance of the dataset
Replace '/kaggle/input/dataset/PS_8_dataset_with_followers.xlsx' with the actual path for your dataset excel file.

In [None]:
ds = CustomDataset('/kaggle/input/test-time/behaviuor_simulation_test_time_followers.xlsx')  # this is a torch dataset instance
df = pd.read_excel('/kaggle/input/test-time/behaviuor_simulation_test_time_followers.xlsx')  # this is a pandas dataframe

In [None]:
# initializing the dataloader
dataloader = DataLoader(ds, batch_size=1, shuffle=False,num_workers=4)

# Predicting likes for the tweets

In [None]:
outputs = []  # all the outputs will be stored in this list
model.eval()
c = 0
with torch.no_grad():
    for data in dataloader:
        c+=1
        try:
            text,mask,imgs,followers,time,is_photo,is_gif,is_video = data['Text']['input_ids'],data['Text']['attention_mask'],data['img'],data['followers'],data['time'],data['is_photo'],data['is_gif'],data['is_video']
            text,mask,imgs,followers,time,is_photo,is_gif,is_video = text.to(device),mask.to(device),imgs.to(device),followers.to(device),time.to(device),is_photo.to(device),is_gif.to(device),is_video.to(device)

            output = model(text,mask,imgs,followers,time,is_photo,is_gif,is_video)
            outputs.append(output.item())
        except Exception as e:
            print(data['id'])
            print(e)
            break
        if c%500 == 499:
            print(c)

In [None]:
for idx in range(len(outputs)):
    outputs[idx] = int(outputs[idx])

# Adding a new column 'predicted_likes' in the dataframe

In [None]:
df['predicted_likes'] = outputs

# Saving the dataframe in the form of excel file

In [None]:
df.to_excel('behaviour_simulation_test_output.xlsx')