In [1]:
import re
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, Bidirectional, LSTM,SimpleRNN, Dense, Dropout, Conv1D, MaxPooling1D, Flatten, MaxPooling2D
from keras.regularizers import l2
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import nltk
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
from nltk.corpus import stopwords

2024-05-27 22:35:23.454226: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [None]:
class FakeNewsDetector:
    def __init__(self, embedding_path, embedding_dim=100, max_length=150):
        self.embedding_path = embedding_path
        self.embedding_dim = embedding_dim
        self.max_length = max_length
        self.model = None
        self.tokenizer = None
        self.word_index = None

    def preprocess_text(self, text):
        stop_words = set(stopwords.words('english'))
        text = str(text)
        lemmatizer = WordNetLemmatizer()
        words = word_tokenize(text)
        words = [word.lower() for word in words if word.isalpha()]
        words = [lemmatizer.lemmatize(word) for word in words]
        preprocessed_text = ' '.join(words)

        return preprocessed_text

    def create_embedding_matrix(self, word_index, embedding_dict, embedding_dim):
        embedding_matrix = np.zeros((len(word_index) + 1, embedding_dim))

        for word, i in word_index.items():
            if i > len(word_index):
                continue
            embedding_vector = embedding_dict.get(word)
            if embedding_vector is not None:
                embedding_matrix[i] = embedding_vector

        return embedding_matrix

    def train_model(self, data, model_type='default'):
        train_df, test_df = train_test_split(data, test_size=0.2, stratify=data['label'])
        train_df, val_df = train_test_split(train_df, test_size=0.25, stratify=train_df['label'])

        train_text = train_df['title'].apply(self.preprocess_text)
        val_text = val_df['title'].apply(self.preprocess_text)
        test_text = test_df['title'].apply(self.preprocess_text)

        self.tokenizer = Tokenizer()
        self.tokenizer.fit_on_texts(train_text)

        train_sequences = self.tokenizer.texts_to_sequences(train_text)
        val_sequences = self.tokenizer.texts_to_sequences(val_text)
        test_sequences = self.tokenizer.texts_to_sequences(test_text)

        X_train = pad_sequences(train_sequences, maxlen=self.max_length, padding='post', truncating='post')
        X_val = pad_sequences(val_sequences, maxlen=self.max_length, padding='post', truncating='post')
        X_test = pad_sequences(test_sequences, maxlen=self.max_length, padding='post', truncating='post')

        y_train = train_df['label'].values
        y_val = val_df['label'].values
        y_test = test_df['label'].values

        embedding_dict = {}
        with open(self.embedding_path, 'r', encoding='utf-8') as f:
            for line in f:
                values = line.split()
                word = values[0]
                vector = np.asarray(values[1:], dtype='float32')
                embedding_dict[word] = vector

        self.word_index = self.tokenizer.word_index
        embedding_matrix = self.create_embedding_matrix(self.word_index, embedding_dict, self.embedding_dim)

        if model_type == 'default':
            self.model = Sequential([
                Embedding(len(self.word_index) + 1, self.embedding_dim, weights=[embedding_matrix], trainable=False),
                Conv1D(64, kernel_size=2, activation='relu'),
                MaxPooling1D(pool_size=2),
                Bidirectional(LSTM(128, return_sequences=False, kernel_regularizer=l2(0.01))),
                Dense(32, activation='relu', kernel_regularizer=l2(0.01)),
                Dropout(0.3),
                Dense(1, activation='sigmoid')
            ])
        elif model_type == 'rnn':
            self.model = Sequential([
                Embedding(len(self.word_index) + 1, self.embedding_dim, weights=[embedding_matrix], trainable=False),
                SimpleRNN(128, return_sequences=False, kernel_regularizer=l2(0.01)),
                Dense(32, activation='relu', kernel_regularizer=l2(0.01)),
                Dropout(0.3),
                Dense(1, activation='sigmoid')
            ])
        elif model_type == 'lstm':
            self.model = Sequential([
                Embedding(len(self.word_index) + 1, self.embedding_dim, weights=[embedding_matrix], trainable=False),
                Dropout(0.2),
                LSTM(100, return_sequences=False, kernel_regularizer=l2(0.01)),
                Dense(4, activation='relu', kernel_regularizer=l2(0.01)),
                Dense(1, activation='sigmoid')
            ])
        elif model_type == 'bilstm':
            self.model = Sequential([
                Embedding(len(self.word_index) + 1, self.embedding_dim, weights=[embedding_matrix], trainable=False),
                Bidirectional(LSTM(128, return_sequences=False, kernel_regularizer=l2(0.01))),
                Dense(32, activation='relu', kernel_regularizer=l2(0.01)),
                Dropout(0.3),
                Dense(1, activation='sigmoid')
            ])
        elif model_type == 'cnn':
            self.model = Sequential([
                Embedding(len(self.word_index) + 1, self.embedding_dim, weights=[embedding_matrix], trainable=False),
                Conv1D(64, kernel_size=2, activation='relu'),
                MaxPooling1D(pool_size=2),
                Flatten(),
                Dense(1, activation='sigmoid')
            ])
        elif model_type == 'cnn-rnn':
            self.model = Sequential([
                Embedding(len(self.word_index) + 1, self.embedding_dim, weights=[embedding_matrix], trainable=False),
                Conv1D(64, kernel_size=2, activation='relu'),
                MaxPooling1D(pool_size=2),
                LSTM(128, return_sequences=False, kernel_regularizer=l2(0.01)),
                Dense(32, activation='relu', kernel_regularizer=l2(0.01)),
                Dropout(0.3),
                Dense(1, activation='sigmoid')
            ])

        self.model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])

        early_stopping = tf.keras.callbacks.EarlyStopping(
            monitor='val_loss', patience=4, restore_best_weights=True
        )
        
        reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(
            monitor='val_loss', patience=2, factor=0.1, verbose=1
        )
        
        history = self.model.fit(
            X_train, y_train, 
            validation_data=(X_val, y_val), 
            epochs=15, 
            callbacks=[early_stopping, reduce_lr], 
            batch_size=64, 
            verbose=1
        )

        test_loss, test_accuracy = self.model.evaluate(X_test, y_test)
        test_predictions = (self.model.predict(X_test) >= 0.5).astype(int)
        test_precision = precision_score(y_test, test_predictions)
        test_recall = recall_score(y_test, test_predictions)
        test_f1 = f1_score(y_test, test_predictions)

        print("Test Loss:", test_loss)
        print("Test Accuracy:", test_accuracy)
        print("Test Precision:", test_precision)
        print("Test Recall:", test_recall)
        print("Test F1-score:", test_f1)


    def predict(self, text):
        preprocessed_text = self.preprocess_text(text)
        sequence = self.tokenizer.texts_to_sequences([preprocessed_text])
        padded_sequence = pad_sequences(sequence,maxlen=self.max_length, padding='post', truncating='post')
        prediction = self.model.predict(padded_sequence)
        return prediction[0][0]

In [None]:
# Example
data_path = 'ISOT'

fake_df = pd.read_csv(data_path + '/Fake.csv')
true_df = pd.read_csv(data_path + '/True.csv')

fake_df = fake_df[['title']]
true_df = true_df[['title']]

true_df['label'] = 1
fake_df['label'] = 0

combined_df = pd.concat([true_df, fake_df], ignore_index=True)
balanced_data = combined_df.sample(frac=1, random_state=42).reset_index(drop=True)

embedding_path =  'glove.6B.100d.txt'
detector = FakeNewsDetector(embedding_path)
detector.train_model(balanced_data)

In [None]:
while True:
    news_text = input("Enter a news title to check its authenticity (or 'quit' to exit): ")
    if news_text.lower() == 'quit':
        break
    prediction = detector.predict(news_text)
    if prediction >= 0.5:
        print("True news.")
    else:
        print("False news.")
    print("Prediction:", prediction)