# Essential Global imports

In [22]:
import time

# Loading Data

In [1]:
import pandas as pd
import string
import re
import nltk
from nltk.corpus import stopwords
from collections import Counter

from sklearn.feature_extraction.text import CountVectorizer

In [42]:
class DataFrames:
    def __init__(
        self,
        train_file_name: str,
        test_file_name: str,
        evaluation_file_name: str,
    ) -> None:
        self.content_col_name: str = "content"
        self.content_clean_col_name: str = "content_clean"
        self.content_clean_col_name_no_UNK: str = "content_clean_no_UNK"
        self.content_clean_col_name_truc: str = "content_clean_truc"
        self.label_col_name: str = "label"
        self.train: pd.DataFrame = self.load_data(train_file_name)
        self.test: pd.DataFrame = self.load_data(test_file_name)
        self.evaluation: pd.DataFrame = self.load_data(evaluation_file_name)

    def load_data(self, file_name: str) -> pd.DataFrame:
        df = pd.read_csv(f"dataset/{file_name}.csv", sep=";")
        df.dropna(inplace=True)
        if not self.content_clean_col_name in df.columns:
            df[self.content_col_name] = df["title"] + " " + df["text"]
            df = df.drop(columns=["Unnamed: 0"])
        return df

    def get_datasets(self) -> list[pd.DataFrame]:
        return [self.train, self.test, self.evaluation]

    def get_info(self) -> str:
        test_info = self.test.shape
        train_info = self.train.shape
        evaluation_info = self.evaluation.shape
        return f"DataFrame Shapes:\n\tTrain: {train_info}\n\tTest: {test_info}\n\tEvaluation: {evaluation_info}\n"

    def save_clean(self, file_name: str, token_limit: int = 10000, num_words_trunc: int = 256) -> None:
        # CLEAN
        self._init_clean_content([self.train, self.test, self.evaluation])
        self.train = self.clean_df(self.train)
        self.test = self.clean_df(self.test)
        self.evaluation = self.clean_df(self.evaluation)

        most_common_words = self.get_most_common_words_counter(
            [self.test, self.train],
            token_limit,
            self.content_clean_col_name,
        )
        self.train = DataFrames.set_least_common_UNK(
            self.train, "content_clean", most_common_words
        )
        self.test = DataFrames.set_least_common_UNK(
            self.test, "content_clean", most_common_words
        )
        self.evaluation = DataFrames.set_least_common_UNK(
            self.evaluation, "content_clean", most_common_words
        )
        self.train = DataFrames.drop_least_common(
            self.train,
            self.content_clean_col_name,
            self.content_clean_col_name_no_UNK,
            most_common_words,
        )
        self.test = DataFrames.drop_least_common(
            self.test,
            self.content_clean_col_name,
            self.content_clean_col_name_no_UNK,
            most_common_words,
        )
        self.evaluation = DataFrames.drop_least_common(
            self.evaluation,
            self.content_clean_col_name,
            self.content_clean_col_name_no_UNK,
            most_common_words,
        )
        self.train = DataFrames.trunc_text(
            self.train,
            self.content_clean_col_name_no_UNK,
            self.content_clean_col_name_truc,
            num_words_trunc,
        )
        self.test = DataFrames.trunc_text(
            self.test,
            self.content_clean_col_name_no_UNK,
            self.content_clean_col_name_truc,
            num_words_trunc,
        )
        self.evaluation = DataFrames.trunc_text(
            self.evaluation,
            self.content_clean_col_name_no_UNK,
            self.content_clean_col_name_truc,
            num_words_trunc,
        )
        # SAVE
        cols_to_save_clean: list[str] = [
            self.content_clean_col_name,
            self.content_clean_col_name_no_UNK,
            self.content_clean_col_name_truc,
            self.label_col_name,
        ]
        self.train.to_csv(
            f"dataset/train_clean_{file_name}.csv",
            sep=";",
            columns=cols_to_save_clean,
        )
        self.test.to_csv(
            f"dataset/test_clean.csv_{file_name}",
            sep=";",
            columns=cols_to_save_clean,
        )
        self.evaluation.to_csv(
            f"dataset/evaluation_clean.csv_{file_name}",
            sep=";",
            columns=cols_to_save_clean,
        )

    def num_unique_words(self, col_name: str) -> int:
        result: set = set()
        df = pd.concat([self.train, self.test, self.evaluation], ignore_index=True)
        df[col_name].str.lower().str.split().apply(result.update)
        return len(result)

    def get_vocab(self, col_name) -> dict[str, int]:
        vectorizer = CountVectorizer()
        for df in [self.train, self.test, self.evaluation]:
            vectorizer.fit_transform(df[col_name].values)
        return vectorizer.vocabulary_

    def _init_clean_content(self, dfs: list[pd.DataFrame]) -> list[pd.DataFrame]:
        for df in dfs:
            df[self.content_clean_col_name] = df[self.content_col_name]
        return dfs

    def clean_df(self, df: pd.DataFrame) -> pd.DataFrame:
        df = self.to_lower(df, self.content_clean_col_name)
        df = self.remove_punctuation(df, self.content_clean_col_name)
        df = self.remove_stopword(df, self.content_clean_col_name)
        return df

    @staticmethod
    def to_lower(df: pd.DataFrame, col_name: str) -> pd.DataFrame:
        df[col_name] = df[col_name].apply(lambda x: str(x).lower())
        return df

    @staticmethod
    def remove_punctuation(df: pd.DataFrame, col_name: str) -> pd.DataFrame:
        re_punctuation = f'[{re.escape(string.punctuation)}"”“]'
        df[col_name] = df[col_name].apply(
            lambda x: re.sub(re_punctuation, " ", str(x))
            .lower()
            .replace("'s", "")
            .replace("’s", "")
        )
        return df

    @staticmethod
    def remove_stopword(df: pd.DataFrame, col_name: str) -> pd.DataFrame:
        stop_words = set(stopwords.words("english"))
        df[col_name] = df[col_name].apply(
            lambda x: " ".join(
                word for word in str(x).split() if not word in stop_words
            )
        )
        return df

    @staticmethod
    def get_most_common_words_counter(
        dfs: list[pd.DataFrame],
        token_limit: int,
        col_name: str,
    ) -> Counter:
        word_counter: Counter = Counter()
        for df in dfs:
            if col_name not in df.columns:
                raise ValueError("Each DataFrame must have a 'clean_content' column")
            tokens = " ".join(df[col_name].astype(str)).split()
            word_counter.update(tokens)
        return Counter(dict(word_counter.most_common(token_limit)))

    @staticmethod
    def set_least_common_UNK(
        df: pd.DataFrame,
        col_name: str,
        most_common_words: Counter,
    ) -> pd.DataFrame:
        df[col_name] = df[col_name].apply(
            lambda x: " ".join(
                [
                    word if word in most_common_words else "<UNK>"
                    for word in str(x).split()
                ]
            )
        )
        return df

    @staticmethod
    def drop_least_common(
        df: pd.DataFrame,
        col_name: str,
        col_name_no_unk: str,
        most_common_words: Counter,
    ) -> pd.DataFrame:
        df[col_name_no_unk] = df[col_name].apply(
            lambda x: " ".join(
                [word for word in str(x).split() if word in most_common_words]
            )
        )
        return df

    @staticmethod
    def trunc_text(
        df: pd.DataFrame,
        col_name: str,
        col_name_trunc: str,
        trunc_num: int,
    ) -> pd.DataFrame:
        df[col_name_trunc] = df[col_name].apply(
            lambda x: " ".join(str(x).split()[:trunc_num])
        )
        return df

    @staticmethod
    def label_to_str(label: int) -> str:
        return "Fake" if label == 1 else "Not Fake"

