<a href="https://colab.research.google.com/github/RedBatProject/ufo-works/blob/main/Untitled107.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import re
import string
import zipfile
import requests
import numpy as np
import pandas as pd
import tensorflow as tf
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from tensorflow.keras.models import Model
from tensorflow.keras.layers import *
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.initializers import Constant
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
from torch.utils.data import Dataset, DataLoader, random_split
from transformers import DistilBertTokenizer, DistilBertModel, TFDistilBertModel
from tqdm import tqdm


In [None]:
class DataProcessor:
    def __init__(self):
        self.max_len = 1500
        self.embed_dim = 300
        self.tokenizer = None
        self.glove_embeddings = {}

    def setup_kaggle(self):
        !pip install -q kaggle
        from google.colab import files
        import os
        print("Upload your kaggle API key to download DATASET")
        files.upload()
        os.makedirs('~/.kaggle',exist_ok=True)
        !cp /content/drive/MyDrive/kaggle.json ~/.kaggle/
        !chmod 600 ~/.kaggle/kaggle.json
        !kaggle datasets download -d bhavikjikadara/fake-news-detection

        with zipfile.ZipFile('/content/fake-news-detection.zip', 'r') as zip_ref:
            zip_ref.extractall('data')

    def load_data(self):
        fake = pd.read_csv('data/fake.csv')
        true = pd.read_csv('data/true.csv')

        fake['label'] = 0
        true['label'] = 1

        df = pd.concat([fake, true], axis=0)
        df = df.sample(frac=1).reset_index(drop=True)
        return df[['text', 'label']]

    def clean_text(self, text):
        text = text.lower()
        text = re.sub(r'@\w+', '', text)  # Remove usernames
        text = re.sub(r'http\S+', '', text)  # Remove URLs
        text = re.sub(r'[^a-zA-Z\s]', '', text)  # Remove non-English chars and punctuation
        text = text.translate(str.maketrans('', '', string.punctuation))
        return text.strip()

    def preprocess_data(self, df):
        df['text'] = df['text'].apply(self.clean_text)
        texts = df['text'].values
        labels = df['label'].values

        train_text, temp_text, train_label, temp_label = train_test_split(
            texts, labels, test_size=0.3, random_state=42)
        val_text, test_text, val_label, test_label = train_test_split(
            temp_text, temp_label, test_size=0.5, random_state=42)

        return (train_text, train_label), (val_text, val_label), (test_text, test_label)

    def load_glove(self):
        !wget http://nlp.stanford.edu/data/glove.6B.zip
        with zipfile.ZipFile('/content/glove.6B.zip', 'r') as zip_ref:
            zip_ref.extractall('data')
        with open("/content/data/glove.6B.300d.txt",'rb') as f:
            for line in f:
                values = line.decode().split()
                word = values[0]
                coefs = np.asarray(values[1:], dtype='float32')
                self.glove_embeddings[word] = coefs

    def prepare_embeddings(self, train_text):
        self.tokenizer = Tokenizer()
        self.tokenizer.fit_on_texts(train_text)

        word_index = self.tokenizer.word_index
        vocab_size = len(word_index) + 1

        embedding_matrix = np.zeros((vocab_size, self.embed_dim))
        for word, i in word_index.items():
            embedding_vector = self.glove_embeddings.get(word)
            if embedding_vector is not None:
                embedding_matrix[i] = embedding_vector

        return vocab_size, embedding_matrix

