<a href="https://colab.research.google.com/github/Tatianakami/spark-sentiment-analysis/blob/main/Sentiment_Analysis_Pipeline_ipyn.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Instala√ß√£o do PySpark**

In [None]:
!pip install pyspark

**Iniciando uma sess√£o**

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("analise_nlp") \
    .getOrCreate()

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("analise_nlp") \
    .getOrCreate()

Leitura dos **dados**

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("analise_nlp") \
    .getOrCreate()

# Definindo o caminho direto para o arquivo dentro da pasta que encontramos
caminho_imdb = "/content/drive/MyDrive/ModuloSpark2_processamento_de_linguagem_natural/imdb-reviews-pt-br.csv"

# Lendo os dados com as configura√ß√µes para texto (NLP)
dados = spark.read.csv(caminho_imdb,
                       escape="\\",
                       header=True,
                       inferSchema=True)

# Mostrando o resultado para confirmar
print(f"Sucesso! O dataset tem {dados.count()} avalia√ß√µes.")
dados.show(5, truncate=50) # truncate=50 ajuda a ler o texto sem cortar muito

**Explorando os dados(estrutura, tamanho dos dados)**

In [None]:
print(f'Numero de linhas:{dados.count()}', f'numero de colunas:{len(dados.columns)}')

 **Campos e Tipos**

In [None]:
dados.printSchema()

**Tamanho dos dados**

In [None]:
print(f'N¬∫ linhas:{dados.count()}',f'\nN¬∫ colunas:{len(dados.columns)}')

In [None]:
# Conteudo
# dados.head() n√£o traz a estrutura desejada

dados.limit(99).show()

In [None]:
# Apresentando alguns coment√°rios ditos como negativo ou positivo.

print("Negativo")
dados.filter(dados.id == 190).select('text_pt').show(truncate=False)

print("Positivo")
dados.filter(dados.id == 12427).select('text_pt').show(truncate=False)

In [None]:
# Lendo o arquivo com "paredes refor√ßadas" (par√¢metros extras)
dados = spark.read.csv(caminho_imdb,
                       header=True,
                       inferSchema=True,
                       quote="\"",       # Diz que o texto est√° protegido por aspas
                       escape="\"",      # Diz para n√£o se assustar com aspas duplas ""
                       multiLine=True)   # Avisa que um coment√°rio pode pular de linha

# AGORA TESTE O NEGATIVO NOVAMENTE:
print("Negativo (ID 190) - Agora deve estar em Portugu√™s:")
dados.filter(dados.id == 190).select('text_pt').show(truncate=False)

In [None]:
# Contabilizando os tipos de coment√°rios
dados.groupBy('sentiment').count().show()

In [None]:
# Propor√ß√£o dos coment√°rios
# conseguimos perceber que nossos dados s√£o balanceados.
import pyspark.sql.functions as f
from pyspark.sql.window import Window

conta_classe=dados.groupBy('sentiment').count()
conta_classe.withColumn("(%)", f.round(f.col('count')/f.sum('count').over(Window.partitionBy())*100, 2)).show()

In [None]:
from wordcloud import WordCloud
import matplotlib.pyplot as plt

In [None]:
dados.limit(10).show()

**WordCloud**

In [None]:
amostra = dados.select('text_pt').sample(fraction = 0.1, seed = 101)

In [None]:
# 3. Transformar em uma lista que o Python entenda (Aten√ß√£o aos colchetes!)
tudo = [texto['text_pt'] for texto in amostra.collect()]

In [None]:
# 4. Criar a Nuvem de Palavras

wordcloud = WordCloud(background_color='white',
                      width=1000,
                      height=600,
                      collocations=False,
                      prefer_horizontal=1.0).generate(str(tudo))

In [None]:
# 5. Mostrar na tela
plt.figure(figsize=(20,8))
plt.imshow(wordcloud)
plt.axis("off") # Esconde os n√∫meros dos eixos para ficar limpo
plt.show()

