In [None]:
import torch
import torch.nn as nn
from transformers import RobertaModel, RobertaTokenizer
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import pandas as pd
from imblearn.under_sampling import RandomUnderSampler
from collections import Counter
from torch.optim.lr_scheduler import ExponentialLR
from sklearn.model_selection import train_test_split

import re
from bs4 import BeautifulSoup
from nltk.tokenize import WordPunctTokenizer

In [None]:
# Check for GPU availability
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

cpu


In [None]:
class FakeBERT(nn.Module):
    def __init__(
        self,
        device,
        roberta_model_path='roberta-base',
        num_classes=1,
        inductor=True
        ):
        super(FakeBERT, self).__init__()

        # Load pre-trained RoBERTa model
        self.roberta = RobertaModel.from_pretrained(roberta_model_path).to(device=device)
        if (inductor):
          self.roberta = torch.compile(self.roberta, backend="inductor")
        self.tokenizer = RobertaTokenizer.from_pretrained(roberta_model_path)

        # CNN
        self.conv1d_p1 = nn.Conv1d(in_channels=768, out_channels=128, kernel_size=5).to(device=device)
        self.conv1d_p2 = nn.Conv1d(in_channels=768, out_channels=128, kernel_size=4).to(device=device)
        self.conv1d_p3 = nn.Conv1d(in_channels=768, out_channels=128, kernel_size=3).to(device=device)
        self.conv1d_s1 = nn.Conv1d(in_channels=128, out_channels=128, kernel_size=5).to(device=device)
        self.conv1d_s2 = nn.Conv1d(in_channels=128, out_channels=128, kernel_size=5).to(device=device)

        # Pooling
        self.max_pool_p1 = nn.MaxPool1d(kernel_size=5).to(device=device)
        self.max_pool_p2 = nn.MaxPool1d(kernel_size=5).to(device=device)
        self.max_pool_p3 = nn.MaxPool1d(kernel_size=5).to(device=device)
        self.max_pool_s1 = nn.MaxPool1d(kernel_size=5).to(device=device)
        self.max_pool_s2 = nn.MaxPool1d(kernel_size=10).to(device=device)

        # Fully connected layers
        self.linear1 = nn.Linear(640, 128).to(device=device)
        self.linear2 = nn.Linear(128, num_classes).to(device=device)
        self.sigmoid = nn.Sigmoid().to(device=device)

    def forward(self, x):
        # Tokenize and encode the sentences
        tokenized_sentences = self.tokenizer(x, truncation=True, padding='max_length', return_tensors='pt').to(device=device)
        # print('tokenized_sentences', tokenized_sentences.shape)

        # Forward pass to get embeddings
        with torch.no_grad():
            # Get RoBERTa embeddings
            model_output = self.roberta(**tokenized_sentences)

        # Extract embeddings from the output
        embeddings = model_output.last_hidden_state
        # print('embeddings', embeddings.shape)

        output_p1 = self.max_pool_p1(F.relu(self.conv1d_p1(embeddings.permute(0, 2, 1))))
        output_p2 = self.max_pool_p2(F.relu(self.conv1d_p2(embeddings.permute(0, 2, 1))))
        output_p3 = self.max_pool_p3(F.relu(self.conv1d_p3(embeddings.permute(0, 2, 1))))
        output_s = torch.cat((output_p1, output_p2, output_p3), dim=2)
        output_s1 = F.relu(self.conv1d_s1(output_s))
        output_s1 = self.max_pool_s1(output_s1)
        output_s2 = F.relu(self.conv1d_s2(output_s1))
        output_s2 = self.max_pool_s2(output_s2)
        output_s2 = output_s2.permute(0, 2, 1)
        output_f = output_s2.reshape(output_s2.size(0), -1)
        output_l1 = torch.relu(self.linear1(output_f))
        output_l2 = self.linear2(output_l1)
        output = self.sigmoid(output_l2)

        return output, output_l1

In [None]:
# Initialize the model
fakebert = FakeBERT(
    device,
    inductor=False
)

sentences = ["I love this product!"]

outputs, before_sigmoid = fakebert(sentences)

print(outputs.shape)
print(before_sigmoid.shape)

config.json:   0%|          | 0.00/481 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/499M [00:00<?, ?B/s]

Some weights of RobertaModel were not initialized from the model checkpoint at roberta-base and are newly initialized: ['roberta.pooler.dense.weight', 'roberta.pooler.dense.bias']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

torch.Size([1, 1])
torch.Size([1, 128])


In [None]:
torch.save(fakebert.state_dict(), '/content/fakebert-sentiment.pth')
torch.save(fakebert.state_dict(), '/content/fakebert-news.pth')

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable

