## Import packages

In [None]:
import random as python_random
import numpy as np
import pandas as pd
import os
from tensorflow.keras.optimizers import Adam, SGD
from tensorflow.python.keras.losses import BinaryCrossentropy
from transformers import TFAutoModelForSequenceClassification
from transformers import AutoTokenizer
from tensorflow.keras.callbacks import EarlyStopping, CSVLogger
import tensorflow as tf
from tensorflow.keras import backend as K
from tqdm.keras import TqdmCallback
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from sklearn.utils import shuffle


## custom F1 score 

In [None]:
def f1_score(y_true, y_pred): 

    """Return F1 score of Fake class"""
    
    
    def recall_m(y_true, y_pred):

        """Return recall score of Fake class"""

        #altering the labels to calculate the recall 
        y_true = K.round(K.clip(y_true, 0, 1))
        y_true = K.round(K.clip((y_true-1)*-1, 0, 1))

        y_pred= K.round(K.clip(y_pred, 0, 1))
        y_pred = K.round(K.clip((y_pred-1)*-1, 0, 1))

        #count the number of correct Fake prediction
        TP = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))

        #count number of true Fake entries
        Fakes = K.sum(K.round(K.clip(y_true, 0, 1)))
        
        recall = TP / (Fakes+K.epsilon())    
        return recall 
    
    
    def precision_m(y_true, y_pred):

        """Return precision score of Fake class"""

        #altering the labels to calculate the precision 
        y_true = K.round(K.clip(y_true, 0, 1))
        y_true = K.round(K.clip((y_true-1)*-1, 0, 1))

        y_pred= K.round(K.clip(y_pred, 0, 1))
        y_pred = K.round(K.clip((y_pred-1)*-1, 0, 1))

        #count the number of correct Fake prediction
        TP = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))

        #count number of entries predicted as Fake
        Pred_Fakes = K.sum(K.round(K.clip(y_pred, 0, 1)))
    
        precision = TP / (Pred_Fakes+K.epsilon())
        return precision 
    
    #get precision and recall score of Fake class
    precision, recall = precision_m(y_true, y_pred), recall_m(y_true, y_pred)
    return 2*((precision*recall)/(precision+recall+K.epsilon()))

## custom weighted loss function

In [None]:
def weighted_loss_function(labels, logits, weight=0.33):

    pos_weight = tf.constant(weight)
    return tf.reduce_mean(tf.nn.weighted_cross_entropy_with_logits(labels=labels, logits=logits, pos_weight=pos_weight))


## load Data

In [None]:
def load_data():

    #load data 
    df_true = pd.read_csv("./dataset/Authentic-48K.csv")
    df_fake = pd.read_csv("./dataset/Fake-1K.csv")
    df = pd.concat([df_fake, df_true])
    df = shuffle(df,random_state=50)
    df.reset_index(inplace=True, drop=True)


    df = df[["articleID", "content", "label"]]

    #split data into 80:20 (train:test) ratio
    train, test = train_test_split(df, test_size=0.20)
    
    #split the train set into further 90:10 (train:dev) ratio
    train, dev = train_test_split(train, test_size=0.10)


    #only 100 data are used to check the code
    X_train = train['content'][:100].ravel().tolist()
    Y_train = train['label'][:100]

    X_dev = dev['content'][:100].ravel().tolist()
    Y_dev = dev['label'][:100]

    X_test = test['content'][:100].ravel().tolist()
    Y_test = test['label'][:100]


    #convert Y into one hot encoding
    Y_train = tf.one_hot(Y_train,depth=2)
    Y_dev = tf.one_hot(Y_dev,depth=2)
    Y_test = tf.one_hot(Y_test,depth=2)
    
    return X_train, Y_train, X_dev, Y_dev, X_test, Y_test


# classifier for train, test and save model

In [None]:
def classifier(X_train, X_dev, Y_train, Y_dev, X_test, Y_test, config, model_name):

    """Train and Save model for test and evaluation"""

    #set random seed to make results reproducible  
    np.random.seed(config['seed'])
    tf.random.set_seed(config['seed'])
    python_random.seed(config['seed'])

    #set model parameters 
    max_length  =  config['max_length']
    learning_rate =  config["learning_rate"]
    epochs = config["epochs"]
    patience = config["patience"]
    batch_size = config["batch_size"]

    if config["loss"].upper() == "CUSTOM":
        loss_function = weighted_loss_function
    elif config["loss"].upper() == "BINARY":
        loss_function = BinaryCrossentropy(from_logits=True)

    if config['optimizer'].upper() == "ADAM":
        optim = Adam(learning_rate=learning_rate)
    elif config['optimizer'].upper() == "SGD":
        optim = SGD(learning_rate=learning_rate)


    lm = config["model"]
        
    #set tokenizer according to pre-trained model
    tokenizer = AutoTokenizer.from_pretrained(lm)
    
    #get transformer text classification model based on pre-trained model
    model = TFAutoModelForSequenceClassification.from_pretrained(lm, num_labels=2)
    
    #transform raw texts into model input 
    tokens_train = tokenizer(X_train, padding=True, 
                             max_length=max_length,
                             truncation=True, 
                             return_tensors="np").data
    tokens_dev = tokenizer(X_dev, 
                           padding=True, 
                           max_length=max_length,
                           truncation=True, 
                           return_tensors="np").data

    tokens_test = tokenizer(X_test, 
                           padding=True, 
                           max_length=max_length,
                           truncation=True, 
                           return_tensors="np").data
   

    model.compile(loss=loss_function, optimizer=optim, metrics=['accuracy',f1_score])

    #callbacks for ealry stopping and saving model history
    es = EarlyStopping(monitor="val_f1_score", patience=patience, restore_best_weights=True, mode='max')
    history_logger = CSVLogger('log/'+model_name+"-HISTORY.csv", separator=",", append=True)

    #train models
    model.fit(tokens_train, 
              Y_train, 
              verbose=0, 
              epochs=epochs,
              batch_size= batch_size, 
              validation_data=(tokens_dev, Y_dev), 
              callbacks=[es, history_logger, 
              TqdmCallback(verbose=2)])
    
    #save models in directory
    model.save_pretrained(save_directory='Models/'+model_name)

    #return prediction on test data
    Y_pred = model.predict(tokens_test, batch_size=1)["logits"]

    Y_pred = np.argmax(Y_pred, axis=1)
    Y_test = np.argmax(Y_test, axis=1)

    return Y_test, Y_pred

## train & evaluate models

In [None]:
X_train, Y_train, X_dev, Y_dev, X_test, Y_test = load_data()
config = {


    "model": "sagorsarker/bangla-bert-base",
    "max_length" : 512,
    "learning_rate": 3e-4,
    "epochs": 1,
    "patience": 3,
    "batch_size": 1,
    "loss": "binary",
    "optimizer": "sgd",
    "seed": 1234


}

if not os.path.exists('log'):
    os.mkdir('log')
if not os.path.exists('Models'):
    os.mkdir('Models')

Y_test, Y_pred = classifier(X_train,X_dev, Y_train, Y_dev, X_test, Y_test, config, 'bangla-bert-base')
print(classification_report(Y_test,Y_pred))