In [None]:
# @title
!pip install -r https://raw.githubusercontent.com/LizaAmatya/FakeNewsDetection/main/requirements.txt?token=GHSAT0AAAAAACJI6OXTVIRQEN524EL4R3GIZLDRYUA

In [None]:
# @title
import os
import pandas as pd
import tensorflow as tf
import tensorflow_text
from transformers import BertTokenizer, TFBertForSequenceClassification, DistilBertTokenizer, TFDistilBertForSequenceClassification
from tensorflow.keras.utils import to_categorical
import numpy as np

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
from tensorflow.keras import mixed_precision
mixed_precision.set_global_policy('mixed_float16')

In [None]:
# BASE_DIR = os.path.dirname(os.path.abspath(__file__))
BASE_DIR = "/content/sample_data"

In [None]:
os.environ["TF_AUTOTUNE"] = "1"
tf.keras.backend.set_floatx('float16')

In [None]:
# # Enable autotuning
# tf.config.threading.set_intra_op_parallelism_threads(1)
# tf.config.threading.set_inter_op_parallelism_threads(1)
# tf.config.threading.set_intra_op_parallelism_threads(1)
# tf.config.threading.set_inter_op_parallelism_threads(1)

In [None]:
train_path = os.path.join(BASE_DIR, 'dataset/train.tsv')
test_path = os.path.join(BASE_DIR, 'dataset/test.tsv')
validation_path = os.path.join(BASE_DIR, 'dataset/validation.tsv')

In [None]:
column_labels = ['row', 'json_ids', 'label', 'statement', 'subject', 'speaker', 'job_title', 'state', 'affiliation', 'barely_true_counts', 'false_counts', 'half_true_counts', 'mostly_true_counts', 'lies_counts', 'context', 'justification']

In [None]:
# Data Frames
train = pd.read_csv(train_path, sep="\t", header=None, names=column_labels)
test = pd.read_csv(test_path, sep="\t", header=None, names=column_labels)
valid = pd.read_csv(validation_path, sep="\t", header=None, names=column_labels)

In [None]:
# Fill nan (empty boxes) with 0
train = train.fillna('None')
test = test.fillna('None')
val = valid.fillna('None')
print(train['label'])
print(val['label'])

In [None]:
labels = train['label']
label_mapping = {label: idx for idx, label in enumerate(labels.unique())}
label_mapping.update({'pants-fire':5})

num_of_classes=len(label_mapping)
train = train[:100]
val = val[:50]
# print(num_of_classes)

{'false': 0, 'half-true': 1, 'mostly-true': 2, 'true': 3, 'barely-true': 4, 'pants-fire': 5}


In [None]:
train['label_encoded'] = train['label'].map(label_mapping)
print(train['label_encoded'])

In [None]:
val['label_encoded'] = val['label'].map(label_mapping)
print(val['label_encoded'])

In [None]:
# train_one_hot_labels = to_categorical(train['label_encoded'], num_classes=num_of_classes)
# val_one_hot_labels = to_categorical(val['label_encoded'], num_classes=num_of_classes)

In [None]:
# model_name = 'experts_wiki_books'
# model_name = 'bert-base-uncased'
model_name = 'distilbert-base-uncased'
# tokenizer = BertTokenizer.from_pretrained(model_name, max_length=128)
tokenizer = DistilBertTokenizer.from_pretrained(model_name, max_length=128)
vocab_size = 10000
embedding_dim = 32

In [None]:
# Create a custom embedding layer
# custom_embedding_layer = tf.keras.layers.Embedding(input_dim=vocab_size, output_dim=embedding_dim)

In [None]:
# Only using statement data at first
# Tokenize the statement data
train_encoded_statement_data = tokenizer(
    train['statement'].to_list(),
    padding=True,
    truncation=True,
    max_length=128,
    return_tensors='tf'
)

print(train_encoded_statement_data)

train_labels = train['label'].tolist()

In [None]:
val_encoded_statement_data = tokenizer(
    val['statement'].tolist(),
    padding=True,
    truncation=True,
    return_tensors='tf'
)

val_labels = val['label'].tolist()

In [None]:
# Create TensorFlow dataset for training
train_dataset = tf.data.Dataset.from_tensor_slices((
    {
        'input_ids': train_encoded_statement_data['input_ids'],
        'attention_mask': train_encoded_statement_data['attention_mask']
    },
    train['label_encoded'] ))  # using one-hot encoded labels when CategoricalCrossEntropy used,
                            # and when using SparseCrossEntropy use train['label_encoded'] which is int rep for labels : 0, 1, 2 ..5

In [None]:
val_dataset = tf.data.Dataset.from_tensor_slices((
    {
        'input_ids': val_encoded_statement_data['input_ids'],
        'attention_mask': val_encoded_statement_data['attention_mask']
    },
    val['label_encoded'] ))  # using one-hot encoded labels when CategoricalCrossEntropy used,
                            # and when using SparseCrossEntropy use train['label_encoded'] which is int rep for labels : 0, 1, 2 ..5