class ModelBuilder:
    def __init__(self, vocab_size, embedding_matrix, max_len, embed_dim=300):
        self.vocab_size = vocab_size
        self.embedding_matrix = embedding_matrix
        self.max_len = max_len
        self.embed_dim = embed_dim

    def build_model_1(self):
        input_layer = Input(shape=(self.max_len,))
        x = Embedding(self.vocab_size, self.embed_dim,
                     embeddings_initializer=Constant(self.embedding_matrix),
                     input_length=self.max_len,
                     trainable=False)(input_layer)
        x = Conv1D(32, 2, activation='relu')(x)
        x = Dropout(0.6)(x)
        x = Bidirectional(LSTM(32))(x)
        x = Dense(32, activation='relu')(x)
        output = Dense(1, activation='sigmoid')(x)
        model = Model(inputs=input_layer, outputs=output)
        model.compile(loss='binary_crossentropy', optimizer=Adam(), metrics=['accuracy'])
        return model

    def build_model_2(self):

        input_layer = Input(shape=(self.max_len,))
        x = Embedding(self.vocab_size, self.embed_dim,
                     embeddings_initializer=Constant(self.embedding_matrix),
                     input_length=self.max_len,
                     trainable=False)(input_layer)
        x = Conv1D(32, 2, activation='relu')(x)
        x = Conv1D(32, 2, activation='relu')(x)
        x = Dropout(0.6)(x)
        x = Bidirectional(LSTM(32))(x)
        x = Dense(32, activation='relu')(x)
        output = Dense(1, activation='sigmoid')(x)
        model = Model(inputs=input_layer, outputs=output)
        model.compile(loss='binary_crossentropy', optimizer=Adam(), metrics=['accuracy'])
        return model

    def build_model_3(self):

        input_layer = Input(shape=(self.max_len,))
        x = Embedding(self.vocab_size, self.embed_dim,
                     embeddings_initializer=Constant(self.embedding_matrix),
                     input_length=self.max_len,
                     trainable=False)(input_layer)
        x = Conv1D(32, 2, activation='relu')(x)
        x = Conv1D(32, 2, activation='relu')(x)
        x = Conv1D(32, 2, activation='relu')(x)
        x = Dropout(0.6)(x)
        x = Bidirectional(LSTM(32))(x)
        x = Dense(32, activation='relu')(x)
        output = Dense(1, activation='sigmoid')(x)
        model = Model(inputs=input_layer, outputs=output)
        model.compile(loss='binary_crossentropy', optimizer=Adam(), metrics=['accuracy'])
        return model

    def build_model_4(self):

        input_layer = Input(shape=(self.max_len,))
        x = Embedding(self.vocab_size, self.embed_dim,
                     embeddings_initializer=Constant(self.embedding_matrix),
                     input_length=self.max_len,
                     trainable=False)(input_layer)
        x = Bidirectional(LSTM(32, return_sequences=True))(x)
        x = Bidirectional(LSTM(32, return_sequences=True))(x)
        x = Bidirectional(LSTM(32))(x)
        x = Dropout(0.6)(x)
        x = Dense(32, activation='relu')(x)
        output = Dense(1, activation='sigmoid')(x)
        model = Model(inputs=input_layer, outputs=output)
        model.compile(loss='binary_crossentropy', optimizer=Adam(), metrics=['accuracy'])
        return model

class Trainer:
    def __init__(self, max_len=1500, epochs=1):
        self.max_len = max_len
        self.epochs = epochs
        self.batch_size = 32

    def train_models(self, models, train_data, val_data):
        histories = []
        print("train and evaluate Deep Learning Models ...")
        for model in models:
            history = model.fit(
                train_data[0], train_data[1],
                validation_data=val_data,
                epochs=self.epochs,
                batch_size=self.batch_size,
                callbacks=[EarlyStopping(patience=3, restore_best_weights=True)]
            )
            histories.append(history)
        return histories

    def evaluate_models(self, models, test_data):
        metrics = []
        for model in models:
            y_pred = model.predict(test_data[0])
            y_pred_class = (y_pred > 0.5).astype(int)

            accuracy = accuracy_score(test_data[1], y_pred_class)
            precision = precision_score(test_data[1], y_pred_class)
            recall = recall_score(test_data[1], y_pred_class)
            f1 = f1_score(test_data[1], y_pred_class)
            roc_auc = roc_auc_score(test_data[1], y_pred)

            metrics.append({
                'accuracy': accuracy,
                'precision': precision,
                'recall': recall,
                'f1': f1,
                'roc_auc': roc_auc
            })
        return metrics


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class NewsDataset(Dataset):
    def __init__(self, texts, labels, tokenizer=None, max_len=512):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = torch.tensor(self.labels[idx], dtype=torch.float32)
        if self.tokenizer:
            encoding = self.tokenizer(
                text,
                truncation=True,
                padding='max_length',
                max_length=self.max_len,
                return_tensors='pt'
            )
            return {
                'input_ids': encoding['input_ids'].squeeze(0),
                'attention_mask': encoding['attention_mask'].squeeze(0),
                'label': label
            }
        else:
            return text, label

