In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
from tensorflow.keras.models import load_model
from tensorflow.keras import Input, Model

# --- Load CNN model ---
cnn_model_path = "/content/drive/MyDrive/fake-news-multimodal/models/fake_news_classification_CNN.h5"
cnn_model = load_model(cnn_model_path)

# --- Create CNN feature extractor ---
# Use input shape of CNN
cnn_input = Input(shape=(256, 256, 3))
x = cnn_input
for layer in cnn_model.layers[:-3]:  # skip Dropout and final Dense(1)
    x = layer(x)
cnn_feature_extractor = Model(inputs=cnn_input, outputs=x)





In [37]:
from transformers import AutoModelForSequenceClassification
from transformers import AutoTokenizer

# Pick one checkpoint, e.g., checkpoint-639
checkpoint_path = "/content/drive/MyDrive/fake-news-multimodal/models/xlm_roberta/checkpoint-639"

# Load the model and ensure it outputs hidden states
text_model = AutoModelForSequenceClassification.from_pretrained(checkpoint_path, output_hidden_states=True)
text_model.eval()
tokenizer = AutoTokenizer.from_pretrained(checkpoint_path)

In [4]:
# Imports from src
import os
import sys
sys.path.append('/content/drive/MyDrive/fake-news-multimodal/src')

In [6]:
import tensorflow as tf
def split_data_fusion(dataset, train_ratio=0.85):
    """
    Splits a tf.data.Dataset into train and test sets (85% / 15%).
    """
    # Convert TF dataset into list
    data_list = list(dataset)

    # Unpack into full arrays
    images = []
    labels = []

    for img_batch, label_batch in data_list:
        for i in range(len(img_batch)):
            images.append(img_batch[i])
            labels.append(label_batch[i])

    # Convert to tensors
    images = tf.stack(images)
    labels = tf.stack(labels)

    total = len(images)
    train_size = int(total * train_ratio)

    # Slice
    train_images = tf.data.Dataset.from_tensor_slices(
        (images[:train_size], labels[:train_size])
    )

    test_images = tf.data.Dataset.from_tensor_slices(
        (images[train_size:], labels[train_size:])
    )

    return train_images, test_images


# Load your raw image dataset
raw_images = load_dataset("/content/drive/MyDrive/fake-news-multimodal/data")  # returns tf.data.Dataset

# Normalize pixel values
normalized_images = normalize_data(raw_images)

# Split into train, val, test
train_images, test_images = split_data_fusion(normalized_images)


Found 4000 files belonging to 2 classes.


In [7]:
len(train_images)

3400

In [8]:
for x in train_images.take(1):
    print("Train element structure:", x)


