<a href="https://colab.research.google.com/github/blanco-herrero/Interviews/blob/main/classifier_RNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Text classification for FakeDetector using RNN

In [4]:
#https://www.tensorflow.org/tutorials/load_data/text
import tensorflow as tf
import keras
import os

In [5]:
import matplotlib.pyplot as plt

def plot_graphs(history, metric):
  plt.plot(history.history[metric])
  plt.plot(history.history['val_'+metric], '')
  plt.xlabel("Epochs")
  plt.ylabel(metric)
  plt.legend([metric, 'val_'+metric])
  plt.show()

In [None]:
# Upload the file
from google.colab import files
files.upload()

In [None]:
#PARAMETERS

FILE_NAMES = ['true_tweets.txt', 'false_tweets.txt']
#!wc negative.txt
!wc true_tweets.txt
!wc false_tweets.txt

In [19]:
#PARAMETERS

BUFFER_SIZE = 50000  #Randomness
BATCH_SIZE = 16  #check if neccesary
TAKE_SIZE = 500 #Test data size

In [20]:
def labeler(example, index):
  return example, tf.cast(index, tf.int64)

labeled_data_sets = []

for i, file_name in enumerate(FILE_NAMES):
  lines_dataset = tf.data.TextLineDataset(os.path.join(file_name))
  labeled_dataset = lines_dataset.map(lambda ex: labeler(ex, i))
  labeled_data_sets.append(labeled_dataset)

In [21]:
#1 = Positive  0= Negative

all_labeled_data = labeled_data_sets[0]
for labeled_dataset in labeled_data_sets[1:]:
  all_labeled_data = all_labeled_data.concatenate(labeled_dataset)

all_labeled_data = all_labeled_data.shuffle(
    BUFFER_SIZE, reshuffle_each_iteration=False)


In [None]:
for ex in all_labeled_data.take(10):
  print(ex)

In [23]:
all_labeled_data = all_labeled_data.prefetch(2)

train_data = all_labeled_data.skip(TAKE_SIZE).shuffle(BUFFER_SIZE)
train_data = train_data.prefetch(2)

test_data = all_labeled_data.take(TAKE_SIZE)
test_data = test_data.prefetch(2)

train_size = len(list(train_data))
test_size = len(list(test_data))

In [None]:
print("all_labeled_data:")
print(type(all_labeled_data))
print(all_labeled_data)
print(len(list(all_labeled_data)))
print("train_data:")
print(type(train_data))
print(train_data)
print(len(list(train_data)))
print("test_data:")
print(type(test_data))
print(test_data)
print(len(list(test_data)))

In [None]:
#Examples of train data
for X_batch, y_batch in train_data.batch(25).take(1):
    for review, label in zip(X_batch.numpy(), y_batch.numpy()):
        print("Tweet / Headline:", review.decode("utf-8")[:10000])
        print("Label:", label, "= FALSE_NEWS" if label == 1 else "= TRUE_NEWS")
        print()

In [None]:
print(type(X_batch))
print(X_batch)
print(type(y_batch))
print(y_batch)

In [27]:
#PARAMETERS (AT LEAST THE NUMBER OF CHARACTERS)

#Function to preprocess the train data
def preprocess(X_batch, y_batch):
    X_batch = tf.strings.substr(X_batch, 0, 300)  #Use the first 300 characters
    X_batch = tf.strings.lower(X_batch)  #To lower case
    X_batch = tf.strings.regex_replace(X_batch, rb"<br\s*/?>", b" ") #Remove tags
    X_batch = tf.strings.regex_replace(X_batch, rb"http\S+", b" ") #Remove html strings
    X_batch = tf.strings.regex_replace(X_batch, rb"[^\P{P}]+", b" ") #Remove punctuation, except
    X_batch = tf.strings.split(X_batch) #Split by spaces
    return X_batch.to_tensor(default_value=b"<pad>"), y_batch

In [None]:
#Example of preprocessing of train data
preprocess(X_batch, y_batch)

In [29]:
#Construct Vocabulary

from collections import Counter

vocabulary = Counter()
for X_batch, y_batch in train_data.batch(32).map(preprocess):
    for review in X_batch:
        vocabulary.update(list(review.numpy()))

In [None]:
vocabulary.most_common()[:20]
#CHECK if we should skip STOP WORDS or apply POS to use only N, V & A; OR Unify words?

In [None]:
len(vocabulary)

In [32]:
#PARAMETER

#Truncate the vocabulary, keeping only the 10,000 most common words
vocab_size = 2000
truncated_vocabulary = [
    word for word, count in vocabulary.most_common()[:vocab_size]]

In [None]:
word_to_id = {word: index for index, word in enumerate(truncated_vocabulary)}
for word in b"el psoe tacha de nausebunda la".lower().split():
    print(word_to_id.get(word) or vocab_size)

