<a href="https://colab.research.google.com/github/Jatin020408/Multi-Modal-Fake-News-Detector/blob/main/FakeNews.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install kagglehub


In [None]:
import kagglehub

# Download latest version of Fakeddit dataset
path = kagglehub.dataset_download("vanshikavmittal/fakeddit-dataset")

# Print the path to the dataset files
print("Path to dataset files:", path)


In [None]:
import pandas as pd

# Load the dataset from the downloaded path
train_df = pd.read_csv(f'{path}/multimodal_only_samples/multimodal_train.tsv',delimiter='\t')
val_df = pd.read_csv(f'{path}/multimodal_only_samples/multimodal_validate.tsv',delimiter='\t')
test_df = pd.read_csv(f'{path}/multimodal_only_samples/multimodal_test_public.tsv',delimiter='\t')

# Display the first few rows of the training data
train_df.head()


In [None]:
from transformers import BertTokenizer

# Load BERT tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# Max token length for BERT input
MAX_LEN = 128

# Function to tokenize titles
def tokenize_titles(df):
    return tokenizer(
        list(df['clean_title'].values),
        truncation=True,
        padding='max_length',
        max_length=MAX_LEN,
        return_tensors='tf'
    )

# Tokenize all three datasets
train_encodings = tokenize_titles(train_df)
val_encodings = tokenize_titles(val_df)
test_encodings = tokenize_titles(test_df)

# Check tokenized shapes
print("Train Input IDs Shape:", train_encodings['input_ids'].shape)
print("Validation Input IDs Shape:", val_encodings['input_ids'].shape)


In [None]:
import nest_asyncio
import asyncio
import aiohttp
from PIL import Image
import numpy as np
from io import BytesIO
import tensorflow as tf
from tqdm.notebook import tqdm

nest_asyncio.apply()
async def fetch_image(session, url):
    try:
        async with session.get(url, timeout=10) as response:
            if response.status == 200:
                img_data = await response.read()
                img = Image.open(BytesIO(img_data)).convert('RGB')
                img = img.resize((224, 224))
                img = tf.keras.utils.img_to_array(img)
                img = tf.keras.applications.efficientnet.preprocess_input(img)

                if img.shape != (224, 224, 3):
                    return None
                return img
    except:
        return None

async def process_images_async(urls):
    images = []
    async with aiohttp.ClientSession() as session:
        for url in tqdm(urls):
            img = await fetch_image(session, url)
            if img is not None:
                images.append(img)
            else:
                images.append(np.zeros((224, 224, 3)))  # fallback for broken/bad images
    return np.array(images, dtype=np.float32)


In [None]:

train_urls = train_df['image_url'].iloc[:1000].tolist()
val_urls = val_df['image_url'].iloc[:500].tolist()
test_urls = test_df['image_url'].iloc[:500].tolist()

train_images = asyncio.get_event_loop().run_until_complete(process_images_async(train_urls))
val_images = asyncio.get_event_loop().run_until_complete(process_images_async(val_urls))
test_images = asyncio.get_event_loop().run_until_complete(process_images_async(test_urls))

train_images = tf.convert_to_tensor(train_images, dtype=tf.float32)
val_images = tf.convert_to_tensor(val_images, dtype=tf.float32)
test_images = tf.convert_to_tensor(test_images, dtype=tf.float32)

print("All images processed")
print("Train image shape:", train_images.shape)


In [None]:
# Save for reuse
np.save('/content/train_images.npy', train_images)
np.save('/content/val_images.npy', val_images)
np.save('/content/test_images.npy', test_images)


In [None]:
train_images = np.load('/content/train_images.npy')
val_images = np.load('/content/val_images.npy')
test_images = np.load('/content/test_images.npy')

In [None]:
import tensorflow as tf
from tensorflow.keras import layers

class CrossAttention(tf.keras.layers.Layer):
    def __init__(self, embed_dim, num_heads=4):
        super(CrossAttention, self).__init__()
        self.mha = tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.layernorm = tf.keras.layers.LayerNormalization()
        self.ffn = tf.keras.Sequential([
            layers.Dense(embed_dim, activation='relu'),
            layers.Dense(embed_dim)
        ])

    def call(self, query, key_value):
        # Attention + residual
        attn_output = self.mha(query=query, key=key_value, value=key_value)
        out1 = self.layernorm(query + attn_output)

        # Feedforward + residual
        ffn_output = self.ffn(out1)
        out2 = self.layernorm(out1 + ffn_output)

        return out2


In [None]:
# Expand dims to [batch, 1, dim] so attention sees it as sequence
text_embedding_exp = tf.expand_dims(text_embedding, axis=1)     # [batch, 1, 768]
effnet_output_exp = tf.expand_dims(effnet_output, axis=1)       # [batch, 1, 1280]

# Project EfficientNet features to match BERT dimension (768)
image_projected = layers.Dense(768)(effnet_output_exp)          # [batch, 1, 768]

# Apply cross-attention
cross_attn_layer = CrossAttention(embed_dim=768)
attended_text = cross_attn_layer(text_embedding_exp, image_projected)

# Flatten back to [batch, 768]
attended_text_flat = layers.Flatten()(attended_text)


In [None]:
from tensorflow.keras import layers, models
from transformers import TFBertModel

# Load BERT
bert_model = TFBertModel.from_pretrained('bert-base-uncased')



# Inputs
text_input = layers.Input(shape=(MAX_LEN,), dtype=tf.int32, name="input_ids")
attention_mask = layers.Input(shape=(MAX_LEN,), dtype=tf.int32, name="attention_mask")