**Limpeza:caracteres especiais**

In [None]:
import string
string.punctuation

In [None]:
amostra = spark.createDataFrame([
                ("Oi, JP! Blz?",),
                ("$$$\\ |~ Parab√©ns ~| \\$$$",),
                ("(#amovc #paz&amor ^.^)",),
                ("\"bora *_* \"",),
                ("=>->'...``` vc foi selecionad@ ¬¥¬¥¬¥...'<=<-",),
                ("{comprar: arroz; feij√£o e pepino} //",),
                ("!\"#$&'()*+,-./:;<=>?@[\]^_`{|}~",),
                ("ana@gmail.com",)
        ], ["textos"])

In [None]:
import pyspark.sql.functions as f

In [None]:
amostra = amostra.withColumn("texto_regex", f.regexp_replace("textos", "\$", ""))

In [None]:
amostra = amostra.withColumn("texto_regex", f.regexp_replace("textos", "[\$#,\"!%&'()*+-./;:<=>?@^_`¬¥{|}~\\\\]", ""))
amostra.show(truncate = False)

In [None]:
amostra = amostra.withColumn("texto_limpo", f.trim(amostra.texto_regex))
amostra.show(truncate = False)

In [None]:
dados = dados.withColumn("texto_regex", f.regexp_replace("text_en", "[\$#,\"!%&'()*+-./;;<=>?@^_`¬¥{|}~\\\\]", ""))

dados.limit(2).show(truncate = False)

In [None]:
dados = dados.withColumn("texto_limpo", f.trim(dados.texto_regex))

In [None]:
dados.limit(2).show(truncate=False, vertical=True)

In [None]:
# Criando uma lista de palavras que "entregam" que o texto √© ingl√™s
stopwords_ingles = [" the ", " and ", " is ", " of ", " to ", " with "]

# Filtrando para manter APENAS o que N√ÉO tem essas palavras
dados_portugues = dados
for palavra in stopwords_ingles:
    # O sinal ~ significa "N√ÉO"
    dados_portugues = dados_portugues.filter(~f.col("text_pt").contains(palavra))

print(f"Agora voc√™ tem {dados_portugues.count()} linhas apenas em portugu√™s!")

In [None]:
# Comparando o antes e o depois
print(f"Total de linhas antes: {dados.count()}")
print(f"Total de linhas s√≥ em Portugu√™s: {dados_portugues.count()}")

# Ver o resultado
dados_portugues.select("text_pt").show(5, truncate=100)

**O Fatiador de Palavras (Tokeniza√ß√£o)**

In [None]:
from pyspark.ml.feature import Tokenizer
from pyspark.sql.types import IntegerType
import pyspark.sql.functions as f

# 1. Preparando a m√°quina de fatiar.
# Ela pega o 'texto_limpo' e joga o resultado na coluna 'tokens'.
tokenizer = Tokenizer(inputCol="texto_limpo", outputCol="tokens")

# 2. Passo os meus dados pela m√°quina.
# O legal √© que o Tokenizer j√° deixa tudo MIN√öSCULO sozinho! Menos um trabalho pra mim.
tokenizado = tokenizer.transform(dados)

# 3. Vou criar uma regrinha r√°pida para contar quantas palavras tem em cada frase.
# √â um contador de palavras autom√°tico.
countTokens = f.udf(lambda lista: len(lista), IntegerType())

# 4. Agora eu mostro o texto, a lista de palavras e o total de palavras (frequ√™ncia).
tokenizado.select("texto_limpo", "tokens") \
    .withColumn("Freq_tokens", countTokens(f.col("tokens"))).show()

**StopWords**

In [None]:
from pyspark.ml.feature import StopWordsRemover
import nltk
from nltk.corpus import stopwords

In [None]:
# 1. Baixando a "lista de lixo" oficial do portugu√™s (NLTK)
nltk.download("stopwords")
stop_portugues = stopwords.words("portuguese")