In [34]:
#PARAMETER

words = tf.constant(truncated_vocabulary)
word_ids = tf.range(len(truncated_vocabulary), dtype=tf.int64)
vocab_init = tf.lookup.KeyValueTensorInitializer(words, word_ids)
num_oov_buckets = 100
table = tf.lookup.StaticVocabularyTable(vocab_init, num_oov_buckets)

In [None]:
table.lookup(tf.constant([u"This is a test... I love the movie but it's too long".lower().split()]))

In [36]:
def encode_words(X_batch, y_batch):
    return table.lookup(X_batch), y_batch

In [37]:
train_set = train_data.repeat().batch(32).map(preprocess)
train_set = train_set.map(encode_words).prefetch(1)

In [38]:
test_set = test_data.repeat().batch(32).map(preprocess)
test_set = test_set.map(encode_words).prefetch(1)

In [None]:
for X_batch, y_batch in train_set.take(1):
    print(X_batch)


In [None]:
#PARAMETERS

embed_size = 6 #128 in the original example
model = keras.models.Sequential([
    keras.layers.Embedding(vocab_size + num_oov_buckets, embed_size,
                           mask_zero=False, # Check: this means that previous padding on test sets are expected
                           input_shape=[None]),
    keras.layers.GRU(128, return_sequences=True),
    keras.layers.GRU(128),
    keras.layers.Dense(1, activation="sigmoid")
])
model.compile(loss="binary_crossentropy", optimizer="adam", metrics=["accuracy", tf.keras.metrics.AUC(),
                                                                     tf.keras.metrics.Precision(),
                                                                    tf.keras.metrics.Recall()   ])
model.summary()

In [None]:
#PARAMETER
#Train the model
history = model.fit(train_set, steps_per_epoch=train_size // 32, epochs=5, validation_data=test_set,
                    validation_steps=30)


In [None]:
#The F1 Score is the 2*((precision*recall)/(precision+recall)).
#history.history
loss = history.history["val_loss"][-1]
print("Loss =", loss)
accuracy = history.history["val_accuracy"][-1]
print("Accuracy =", accuracy)
precision = history.history["val_precision"][-1]
print("Precision = ", precision)
recall = history.history["val_recall"][-1]
print("Recall =", recall)
f1 = 2*((precision*recall)/(precision+recall))
print("F1 Score = ", f1)
auc = history.history["val_auc"][-1]
print("ROC-AUC Score =", auc)

In [None]:
print(plot_graphs(history, 'loss'))
print(plot_graphs(history, 'accuracy'))
print(plot_graphs(history, 'precision'))
print(plot_graphs(history, 'recall'))
print(plot_graphs(history, 'auc'))

In [None]:
#CHECK HOW TO SAVE MODEL TO REUSE....
#model.save("rnn_model.h5")

In [None]:
def pad_to_size(vec, size):
  zeros = [0] * (size - len(vec))
  vec.extend(zeros)
  return vec

In [None]:
def encode_frase(sample_pred_text):
    sample_pred_text = table.lookup(tf.constant([sample_pred_text.lower().split()]))
    sample_pred_text = tf.make_tensor_proto(sample_pred_text)
    sample_pred_text = tf.make_ndarray(sample_pred_text)
    sample_pred_text = sample_pred_text.tolist()
    sample_pred_text = sample_pred_text[0]
    return sample_pred_text

In [None]:
def sample_predict(sample_pred_text, pad):
  encoded_sample_pred_text = encode_frase(sample_pred_text)
  if pad:
    encoded_sample_pred_text = pad_to_size(encoded_sample_pred_text, 64)
  encoded_sample_pred_text = tf.cast(encoded_sample_pred_text, tf.float32)
  predictions = model.predict(tf.expand_dims(encoded_sample_pred_text, 0), verbose=0)

  return (predictions)

In [None]:
#PARAMETERS

# predict on a sample text with or without padding: True or False

sample_pred_text = "Vladimir Putin destruye a la ideología de género en 5 minutos"
sample_pred_text2 = "Pablo Casado nuevo líder del PP"
predictions = sample_predict(sample_pred_text, pad=True)
predictions2 = sample_predict(sample_pred_text2, pad=True)
print(predictions)
print(predictions2)

In [None]:
#PARAMETERS

#PRINT PREDICTIONS TO A FILE
import sys
original_stdout = sys.stdout

with open('predictions.txt', 'w') as f:
    sys.stdout = f
    #Read cruella.txt from the same directory of the Notebook
    f=open("my_file.txt", "r", encoding="utf8", errors='ignore')
    f1=f.readlines()
    for x in f1:
        score = sample_predict(x, pad=True)
        print(x.rstrip('\n \n'), "\t", *score.flatten(), "\t", "Positive" if score>0.5 else "Negative")
    sys.stdout = original_stdout

In [None]:
!wc predictions.txt