# Lambda wrapper for BERT CLS token
def get_bert_cls_embedding(inputs):
    input_ids, attention_mask = inputs
    outputs = bert_model(input_ids=input_ids, attention_mask=attention_mask)
    return outputs.last_hidden_state[:, 0, :]  # [CLS] token

# Explicit shape defined here
text_embedding = layers.Lambda(
    get_bert_cls_embedding,
    output_shape=(768,)
)([text_input, attention_mask])

# EfficientNetB0 for image features
image_input = layers.Input(shape=(224, 224, 3), name="image_input")
effnet_model = tf.keras.applications.EfficientNetB0(include_top=False, weights='imagenet')
effnet_output = effnet_model(image_input)
effnet_output = layers.GlobalAveragePooling2D()(effnet_output)

# Combine features
combined = attended_text_flat  # Already attended to image info
x = layers.Dense(512, activation='relu')(combined)
x = layers.Dropout(0.5)(x)
output = layers.Dense(1, activation='sigmoid')(x)

# Define and compile model
model = models.Model(inputs=[text_input, attention_mask, image_input], outputs=output)
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

model.summary()


In [None]:
# Trim everything to match train_images
train_labels = train_df['2_way_label'].iloc[:1000].values
val_labels = val_df['2_way_label'].iloc[:500].values

train_input_ids = train_encodings['input_ids'][:1000]
train_attention = train_encodings['attention_mask'][:1000]

val_input_ids = val_encodings['input_ids'][:500]
val_attention = val_encodings['attention_mask'][:500]


In [None]:
from tensorflow.keras.callbacks import EarlyStopping

early_stop = EarlyStopping(
    monitor='val_loss',
    patience=2,             # stop if val_loss doesn't improve for 2 epochs
    restore_best_weights=True
)


In [None]:
from tensorflow.keras.callbacks import ReduceLROnPlateau

reduce_lr = ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.5,
    patience=2,
    min_lr=1e-6,
    verbose=1
)


In [None]:
from tensorflow.keras.callbacks import ModelCheckpoint

checkpoint_cb = ModelCheckpoint(
    filepath='best_model.h5',
    monitor='val_accuracy',
    save_best_only=True,
    save_weights_only=False,
    verbose=1
)


In [None]:
callbacks = [early_stop, reduce_lr, checkpoint_cb]
train_dataset = tf.data.Dataset.from_tensor_slices((
    {
        'input_ids': train_input_ids,
        'attention_mask': train_attention,
        'image_input': train_images
    },
    train_labels
)).shuffle(1000).batch(16)

val_dataset = tf.data.Dataset.from_tensor_slices((
    {
        'input_ids': val_input_ids,
        'attention_mask': val_attention,
        'image_input': val_images
    },
    val_labels
)).batch(16)

In [None]:


history = model.fit(
    train_dataset,
    validation_data=val_dataset,
    epochs=5,
    callbacks=callbacks
)


In [None]:
model.save("final_model.h5")


In [None]:
# Limit text to match image count
val_input_ids = val_encodings['input_ids'][:500]
val_attention = val_encodings['attention_mask'][:500]
val_labels = val_df['2_way_label'].iloc[:500].values


In [None]:
val_loss, val_accuracy = model.evaluate(
    [val_input_ids, val_attention, val_images],
    val_labels
)
print(f" Validation Accuracy: {val_accuracy * 100:.2f}%")


In [None]:
print(f" Validation Loss: {val_loss * 100:.2f}%")

In [None]:
import matplotlib.pyplot as plt

# Accuracy
plt.plot(history.history['accuracy'], label='Train Accuracy')
plt.plot(history.history['val_accuracy'], label='Val Accuracy')
plt.title('Model Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.grid(True)
plt.show()

# Loss
plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Val Loss')
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)
plt.show()


In [None]:
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns

# Get predicted classes
val_preds = model.predict([
    val_encodings['input_ids'][:len(val_images)],
    val_encodings['attention_mask'][:len(val_images)],
    val_images
])
val_pred_labels = (val_preds.flatten() > 0.5).astype(int)

# Confusion Matrix
cm = confusion_matrix(val_labels, val_pred_labels)
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['Real', 'Fake'], yticklabels=['Real', 'Fake'])
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title('Confusion Matrix (Validation)')
plt.show()

# Report
print(classification_report(val_labels, val_pred_labels, target_names=['Real', 'Fake']))


In [None]:
import requests
from PIL import Image
from io import BytesIO
def load_and_preprocess_image_from_url(url):
    try:
        response = requests.get(url, timeout=5)
        response.raise_for_status()  # Raise error if not 200 OK

        img = Image.open(BytesIO(response.content)).convert('RGB')
        img = img.resize((224, 224))
        img_array = tf.keras.utils.img_to_array(img)
        img_array = tf.keras.applications.efficientnet.preprocess_input(img_array)
        return tf.convert_to_tensor([img_array], dtype=tf.float32)  # Batch dimension

    except Exception as e:
        print(f"Failed to load image: {url}")
        print(f"Error: {e}")
        return tf.zeros((1, 224, 224, 3))  # Fallback blank image


In [None]:

test_idx = 95
test_title = test_df['clean_title'].iloc[test_idx]
test_url = test_df['image_url'].iloc[test_idx]

test_input = tokenizer.encode_plus(
    test_title,
    truncation=True,
    padding='max_length',
    max_length=MAX_LEN,
    return_tensors='tf'
)

test_image = load_and_preprocess_image_from_url(test_url)

prediction = model.predict([test_input['input_ids'], test_input['attention_mask'], test_image])
label = int(prediction[0][0] > 0.5)

print("Title:", test_title)
print("Image URL:", test_url)
print("Prediction:", "Fake News " if label == 0 else "Real News ")