In [None]:
from helpers import (
    preprocessing
)
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructField, StructType, IntegerType, DateType, FloatType, ArrayType, LongType, MapType
import warnings
import numpy as np 
import tensorflow as tf
from tensorflow.keras import layers
warnings.filterwarnings('ignore')

In [None]:
# base de dados do twitter ja classificada com sentimentos
path = "/home/daholive/Documents/twitter_ellection_brazil_v2/datasource/raw_kaggle/TweetsWithTheme_v2.csv"

In [None]:
# instancia spark
spark = SparkSession.builder \
    .master("local[*]") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:1.1.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.executor.memory","4G") \
    .config("spark.driver.memory","4G") \
    .config("spark.executor.cores","12") \
    .config("spark.sql.execution.arrow.pyspark.enabled","true") \
    .getOrCreate()

In [None]:
# dataframe twitter com sentimentos classificados
dataframe = spark.read.options(delimiter=';',header='True').csv(path)

In [None]:
# label adjust
dataframe = dataframe.withColumn("sentiment_map", 
    F.when(F.col("sentiment")=="Negativo", 0).otherwise(1)
)

In [None]:
# dataframe features
rdd2 = dataframe.rdd.map(lambda x: (preprocessing(x.tweet_text),len(preprocessing(x.tweet_text).split()),x.sentiment_map))

schema = StructType([       
    StructField('features', StringType(), True),
    StructField('tokens_count', IntegerType(), True),
    StructField('label', IntegerType(), True),
])

df_features = spark.createDataFrame(rdd2, schema = schema)

count_map = F.udf( 
    lambda x: len(x.split()),
    IntegerType()     
)

df_features = df_features \
    .filter(F.col("features")!="-") \
    .filter( count_map(F.col("features"))<30 ) \
    .dropDuplicates(subset = ['features'])

In [None]:
df_features.groupby('label').count().show()

In [None]:
train = df_features.sampleBy("label", fractions={0: 1, 1: 0.87}, seed=10)

In [None]:
train.groupby('label').count().show()

In [None]:
# features and labels
features = train.select('features').rdd.flatMap(lambda x: x).collect()
labels = np.array(train.select('label').rdd.flatMap(lambda x: x).collect())

In [None]:
# tensorflow - tokenizacao
import tensorflow_datasets as tfds
import random

tokenizer = tfds.deprecated.text.SubwordTextEncoder.build_from_corpus(features, target_vocab_size=2**16)

In [None]:
# tensorflow - padding
data_inputs = [tokenizer.encode(sentence) for sentence in features]

max_len = max([len(sentence) for sentence in data_inputs])

data_inputs = tf.keras.preprocessing.sequence.pad_sequences(data_inputs,
                                                            value = 0,
                                                            padding = 'post',
                                                            maxlen=max_len)

In [None]:
# data split
from sklearn.model_selection import train_test_split
train_inputs, test_inputs, train_labels, test_labels = train_test_split(data_inputs,
                                                                        labels,
                                                                        test_size=0.3,
                                                                        stratify = labels)

In [None]:
# model build
class DCNN(tf.keras.Model): 

  def __init__(self,
               vocab_size,
               emb_dim=128,
               nb_filters=50,
               ffn_units=512, 
               nb_classes=2,
               dropout_rate=0.1,
               training=True,
               name="dcnn"):
      
    super(DCNN, self).__init__(name=name)
    
    self.embedding = layers.Embedding(vocab_size, emb_dim)

    self.bigram = layers.Conv1D(filters=nb_filters, kernel_size=2, padding='same', activation='relu')
    self.trigram = layers.Conv1D(filters=nb_filters, kernel_size=3, padding='same', activation='relu')
    self.fourgram = layers.Conv1D(filters=nb_filters, kernel_size=4, padding='same', activation='relu')

    self.pool = layers.GlobalMaxPool1D()

    self.dense_1 = layers.Dense(units = ffn_units, activation = 'relu')

    self.dropout = layers.Dropout(rate = dropout_rate)
    
    if nb_classes == 2:
      self.last_dense = layers.Dense(units = 1, activation = 'sigmoid')
    else:
      self.last_dense = layers.Dense(units = nb_classes, activation = 'softmax')

  def call(self, inputs, training):
    x = self.embedding(inputs) 
    x_1 = self.bigram(x) 
    x_1 = self.pool(x_1) 
    x_2 = self.trigram(x) 
    x_2 = self.pool(x_2) 
    x_3 = self.fourgram(x) 
    x_3 = self.pool(x_3) 

    merged = tf.concat([x_1, x_2, x_3], axis = -1) # (batch_size, 3 * nb_filters)

    merged = self.dense_1(merged)
    merged = self.dropout(merged, training)
    output = self.last_dense(merged)

    return output

