## Datasets

In [None]:
!pip install git+https://github.com/GoloMarcos/FKTC/

from FakeNewsTextCollections import datasets

datasets_dictionary = datasets.load()

In [None]:
fcn = datasets_dictionary['fcn']

In [None]:
from sklearn.model_selection import train_test_split

train = fcn[(fcn['class'] == 1) & (fcn['fold'] == 0)]
test = fcn[(fcn['class'] == 1) & (fcn['fold'] != 0)]
outlier = fcn[fcn['class'] == -1]

# One-Class Learning

In [None]:
from sklearn.metrics import classification_report

def evaluation_one_class(preds_interest, preds_outliers):
  y_true = [1]*len(preds_interest) + [-1]*len(preds_outliers)
  y_pred = list(preds_interest)+list(preds_outliers)
  return classification_report(y_true, y_pred, output_dict=False)

# BERTs

In [None]:
!pip install sentence-transformers==1.0.4 #version used in the fake news collections

Collecting sentence-transformers==1.0.4
  Downloading sentence-transformers-1.0.4.tar.gz (74 kB)
[K     |████████████████████████████████| 74 kB 2.3 MB/s 
[?25hCollecting transformers<5.0.0,>=3.1.0
  Downloading transformers-4.12.5-py3-none-any.whl (3.1 MB)
[K     |████████████████████████████████| 3.1 MB 11.1 MB/s 
Collecting sentencepiece
  Downloading sentencepiece-0.1.96-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.2 MB)
[K     |████████████████████████████████| 1.2 MB 35.4 MB/s 
Collecting huggingface-hub<1.0,>=0.1.0
  Downloading huggingface_hub-0.1.2-py3-none-any.whl (59 kB)
[K     |████████████████████████████████| 59 kB 6.6 MB/s 
Collecting tokenizers<0.11,>=0.10.1
  Downloading tokenizers-0.10.3-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl (3.3 MB)
[K     |████████████████████████████████| 3.3 MB 33.8 MB/s 
Collecting pyyaml>=5.1
  Downloading PyYAML-6.0-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64

In [None]:
from sentence_transformers import SentenceTransformer

In [None]:
def sentence_embedding(txts):

  model = SentenceTransformer('distiluse-base-multilingual-cased')

  sentences =[]

  for txt in txts:
    txt.replace('\\\\t', ' ')
    txt.replace('\\\\r', ' ')
    txt.replace('\\\\n',' ')
    sentences.append(txt)

  sentence_embeddings = model.encode(sentences)

  return sentence_embeddings 

# Density Information

In [None]:
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_samples

def make_density_information(cluster_list, df_train, df_test, df_outlier):
    l_x_train = []
    l_x_test = []
    l_x_outlier = []

    len_train = len(df_train)
    len_test = len(df_test)
    len_out = len(df_outlier)

    for cluster in cluster_list:
        kmeans = KMeans(n_clusters=cluster, random_state=0).fit(df_train)

        x_train_temp = silhouette_samples(df_train, kmeans.labels_).reshape(len_train, 1)
        l_x_train.append(x_train_temp)

        x_test_temp = silhouette_samples(np.concatenate([df_train, df_test]), np.concatenate([kmeans.labels_, kmeans.predict(df_test)])).reshape(len_train + len_test, 1)
        l_x_test.append(x_test_temp[len_train:])

        x_outlier_temp = silhouette_samples(np.concatenate([df_train, df_outlier]),  np.concatenate([kmeans.labels_, kmeans.predict(df_outlier)])).reshape(len_train + len_out, 1)
        l_x_outlier.append(x_outlier_temp[len_train:])

    return np.concatenate(l_x_train, axis=1), np.concatenate(l_x_test, axis=1), np.concatenate(l_x_outlier, axis=1)

In [None]:
def return_density_inf(df_train, df_new):
    l_x_new = []

    len_train = len(df_train)
    len_new = len(df_new)

    for cluster in cluster_list:
        kmeans = KMeans(n_clusters=cluster, random_state=0).fit(df_train)
        x_new_temp = []
        for example in df_new:
          example = example.reshape(1,512)
          dfs = np.concatenate([df_train, example])
          labels = np.concatenate([kmeans.labels_, kmeans.predict(example)])
          
          silho = silhouette_samples(dfs, labels)[len_train:]
          x_new_temp.append(silho)

        l_x_new.append(x_new_temp)

    return np.concatenate(l_x_new, axis=1)

# LIWC

In [None]:
!gdown --id 1ybt-bi6H0gAHL0fQNaleDEtlCwmBeyPn

Downloading...
From: https://drive.google.com/uc?id=1ybt-bi6H0gAHL0fQNaleDEtlCwmBeyPn
To: /content/LiwcFeatures.zip
100% 8.57M/8.57M [00:00<00:00, 23.5MB/s]


In [None]:
!unzip LiwcFeatures.zip

In [None]:
from liwc.liwc import Liwc
liwc = Liwc('dictionaries/LIWC2007_Portugues_win.dic')
import pickle
import numpy as np
import pandas as pd
from sklearn import preprocessing
import collections

In [None]:
def return_LIWC(textual_documents):
  
  global dict_types

  dict_types = {}

  for i in range(len(textual_documents)):
    txt = str(textual_documents[i]) 
    dict_liwc = liwc.parse(txt.split(' '))
    if dict_liwc == collections.Counter():
      dict_liwc = {'cogmech' : 0.0}
    dict_types[i] = dict_liwc

  global data_features
  data_features = pd.DataFrame.from_dict(dict_types, orient='index').fillna(0)

  x = data_features.values
  min_max_scaler = preprocessing.MinMaxScaler()
  x_scaled = min_max_scaler.fit_transform(x)
  data_normalized = pd.DataFrame(x_scaled, index=data_features.index, columns=data_features.columns)

  return data_normalized

# TripleVAE

In [None]:
import numpy as np
import pandas as pd
import tensorflow
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import Dense, Input, concatenate, multiply, average, subtract, add, maximum, minimum
from tensorflow.keras.models import Model

In [None]:
class Sampling(layers.Layer):
    """Uses (z_mean, z_log_var) to sample z."""

    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim), seed=1)
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

class TVAE(keras.Model):
    def __init__(self, encoder, decoder, factor_multiply_embedding, factor_multiply_density, factor_multiply_liwc,
                 **kwargs):
        super(TVAE, self).__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.factor_multiply_embedding = factor_multiply_embedding
        self.factor_multiply_density = factor_multiply_density
        self.factor_multiply_liwc = factor_multiply_liwc

    def train_step(self, data):
        if isinstance(data, tuple):
            data = data[0]
        with tf.GradientTape() as tape:
            z_mean, z_log_var, z = self.encoder((data[0], data[1], data[2]))

            reconstruction = self.decoder(z)

            embedding_loss = tf.reduce_mean(
                keras.losses.mean_squared_error(data[0], reconstruction[0])
            )

            embedding_loss *= self.factor_multiply_embedding

            density_loss = tf.reduce_mean(
                keras.losses.mean_squared_error(data[1], reconstruction[1])
            )

            density_loss *= self.factor_multiply_density

            liwc_loss = tf.reduce_mean(
                keras.losses.mean_squared_error(data[2], reconstruction[2])
            )

            liwc_loss *= self.factor_multiply_liwc

            kl_loss = 1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var)
            kl_loss = tf.reduce_mean(kl_loss)
            kl_loss *= -0.5
            total_loss = embedding_loss + density_loss + liwc_loss + kl_loss

        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))

        return {
            "total loss": total_loss,
            "embedding loss": embedding_loss,
            "density loss": density_loss,
            "liwc loss": liwc_loss,
            "kl loss": kl_loss
        }