# 2. Configurando a m√°quina de limpeza
# Ela pega meus 'tokens' e cria a coluna 'texto_final' sem a sujeira.
remover = StopWordsRemover(inputCol="tokens",
                           outputCol="texto_final",
                           stopWords=stop_portugues)

# 3. Rodando a limpeza nos meus dados tokenizados
feature_data = remover.transform(tokenizado)

# 4.  Vamos ver quanto "peso" a gente perdeu?
print("Comparando a dieta dos tokens:")
feature_data.select("tokens", "texto_final") \
    .withColumn("Antes", countTokens(f.col("tokens"))) \
    .withColumn("Depois", countTokens(f.col("texto_final"))) \
    .show(5)

**Bag of words**

In [None]:
from pyspark.ml.feature import CountVectorizer

In [None]:
# 1. Preparando a  "Sacola".
# Ela olha para o 'texto_final' (que j√° t√° limpo) e cria a coluna 'CountVec'.
cv = CountVectorizer(inputCol="texto_final", outputCol="CountVec")

# 2. O 'fit' √© o Spark aprendendo quais palavras existem no mundo.
# Ele cria um dicion√°rio (modelo) com todas as palavras √∫nicas.
modelo_sacola = cv.fit(feature_data)

# 3. O 'transform' √© onde ele realmente carimba os n√∫meros nos nossos dados.
dados_vetorizados = modelo_sacola.transform(feature_data)

# 4. as palavras que ele aprendeu?
# Isso aqui √© o nosso "Dicion√°rio do Spark".
print("As primeiras palavras do nosso dicion√°rio:")
print(modelo_sacola.vocabulary[:10])

# 5. Mostrando o resultado final (Palavras vs N√∫meros)
dados_vetorizados.select('texto_final', 'CountVec').limit(5).show(truncate=False)

**Hashing TF**

In [None]:
from pyspark.ml.feature import HashingTF

# 1. Preparando o "Funil" de palavras.
# Eu digo de onde vem (texto_final) e onde coloco o resultado (hashingTF).
htf = HashingTF(inputCol="texto_final", outputCol="hashingTF")

# 2. Aqui eu defino o limite.
# "Spark, eu s√≥ quero 50 gavetas para guardar tudo isso, ok?"
htf.setNumFeatures(50)

# 3. Rodando a transforma√ß√£o.
# Repare que aqui N√ÉO usamos o .fit(), vamos direto para o .transform().
# Isso acontece porque o Hashing √© uma conta matem√°tica, ele n√£o precisa "aprender" as palavras antes.
dados_hashing = htf.transform(feature_data)

# 4. Vendo o resultado compacto
dados_hashing.select("texto_final", "hashingTF").limit(5).show(truncate=False)

In [None]:
from pyspark.ml.feature import IDF

# 1. Preparando o "Juiz" (IDF).
# Ele olha para os n√∫meros do HashingTF e decide quem ganha mais peso.
# O resultado vai para a coluna 'features' (esse √© o nome padr√£o que a IA gosta).
idf = IDF(inputCol="hashingTF", outputCol="features")

# 2. O 'fit' aqui √© obrigat√≥rio!
# O Spark precisa ler a base toda para saber quais palavras s√£o "comuns" e quais s√£o "raras".
modelo_idf = idf.fit(dados_hashing)

# 3. Agora ele carimba os pesos nos nossos dados.
dados_finalizados = modelo_idf.transform(dados_hashing)

# 4. Vendo o resultado (Agora os n√∫meros n√£o s√£o mais inteiros, s√£o decimais/pesos!)
dados_finalizados.select('texto_final', 'features').limit(5).show(truncate=False)

**Pipeline**

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF

# 1. PE√áA 1: O fatiador de palavras
tokenizer = Tokenizer(inputCol="texto_limpo", outputCol="tokens")

