# Análise de Sentimentos de Avaliações do IMDB 

Os scripts abaixo realizam a limpeza e transformação dos dados. Além disso, são criados vários modelos de machine learning, com o objetivo de prever se o texto de avalição de um filme foi postivo ou negativo.

Por último, todos o desempenho de todos os modelos são comparados, com base na acurácia.

## Importando Bibliotecas

### Criação da Instância Spark

In [3]:
# Criação da instância Spark
# import findspark
# findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("NLP").config("spark.executor.heartbeatInterval","700000s") \
.config("spark.network.timeout", "900000s").config("spark.driver.memory", "8g").config("spark.memory.offHeap.enabled","true") \
.config("spark.memory.offHeap.size","10g").config("spark.executor.memory","6g") \
.config("spark.task.cpus", 2).config("spark.executor.cores","3").getOrCreate()

spark

### Importação de Outras Bibliotecas

In [4]:
# from pyspark.ml import Pipeline 
from pyspark.ml.feature import * #CountVectorizer,StringIndexer, RegexTokenizer,StopWordsRemover
from pyspark.sql import functions
from pyspark.sql.functions import * #col, udf,regexp_replace,isnull
from pyspark.sql.types import StringType,IntegerType

from pyspark.ml.classification import *
from pyspark.ml.classification import NaiveBayes

from pyspark.ml.evaluation import *
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.ml.feature import StopWordsRemover

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

import os

### Importando os dados

In [5]:
path = "../data/dados_brutos/"

# CSV
reviews = spark.read.csv(path+'IMDB Dataset.csv',header=True, escape="\"")

### Verificando características dos dados

In [None]:
reviews.limit(10).toPandas()

In [None]:
reviews.printSchema()

In [None]:
print("Número de avaliações: ", reviews.count())

In [14]:
print(reviews.show(2))

+--------------------+---------+
|              review|sentiment|
+--------------------+---------+
|One of the other ...| positive|
|A wonderful littl...| positive|
+--------------------+---------+
only showing top 2 rows

None


##### Verifica se existe algum valor nulo entre os dados

In [None]:

def null_value_calc(df):
    null_columns_counts = []
    numRows = df.count()
    for k in df.columns:
        nullRows = df.where(col(k).isNull()).count()
        if(nullRows > 0):
            temp = k,nullRows,(nullRows/numRows)*100
            null_columns_counts.append(temp)
    return(null_columns_counts)

null_columns_calc_list = null_value_calc(reviews)

if (len(null_columns_calc_list) > 0):
    spark.createDataFrame(null_columns_calc_list, ['Column_Name', 'Null_Values_Count','Null_Value_Percent']).show()



##### Como não existe nenhuma linha de dados nula, não é necessário nenhum tratamento adicional. 

##### Verifica o balanceamento entre as classes

In [None]:
reviews.groupBy("sentiment").count().orderBy(col("count").desc()).show()

##### Como as classes estão balanceadas, não é necessário nenhum tratamento adicional

### Transformação dos Dados

##### Transforma os textos usados na coluna "sentiment" em labels númericos

In [None]:
df = reviews
indexer = StringIndexer(inputCol="sentiment", outputCol="label")
df = indexer.fit(df).transform(df)
df.limit(10).toPandas()

In [None]:
# Remove as tags das avaliações
df = df.withColumn("review",regexp_replace(df["review"], '<.*/>', ''))
# Remove tudo que não seja uma letra
df = df.withColumn("review",regexp_replace(df["review"], '[^A-Za-z ]+', ''))
# Remove múltiplos espaços em branco
df = df.withColumn("review",regexp_replace(df["review"], ' +', ' '))
# Coloca todo o texto em minúsculo
df = df.withColumn("review",lower(df["review"]))

In [None]:
df.limit(10).toPandas()

##### Cria coluna "words" que contém cada palavra do texto completo da avaliação

In [None]:
regex_tokenizer = RegexTokenizer(inputCol="review", outputCol="words", pattern="\\W")
df = regex_tokenizer.transform(df)

df.limit(10).toPandas()

#####  Exclui todas as palavras que são consideradas irrelevantes para o entedimento do sentido de um texto

In [None]:
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
feature_data = remover.transform(df)
    
feature_data.limit(10).toPandas()

##### Cria coluna "rawfeatures" que contém tupla com 3 itens. O primeiro item é a quantidade de grupos de palavras passados como parâmetro. O segundo item representa o índice da palavra. Como as palavras são agrupadas, o índice representa, o grupo onde a palavra foi inserido. O terceiro item é a frequência absoluta de cada palavra

In [None]:
# Existe a Count Vector que faz a mesma coisa que a HashingTF, porém a quantidade de grupos será igual a quantidade total de palavras
hashingTF = HashingTF(inputCol="filtered", outputCol="rawfeatures", numFeatures=250)
HTFfeaturizedData = hashingTF.transform(feature_data)