def encoder_tvae(arq, embedding_dim, density_dim, liwc_dim, operator):
    embedding_inputs = keras.Input(shape=(embedding_dim,), name='first_input_encoder')
    density_inputs = keras.Input(shape=(density_dim,), name='second_input_encoder')
    liwc_inputs = keras.Input(shape=(liwc_dim,), name='third_input_encoder')

    l1 = Dense(np.max([embedding_dim, density_dim, liwc_dim]), activation='linear')(embedding_inputs)
    l2 = Dense(np.max([embedding_dim, density_dim, liwc_dim]), activation='linear')(density_inputs)
    l3 = Dense(np.max([embedding_dim, density_dim, liwc_dim]), activation='linear')(liwc_inputs)

    fusion = None
    if operator == 'concatenate':
        fusion = concatenate([l1, l2, l3])
    if operator == 'multiply':
        fusion = multiply([l1, l2, l3])
    if operator == 'average':
        fusion = average([l1, l2, l3])
    if operator == 'subtract':
        fusion = subtract([l1, l2])
        fusion = subtract([fusion, l3])
    if operator == 'add':
        fusion = add([l1, l2, l3])
    if operator == 'max':
        fusion = maximum([l1, l2])
    if operator == 'min':
        fusion = minimum([l1, l2])

    if len(arq) == 3:
        first_dense = Dense(arq[0], activation="linear")(fusion)

        second_dense = Dense(arq[1], activation="linear")(first_dense)

        z_mean = layers.Dense(arq[2], name="Z_mean")(second_dense)
        z_log_var = layers.Dense(arq[2], name="Z_log_var")(second_dense)
        z = Sampling()([z_mean, z_log_var])

    elif len(arq) == 2:
        first_dense = Dense(arq[0], activation="linear")(fusion)

        z_mean = layers.Dense(arq[1], name="Z_mean")(first_dense)
        z_log_var = layers.Dense(arq[1], name="Z_log_var")(first_dense)
        z = Sampling()([z_mean, z_log_var])

    else:  # len(arq) == 1
        z_mean = layers.Dense(arq[0], name="Z_mean")(fusion)
        z_log_var = layers.Dense(arq[0], name="Z_log_var")(fusion)
        z = Sampling()([z_mean, z_log_var])

    encoder = keras.Model([embedding_inputs, density_inputs, liwc_inputs], [z_mean, z_log_var, z], name="encoder")

    return encoder