In [102]:
data_frames = DataFrames("train", "test", "evaluation")
data_frames.save_clean("1000tok", token_limit=1000)

data_frames_1000tok = DataFrames("train_clean", "test_clean", "evaluation_clean")

# Logistic Regression Classifier

In [25]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.metrics import accuracy_score

In [39]:
class LogisticRegressionClassifier:
    def __init__(
        self,
        num_features_tfidf: int,
        ngram_range: tuple[int, int],
        logistic_regression_max_iter: int,
    ) -> None:
        self.num_features_tfidf = num_features_tfidf
        self.ngram_range = ngram_range
        self.logistic_regression_max_iter = logistic_regression_max_iter
        self.tfidf = TfidfVectorizer(
            stop_words="english",
            max_features=num_features_tfidf,
            ngram_range=ngram_range,
        )
        self.logistic_regression = LogisticRegression(
            max_iter=logistic_regression_max_iter,
        )
        self.pipeline = Pipeline(
            [
                (
                    "tfidf",
                    self.tfidf,
                ),
                (
                    "logistic-regression",
                    self.logistic_regression,
                ),
            ]
        )
        self.y_prediction = ...

    def __str__(self) -> str:
        tfidf_info: str = (
            f"TFIDF: {self.num_features_tfidf}, {self.ngram_range})"
        )
        log_reg_info: str = (
            f"L-REG: {self.logistic_regression_max_iter})"
        )
        return f"{tfidf_info} {log_reg_info}"

    def print_info(self) -> None:
        print(self)

    def train(self, X_train, y_train) -> None:
        self.pipeline.fit(X_train, y_train)

    def predict(self, X_test):
        return self.pipeline.predict(X_test)

