In [None]:
#!pip install fugue
#!pip install pycaret[full]

In [None]:
import numpy as np 
import unidecode
import pandas as pd 
import re
import nltk 
import string
from nltk.corpus import stopwords
from nltk.stem.rslp import RSLPStemmer


import warnings
warnings.filterwarnings('ignore')

lemmatizer = RSLPStemmer()

In [None]:
import pyspark 
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:1.1.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.executor.memory","4G") \
    .config("spark.driver.memory","4G") \
    .config("spark.executor.cores","12") \
    .getOrCreate()

In [None]:
dataframe = spark.read.options(
    delimiter=';',
    header='True').csv("/home/daholive/Documents/twitter_ellection_brazil/datasource/TweetsWithTheme_v2.csv")

In [None]:
dataframe.show(10)

In [None]:
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf, col

def translate(mapping):
    def translate_(col):
        return mapping.get(col)
    return udf(translate_, StringType())

mapping= {
    "Negativo": 0,
    "Positivo": 1  
}

dataframe = dataframe.withColumn("sentiment_map", translate(mapping)("sentiment"))

dataframe.show(10)

In [None]:
def clean_text(texto):
    
    punct = string.punctuation # Cria uma tabela de tradução
    trantab = str.maketrans(punct, len(punct)*' ') # Todo simbolo da pontuação e substituido por um espaço
    
    emoj = re.compile("["
        u"\U0001F600-\U0001F64F"  # emoticons
        u"\U0001F300-\U0001F5FF"  # symbols & pictographs
        u"\U0001F680-\U0001F6FF"  # transport & map symbols
        u"\U0001F1E0-\U0001F1FF"  # flags (iOS)
        u"\U00002500-\U00002BEF"  # chinese char
        u"\U00002702-\U000027B0"
        u"\U00002702-\U000027B0"
        u"\U000024C2-\U0001F251"
        u"\U0001f926-\U0001f937"
        u"\U00010000-\U0010ffff"
        u"\u2640-\u2642" 
        u"\u2600-\u2B55"
        u"\u200d"
        u"\u23cf"
        u"\u23e9"
        u"\u231a"
        u"\ufe0f"  # dingbats
        u"\u3030""]+", re.UNICODE)
    
    texto = texto.lower()
    texto = re.sub('\d+', '', str(texto)).replace("&gt;"," ").replace("&lt;"," ") 
    texto = re.sub(r"https?:\/\/\S+","", texto)
    texto = re.sub(r"@[A-Za-z0-9\w]+","", texto)
    texto = re.sub(r"#[A-Za-z0-9\w]+","", texto)
    texto = re.sub('^RT ',' ',texto)
    texto = texto.translate(trantab).replace("\n"," ")
    # texto = re.sub(emoj, '', texto).replace("“"," ").replace("”"," ").strip().lower()
    texto = texto.replace("“"," ").replace("”"," ")
    texto = unidecode.unidecode(texto)
    texto = ' '.join([word for word in texto.split() if word not in list(set(stopwords.words("portuguese")))])
    texto = ' '.join([word for word in texto.split() if word.isalnum()])
    texto = ' '.join([re.sub(r'([a-z])\1+', r'\1',word) for word in texto.split()])
    texto = ' '.join([re.sub(r'(ha)\1+', r'\1',word) for word in texto.split()])
    texto = ' '.join([re.sub(r'(uha)\1+', r'\1',word) for word in texto.split()])
    texto = ' '.join([lemmatizer.stem(word) for word in texto.split()])

    return texto.strip()

In [None]:
udf_clean_txt = udf(lambda x: clean_text(x), StringType())

In [None]:
dataframe = dataframe.withColumn("tweet_text_clean", udf_clean_txt(col("tweet_text")))
dataframe.show(10)

In [None]:
features = dataframe.select('tweet_text_clean').rdd.flatMap(lambda x: x).collect()

In [None]:
labels = dataframe.select('sentiment_map').rdd.flatMap(lambda x: x).collect()

In [None]:
from sklearn.feature_extraction.text import CountVectorizer, TfidfTransformer

In [None]:
# Bigram Counts
bigram_vectorizer = CountVectorizer(ngram_range=(1, 2), max_features = 500)
bigram_vectorizer.fit(features)
X_train_bigram = bigram_vectorizer.transform(features)