class TFN(nn.Module):
    def __init__(
        self,
        device,
        inductor=True,
        fakebert_out=128,
        post_fusion_dim=256,
        post_fusion_dropout=0
        ):
        super(TFN, self).__init__()

        self.fakebert_out = fakebert_out
        self.post_fusion_dim = post_fusion_dim
        self.post_fusion_dropout = nn.Dropout(p=post_fusion_dropout)

        # Load pre-trained FakeBERT models
        self.fakebert_sentiment = FakeBERT(device=device, inductor=inductor)
        self.fakebert_sentiment.load_state_dict(torch.load('/content/fakebert-sentiment.pth'))

        self.fakebert_news = FakeBERT(device=device, inductor=inductor)
        self.fakebert_news.load_state_dict(torch.load('/content/fakebert-news.pth'))

        # define the post_fusion layers
        self.post_fusion_layer_1 = nn.Linear((self.fakebert_out) * (self.fakebert_out), self.post_fusion_dim)
        self.post_fusion_layer_2 = nn.Linear(self.post_fusion_dim, self.post_fusion_dim)

    def forward(self, x):
        sentiment, sentiment_h = self.fakebert_sentiment(x)
        print('sentiment_h', sentiment_h.shape)
        fakenews, fakenews_h = self.fakebert_news(x)
        print('fakenews_h', fakenews_h.shape)

        # sentiment_h has shape (batch_size, self.fakebert_out), _video_h has shape (batch_size, self.fakebert_out)
        # we want to perform outer product between the two batch, hence we unsqueenze them to get
        # (batch_size, self.fakebert_out, 1) X (batch_size, 1, self.fakebert_out)
        # fusion_tensor will have shape (batch_size, self.fakebert_out, self.fakebert_out)
        fusion_tensor = torch.bmm(sentiment_h.unsqueeze(2), fakenews_h.unsqueeze(1))
        print('fusion_tensor', fusion_tensor.shape)

        batch_size = sentiment_h.shape[0]
        fusion_tensor = fusion_tensor.view(batch_size, -1)
        print('fusion_tensor', fusion_tensor.shape)

        post_fusion_dropped = self.post_fusion_dropout(fusion_tensor)
        print('post_fusion_dropped', post_fusion_dropped.shape)
        post_fusion_y_1 = F.relu(self.post_fusion_layer_1(post_fusion_dropped))
        print('post_fusion_y_1', post_fusion_y_1.shape)
        post_fusion_y_2 = F.relu(self.post_fusion_layer_2(post_fusion_y_1))
        print('post_fusion_y_2', post_fusion_y_2.shape)
        output = post_fusion_y_2

        return output


In [None]:
# Initialize the model
tfn = TFN(
    device,
    inductor=False
)

sentences = ["I love this product!"]

outputs = tfn(sentences)

Some weights of RobertaModel were not initialized from the model checkpoint at roberta-base and are newly initialized: ['roberta.pooler.dense.weight', 'roberta.pooler.dense.bias']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Some weights of RobertaModel were not initialized from the model checkpoint at roberta-base and are newly initialized: ['roberta.pooler.dense.weight', 'roberta.pooler.dense.bias']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


sentiment_h torch.Size([1, 128])
fakenews_h torch.Size([1, 128])
fusion_tensor torch.Size([1, 128, 128])
fusion_tensor torch.Size([1, 16384])
post_fusion_dropped torch.Size([1, 16384])
post_fusion_y_1 torch.Size([1, 256])
post_fusion_y_2 torch.Size([1, 256])


#### Errors

In [None]:
# import torch
# import torch.nn as nn
# import torch.nn.functional as F
# from torch.autograd import Variable

# class TFN(nn.Module):
#     def __init__(self, device, fakebert_out, post_fusion_dim, post_fusion_dropout):
#         super(TFN, self).__init__()

#         self.fakebert_out = fakebert_out
#         self.post_fusion_dim = post_fusion_dim
#         self.post_fusion_dropout = nn.Dropout(p=post_fusion_dropout)

#         # Load pre-trained FakeBERT models
#         self.fakebert_sentiment = FakeBERT(device=device, inductor=True)
#         self.fakebert_sentiment.load_state_dict(torch.load('sentiment-path/fakebert-sentiment.pth'))

#         self.fakebert_news = FakeBERT(device=device, inductor=True)
#         self.fakebert_news.load_state_dict(torch.load('news-path/fakebert-news.pth'))

#         # define the post_fusion layers
#         self.post_fusion_layer_1 = nn.Linear((self.fakebert_out) * 2, self.post_fusion_dim)
#         self.post_fusion_layer_2 = nn.Linear(self.post_fusion_dim, self.post_fusion_dim)
#         self.post_fusion_layer_3 = nn.Linear(self.post_fusion_dim, 1)

#         # in TFN we are doing a regression with constrained output range: (-3, 3), hence we'll apply sigmoid to output
#         # shrink it to (0, 1), and scale\shift it back to range (-3, 3)
#         self.output_range = Parameter(torch.FloatTensor([6]), requires_grad=False)
#         self.output_shift = Parameter(torch.FloatTensor([-3]), requires_grad=False)


#     def forward(self, x):
#         sentiment, sentiment_h = self.fakebert_sentiment(x)
#         fakenews, fakenews_h = self.fakebert_news(x)

#         # sentiment_h has shape (batch_size, self.fakebert_out), _video_h has shape (batch_size, self.fakebert_out)
#         # we want to perform outer product between the two batch, hence we unsqueenze them to get
#         # (batch_size, self.fakebert_out, 1) X (batch_size, 1, self.fakebert_out)
#         # fusion_tensor will have shape (batch_size, self.fakebert_out, self.fakebert_out)
#         fusion_tensor = torch.bmm(sentiment_h.unsqueeze(2), fakenews_h.unsqueeze(1))

#         batch_size = sentiment_h.shape[0]
#         fusion_tensor = fusion_tensor.view(batch_size, -1)

#         # Perform tensor fusion on the outputs of the two models
#         fusion_tensor = self.perform_tensor_fusion(sentiment_output, news_output)

#         post_fusion_dropped = self.post_fusion_dropout(fusion_tensor)
#         post_fusion_y_1 = F.relu(self.post_fusion_layer_1(post_fusion_dropped))
#         post_fusion_y_2 = F.relu(self.post_fusion_layer_2(post_fusion_y_1))
#         post_fusion_y_3 = F.sigmoid(self.post_fusion_layer_3(post_fusion_y_2))
#         output = post_fusion_y_3 * self.output_range + self.output_shift

#         return output