In [99]:
def run_lrc(
    data_frames: DataFrames,
    content_col_name: str,
    label_col_name: str,
    num_features_tfidf=2000,
    ngram_range=(1, 2),
    logistic_regression_max_iter=1000,
) -> list:
    lrc = LogisticRegressionClassifier(
        num_features_tfidf,
        ngram_range,
        logistic_regression_max_iter,
    )
    start_time = time.time()
    lrc.train(
        data_frames.train[content_col_name],
        data_frames.train[label_col_name],
    )
    end_time = time.time()
    
    y_hat = lrc.predict(data_frames.test[content_col_name])
    
    mtime = end_time - start_time
    acc = accuracy_score(y_hat, data_frames.test[label_col_name])
    desc = str(lrc)
    return ["LRC", "cpu", mtime, acc, desc, content_col_name]

# C-Support Vector Classification Implementation

In [56]:
from sklearn.svm import SVC

In [57]:
class SVCClassifier:
    def __init__(
        self,
        num_features_tfidf: int,
        ngram_range: tuple[int, int],
        svc_kernel: str,
        svc_c: float,
    ) -> None:
        self.num_features_tfidf = num_features_tfidf
        self.ngram_range = ngram_range
        self.svc_kernel = svc_kernel
        self.svc_c = svc_c
        self.tfidf = TfidfVectorizer(
            stop_words="english",
            max_features=num_features_tfidf,
            ngram_range=ngram_range,
        )
        self.svc = SVC(
            kernel=svc_kernel,
            C=svc_c,
            probability=True,
        )
        self.pipeline = Pipeline(
            [
                (
                    "tfidf",
                    self.tfidf,
                ),
                (
                    "svc",
                    self.svc,
                ),
            ]
        )

    def __str__(self) -> str:
        tfidf_info: str = (
            f"TFIDF: {self.num_features_tfidf}, {self.ngram_range})"
        )
        svc_info: str = f"SVC: {self.svc_kernel}, C: {self.svc_c})"
        return f"{tfidf_info} {svc_info}"

    def print_info(self) -> None:
        print(self)

    def train(self, X_train, y_train) -> None:
        self.pipeline.fit(X_train, y_train)

    def predict(self, X_test):
        return self.pipeline.predict(X_test)

In [100]:
def run_svc(
    data_frames: DataFrames,
    content_col_name: str,
    label_col_name: str,
    num_features_tfidf=1000,
    ngram_range=(1, 2),
) -> list:
    svc = SVCClassifier(
        num_features_tfidf=num_features_tfidf,
        ngram_range=ngram_range,
        svc_kernel="linear",
        svc_c=1.0,
    )
    start_time = time.time()
    svc.train(
        data_frames.train[content_col_name],
        data_frames.train[label_col_name],
    )
    end_time = time.time()
    
    y_hat = svc.predict(data_frames.test[content_col_name])
    
    mtime = end_time - start_time
    acc = accuracy_score(y_hat, data_frames.test[label_col_name])
    desc = str(svc)
    return ["SVC", "cpu", mtime, acc, desc, content_col_name]

# Neural Network Implementation

In [63]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, roc_auc_score
from sklearn.feature_extraction.text import TfidfVectorizer