HTFfeaturizedData.limit(10).toPandas()


In [None]:
HTFfeaturizedData.show(3,False)

##### Cria coluna "features" que contém tupla com 3 itens. O retorno é bem parecido com a HashingTF, porém, aqui o terceito item será o resultado do seguinte cálculo:
##### idf = log((m + 1) / (d(t) + 1)), onde m é o número total de linhas e d(t) é o número de linhas que contém a palavra t

In [None]:
# TF-IDF
idf = IDF(inputCol="rawfeatures", outputCol="features")
idfModel = idf.fit(HTFfeaturizedData)
TFIDFfeaturizedData = idfModel.transform(HTFfeaturizedData)
TFIDFfeaturizedData.name = 'TFIDFfeaturizedData'

# Renomeia coluna "rawfetatures" para "features" para consistência
HTFfeaturizedData = HTFfeaturizedData.withColumnRenamed("rawfeatures","features")
HTFfeaturizedData.name = 'HTFfeaturizedData' #We will use later for printing

if os.path.isdir("../data/dados_transformados/HTFfeaturizedData"):
    HTFfeaturizedData.write.mode("Overwrite").partitionBy("label").parquet("../data/dados_transformados/HTFfeaturizedData")
else:
    HTFfeaturizedData.write.partitionBy("label").parquet("../data/dados_transformados/HTFfeaturizedData")
    
if os.path.isdir("../data/dados_transformados/TFIDFfeaturizedData"):
    TFIDFfeaturizedData.write.mode("Overwrite").partitionBy("label").parquet("../data/dados_transformados/TFIDFfeaturizedData")
else:
    TFIDFfeaturizedData.write.partitionBy("label").parquet("../data/dados_transformados/TFIDFfeaturizedData")

TFIDFfeaturizedData.limit(10).toPandas()

##### As tabelas HTFfeaturizedData e TFIDFfeaturizedData serão usadas na construção de difentes modelos de machine learning. Os dados dessas duas tabelas são salvos

In [None]:
TFIDFfeaturizedData.show(3,False)

##### O Word2Vec cria a coluna "filtered" que retorna uma lista de valores baseada na lista de palavras. O parâmetro "vectorSize" determina o número de valores da lista e o número de grupos , e o "minCount" determina a quantidade mínima de aparições que uma palavra deve ter para ser considerada no cálculo. Logo, cada valor da lista mostra o grau de similaridade da lista de palavras com cada um dos grupos

##### O Word2Vec retorna alguns valores negativos. Para corrigir isso, o MinMaxScaler  fará o seguinte cálculo:
##### Formula = ((VALOR-COLUNA_MIN)/(COLUNA_MAX-COLUNA_MIN)) *(max - min) + min

In [None]:
# Word2Vec
word2Vec = Word2Vec(vectorSize=250, minCount=5, inputCol="filtered", outputCol="features")
model = word2Vec.fit(feature_data)

W2VfeaturizedData = model.transform(feature_data)

scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

scalerModel = scaler.fit(W2VfeaturizedData)

scaled_data = scalerModel.transform(W2VfeaturizedData)
W2VfeaturizedData = scaled_data.select('sentiment','review','label','scaledFeatures')
# Renomeia coluna "scaledFeatures" para "features" para consistência
W2VfeaturizedData = W2VfeaturizedData.withColumnRenamed('scaledFeatures','features')

W2VfeaturizedData.name = 'W2VfeaturizedData'

if os.path.isdir("../data/dados_transformados/W2VfeaturizedData"):
    W2VfeaturizedData.write.mode("Overwrite").partitionBy("label").parquet("../data/dados_transformados/W2VfeaturizedData")
else:
    W2VfeaturizedData.write.partitionBy("label").parquet("../data/dados_transformados/W2VfeaturizedData")

##### A tabela W2VfeaturizedData será usada na construção de difentes modelos de machine learning. Os dados dessa tabela são salvos

In [None]:
W2VfeaturizedData.show(3,False)

### Criação dos modelos de machine learning

Serão testados 3 tabelas e um conjunto de várias técnicas de classificação. Cada combinação irá gerar um modelo de machine learning que serão comparados pela acurácia

##### A função abaixo fará a criação do modelo de machine learning para cada combinação de tabelas e tipos de modelo