In [None]:
# Bigram Tf-Idf
bigram_tf_idf_transformer = TfidfTransformer()
bigram_tf_idf_transformer.fit(X_train_bigram)
X_train_bigram_tf_idf = bigram_tf_idf_transformer.transform(X_train_bigram)

In [None]:
count_vect_df = pd.DataFrame(X_train_bigram_tf_idf.todense(), columns=bigram_vectorizer.get_feature_names())
count_vect_df

In [None]:
df_setup = pd.concat(
    [pd.Series(labels,name="sentiment"), count_vect_df], 
    axis = 1)
df_setup

In [None]:
from pycaret.classification import *

In [None]:
clf1 = setup(
    data = df_setup, 
    target = 'sentiment', 
    session_id = 5, 
    fold=5,
    train_size = 0.75,
    verbose=True,
    silent = True, 
    use_gpu = False,
    n_jobs=-1,
    fix_imbalance = True,
    remove_outliers = True,
    handle_unknown_categorical = True, 
    unknown_categorical_method = 'most_frequent')


In [None]:
best = compare_models(include=['gbc','lda','ridge','ada','lr','lightgbm'],sort = "F1")

In [None]:
# import parallel back-end
from pycaret.parallel import FugueBackend

In [None]:
# compare models
# conda install -c conda-forge lightgbm
best = compare_models(include=['lr'],parallel = FugueBackend(spark))

In [None]:
from pycaret.classification import *
import fugue_spark
from fugue import transform

In [None]:
schema = """Model:str, Accuracy:float, AUC:float, Recall:float, Prec:float, 
F1:float, Kappa:float, MCC:float, TT_Sec:float, sentiment:str"""

def wrapper(df: pd.DataFrame) -> pd.DataFrame:

    clf = setup(
        data = df, 
        target = 'sentiment', 
        session_id = 5, 
        train_size = 0.75,
        fold=5,
        use_gpu = False,
        verbose=True,
        silent = True, 
        n_jobs=-1,
        fix_imbalance = True,
        remove_outliers = True,
        handle_unknown_categorical = True, 
        unknown_categorical_method = 'most_frequent')

    models = compare_models(include=['ridge','ada','lr','lightgbm'],sort = "F1")

    results = pull().reset_index(drop=True)

    # Fugue can't have spaces or . in column names
    results = results.rename(columns={"TT (Sec)": "TT_Sec","Prec.": "Prec"})

    return results


res = transform(dataframe.replace({np.nan: None}), wrapper, schema=schema, engine="spark")


In [None]:
res.show()

In [None]:
best = compare_models(sort='F1')  

# fold=5
#  0.7637
# include=['gbc','lda','ridge','ada','lr','lightgbm'],

In [None]:
best

"""
RidgeClassifier(alpha=1.0, class_weight=None, copy_X=True, fit_intercept=True,
                max_iter=None, normalize=False, random_state=5, solver='auto',
                tol=0.001)
"""

In [None]:
tuned_dt = tune_model(best, fold=5)

In [None]:
plot_model(tuned_dt, plot = 'auc')

In [None]:
plot_model(tuned_dt, plot = 'pr')

In [None]:
plot_model(tuned_dt, plot='feature')

In [None]:
plot_model(tuned_dt, plot = 'confusion_matrix')

In [None]:
predict_model(tuned_dt);

In [None]:
final_rf = finalize_model(tuned_dt)

In [None]:
save_model(final_rf,'Final Linear Discriminant Analysis')

In [None]:
saved_final_rf = load_model('Final Linear Discriminant Analysis')

In [None]:
saved_final_rf

In [None]:
texto = ["Bolsonaro você é um idiota e vai perder a eleição","Lula é o melhor, e ganha no primeiro turno"]

features = [clean_text(w) for w in texto]
features

In [None]:
vectorizer = TfidfVectorizer(
    min_df=0.004,
    max_df=0.7
)

In [None]:
processed_features = vectorizer.fit_transform(features)

In [None]:
vectorizer.get_feature_names()

In [None]:
df_tfidf_v2 = pd.DataFrame(
    processed_features.toarray(), 
    columns=vectorizer.get_feature_names())

df_tfidf_v2

In [None]:

unseen_predictions = predict_model(tuned_dt, data=df_tfidf_v2)