class DistilBERTClassifier(nn.Module):
    def __init__(self, num_classes=1):
        super(DistilBERTClassifier, self).__init__()
        self.bert = DistilBertModel.from_pretrained('distilbert-base-uncased')
        self.fc = nn.Linear(self.bert.config.hidden_size, num_classes)

    def forward(self, input_ids, attention_mask):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = outputs.last_hidden_state[:, 0, :]
        return torch.sigmoid(self.fc(pooled_output)).squeeze()

def train_model(model, train_loader, val_loader, epochs=0, lr=2e-5):
    optimizer = optim.Adam(model.parameters(), lr=lr)
    criterion = nn.BCELoss()
    model.to(device)

    for epoch in range(epochs):
        model.train()
        train_loss = 0.0
        for batch in tqdm(train_loader):
            input_ids, attention_mask, labels = batch['input_ids'].to(device), batch['attention_mask'].to(device), batch['label'].to(device)
            optimizer.zero_grad()
            outputs = model(input_ids, attention_mask)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()

        print(f"Epoch {epoch+1}, Loss: {train_loss/len(train_loader)}")

    return model

def evaluate_model(model, test_loader):
    model.eval()
    preds, true_labels = [], []
    with torch.no_grad():
        for batch in test_loader:
            input_ids, attention_mask, labels = batch['input_ids'].to(device), batch['attention_mask'].to(device), batch['label'].to(device)
            outputs = model(input_ids, attention_mask)
            preds.extend(outputs.cpu().numpy())
            true_labels.extend(labels.cpu().numpy())

    preds = np.array(preds) > 0.5
    accuracy = accuracy_score(true_labels, preds)
    precision = precision_score(true_labels, preds)
    recall = recall_score(true_labels, preds)
    f1 = f1_score(true_labels, preds)
    roc_auc = roc_auc_score(true_labels, preds)
    print(f"Accuracy: {accuracy}, Precision: {precision}, Recall: {recall}, F1: {f1}, ROC AUC: {roc_auc}")


In [None]:
def main():
    # Setup and process data
    processor = DataProcessor()
    processor.setup_kaggle()
    df = processor.load_data()
    (train_texts, train_labels), (val_texts, val_labels), (test_texts, test_labels) = processor.preprocess_data(df)

    # Prepare GloVe embeddings
    processor.load_glove()
    vocab_size, embedding_matrix = processor.prepare_embeddings(train_text)

    # Tokenize and pad sequences
    train_seq = processor.tokenizer.texts_to_sequences(train_text)
    val_seq = processor.tokenizer.texts_to_sequences(val_text)
    test_seq = processor.tokenizer.texts_to_sequences(test_text)

    train_pad = pad_sequences(train_seq, maxlen=processor.max_len)
    val_pad = pad_sequences(val_seq, maxlen=processor.max_len)
    test_pad = pad_sequences(test_seq, maxlen=processor.max_len)

    # Build models
    model_builder = ModelBuilder(vocab_size, embedding_matrix, processor.max_len)
    models = [
        model_builder.build_model_1(),
        model_builder.build_model_2(),
        model_builder.build_model_3(),
        model_builder.build_model_4()
    ]

    # Train models
    trainer = Trainer()
    histories = trainer.train_models(
        models,
        (train_pad, train_label),
        (val_pad, val_label)
    )

    # Evaluate models
    metrics = trainer.evaluate_models(models, (test_pad, test_label))
    for i, metric in enumerate(metrics):
        print(f"Model {i+1} Metrics:")
        print(metric)

    # Train and evaluate DistilBERT


    print("Train and evaluate DistilBERT")
    tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased')

    train_dataset = NewsDataset(train_texts.tolist(), train_labels.tolist(), tokenizer)
    val_dataset = NewsDataset(val_texts.tolist(), val_labels.tolist(), tokenizer)
    test_dataset = NewsDataset(test_texts.tolist(), test_labels.tolist(), tokenizer)

    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=32)
    test_loader = DataLoader(test_dataset, batch_size=32)

    model = DistilBERTClassifier()
    model = train_model(model, train_loader, val_loader, epochs=0)

    evaluate_model(model, test_loader)

if __name__ == "__main__":
    main()