def decoder_tvae(arq, embedding_dim, density_dim, liwc_dim):
    latent_inputs = keras.Input(shape=(arq[(len(arq) - 1)],), name='input_decoder')

    if len(arq) == 3:
        first_dense = Dense(arq[1], activation="linear")(latent_inputs)

        second_dense = Dense(arq[0], activation="linear")(first_dense)

        embedding_outputs = Dense(embedding_dim, activation="linear")(second_dense)

        density_outputs = Dense(density_dim, activation="linear")(second_dense)

        liwc_outputs = Dense(liwc_dim, activation="linear")(second_dense)

    elif len(arq) == 2:
        first_dense = Dense(arq[0], activation="linear")(latent_inputs)

        embedding_outputs = Dense(embedding_dim, activation="linear")(first_dense)

        density_outputs = Dense(density_dim, activation="linear")(first_dense)

        liwc_outputs = Dense(liwc_dim, activation="linear")(first_dense)

    else:  # len(arq) == 1
        embedding_outputs = Dense(embedding_dim, activation="linear")(latent_inputs)

        density_outputs = Dense(density_dim, activation="linear")(latent_inputs)

        liwc_outputs = Dense(liwc_dim, activation="linear")(latent_inputs)

    decoder = keras.Model(latent_inputs, [embedding_outputs, density_outputs, liwc_outputs], name="decoder")

    return decoder


def triplevae(arq, embedding_dim, density_dim, liwc_dim, operator):
    encoder = encoder_tvae(arq, embedding_dim, density_dim, liwc_dim, operator)

    decoder = decoder_tvae(arq, embedding_dim, density_dim, liwc_dim)

    tvae = TVAE(encoder, decoder, embedding_dim, density_dim, liwc_dim)

    tvae.compile(optimizer=keras.optimizers.Adam())

    return tvae, encoder, decoder

In [None]:
BERT = 'DistilBERT Multilingua'

df_train = np.array(train[BERT].to_list())
df_test = np.array(test[BERT].to_list())
df_outlier = np.array(outlier[BERT].to_list())

df_train_fet = np.array(train['features_normalized'].to_list()).astype('float32')
df_test_fet = np.array(test['features_normalized'].to_list()).astype('float32')
df_outlier_fet = np.array(outlier['features_normalized'].to_list()).astype('float32')

cluster_list = [2, 4, 5]
epoch = 10
arq = [256]
operator = 'max'

density_train, density_test, density_outlier = make_density_information(cluster_list, df_train, df_test, df_outlier)
tf.random.set_seed(1)

