# Criando sessão spark

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder\
  .master('local[*]')\
  .appName('analise_nlp')\
  .getOrCreate()

## Dados

In [3]:
dados = spark.read.csv("imdb-reviews-pt-br.csv", sep=",", inferSchema=True, header=True, escape='\"')

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/d:/Estudo/ALURA/Apache Spark com Python/NLP/imdb-reviews-pt-br.csv.

## Explorando dados

In [None]:
dados.show()

In [None]:
# Quantidade de linhas
dados.count()

In [None]:
# Tipo
dados.printSchema()

In [None]:
# Conteúdo
dados.show()

In [None]:
dados.groupBy("sentiment").count().show()

## Limpeza de dados

### WordCloud

In [None]:
# Importação
from wordcloud import WordCloud
import matplotlib.pyplot as plt

# Amostra de 10% dos dados
amostra = dados.select('text_pt').sample(fraction = 0.1, seed=101)

# Pega os dados na amostra diretamente do texto sem ser objetos 
tudo = [texto['text_pt'] for texto in amostra.collect()]

# Nuvem de palavra 
wordcloud = WordCloud(collocations=False, 
                      background_color="white",
                      prefer_horizontal=1,
                      width=1000,
                      height=600).generate(str(tudo))

In [None]:
plt.figure(figsize=(20, 8))
plt.imshow(wordcloud)
plt.axis("off")
plt.show()

### **Limpeza**: caracteres especiais

In [None]:
# Queremos remover esses caracteres 
import string

string.punctuation

In [None]:
import pyspark.sql.functions as f

In [None]:
dados = dados.withColumn("texto_regex", f.regexp_replace("text_en", "[\$#,\"!%&'()*+-./;;<=>?@^_`´{|}~\\\\]", ""))

dados.limit(2).show(truncate = False)

In [None]:
dados = dados.withColumn("texto_limpo", f.trim(dados.texto_regex))

### Tokenização divisão em tokens

In [None]:
# Tokenizando os dados
from pyspark.ml.feature import Tokenizer
tokenizer = Tokenizer(inputCol="texto_limpo", outputCol="tokens")

tokenizado = tokenizer.transform(dados)

In [None]:
tokenizado.select("texto_limpo", "tokens").show()

### StopWords: Remoção de ruído

In [None]:
import nltk
nltk.download("stopwords")

In [None]:
from nltk.corpus import stopwords
from pyspark.ml.feature import StopWordsRemover

In [None]:
remover = StopWordsRemover(inputCol="tokens", outputCol="texto_final")

feature_data = remover.transform(tokenizado)

In [None]:
feature_data.show()

### Vetorização

#### CountVectorizer

In [None]:
from pyspark.ml.feature import HashingTF

hashingTF = HashingTF(inputCol="texto_final", outputCol="hashingTF")
hashingTF.setNumFeatures(50)

HTFfeaturizedData = hashingTF.transform(feature_data)

In [None]:
HTFfeaturizedData.show()

### TD-IDF

In [None]:
from pyspark.ml.feature import IDF
idf = IDF(inputCol="hashingTF", outputCol="features")
idfModel = idf.fit(HTFfeaturizedData)
TFIDFfeaturizedData = idfModel.transform(HTFfeaturizedData)

In [None]:
TFIDFfeaturizedData.select('texto_final', 'features').limit(5).show()

### Codificando

In [None]:
from pyspark.sql.functions import when

# Sim virou 1 e não 0
TFIDFfeaturizedData = TFIDFfeaturizedData.withColumn("sentiment", when(TFIDFfeaturizedData["sentiment"] == "pos", 1).otherwise(0))

TFIDFfeaturizedData.groupBy("sentiment").count().show()

## PIPELINE COM TUDO 

In [None]:
# Importação
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.sql.functions import when

# Passos de pré-processamento
tokenizer = Tokenizer(inputCol="texto_limpo", outputCol="tokens")
stopwords = StopWordsRemover(inputCol="tokens", outputCol="texto_final")
hashingTF = HashingTF(inputCol=stopwords.getOutputCol(), outputCol="HTF", numFeatures=1000)
tfidf = IDF(inputCol="HTF", outputCol="features")

# Criar um único pipeline com todas as etapas
pipeline = Pipeline(stages=[tokenizer, stopwords, hashingTF, tfidf])

# Ajustar o pipeline aos dados
pipeline_model = pipeline.fit(dados)

# Transformar os dados com o pipeline
dados_transformados = pipeline_model.transform(dados)

# Renomeando coluna sentiment e transformando em binário
dados_transformados = dados_transformados.withColumn("sentiment", when(dados_transformados["sentiment"] == "pos", 1.0).otherwise(0.0)).withColumnRenamed("sentiment", "label")

# Dividir os dados em treino e teste
train, test = dados_transformados.randomSplit([0.7, 0.3], seed=101)

# Modelo de Machine Learning (Árvore de Decisão)
dt = DecisionTreeClassifier(featuresCol="features", labelCol="label")

# Treinar o modelo
modelo_dt = dt.fit(train)

# Previsão nos dados de teste
previsao_dt = modelo_dt.transform(test)

# Mostrar as previsões
previsao_dt.show()

# Para prever novos comentários
nova_base = spark.createDataFrame([
    (1, "This is without doubt the worst movie I have ever seen, I hated the acting of the actor."),
    (0, "I loved the movie, excellent acting!")
], ["id", "texto_limpo"])

# Transformar a nova base com o pipeline
nova_base_transformada = pipeline_model.transform(nova_base)

# Fazer previsões na nova base
nova_class = modelo_dt.transform(nova_base_transformada)

In [None]:
# Mostrar as previsões da nova base
nova_class.show()

### Métricas

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

acuracia = evaluator.evaluate(previsao_dt)

In [None]:
acuracia

## Nova analise

In [None]:
nova_base = spark.createDataFrame([
        (1, "This is without doubt the worst movie i have ever seen, I hated the acting of the actor."),
        (0, "I loved the movie, excellent acting!"),
    ], ["texto_limpo"])

In [None]:
modelo_dt.transform(nova_base)