cp: cannot create regular file '/root/.kaggle/': Not a directory
chmod: cannot access '/root/.kaggle/kaggle.json': No such file or directory
Dataset URL: https://www.kaggle.com/datasets/bhavikjikadara/fake-news-detection
License(s): Attribution 4.0 International (CC BY 4.0)
fake-news-detection.zip: Skipping, found more recently modified local copy (use --force to force download)
Train and evaluate DistilBERT
Accuracy: 0.22791388270230142, Precision: 0.20965353864376665, Recall: 0.2185859833281877, F1: 0.2140266021765417, ROC AUC: 0.22757102370070711


In [None]:
import os
import re
import string
import zipfile
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, random_split
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
from transformers import DistilBertTokenizer, DistilBertModel
from tqdm import tqdm

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class NewsDataset(Dataset):
    def __init__(self, texts, labels, tokenizer=None, max_len=512):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = torch.tensor(self.labels[idx], dtype=torch.float32)
        if self.tokenizer:
            encoding = self.tokenizer(
                text,
                truncation=True,
                padding='max_length',
                max_length=self.max_len,
                return_tensors='pt'
            )
            return {
                'input_ids': encoding['input_ids'].squeeze(0),
                'attention_mask': encoding['attention_mask'].squeeze(0),
                'label': label
            }
        else:
            return text, label

class DistilBERTClassifier(nn.Module):
    def __init__(self, num_classes=1):
        super(DistilBERTClassifier, self).__init__()
        self.bert = DistilBertModel.from_pretrained('distilbert-base-uncased')
        self.fc = nn.Linear(self.bert.config.hidden_size, num_classes)

    def forward(self, input_ids, attention_mask):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = outputs.last_hidden_state[:, 0, :]
        return torch.sigmoid(self.fc(pooled_output)).squeeze()

def train_model(model, train_loader, val_loader, epochs=5, lr=2e-5):
    optimizer = optim.Adam(model.parameters(), lr=lr)
    criterion = nn.BCELoss()
    model.to(device)

    for epoch in range(epochs):
        model.train()
        train_loss = 0.0
        for batch in tqdm(train_loader):
            input_ids, attention_mask, labels = batch['input_ids'].to(device), batch['attention_mask'].to(device), batch['label'].to(device)
            optimizer.zero_grad()
            outputs = model(input_ids, attention_mask)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()

        print(f"Epoch {epoch+1}, Loss: {train_loss/len(train_loader)}")

    return model

def evaluate_model(model, test_loader):
    model.eval()
    preds, true_labels = [], []
    with torch.no_grad():
        for batch in test_loader:
            input_ids, attention_mask, labels = batch['input_ids'].to(device), batch['attention_mask'].to(device), batch['label'].to(device)
            outputs = model(input_ids, attention_mask)
            preds.extend(outputs.cpu().numpy())
            true_labels.extend(labels.cpu().numpy())

    preds = np.array(preds) > 0.5
    accuracy = accuracy_score(true_labels, preds)
    precision = precision_score(true_labels, preds)
    recall = recall_score(true_labels, preds)
    f1 = f1_score(true_labels, preds)
    roc_auc = roc_auc_score(true_labels, preds)
    print(f"Accuracy: {accuracy}, Precision: {precision}, Recall: {recall}, F1: {f1}, ROC AUC: {roc_auc}")

def main():
    df_fake = pd.read_csv('data/fake.csv')
    df_true = pd.read_csv('data/true.csv')

    df_fake['label'] = 0
    df_true['label'] = 1
    df = pd.concat([df_fake, df_true]).sample(frac=1).reset_index(drop=True)

    train_texts, temp_texts, train_labels, temp_labels = train_test_split(df['text'], df['label'], test_size=0.3, random_state=42)
    val_texts, test_texts, val_labels, test_labels = train_test_split(temp_texts, temp_labels, test_size=0.5, random_state=42)

    tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased')

    train_dataset = NewsDataset(train_texts.tolist(), train_labels.tolist(), tokenizer)
    val_dataset = NewsDataset(val_texts.tolist(), val_labels.tolist(), tokenizer)
    test_dataset = NewsDataset(test_texts.tolist(), test_labels.tolist(), tokenizer)

    train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=16)
    test_loader = DataLoader(test_dataset, batch_size=16)

    model = DistilBERTClassifier()
    model = train_model(model, train_loader, val_loader, epochs=3)

    evaluate_model(model, test_loader)

if __name__ == "__main__":
    main()


  2%|▏         | 47/1965 [00:45<30:53,  1.03it/s]


KeyboardInterrupt: 