# 2. PE√áA 2: O filtro de palavras in√∫teis
stopwords = StopWordsRemover(inputCol="tokens", outputCol="texto_final")

# 3. PE√áA 3: O contador r√°pido (limitando a 1000 palavras para ser potente)
hashingTF = HashingTF(inputCol="texto_final", outputCol="HTF", numFeatures=1000)

# 4. PE√áA 4: O juiz que d√° peso ao que importa (TF-IDF)
tfidf = IDF(inputCol="HTF", outputCol="features")

# üöÄ A ESTEIRA (Pipeline): Coloco as pe√ßas na ordem certa
pipeline_da_tati = Pipeline(stages=[tokenizer, stopwords, hashingTF, tfidf])

# 5. Ligo a f√°brica!
# O .fit() aprende com os dados e o .transform() entrega o resultado pronto.
dados_finalizados = pipeline_da_tati.fit(dados).transform(dados)

# 6. Conferindo o final da linha de produ√ß√£o
dados_finalizados.select("sentiment", "features").show(5)

In [None]:
from pyspark.ml import Pipeline


In [None]:
# 1. Separando os dados (70% pra estudar, 30% pra testar)
# A 'seed' garante que o sorteio seja sempre igual toda vez que eu rodar.
treino, teste = dados.randomSplit([0.7, 0.3], seed=101)

# 2. O MOMENTO DO ESTUDO (.fit)
# Aqui o Pipeline vai rodar tudo: limpar, tokenizar, pesar e a √Årvore vai criar as regras.
# O resultado √© o 'modelo_pronto'.
modelo_pronto = pipeline.fit(treino)

# 3. O MOMENTO DA PROVA (.transform)
# Agora eu pe√ßo para o modelo tentar adivinhar o sentimento dos dados de TESTE.
previsoes = modelo_pronto.transform(teste)

# 4. Vendo o resultado (O que era real vs O que a IA achou)
previsoes.select("label", "prediction").show(10)

In [None]:

# 2.  √Årvore de Decis√£o,
# garantindo que ela use o 'label' que j√° est√° l√°:
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(featuresCol='features', labelCol='label', maxDepth=10)

# 3. o pipeline e para o fit:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[tokenizer, stopwords, hashingTF, tfidf, dt])

# 4. Agora sim, treine o modelo:
modelo_pronto = pipeline.fit(treino)

**Teste e m√©tricas**

In [None]:

predictions = modelo_pronto.transform(teste)

# 2. Agora o avaliador vai encontrar a vari√°vel!
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

prof_avaliador = MulticlassClassificationEvaluator(labelCol='label',
                                                   predictionCol='prediction',
                                                   metricName='accuracy')

# mesmo nome
nota_final = prof_avaliador.evaluate(predictions)

print(f"Acur√°cia: {nota_final:.2f}")

In [None]:
# 1. Crio as frases que eu quero que a IA classifique.
# IMPORTANTE: A coluna tem que ter o mesmo nome (texto_limpo) que o Pipeline espera!
frases_novas = spark.createDataFrame([
        (1, "This is without doubt the worst movie i have ever seen, I hated the acting of the actor."), # Ruim
        (0, "I loved the movie, excellent acting!"), # Bom
    ], ["id", "texto_limpo"])

# 2. Uso o meu modelo treinado para "adivinhar"
# Ele vai fazer a faxina, tokenizar, pesar e dar o veredito sozinho.
resultado_final = modelo_pronto.transform(frases_novas)

# 3. Mostro o resultado na tela
resultado_final.select("texto_limpo", "prediction").show(truncate=False)

In [None]:
#  salvar seu modelo e usar depois sem precisar treinar de novo
modelo_pronto.save("modelo_sentimentos_imdb_v1")

# Calcule o F1-Score (criterio)
evaluator_f1 = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='f1')
f1 = evaluator_f1.evaluate(predictions)
print(f"F1-Score do Modelo: {f1:.2f}")