In [None]:
def ClassTrainEval(classifier,features,classes,train,test):

    def FindMtype(classifier):
        M = classifier
        # Retorna texto com o tipo de classificador usado
        Mtype = type(M).__name__
        
        return Mtype
    
    Mtype = FindMtype(classifier)
    

    def IntanceFitModel(Mtype,classifier,classes,features,train):
        
        if Mtype == "OneVsRest":
            # Instância classificador
            lr = LogisticRegression()
            # Cria instância do classificador OneVsRest
            OVRclassifier = OneVsRest(classifier=lr)
            # Adiciona parâmetros de livre escolha
            paramGrid = ParamGridBuilder() \
                .addGrid(lr.regParam, [0.1, 0.01]) \
                .build()
            # Usa o CrossValidator para validar o melhor modelo dentre a lista de parâmetros escolhidos
            crossval = CrossValidator(estimator=OVRclassifier,
                                      estimatorParamMaps=paramGrid,
                                      evaluator=MulticlassClassificationEvaluator(),
                                      numFolds=2)
            fitModel = crossval.fit(train)
            return fitModel
        if Mtype == "MultilayerPerceptronClassifier":
            # Especifica camadas da rede neural
            # Camada de entrada terá o tamanho igual a quantidade de valores da coluna "features"
            # As camadas intermediárias terão o mesmo tamanho mais 1 e o mesmo tamanho usado na camada de entrada
            # A camada de saída terá o tamanho igual ao número de classes
            # O CrossValidator não é usado aqui
            features_count = len(features[0][0])
            layers = [features_count, features_count+1, features_count, classes]
            MPC_classifier = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)
            fitModel = MPC_classifier.fit(train)
            return fitModel
        if Mtype in("LogisticRegression","NaiveBayes","RandomForestClassifier","GBTClassifier","LinearSVC","DecisionTreeClassifier"):
  
            # Adiciona parâmetros de livre escolha
            if Mtype in("LogisticRegression"):
                paramGrid = (ParamGridBuilder() \
#                              .addGrid(classifier.regParam, [0.1, 0.01]) \
                             .addGrid(classifier.maxIter, [10, 15,20])
                             .build())
                
            # Adiciona parâmetros de livre escolha
            if Mtype in("NaiveBayes"):
                paramGrid = (ParamGridBuilder() \
                             .addGrid(classifier.smoothing, [0.0, 0.2, 0.4, 0.6]) \
                             .build())
                
            # Adiciona parâmetros de livre escolha
            if Mtype in("RandomForestClassifier"):
                paramGrid = (ParamGridBuilder() \
                               .addGrid(classifier.maxDepth, [2, 5, 10])
#                                .addGrid(classifier.maxBins, [5, 10, 20])
#                                .addGrid(classifier.numTrees, [5, 20, 50])
                             .build())
                
            # Adiciona parâmetros de livre escolha
            if Mtype in("GBTClassifier"):
                paramGrid = (ParamGridBuilder() \
#                              .addGrid(classifier.maxDepth, [2, 5, 10, 20, 30]) \
#                              .addGrid(classifier.maxBins, [10, 20, 40, 80, 100]) \
                             .addGrid(classifier.maxIter, [10, 15,50,100])
                             .build())
                
           # Adiciona parâmetros de livre escolha
            if Mtype in("LinearSVC"):
                paramGrid = (ParamGridBuilder() \
                             .addGrid(classifier.maxIter, [10, 15]) \
                             .addGrid(classifier.regParam, [0.1, 0.01]) \
                             .build())
            
            # Adiciona parâmetros de livre escolha
            if Mtype in("DecisionTreeClassifier"):
                paramGrid = (ParamGridBuilder() \
#                              .addGrid(classifier.maxDepth, [2, 5, 10, 20, 30]) \
                             .addGrid(classifier.maxBins, [10, 20, 40, 80, 100]) \
                             .build())
            
            # Usa o CrossValidator para validar o melhor modelo dentre a lista de parâmetros escolhidos
            crossval = CrossValidator(estimator=classifier,
                                      estimatorParamMaps=paramGrid,
                                      evaluator=MulticlassClassificationEvaluator(),
                                      numFolds=2)
            fitModel = crossval.fit(train)
            return fitModel
    
    fitModel = IntanceFitModel(Mtype,classifier,classes,features,train)
    
    # Mostra na tela alguma métricas de cada tipo de classificador
    if fitModel is not None:
        
        if Mtype in("OneVsRest"):
            # Obtém o melhor modelo do CrossValidator
            BestModel = fitModel.bestModel
            print(" ")
            print('\033[1m' + Mtype + '\033[0m')
            # Extraí a lista de coeficientes e a interseção do modelo
            models = BestModel.models
            for model in models:
                print('\033[1m' + 'Interseção: '+ '\033[0m',model.intercept,'\033[1m' + '\nCoeficientes:'+ '\033[0m',model.coefficients)

        if Mtype == "MultilayerPerceptronClassifier":
            print("")
            print('\033[1m' + Mtype," Pesos"+ '\033[0m')
            print('\033[1m' + "Pesos do Modelo: "+ '\033[0m',fitModel.weights.size)
            print("")

        if Mtype in("DecisionTreeClassifier", "GBTClassifier","RandomForestClassifier"):
            # FEATURE IMPORTANCES
            # Obtém o melhor modelo do CrossValidator
            BestModel = fitModel.bestModel
            print(" ")
            print('\033[1m' + Mtype," Importância das entradas"+ '\033[0m')
            print("(A pontuação mais próxima de 1 é a mais importante)")
            print("Pontuação mais baixa é a menos importante")
            print(" ")
            print(BestModel.featureImportances)
            
            if Mtype in("DecisionTreeClassifier"):
                global DT_featureimportances
                DT_featureimportances = BestModel.featureImportances.toArray()
                global DT_BestModel
                DT_BestModel = BestModel
            if Mtype in("GBTClassifier"):
                global GBT_featureimportances
                GBT_featureimportances = BestModel.featureImportances.toArray()
                global GBT_BestModel
                GBT_BestModel = BestModel
            if Mtype in("RandomForestClassifier"):
                global RF_featureimportances
                RF_featureimportances = BestModel.featureImportances.toArray()
                global RF_BestModel
                RF_BestModel = BestModel

        if Mtype in("LogisticRegression"):
            # Obtém o melhor modelo do CrossValidator
            BestModel = fitModel.bestModel
            print(" ")
            print('\033[1m' + Mtype," Matrix de coeficientes"+ '\033[0m')
            print("Coeficientes: \n" + str(BestModel.coefficientMatrix))
            print("Interseção: " + str(BestModel.interceptVector))
            global LR_coefficients
            LR_coefficients = BestModel.coefficientMatrix.toArray()
            global LR_BestModel
            LR_BestModel = BestModel

        if Mtype in("LinearSVC"):
            # Obtém o melhor modelo do CrossValidator
            BestModel = fitModel.bestModel
            print(" ")
            print('\033[1m' + Mtype," Coeficientes"+ '\033[0m')
            print("Coeficientes: \n" + str(BestModel.coefficients))
            global LSVC_coefficients
            LSVC_coefficients = BestModel.coefficients.toArray()
            global LSVC_BestModel
            LSVC_BestModel = BestModel
        
   
    # Estabelece colunas da tabela que irá comparar os resultados de cada classificador
    columns = ['Classifier', 'Result']
    
    if Mtype in("LinearSVC","GBTClassifier") and classes != 2:
        Mtype = [Mtype] 
        score = ["N/A"]
        result = spark.createDataFrame(zip(Mtype,score), schema=columns)
    else:
        predictions = fitModel.transform(test)
        # Avalia o modelo pela acurácia
        MC_evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
        accuracy = (MC_evaluator.evaluate(predictions))*100
        Mtype = [Mtype]
        score = [str(accuracy)]
        # Insere na tabela os resultados do modelo
        result = spark.createDataFrame(zip(Mtype,score), schema=columns)
        result = result.withColumn('Result',result.Result.substr(0, 5))
        # Salva modelo
        if os.path.isdir('../output/' + Mtype[0] + '_' + train.name):
            fitModel.write().overwrite().save('../output/' + Mtype[0] + '_' + train.name )
        else:
            fitModel.save('../output/' + Mtype[0] + '_' + train.name)
        
    return result
    # Retorna tabela com os resultados do modelo

