# Text Classification Using Machine Learning - Pyspark
    - Objetivo: prever a classe de sentimento de qualquer crítica (positiva ou negativa). 
Dados: revisão do Movie Lens rotulados

    - O dataframe apresenta as resenhas e classificação (label) positiva ou negativa
    - Então, o algoritmo de ML tentará prever os novos casos como positivo ou negativo tomando como base as resenhas    

In [1]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [2]:
# Load DataFrame
url='C:/Users/Pc/Desktop/Hartb/machineLearning/PYSPARK/Movie_reviews.txt'
text_df = spark.read.csv(url,inferSchema=True,header=True,sep=',')

text_df.printSchema()

root
 |-- Review: string (nullable = true)
 |-- Sentiment: string (nullable = true)



In [3]:
text_df.show(5, False)

+------------------------------------------------------------------------+---------+
|Review                                                                  |Sentiment|
+------------------------------------------------------------------------+---------+
|The Da Vinci Code book is just awesome.                                 |1        |
|this was the first clive cussler i've ever read, but even books like Rel|1        |
|i liked the Da Vinci Code a lot.                                        |1        |
|i liked the Da Vinci Code a lot.                                        |1        |
|I liked the Da Vinci Code but it ultimatly didn't seem to hold it's own.|1        |
+------------------------------------------------------------------------+---------+
only showing top 5 rows



- Você pode observar a coluna Sentiment em StringType, e precisaremos dela para convertê-la em um tipo Integer ou float daqui para frente

In [4]:
text_df.count()

7087

- Temos cerca de sete mil registros dos quais alguns podem não estar devidamente rotulados. Portanto, filtramos apenas os registros rotulados corretamente.

In [5]:
text_df = text_df.filter(((text_df.Sentiment =='1') | (text_df.Sentiment =='0')))

text_df.count()

6990

- Alguns dos registros foram filtrados e agora ficamos com 6.990 registros para análise. O próximo passo é validar uma série de revisões para cada classe.

In [6]:
text_df.groupBy('Sentiment').count().show()

+---------+-----+
|Sentiment|count|
+---------+-----+
|        0| 3081|
|        1| 3909|
+---------+-----+



- Estamos lidando com um conjunto de dados equilibrado aqui, pois ambas as classes têm um número quase semelhante de revisões. Vejamos alguns dos registros no conjunto de dados.

In [7]:
from pyspark.sql.functions import rand

In [8]:
text_df.orderBy(rand()).show(10,False)

+------------------------------------------------------------------------+---------+
|Review                                                                  |Sentiment|
+------------------------------------------------------------------------+---------+
|I loved the Harry Potter books before I discovered fanfiction but fanfic|1        |
|This quiz sucks and Harry Potter sucks ok bye..                         |0        |
|Brokeback Mountain is fucking horrible..                                |0        |
|Brokeback Mountain is fucking horrible..                                |0        |
|I love Harry Potter..                                                   |1        |
|Mission Impossible 3 et al. can be said to be boring!                   |0        |
|This quiz sucks and Harry Potter sucks ok bye..                         |0        |
|Ok brokeback mountain is such a horrible movie.                         |0        |
|The Da Vinci Code is awesome!!                                  

- Na próxima etapa, criamos uma nova coluna de rótulo como um tipo Integer e descartamos a coluna Sentiment original, que era do tipo String.

In [9]:
text_df = text_df.withColumn("Label", text_df.Sentiment.cast('float')).drop('Sentiment')

text_df.orderBy(rand()).show(10,False)

+-----------------------------------------------+-----+
|Review                                         |Label|
+-----------------------------------------------+-----+
|I love Harry Potter.                           |1.0  |
|I LOVE BROKEBACK MOUNTAIN!                     |1.0  |
|Gotta Love Harry Potter icons..                |1.0  |
|mission impossible 2 rocks!!....               |1.0  |
|I love Harry Potter.                           |1.0  |
|This quiz sucks and Harry Potter sucks ok bye..|0.0  |
|Brokeback Mountain was awesome.                |1.0  |
|Mission Impossible 3 was AWESOME...            |1.0  |
|I love Harry Potter.                           |1.0  |
|So Brokeback Mountain was really depressing.   |0.0  |
+-----------------------------------------------+-----+
only showing top 10 rows



- Também incluímos uma coluna adicional que captura o length da revisão.
        - Ou seja conta o total de letras (incluindo espaço) no documento

In [10]:
from pyspark.sql.functions import length

In [11]:
text_df = text_df.withColumn('length',length(text_df['Review']))

text_df.orderBy(rand()).show(10,False)