In [None]:
# variables init
vocab_size = tokenizer.vocab_size
emb_dim = 200 
nb_filters = 100 
ffn_units = 256 # 256
batch_size = 64 
nb_classes = len(set(train_labels)) 
dropout_rate = 0.2
nb_epochs = 10

In [None]:
# tensorflow parameters for local GPU
physical_devices = tf.config.experimental.list_physical_devices('GPU')
assert len(physical_devices) > 0, "Not enough GPU hardware devices available"
config = tf.config.experimental.set_memory_growth(physical_devices[0], True)

In [None]:
# model instance
Dcnn = DCNN(
    vocab_size=vocab_size, 
    emb_dim=emb_dim, 
    nb_filters=nb_filters,
    ffn_units=ffn_units, 
    nb_classes=nb_classes, 
    dropout_rate=dropout_rate
)

In [None]:
# model compile
Dcnn.compile(
    loss='binary_crossentropy', 
    optimizer='adam', 
    metrics=['accuracy']
)

In [None]:
# model checkpoint
checkpoint_path = "/home/daholive/Documents/twitter_ellection_brazil_v2/model/checkpoints"
ckpt = tf.train.Checkpoint(Dcnn=Dcnn) # passando o objeto que iremos salvar
ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=5)
if ckpt_manager.latest_checkpoint: # verifica se existe um ultimo checkpoint salvo
    ckpt.restore(ckpt_manager.latest_checkpoint) # restaura o ultimo checkpoint salvo
    print('Latest checkpoint restored')

In [None]:
# model fit
history = Dcnn.fit(
    train_inputs, 
    train_labels,
    batch_size = batch_size,
    epochs = nb_epochs,
    verbose = 1,
    validation_split = 0.10
)
ckpt_manager.save()

In [None]:
# model available
results = Dcnn.evaluate(
    test_inputs, 
    test_labels, 
    batch_size=batch_size
)

y_pred_test = Dcnn.predict(test_inputs)
y_pred_test = (y_pred_test > 0.5)

In [None]:
# confusion matrix
from sklearn.metrics import confusion_matrix
import seaborn as sns
cm = confusion_matrix(test_labels, y_pred_test)
cm

In [None]:
# confusion matrix heatmap
sns.heatmap(cm, annot=True)

In [None]:
# execution history
history.history.keys()

In [None]:
# chart - training and validation data versus loss progress
import matplotlib.pyplot as plt

plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss progress during training and validation')
plt.xlabel('Epoch')
plt.ylabel('Losses')
plt.legend(['Training loss', 'Validation loss'])

In [None]:
# chart - training and validation data versus accuracy progress
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model accuracy progress during training and validation')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend(['Training accuracy', 'Validation accuracy'])

In [None]:
# predict - 1
text = 'Lula é o melhor presidente'
text = tokenizer.encode(text) # texto tokenizado

Dcnn(np.array([text]), training=False).numpy()

In [None]:
# predict - 2
text = 'Bolsonaro não é de nada'
text = tokenizer.encode(text) # texto tokenizado

Dcnn(np.array([text]), training=False).numpy()

## TWIITER - DADOS RECENTES - APLICAÇÃO REDES NEURAIS CONVOLUCIONAIS

In [None]:
import os
path = os.path.abspath(os.path.join('..', ''))
df_twitter = spark.read.parquet(path+"/datasource/trusted/tweets_preprocessing")

In [None]:
features_ = df_twitter.rdd.map(lambda x: (
    {
        'twitter_id': x.twitter_id,
        'candidato': x.query,
        'text': x.text,
        'created_at_tz':x.created_at_tz, 
        'text_clean': x.text_clean
    }
)).collect()

In [None]:
dict_ = [ {
    'twitter_id':arr['twitter_id'],
    'created_at_tz':arr['created_at_tz'],
    'candidato':arr['candidato'],
    'text_original':arr['text'], 
    'text_clean':arr['text_clean'], 
    'sentiment_tax': float(Dcnn(np.array([tokenizer.encode(arr['text_clean'])]), training=False).numpy()[0][0]),
    'sentiment': 1 if float(Dcnn(np.array([tokenizer.encode(arr['text_clean'])]), training=False).numpy()[0][0])>0.5 else 0
  } for arr in features_]

In [None]:
df = spark.createDataFrame(dict_)

In [None]:
df.groupby('sentiment').count().show()

In [None]:
df.show()

In [None]:
# save data
(df
 .write
 .option('mergeSchema', 'true')
 .option('overwriteSchema', 'true')
 .save("/home/daholive/Documents/twitter_ellection_brazil_v2/datasource/refined/tweets_redes_neurais_convolucionais", mode='overwrite')) 