tvae, encoderTVAE, decoder = triplevae(arq, len(df_train[0]), len(cluster_list), len(df_train_fet[0]), operator)

tvae.fit([df_train, density_train, df_train_fet], [df_train, density_train, df_train_fet], epochs=epoch, batch_size=32, verbose=0)

x_train, _, _ = encoderTVAE.predict([df_train, density_train, df_train_fet])
x_test, _, _ = encoderTVAE.predict([df_test, density_test, df_test_fet])
x_outlier, _, _ = encoderTVAE.predict([df_outlier, density_outlier, df_outlier_fet])

# OCSVM

## Best parmeters of the representation method

In [None]:
nu = 0.1
gamma = 'scale'
kernel= 'sigmoid'

## Evaluation

In [None]:
from sklearn.svm import OneClassSVM as OCSVM

ocsvm = OCSVM(kernel=kernel,nu=nu,gamma=gamma)

ocsvm.fit(x_train)

y_pred_fake = ocsvm.predict(x_test)
y_pred_true = ocsvm.predict(x_outlier)

In [None]:
print(evaluation_one_class(y_pred_fake,y_pred_true))

              precision    recall  f1-score   support

          -1       0.89      0.88      0.88      1020
           1       0.87      0.88      0.88       939

    accuracy                           0.88      1959
   macro avg       0.88      0.88      0.88      1959
weighted avg       0.88      0.88      0.88      1959



# Predict Proba Adaptation


In [None]:
import numpy as np
from sklearn.preprocessing import MinMaxScaler

In [None]:
import numpy as np

def representation(textual_documents):

  if type(textual_documents) != list:
    textual_documents = [textual_documents]
  
  if preproc == 'TripleVAE-BERT':
    textual_documents_emb = sentence_embedding(textual_documents) # change the embeddings
    
    if on_test:
      density = len(textual_documents) * [density_test[idx]] # congeal the test density information
      liwc_rep = len(textual_documents) * [df_test_fet[idx]] # congeal the test liwc
    else:
      density = len(textual_documents) * [density_outlier[idx]] # congeal the outlier density information
      liwc_rep = len(textual_documents) * [df_outlier_fet[idx]] # congeal the test liwc

    textual_documents_vec,_,_ = encoderTVAE.predict([textual_documents_emb, np.array(density), np.array(liwc_rep)])
  
  elif preproc == 'TripleVAE-Density':
    textual_documents_emb = sentence_embedding(textual_documents)

    density = return_density_inf(df_train, textual_documents_emb) # change the density information
    
    if on_test:
      embedding_lol = [df_test[idx]] * len(textual_documents) # congeal the test embeddings
      liwc_rep = len(textual_documents) * [df_test_fet[idx]] # congeal the test liwc
    else:
      embedding_lol = [df_outlier[idx]] * len(textual_documents) # congeal the outlier embeddings
      liwc_rep = len(textual_documents) * [df_outlier_fet[idx]] # congeal the test liwc

    textual_documents_vec,_,_ = encoderTVAE.predict([np.array(embedding_lol), np.array(density), np.array(liwc_rep)])

  elif preproc == 'TripleVAE-LIWC':
    liwc_rep = return_LIWC(textual_documents)  # change the liwc
    
    if on_test:
      embedding_lol = [df_test[idx]] * len(textual_documents) # congeal the test embeddings
      density = len(textual_documents) * [density_test[idx]] # congeal the test density information
    else:
      embedding_lol = [df_outlier[idx]] * len(textual_documents) # congeal the outlier embeddings
      density = len(textual_documents) * [density_outlier[idx]] # congeal the outlier density information

    textual_documents_vec,_,_ = encoderTVAE.predict([np.array(embedding_lol), np.array(density), np.array(liwc_rep)])

  return textual_documents_vec

In [None]:
def normalize_decision_function(minmax_less, minmax_geq, list_decision_function):
  
  list_decision_function_normalize = []
  
  for value_decision_function in list_decision_function:
    if value_decision_function < 0:
      list_decision_function_normalize.append(minmax_less.transform([[value_decision_function]])[0][0])
    else:
      list_decision_function_normalize.append(minmax_geq.transform([[value_decision_function]])[0][0])

  return list_decision_function_normalize