In [None]:

classifiers = [
                LogisticRegression()
                ,OneVsRest()
               ,LinearSVC()
               ,NaiveBayes()
               ,RandomForestClassifier()
               ,GBTClassifier()
               ,DecisionTreeClassifier()
               ,MultilayerPerceptronClassifier()
              ] 

featureDF_list = [HTFfeaturizedData,TFIDFfeaturizedData,W2VfeaturizedData]

##### Para cada uma das 3 tabelas, cria modelos de machine learning beseados na lista de classificadores acima

##### Cada modelo criado é avaliado pela acurácia. No final, uma tabela compara o resultado de cada modelo

In [None]:

for featureDF in featureDF_list:
    print('\033[1m' + featureDF.name," Results:"+ '\033[0m')
    train, test = featureDF.randomSplit([0.7, 0.3],seed = 11)
    train.name = featureDF.name
    features = featureDF.select(['features']).collect()
    # Retorna o número de classes
    classes = featureDF.select("label").distinct().count()

    # Organiza tabela que receberá os resultados do modelo
    columns = ['Classifier', 'Result']
    vals = [("Place Holder","N/A")]
    results = spark.createDataFrame(vals, columns)

    for classifier in classifiers:
        new_result = ClassTrainEval(classifier,features,classes,train,test)
        results = results.union(new_result)
    results = results.where("Classifier!='Place Holder'")
    # Mostra na tela a comparação do resultado de cada classificador
    print(results.show(truncate=False))