+------------------------------------------------------------------------+-----+------+
|Review                                                                  |Label|length|
+------------------------------------------------------------------------+-----+------+
|Brokeback Mountain is fucking horrible..                                |0.0  |40    |
|Mission Impossible 3 was quite boring.                                  |0.0  |38    |
|Harry Potter dragged Draco Malfoy ’ s trousers down past his hips and   |0.0  |69    |
|I loved Brokeback Mountain and will check out the short stories one of t|1.0  |72    |
|Brokeback Mountain was boring.                                          |0.0  |30    |
|Not because I hate Harry Potter, but because I am the type of person tha|0.0  |72    |
|I either LOVE Brokeback Mountain or think it's great that homosexuality |1.0  |71    |
|I absolutely LOVE Harry Potter, as you can tell already.                |1.0  |56    |
|I know you are all wondering no

In [12]:
text_df.groupBy('Label').agg({'Length':'mean'}).show()

+-----+-----------------+
|Label|      avg(Length)|
+-----+-----------------+
|  1.0|47.61882834484523|
|  0.0|50.95845504706264|
+-----+-----------------+



- Não há grande diferença entre a mean length das críticas positivas e negativas. O próximo passo é iniciar o processo de tokenização e remover as stopwords.

In [13]:
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover

In [14]:
# Remoção das  stopwords

# Primeiro, Tokeniza
tokenization=Tokenizer(inputCol='Review',outputCol='tokens')

tokenized_df=tokenization.transform(text_df)

# Depois remove a stopwords
stopword_removal=StopWordsRemover(inputCol='tokens',outputCol='refined_tokens')

refined_text_df=stopword_removal.transform(tokenized_df)

- Como agora estamos lidando apenas com tokens em vez de uma revisão inteira, faria mais sentido capturar vários tokens em cada revisão em vez de usar o legth da revisão. Criamos outra coluna (contagem de tokens) que fornece o número de tokens em cada linha.

In [15]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import *

In [16]:
# Cria uma função aplicada a colunas no Pyspark. Ela fornece a contagem de palavras e retorna um Inteiro
len_udf = udf(lambda s: len(s), IntegerType())


refined_text_df = refined_text_df.withColumn("token_count", len_udf(col('refined_tokens')))

refined_text_df.orderBy(rand()).show(10)