In [None]:
def one_class_predict_proba(new_vecs):
  
  train_dec_fun = ocsvm.decision_function(x_train)
  test_dec_fun = ocsvm.decision_function(x_test)
  out_dec_fun = ocsvm.decision_function(x_outlier)
  news_dec_fun = ocsvm.decision_function(new_vecs)

  total = np.concatenate([train_dec_fun,test_dec_fun,out_dec_fun, news_dec_fun])

  total_geq = total[total >= 0]
  total_less = total[total < 0]

  minmax_geq = MinMaxScaler(feature_range=(0.5,1)).fit(total_geq.reshape(-1, 1))
  minmax_less = MinMaxScaler(feature_range=(0,0.5)).fit(total_less.reshape(-1, 1))
  
  list_decision_function_normalize = normalize_decision_function(minmax_less, minmax_geq, news_dec_fun)

  list_predict_proba = []

  for num in list_decision_function_normalize:
    list_predict_proba.append(np.array([num,1-num]))

  return np.array(list_predict_proba)

In [None]:
def predict_proba(textual_documents):

  new_vecs = representation(textual_documents)

  list_predict_proba = one_class_predict_proba(new_vecs)

  return list_predict_proba

# Multimodal LIME for One-Class Learning



In [None]:
!pip install lime

Collecting lime
  Downloading lime-0.2.0.1.tar.gz (275 kB)
[?25l[K     |█▏                              | 10 kB 25.2 MB/s eta 0:00:01[K     |██▍                             | 20 kB 26.0 MB/s eta 0:00:01[K     |███▋                            | 30 kB 12.8 MB/s eta 0:00:01[K     |████▊                           | 40 kB 9.9 MB/s eta 0:00:01[K     |██████                          | 51 kB 5.6 MB/s eta 0:00:01[K     |███████▏                        | 61 kB 5.7 MB/s eta 0:00:01[K     |████████▎                       | 71 kB 5.3 MB/s eta 0:00:01[K     |█████████▌                      | 81 kB 5.9 MB/s eta 0:00:01[K     |██████████▊                     | 92 kB 5.7 MB/s eta 0:00:01[K     |███████████▉                    | 102 kB 5.4 MB/s eta 0:00:01[K     |█████████████                   | 112 kB 5.4 MB/s eta 0:00:01[K     |██████████████▎                 | 122 kB 5.4 MB/s eta 0:00:01[K     |███████████████▌                | 133 kB 5.4 MB/s eta 0:00:01[K     |████████

In [None]:
import lime
import sklearn
import sklearn.ensemble
import sklearn.metrics
from __future__ import print_function
from lime import lime_text
from lime.lime_text import LimeTextExplainer

In [None]:
explainer = LimeTextExplainer(class_names=['Fake','Real'])

## Density


In [None]:
preproc = 'TripleVAE-Density'

idx = 796

on_test = True

exp = explainer.explain_instance(test['text'].iloc[idx], predict_proba, num_features=10)
print('Document id: %d' % idx)
print('Probability(fake/true) =', predict_proba([test['text'].iloc[idx]])[0])
print('True class: %s' % test['class'].iloc[idx])
exp.show_in_notebook(text=True)

In [None]:
exp.save_to_file('lime-tvae-density.html')

## DBERT

In [None]:
preproc = 'TripleVAE-BERT'

idx = 796

on_test = True

exp = explainer.explain_instance(test['text'].iloc[idx], predict_proba, num_features=10)
print('Document id: %d' % idx)
print('Probability(fake/true) =', predict_proba([test['text'].iloc[idx]])[0])
print('True class: %s' % test['class'].iloc[idx])
exp.show_in_notebook(text=True)

In [None]:
exp.save_to_file('lime-tvae-dbertml.html')

## LIWC

In [None]:
preproc = 'TripleVAE-LIWC'

idx = 796

on_test = True


exp = explainer.explain_instance(test['text'].iloc[idx], predict_proba, num_features=10)
print('Document id: %d' % idx)
print('Probability(fake/true) =', predict_proba([test['text'].iloc[idx]])[0])
print('True class: %s' % test['class'].iloc[idx])
exp.show_in_notebook(text=True)

In [None]:
exp.save_to_file('lime-tvae-liwc.html')