In [None]:
# Limiting the dataset
limit = 10
limited_train_dataset = train_dataset.take(limit)

In [None]:
# model = TFBertForSequenceClassification.from_pretrained(model_name)
model = TFDistilBertForSequenceClassification.from_pretrained(model_name)

In [None]:
num_epochs = 1
batch_size = 32

In [None]:
# # Create a new model with the BERT base and the custom output layer

input_ids = tf.keras.layers.Input(shape=(None,), dtype=tf.int32, name='input_ids')
attention_mask = tf.keras.layers.Input(shape=(None,), dtype=tf.int32, name='attention_mask')

Custom embedding layer

custom_embeddings = custom_embedding_layer(input_ids)

In [None]:
# Adding a dense layer for the output
dense_layer = tf.keras.layers.Dense(num_of_classes, activation='softmax', name='dense_output')
bert_output = model([input_ids, attention_mask])
cls_token = bert_output.logits
dense_output = dense_layer(cls_token)
# dense_output_expanded = tf.keras.layers.Reshape((1, 6))(dense_output)

In [None]:
# combined_embeddings = tf.keras.layers.Concatenate(axis=-1)([dense_output_expanded, custom_embeddings])
output = tf.keras.layers.Dense(num_of_classes, activation='softmax')(dense_output)

In [None]:
# Create the final model
custom_model = tf.keras.Model(inputs=model.input, outputs=output)

In [None]:
custom_model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
            loss=tf.keras.losses.SparseCategoricalCrossentropy(),        #BinaryCrossEntropy for binary classification; for now lets only classify acc to data: 6 classes
            metrics=tf.keras.metrics.SparseCategoricalAccuracy())  # or use ['accuracy']

In [None]:
custom_model.summary()

In [None]:
checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=os.path.join(BASE_DIR, 'model_checkpoint'),  # Specify the path to save the checkpoint
    save_best_only=True,  # Save only the best model based on the validation loss
    monitor='val_loss',  # Monitor the validation loss
    mode='min',  # Mode can be 'min' or 'max' depending on the monitored metric
    verbose=1  # Show progress while saving
)

In [None]:
from tqdm import tqdm
from tensorflow.keras.callbacks import Callback

train_steps_per_epoch = len(limited_train_dataset)
tqdm_callback = tf.keras.callbacks.LambdaCallback(
    on_epoch_begin=lambda epoch, logs: tqdm(total=train_steps_per_epoch, position=0, desc="Epoch", unit="batch"),
    on_epoch_end=lambda epoch, logs: tqdm.write(f'Epoch {epoch + 1}/{num_epochs}, Loss: {logs["loss"]}, Accuracy: {logs["sparse_categorical_accuracy"]}, Val Loss: {logs["val_loss"]}, Val Accuracy: {logs["val_sparse_categorical_accuracy"]}'),
    on_batch_end=lambda batch, logs: tqdm.update(1)
)

class ProgressBarCallback(Callback):
    def on_epoch_end(self, epoch, logs=None):
        logs = logs or {}
        self.epochs += 1
        self.pbar.update(1)
        self.pbar.set_postfix(logs, refresh=True)

    def on_train_begin(self, logs=None):
        self.epochs = 0
        self.pbar = tqdm(total=self.params['epochs'], unit='epoch', position=0)

    def on_train_end(self, logs=None):
        self.pbar.close()

In [None]:
print('Start training')
progress_bar_callback = ProgressBarCallback()
history = custom_model.fit(
    limited_train_dataset.shuffle(10).batch(batch_size).prefetch(tf.data.AUTOTUNE),
    epochs=num_epochs,
    validation_data=val_dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE),
    verbose=2,
    callbacks=[checkpoint_callback, progress_bar_callback]
)

In [None]:
# Save the trained model if needed
custom_model.save(os.path.join(BASE_DIR, 'trained_model'))

In [None]:
from keras.utils import Progbar

# Tokenize and preprocess the test data
test_encoded_statement_data = tokenizer(
    test['statement'].tolist(),
    padding=True,
    truncation=True,
    return_tensors='tf'
)
test['label_encoded'] = train['label'].map(label_mapping)

# Create TensorFlow dataset for testing
test_dataset = tf.data.Dataset.from_tensor_slices((
    {
        'input_ids': test_encoded_statement_data['input_ids'],
        'attention_mask': test_encoded_statement_data['attention_mask']
    },
    test['label_encoded']
))
limited_test_dataset = test_dataset.take(limit)
test_steps = len(limited_test_dataset)
progbar = Progbar(test_steps)

# Evaluate the model on the test dataset
results = custom_model.evaluate(limited_test_dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE))

# Display the results and update the progress bar
for metric_name, result in zip(custom_model.metrics_names, results):
    print(f'{metric_name}: {result}')

    # Update progress bar
    progbar.update(1)
# Print the evaluation results (including accuracy)
print("Test Loss:", results[0])
print("Test Accuracy:", results[1])