In [86]:
class NNClassifier(nn.Module):
    def __init__(self, input_size: int, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.input_size: int = input_size
        self.model = nn.Sequential(
            nn.Linear(input_size, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 1),
            nn.Sigmoid(),
        )

    def forward(self, x) -> None:
        return self.model(x)

    def info(self) -> str:
        description = []
        for i, layer in enumerate(self.model):
            layer_type = type(layer).__name__
            layer_info = str(layer)
            description.append(f"Layer {i}: {layer_type} - {layer_info}")
        return "\n".join(description)

In [101]:
def run_nnc(
    data_frames: DataFrames,
    content_col_name: str,
    label_col_name: str,
    num_features_tfidf=2000,
    num_epochs=2,
) -> list:
    device: str = "cuda" if torch.cuda.is_available() else "cpu"

    vectorizer = TfidfVectorizer(max_features=num_features_tfidf, stop_words="english")
    X = torch.tensor(
        vectorizer.fit_transform(data_frames.train[content_col_name]).toarray(),
        dtype=torch.float32,
    )
    y = torch.tensor(
        data_frames.train[label_col_name].values, dtype=torch.float32
    ).unsqueeze(1)

    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )

    X_train = X_train.to(device)
    y_train = y_train.to(device)

    train_dataset = TensorDataset(X_train, y_train)
    train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True)

    model = NNClassifier(input_size=num_features_tfidf).to(device)
    criterion = nn.BCELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    acc = 0
    total_time = 0.0
    for epoch in range(num_epochs):
        start_train_epoch = time.time()
        model.train()
        for inputs, labels in train_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
        end_train_epoch = time.time()
        total_time += end_train_epoch - start_train_epoch
    del X_train, y_train

    X_test = X_test.to(device)
    y_test = y_test.to(device)
    model.eval()
    with torch.no_grad():
        val_outputs = model(X_test)
        val_preds = (val_outputs >= 0.5).float()
        accuracy = accuracy_score(y_test.cpu(), val_preds.cpu())
        acc = accuracy
    
    desc = f"Layers: {model.info()} epochs: {num_epochs}"
    return ["NNC", device, total_time, acc, desc, content_col_name]

# TESTS

### Init testing vars

In [105]:
ROWS = []
NUM_OF_TESTS: int = 3

### LRC

In [106]:
LRC_ROWS = []
for i in range(NUM_OF_TESTS):
    LRC_ROWS.append(
        run_lrc(
            data_frames,
            data_frames.content_col_name,
            data_frames_clean.label_col_name
        )
    )
for i in range(NUM_OF_TESTS):
    LRC_ROWS.append(
        run_lrc(
            data_frames_1000tok,
            data_frames_1000tok.content_clean_col_name_truc,
            data_frames_1000tok.label_col_name
        )
    )

### SVC

In [107]:
SVC_ROWS = []
for i in range(NUM_OF_TESTS):
    SVC_ROWS.append(
        run_svc(
            data_frames,
            data_frames.content_col_name,
            data_frames.label_col_name
        )
    )
for i in range(NUM_OF_TESTS):
    SVC_ROWS.append(
        run_svc(
            data_frames_1000tok,
            data_frames_1000tok.content_clean_col_name_truc,
            data_frames_1000tok.label_col_name
        )
    )

KeyboardInterrupt: 

### NNC

In [112]:
NNC_ROWS = []
for i in range(NUM_OF_TESTS):
    NNC_ROWS.append(
        run_nnc(
            data_frames,
            data_frames.content_col_name,
            data_frames.label_col_name,
            1000,
        )
    )
    torch.cuda.empty_cache()
for i in range(NUM_OF_TESTS):
    NNC_ROWS.append(
        run_nnc(
            data_frames_1000tok,
            data_frames_1000tok.content_clean_col_name_truc,
            data_frames_1000tok.label_col_name,
            200,
        )
    )
    torch.cuda.empty_cache()

# Results analysis

In [113]:
ROWS.extend(LRC_ROWS)
ROWS.extend(SVC_ROWS)
ROWS.extend(NNC_ROWS)

In [114]:
result_df = pd.DataFrame(
    ROWS,
    columns=[
        "Model",
        "device",
        "time (seconds)",
        "accuracy",
        "additional description",
        "data type",
    ],
)
result_df.to_csv("results.csv", sep=";")

In [115]:
result_df.head()

Unnamed: 0,Model,device,time (seconds),accuracy,additional description,data type
0,LRC,cpu,13.196943,0.971418,"TFIDF: 2000, (1, 2)) L-REG: 1000)",content
1,LRC,cpu,13.665311,0.971418,"TFIDF: 2000, (1, 2)) L-REG: 1000)",content
2,LRC,cpu,13.876855,0.971418,"TFIDF: 2000, (1, 2)) L-REG: 1000)",content
3,LRC,cpu,3.413579,0.970171,"TFIDF: 2000, (1, 2)) L-REG: 1000)",content_clean_truc
4,LRC,cpu,3.38513,0.970171,"TFIDF: 2000, (1, 2)) L-REG: 1000)",content_clean_truc