+--------------------+-----+------+--------------------+--------------------+-----------+
|              Review|Label|length|              tokens|      refined_tokens|token_count|
+--------------------+-----+------+--------------------+--------------------+-----------+
|I like Harry Pott...|  1.0|    21|[i, like, harry, ...|[like, harry, pot...|          3|
|I hate Harry Pott...|  0.0|    23|[i, hate, harry, ...|[hate, harry, pot...|          3|
|I love Harry Pott...|  1.0|    21|[i, love, harry, ...|[love, harry, pot...|          3|
|friday hung out w...|  0.0|    72|[friday, hung, ou...|[friday, hung, ke...|          9|
|I love Harry Potter.|  1.0|    20|[i, love, harry, ...|[love, harry, pot...|          3|
|i love being a se...|  1.0|    71|[i, love, being, ...|[love, sentry, mi...|          6|
|I love The Da Vin...|  1.0|    27|[i, love, the, da...|[love, da, vinci,...|          4|
|dudeee i LOVED br...|  1.0|    37|[dudeee, i, loved...|[dudeee, loved, b...|          4|
|The Da Vi

- Após a etapa de stopwords precisamos converter o texto em um recurso numérico
    - Podemos utilizar o count vectorizer

In [17]:
from pyspark.ml.feature import CountVectorizer

In [18]:
refined_text_df.show(5)

+--------------------+-----+------+--------------------+--------------------+-----------+
|              Review|Label|length|              tokens|      refined_tokens|token_count|
+--------------------+-----+------+--------------------+--------------------+-----------+
|The Da Vinci Code...|  1.0|    39|[the, da, vinci, ...|[da, vinci, code,...|          5|
|this was the firs...|  1.0|    72|[this, was, the, ...|[first, clive, cu...|          9|
|i liked the Da Vi...|  1.0|    32|[i, liked, the, d...|[liked, da, vinci...|          5|
|i liked the Da Vi...|  1.0|    32|[i, liked, the, d...|[liked, da, vinci...|          5|
|I liked the Da Vi...|  1.0|    72|[i, liked, the, d...|[liked, da, vinci...|          8|
+--------------------+-----+------+--------------------+--------------------+-----------+
only showing top 5 rows



In [19]:
count_vec = CountVectorizer(inputCol='refined_tokens',outputCol='features')

cv_text_df = count_vec.fit(refined_text_df).transform(refined_text_df)

cv_text_df.select(['refined_tokens','token_count','features','Label']).show(10, False)

#pag 217

+-------------------------------------------------------------+-----------+----------------------------------------------------------------------------------+-----+
|refined_tokens                                               |token_count|features                                                                          |Label|
+-------------------------------------------------------------+-----------+----------------------------------------------------------------------------------+-----+
|[da, vinci, code, book, awesome.]                            |5          |(2301,[0,1,4,43,236],[1.0,1.0,1.0,1.0,1.0])                                       |1.0  |
|[first, clive, cussler, ever, read,, even, books, like, rel] |9          |(2301,[11,51,229,237,275,742,824,1087,2140],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|1.0  |
|[liked, da, vinci, code, lot.]                               |5          |(2301,[0,1,4,53,358],[1.0,1.0,1.0,1.0,1.0])                                       |1.0  |
|[liked, d

- O índice representa a pósição da palavra (coluna) única na matriz
- Antes de atribuir um índice as palavras são ordanadas em ordem alfabetica.
- então em vez de ter os nomes nas colunas, são atribuidos indices
- 'da': posicao 0
- 'vinci': posicao 1
- 'loved': posicao zero

Essas posições são definidas dentro de cada documento. Por isso existe duas palavras com a mesma posição

In [20]:

model_text_df = cv_text_df.select(['features','token_count','Label'])


- Assim que tivermos o vetor de recursos para cada linha, podemos usar o VectorAssembler para criar recursos de entrada para o modelo de aprendizado de máquina.

In [21]:
from pyspark.ml.feature import VectorAssembler

In [22]:

df_assembler = VectorAssembler(inputCols=['features', 'token_count'],outputCol='features_vec')

model_text_df = df_assembler.transform(model_text_df)

model_text_df.printSchema()

root
 |-- features: vector (nullable = true)
 |-- token_count: integer (nullable = true)
 |-- Label: float (nullable = true)
 |-- features_vec: vector (nullable = true)



- Podemos usar qualquer um dos modelos de classificação nesses dados, mas continuamos com o treinamento do Modelo de Regressão Logística.

In [23]:
from pyspark.ml.classification import LogisticRegression

In [24]:
training_df,test_df = model_text_df.randomSplit([0.75,0.25])

- Para validar a presença de registros suficientes para ambas as classes no conjunto de dados train e test, podemos aplicar a função groupBy na coluna Label.

In [25]:
training_df.groupBy('Label').count().show()

+-----+-----+
|Label|count|
+-----+-----+
|  1.0| 2982|
|  0.0| 2306|
+-----+-----+



In [26]:
test_df.groupBy('Label').count().show()

+-----+-----+
|Label|count|
+-----+-----+
|  1.0|  927|
|  0.0|  775|
+-----+-----+



# NLP: Regressão logística

In [27]:
log_reg = LogisticRegression(featuresCol='features_vec',labelCol='Label').fit(training_df)

- Depois de treinar o modelo, avaliamos o desempenho do modelo no conjunto de dados de teste.

In [28]:
results = log_reg.evaluate(test_df).predictions
results.show()

+--------------------+-----------+-----+--------------------+--------------------+--------------------+----------+
|            features|token_count|Label|        features_vec|       rawPrediction|         probability|prediction|
+--------------------+-----------+-----+--------------------+--------------------+--------------------+----------+
|(2301,[0,1,4,5,64...|          6|  1.0|(2302,[0,1,4,5,64...|[-22.063774564624...|[2.61712494250314...|       1.0|
|(2301,[0,1,4,5,82...|          6|  1.0|(2302,[0,1,4,5,82...|[-18.573202819923...|[8.58539933273700...|       1.0|
|(2301,[0,1,4,10,1...|         10|  0.0|(2302,[0,1,4,10,1...|[45.9551864501695...|           [1.0,0.0]|       0.0|
|(2301,[0,1,4,11,1...|          6|  0.0|(2302,[0,1,4,11,1...|[0.22035687262570...|[0.55486738064878...|       0.0|
|(2301,[0,1,4,11,1...|          6|  0.0|(2302,[0,1,4,11,1...|[0.22035687262570...|[0.55486738064878...|       0.0|
|(2301,[0,1,4,12,1...|          8|  1.0|(2302,[0,1,4,12,1...|[-10.603077352117..

In [29]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [30]:
# VPP
true_postives = results[(results.Label == 1) & (results.prediction == 1)].count()

# VPN
true_negatives = results[(results.Label == 0) & (results.prediction == 0)].count()

In [31]:
# FP
false_positives = results[(results.Label == 0) & (results.prediction == 1)].count()

# FN
false_negatives = results[(results.Label == 1) & (results.prediction == 0)].count()

- O desempenho do modelo parece razoavelmente bom e é capaz de diferenciar facilmente entre avaliações positivas e negativas.

In [32]:
recall = float(true_postives)/(true_postives + false_negatives)

print(recall)

0.9838187702265372


In [33]:
precision = float(true_postives) / (true_postives + false_positives)

print(precision)

0.9702127659574468


In [34]:
accuracy=float((true_postives+true_negatives) /(results.count()))

print(accuracy)

0.9747356051703878