Train element structure: (<tf.Tensor: shape=(256, 256, 3), dtype=float32, numpy=
array([[[0.77254903, 0.80784315, 0.8352941 ],
        [0.76862746, 0.8117647 , 0.8352941 ],
        [0.7607843 , 0.8117647 , 0.8352941 ],
        ...,
        [0.20428921, 0.2827206 , 0.09056372],
        [0.19252451, 0.27487746, 0.10625   ],
        [0.15330882, 0.23566176, 0.08272059]],

       [[0.77254903, 0.80784315, 0.8352941 ],
        [0.76862746, 0.8117647 , 0.8352941 ],
        [0.7607843 , 0.8117647 , 0.8352941 ],
        ...,
        [0.19718137, 0.26887256, 0.10085785],
        [0.17867647, 0.26102942, 0.09240196],
        [0.13664216, 0.22463235, 0.05539216]],

       [[0.77254903, 0.80784315, 0.8352941 ],
        [0.76862746, 0.8117647 , 0.8352941 ],
        [0.7607843 , 0.8117647 , 0.8352941 ],
        ...,
        [0.18615197, 0.25674018, 0.10404412],
        [0.16654412, 0.24889706, 0.08811274],
        [0.12156863, 0.21176471, 0.03921569]],

       ...,

       [[0.        , 0.14901961, 

In [41]:
from dataloader.dataloader import load_dataset
from dataloader.text_dataloader import load_text_data
from preprocessing.preprocessing_image import normalize_data, split_data, optimize_pipeline
from dataloader.fusion_dataloader import FusionDataset
from models.Late_fusion.fusion import FusionModel

# Load text data
train_text, test_text = load_text_data()

# Build fusion dataset (FIXED)
train_fusion = FusionDataset(train_images, train_text)
test_fusion  = FusionDataset(test_images,  test_text)

# DataLoaders
from torch.utils.data import DataLoader
train_loader = DataLoader(train_fusion, batch_size=4, shuffle=True)
test_loader  = DataLoader(test_fusion, batch_size=4)

Loaded training dataset with 3400 samples.
Loaded testing dataset with 600 samples.


In [12]:
print("Train images:", len(train_images))
print("Train text:  ", len(train_text))
print("Test images:", len(test_images))
print("Test text:  ", len(test_text))


Train images: 3400
Train text:   3400
Test images: 600
Test text:   600


In [42]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import tensorflow as tf
from tensorflow.keras.models import load_model
from tensorflow.keras import Input, Model
from transformers import XLMRobertaModel
import os
from tqdm.auto import tqdm # Import tqdm

class FusionModel(nn.Module):
    def __init__(self, cnn_feature_extractor_model, text_model, cnn_feature_dim, num_classes=2, device='cpu'):
        """
        cnn_feature_extractor_model: loaded Keras CNN feature extractor model
        text_model: loaded transformers AutoModelForSequenceClassification
        cnn_feature_dim: number of features from CNN before final dense
        """
        super(FusionModel, self).__init__()
        self.device = device

        # CNN feature extractor (already a feature extractor model)
        self.cnn_feature_extractor = cnn_feature_extractor_model

        # Freeze CNN if desired
        self.cnn_feature_extractor.trainable = False

        # HuggingFace XLM-R feature extractor
        self.text_model = text_model
        for param in self.text_model.parameters():
            param.requires_grad = False  # freeze text model

        # Fusion classifier
        self.fc = nn.Sequential(
            nn.Linear(cnn_feature_dim + 768, 256),  # CNN + XLM-R CLS
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(256, num_classes)
        )

    def forward(self, images, input_ids, attention_mask):
        # CNN features
        # Permute images to Keras/TensorFlow format (batch, height, width, channels)
        images = images.permute(0, 2, 3, 1) # from (batch, C, H, W) to (batch, H, W, C)
        cnn_features = self.cnn_feature_extractor(images)  # tf.Tensor
        if isinstance(cnn_features, tf.Tensor):
            cnn_features = torch.tensor(cnn_features.numpy(), dtype=torch.float32, device=self.device)

        # XLM-R features
        outputs = self.text_model(input_ids=input_ids, attention_mask=attention_mask)
        if hasattr(outputs, "logits"):  # sequence classifier outputs
            # For feature extraction, take hidden states
            text_features = outputs.hidden_states[-1][:,0,:]  # CLS token from last layer
        else:  # base model
            text_features = outputs.last_hidden_state[:,0,:]

        text_features = text_features.to(self.device)

        # Concatenate
        fused = torch.cat((cnn_features, text_features), dim=1)
        return self.fc(fused)



# --- Device ---
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# --- Load text model (already loaded as `text_model`) ---
# The actual cnn_feature_dim is 115200 based on the error message (115968 - 768 for text features)
fusion_model = FusionModel(cnn_feature_extractor, text_model, cnn_feature_dim=115200, num_classes=2, device=device)
fusion_model.to(device)

# --- Loss and optimizer ---
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(fusion_model.fc.parameters(), lr=1e-4)  # only train fusion layers

# --- Training loop ---
epochs = 5
for epoch in range(epochs):
    fusion_model.train()
    total_loss = 0
    # Wrap train_loader with tqdm for a progress bar
    for images, input_ids, attention_mask, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}"):
        images = images.to(device)
        input_ids = input_ids.to(device)
        attention_mask = attention_mask.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        outputs = fusion_model(images, input_ids, attention_mask)
        loss = criterion(outputs, labels);
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(train_loader):.4f}")

# --- Evaluation ---
fusion_model.eval()
correct, total = 0, 0
with torch.no_grad():
    for images, input_ids, attention_mask, labels in test_loader:
        images = images.to(device)
        input_ids = input_ids.to(device)
        attention_mask = attention_mask.to(device)
        labels = labels.to(device)

        outputs = fusion_model(images, input_ids, attention_mask)
        predicted = torch.argmax(outputs, dim=1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f"Test Accuracy: {correct/total:.4f}")

# --- Save the trained model ---
save_path = "/content/drive/MyDrive/fake-news-multimodal/models/fusion"
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(save_path), exist_ok=True)
torch.save(fusion_model.state_dict(), save_path)
print(f"Fusion model saved to {save_path}")

Epoch 1/5:   0%|          | 0/850 [00:00<?, ?it/s]

Epoch 1/5, Loss: 0.4747


Epoch 2/5:   0%|          | 0/850 [00:00<?, ?it/s]

Epoch 2/5, Loss: 0.1546


Epoch 3/5:   0%|          | 0/850 [00:00<?, ?it/s]

Epoch 3/5, Loss: 0.1841


Epoch 4/5:   0%|          | 0/850 [00:00<?, ?it/s]

Epoch 4/5, Loss: 0.1685


Epoch 5/5:   0%|          | 0/850 [00:00<?, ?it/s]

Epoch 5/5, Loss: 0.1016
Test Accuracy: 0.9817


RuntimeError: File /content/drive/MyDrive/fake-news-multimodal/models/fusion cannot be opened.