In [None]:
image_folder = 'untitled folder/gossipcop_images'

!pip install Pillow
!pip install torch
!pip install torchvision

from PIL import Image
import os
import json
from torch.utils.data import Dataset, DataLoader  # Import DataLoader
import torchvision.transforms as transforms



In [None]:
class CustomDataset(Dataset):
    def __init__(self, root_dir, image_folder, transform=None):
        self.root_dir = root_dir
        self.image_folder = image_folder
        self.transform = transform
        self.data = self.load_data()

    def load_data(self):
        data = []

        for folder_name in os.listdir(self.root_dir)[:12000]:
            folder_path = os.path.join(self.root_dir, folder_name)
            json_path = os.path.join(folder_path, 'tweets.json')

            if os.path.exists(json_path):
                try:
                    with open(json_path, 'r') as f:
                        tweets_data_list = json.load(f)

                    tweet_number = folder_name.split('-')[-1]
                    image_name = f'{tweet_number}.jpg'
                    image_path = os.path.join(self.image_folder, image_name)

                    if os.path.exists(image_path) and self.is_valid_image(image_path):
                        data.append({'text': tweets_data_list, 'image_path': image_path})

                except json.JSONDecodeError as json_error:
                    print(f"Error decoding JSON in file {json_path}: {json_error}")
            else:
                print(f"JSON file not found: {json_path}")

        return data

    def is_valid_image(self, file_path):
        try:

            img = Image.open(file_path)
            img.verify()
            return True
        except Exception as e:
            print(f"Invalid image file: {file_path}. Error: {e}")
            return False

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        text = self.data[idx]['text']
        image_path = self.data[idx]['image_path']

        try:

            image = Image.open(image_path).convert('RGB')

            if self.transform:
                image = self.transform(image)

            return {'text': text, 'image': image}

        except OSError as e:
            print(f"Error opening image file {image_path}: {e}")

            return None

In [None]:
root_folder = 'gossipcop_real'
image_folder = 'gossipcop_images'
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

dataset = CustomDataset(root_folder, image_folder, transform=transform)
dataloader = DataLoader(dataset, batch_size=1, shuffle=True)

In [None]:

root_folder = 'gossipcop_fake'
image_folder = 'gossipcop_images'
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

dataset2 = CustomDataset(root_folder, image_folder, transform=transform)
dataloader2 = DataLoader(dataset, batch_size=1, shuffle=True)

In [None]:
import numpy as np

texts = []
images = []

for batch in dataset:
    text = batch['text']
    image = batch['image']


    if 'tweets' in text and text['tweets'] and len(text['tweets']) > 0:
        texts.append(text['tweets'][0]['text'])
    else:
        texts.append("")

    images.append(image)

texts_true = np.array(texts)
images_true = np.array(images)

In [None]:
texts2 = []
images2 = []
for batch2 in dataset2:
    text2 = batch2['text']
    image2 = batch2['image']


    if 'tweets' in text2 and text2['tweets'] and len(text2['tweets']) > 0:
        texts2.append(text2['tweets'][0]['text'])
    else:
        texts2.append("")
    images2.append(image2)

texts_fake = np.array(texts2)
images_fake = np.array(images2)

In [None]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical
from transformers import AutoTokenizer, TFRobertaModel

tokenizer = AutoTokenizer.from_pretrained('vinai/bertweet-base')
bert = TFRobertaModel.from_pretrained('vinai/bertweet-base')

In [None]:
max_len = 128

texts_true_truncate = tokenizer(
    text=texts_true.tolist(),
    add_special_tokens=True,
    max_length=max_len,
    truncation=True,
    padding=True,
    return_tensors='tf',
    return_token_type_ids=False,
    return_attention_mask=True,
    verbose=True
)

texts_fake_truncate = tokenizer(
    text=texts_fake.tolist(),
    add_special_tokens=True,
    max_length=max_len,
    truncation=True,
    padding=True,
    return_tensors='tf',
    return_token_type_ids=False,
    return_attention_mask=True,
    verbose=True
)

In [None]:
text_all_input_ids = []
text_all_attention_mask = []
text_all_input_ids = np.row_stack((texts_true_truncate['input_ids'],texts_fake_truncate['input_ids']))
text_all_attention_mask = np.row_stack((texts_true_truncate['attention_mask'],texts_fake_truncate['attention_mask']))
print(text_all_attention_mask.shape)

In [None]:
y_all = []
y_all.extend(np.zeros(len(texts_true)))
y_all.extend(np.ones(len(texts_fake)))
np.array(y_all)

In [None]:
img_all = []
img_all = np.row_stack((images_true,images_fake))

In [None]:
perm = np.random.permutation(len(text_all_input_ids))
text_all_input_ids = text_all_input_ids[perm]
text_all_attention_mask = text_all_attention_mask[perm]
y_all = np.array(y_all)[perm]

In [None]:
from sklearn.model_selection import train_test_split
train_input_ids, test_input_ids, train_attention_masks, test_attention_mask, y_train, y_test = train_test_split(text_all_input_ids,text_all_attention_mask,y_all, test_size=0.20, random_state=42)

In [None]:
train_input_ids, val_input_ids, train_attention_masks, val_attention_masks,  y_train, y_val = train_test_split(train_input_ids,train_attention_masks,y_train, test_size=0.20, random_state=42)

In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow as tf
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.initializers import TruncatedNormal
from tensorflow.keras.losses import CategoricalCrossentropy
from tensorflow.keras.metrics import CategoricalAccuracy
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.layers import Input, Dense

SEQ_LEN = 128
NUM_CLASSES= 1
COLOR_CHANNELS=images_fake.shape[1]
IMG_SIZE=images_fake.shape[2]


# BERT EMBEDDINGS
input_ids = Input(shape=(SEQ_LEN,), dtype=tf.int32, name="input_ids")
input_mask = Input(shape=(SEQ_LEN,), dtype=tf.int32, name="attention_mask")
input_bert = [input_ids,input_mask]
text_embedder = bert.roberta
input_text_embs = text_embedder(input_bert).last_hidden_state
input_text_embs = input_text_embs[:,0,:]
print(input_text_embs.shape)


probs = layers.Dense(NUM_CLASSES, activation="sigmoid")(input_text_embs)


model = keras.Model(inputs=input_bert, outputs=probs)

model.summary()



In [None]:
import tensorflow_addons as tfa
from keras.callbacks import ModelCheckpoint

max_epochs = 8
batch_size = 32
opt = tfa.optimizers.RectifiedAdam(learning_rate=3e-5)

loss = keras.losses.BinaryCrossentropy()
best_weights_file = "fake_news_detection_weights_bertweet.h5"
auc = keras.metrics.AUC(curve="ROC")
m_ckpt = ModelCheckpoint(best_weights_file, monitor='val_'+auc.name, mode='max', verbose=2,
                          save_weights_only=True, save_best_only=True)
model.compile(loss=loss, optimizer=opt, metrics=[auc, keras.metrics.BinaryAccuracy()])

In [None]:
# test the model (classification report)
from sklearn.metrics import classification_report
best_weights_file = "fake_news_detection_weights_bertweet.h5"
model.load_weights(best_weights_file)
opt = tfa.optimizers.RectifiedAdam(learning_rate=3e-5)
model.compile(loss=loss, optimizer=opt, metrics=[auc, keras.metrics.BinaryAccuracy()])

y_pred_probs = model.predict([test_input_ids,test_attention_mask])
y_pred= [1 if x >=0.5 else 0 for x in y_pred_probs]

report = classification_report(y_test, y_pred, digits=3)
print(report)