# Projeto Spark

Entrega: 16 de novembro de 2025

## Introdu√ß√£o

Neste projeto vamos construir um classificador Naive-Bayes para determinar o sentimento de um coment√°rio.

## Grupos

O projeto pode ser individual ou em duplas. Criem os grupos em https://classroom.github.com/a/YQj6i16S

## Instalando o ambiente

O jeito mais simples de come√ßar a trabalhar com Spark √© instalar um container com tudo pronto! No site https://hub.docker.com/r/jupyter/pyspark-notebook vemos uma imagem Docker que j√° vem com `pyspark` e `jupyter lab`. Instale a imagem com o comando:

```bash
docker pull jupyter/pyspark-notebook
```


Vamos iniciar o ambiente de trabalho com o comando `docker run`. Para isso precisamos tomar alguns cuidados:

1) Temos que mapear nosso diretorio local de trabalho para um diret√≥rio interno do container, de modo que altera√ß√µes feitas dentro do container (nesta pasta escolhida) sejam gravadas no nosso diretorio local. No container temos um usu√°rio padr√£o com *username* `jovyan`. No *homedir* desse usuario temos uma pasta vazia `work`, que vai servir como local de mapeamento do nosso diretorio local de trabalho. Podemos ent√£o fazer esse mapeamendo com a op√ß√£o `-v` do comando `docker run` da seguinte forma:

```bash
-v <diretorio>:/home/jovyan/work
```

onde `<diretorio>` representa seu diretorio local de trabalho.

2) Para acessar o `jupyter notebook` e o *dashboard* do Spark a partir do nosso *browser* favorito temos que abrir algumas portas do container com a op√ß√£o `-p`. As portas s√£o `8888` (para o pr√≥prio `jupyter notebook`) e `4040` (para o *dashboard* do Spark). Ou seja, adicionaremos √†s op√ß√µes do `docker run`o seguinte:

```bash
-p 8888:8888 -p 4040:4040
```

Desta forma, ao acessar `localhost:8888` na nossa m√°quina, estaremos acessando o servidor Jupyter na porta 8888 interna do container.

3) Vamos iniciar o container no modo interativo, e vamos especificar que o container deve ser encerrado ao fechar o servidor Jupyter. Faremos isso com as op√ß√µes `-it` e `-rm`

Antes de executar, garanta que as portas 4040 e 8888 est√£o livres (sem jupyter j√° executando) ou altere o comando. Ainda, esteja na pasta da aula ao executar, assim apenas ela ser√° exposta ao container.

Portanto, o comando completo que eu uso na minha m√°quina Linux para iniciar o container √©:

```bash
docker run \
    -it \
    --rm \
    -p 8888:8888 \
    -p 4040:4040 \
    -v "`pwd`":/home/jovyan/work \
    jupyter/pyspark-notebook

```

Se estiver no Windows estes comandos, utilize:

- No Powershell: `docker run -it --rm -p 8888:8888 -p 4040:4040 -v ${PWD}:/home/jovyan/work jupyter/pyspark-notebook`

- No Prompt de comando: `docker run -it --rm -p 8888:8888 -p 4040:4040 -v %cd%:/home/jovyan/work jupyter/pyspark-notebook`

## Iniciando o Spark

Vamos iniciar o ambiente Spark. Para isso vamos:

1) Criar um objeto de configura√ß√£o do ambiente Spark. Nossa configura√ß√£o ser√° simples: vamos especificar que o nome da nossa aplica√ß√£o Spark √© "Minha aplica√ß√£o", e que o *master node* √© a m√°quina local, usando todos os *cores* dispon√≠veis. Aplica√ß√µes reais de Spark s√£o configuradas de modo ligeiramente diferente: ao especificar o *master node* passamos uma URL real, com o endere√ßo do n√≥ gerente do *cluster* Spark.

2) Vamos criar um objeto do tipo `SparkContext` com essa configura√ß√£o

In [1]:
import pyspark

conf = pyspark.SparkConf()
conf.setAppName("Meu projeto Spark")
conf.setMaster("local[*]")

sc = pyspark.SparkContext(conf=conf)

O `SparkContext` √© a nossa porta de entrada para o cluster Spark, ele ser√° a raiz de todas as nossas opera√ß√µes com o Spark.

In [2]:
sc

O link acima provavelmente n√£o funcionar√° porque ele se refere √† porta 4040 interna do container (portanto a URL est√° com endere√ßo interno). Por√©m fizemos o mapeamento da porta 4040 interna para a porta 4040 externa, logo voc√™ pode acessar o *dashboard* do Spark no endere√ßo http://localhost:4040

<center><img src="./spark_dashboard.png" width=800/></center>

## Lendo os dados

Utilize os dados (`train.csv`) da aula 22. Caso queira fazer download novamente, utilize um dos links:

- https://www.kaggle.com/datasets/kritanjalijain/amazon-reviews.
- https://bigdata-22-2.s3.us-east-2.amazonaws.com/amazon_sentiment/train.csv.gz

Vamos come√ßar lendo o arquivo de reviews e gravando o resultado em formato pickle, mais amig√°vel.

In [4]:
def parse_line(line):
    parts = line[1:-1].split('","')
    sentiment = int(parts[0])
    title = parts[1].replace('""', '"')
    body = parts[2].replace('""', '"')
    return (sentiment, title, body)

rdd = sc.textFile("train.csv").map(parse_line)

In [7]:
rdd.count()

3600000

In [6]:
rdd.take(1)

[(2,
  'Stuning even for the non-gamer',
  'This sound track was beautiful! It paints the senery in your mind so well I would recomend it even to people who hate vid. game music! I have played the game Chrono Cross but out of all of the games I have ever played it has the best music! It backs away from crude keyboarding and takes a fresher step with grate guitars and soulful orchestras. It would impress anyone who cares to listen! ^_^')]

Agora vamos gravar no formato pickle, para facilitar os trabalhos futuros. Ap√≥s gravar o arquivo, n√£o mais rode as c√©lulas desta primeira etapa!

In [None]:
rdd.saveAsPickleFile("reviews.pickle")

## Um classificador Naive-Bayes

Vamos ler o arquivo pickle gravado anteriormente:

In [None]:
rdd = sc.pickleFile('reviews.pickle')

In [None]:
rdd.count()

In [None]:
rdd.take(1)

Agora, complete as tarefas em sequencia para construir o classificador Naive-Bayes:

### Fase 1

#### Tarefa

Construa uma fun√ß√£o que recebe um RDD no formato do RDD original e retorna um RDD no qual cada item √© um par (palavra, contagem).

In [None]:
import re

def words_count(rdd):
    """
    Recebe um RDD no formato (sentiment, title, body)
    Retorna um RDD (palavra, contagem)
    """
    
    def tokenize(text):
        # transforma em min√∫sculas e remove caracteres n√£o alfab√©ticos
        return re.findall(r"[a-zA-Z]+", text.lower())
    
    return (rdd
        # extrair apenas o texto: t√≠tulo + corpo
        .flatMap(lambda x: tokenize(x[1] + " " + x[2]))
        # criar pares (palavra, 1)
        .map(lambda word: (word, 1))
        # somar as ocorr√™ncias
        .reduceByKey(lambda a, b: a + b)
    )

#### Tarefa

Construa uma fun√ß√£o que recebe o RDD (palavra, contagem) construido anteriormente e retorna um RDD no qual cada item √© um par (palavra, $\log_{10}\left(c \, / \, T\right)$), onde $c$ √© a contagem daquela palavra e $T$ √© a soma das contagens de palavra.

In [None]:
import math

def to_log_prob(word_count_rdd):
    """
    Recebe um RDD (palavra, contagem)
    Retorna um RDD (palavra, log10(c/T))
    onde T √© a soma das contagens de todas as palavras.
    """
    # Soma total das contagens no Corpus (T)
    T = word_count_rdd.map(lambda x: x[1]).sum()

    # Calcula log10(c/T) para cada palavra
    return word_count_rdd.map(
        lambda x: (x[0], math.log10(x[1] / T))
    )


#### Tarefa

Separe o RDD original em dois RDDs: o dos reviews positivos e o dos negativos. Em seguida, use as fun√ß√µes anteriores para construir RDDs que contem os pares (palavra, $\log_{10}\left(c \, / \, T\right)$)

In [None]:
# Supondo que o RDD original se chama `rdd` no formato:
# (sentiment:int, title:str, body:str)

# 1Ô∏è‚É£ Separar por classe
rdd_pos = rdd.filter(lambda x: x[0] == 1)   # positivos
rdd_neg = rdd.filter(lambda x: x[0] == 0)   # negativos

# 2Ô∏è‚É£ Gerar contagem das palavras por classe
word_counts_pos = words_count(rdd_pos)
word_counts_neg = words_count(rdd_neg)

# 3Ô∏è‚É£ Converter para probabilidades logar√≠tmicas
log_probs_pos = to_log_prob(word_counts_pos)
log_probs_neg = to_log_prob(word_counts_neg)

### Tarefa

Use o `.fullOuterJoin()` dos RDDs para construir um RDD unificado, no qual cada item √© da forma (palavra, log_prob_positivo, log_prob_negativo). "Baixe" esse resultado final usando `.collect()`.

In [None]:
# Junta as probabilidades usando fullOuterJoin
joined_rdd = log_probs_pos.fullOuterJoin(log_probs_neg)

# Ajusta o formato para (palavra, log_prob_pos, log_prob_neg)
# Substituindo None por 0.0 (para evitar erros futuros)
model_rdd = joined_rdd.map(
    lambda x: (
        x[0],
        x[1][0] if x[1][0] is not None else 0.0,  # log_prob_pos
        x[1][1] if x[1][1] is not None else 0.0   # log_prob_neg
    )
)

# Baixa para o driver (‚ö†Ô∏è pode ser grande!)
modelo = model_rdd.collect()

# Exibe os primeiros itens como exemplo
print(modelo[:10])

#### Tarefa

Para uma dada string, determine se ela √© um review positivo ou negativo usando os RDDs acima. Lembre-se de como funciona o classificador Naive-Bayes: http://stanford.edu/~jurafsky/slp3/slides/7_NB.pdf, consulte tambem suas notas de aula de Ci√™ncia dos Dados!

In [None]:
import re, math

# --- util: mesma tokeniza√ß√£o usada antes ---
def _tokenize(text: str):
    return re.findall(r"[a-zA-Z]+", text.lower())

# --- preparar priors (contagem de documentos por classe) ---
N_pos = rdd_pos.count()
N_neg = rdd_neg.count()
N_tot = N_pos + N_neg
log_prior_pos = math.log10(N_pos / N_tot) if N_pos > 0 else float("-inf")
log_prior_neg = math.log10(N_neg / N_tot) if N_neg > 0 else float("-inf")

# --- totais por classe e tamanho do vocabul√°rio (para smoothing) ---
T_pos = word_counts_pos.map(lambda x: x[1]).sum()
T_neg = word_counts_neg.map(lambda x: x[1]).sum()
V = (word_counts_pos.keys().union(word_counts_neg.keys())).distinct().count()

# --- dicion√°rio/broadcast com (palavra -> (logp_pos, logp_neg)) ---
# Se voc√™ j√° tem `modelo` (coletado), reaproveite; sen√£o, colete agora:
try:
    modelo  # s√≥ verifica se existe
    _pairs = modelo
except NameError:
    _pairs = (log_probs_pos
              .fullOuterJoin(log_probs_neg)
              .map(lambda x: (x[0],
                              x[1][0] if x[1][0] is not None else 0.0,
                              x[1][1] if x[1][1] is not None else 0.0))
              .collect())

_model_dict = {w: (lp_pos, lp_neg) for (w, lp_pos, lp_neg) in _pairs}
bc_model = sc.broadcast(_model_dict)

def classify_review(text: str, alpha: float = 1.0):
    """
    Classifica um texto como positivo (1) ou negativo (0) usando Naive Bayes (multinomial).
    - Usa log-somas para estabilidade num√©rica.
    - Aplica Laplace smoothing (alpha) para palavras fora do vocabul√°rio.
    Retorna: dict com classe, scores, e probabilidades.
    """
    tokens = _tokenize(text)

    # backoff (palavra OOV) com smoothing multinomial:
    # P(w|classe) = (alpha) / (T_classe + alpha*V)
    # trabalhamos diretamente no log10:
    if alpha > 0:
        log_backoff_pos = math.log10(alpha / (T_pos + alpha * V))
        log_backoff_neg = math.log10(alpha / (T_neg + alpha * V))
    else:
        # sem smoothing: ignorar OOV (contribui√ß√£o 0 no log-sum ‚Üí n√£o somar)
        log_backoff_pos = None
        log_backoff_neg = None

    log_score_pos = log_prior_pos
    log_score_neg = log_prior_neg
    mdict = bc_model.value

    for w in tokens:
        vals = mdict.get(w)
        if vals is not None:
            lp_pos, lp_neg = vals
            log_score_pos += lp_pos
            log_score_neg += lp_neg
        else:
            if alpha > 0:
                log_score_pos += log_backoff_pos
                log_score_neg += log_backoff_neg
            # se alpha == 0 e palavra n√£o existe, simplesmente n√£o somamos nada

    # decis√£o
    predicted = 1 if log_score_pos >= log_score_neg else 0

    # probabilidades a partir dos log-scores (base 10):
    # p_pos ‚àù 10^{log_score_pos}; p_neg ‚àù 10^{log_score_neg}
    # normaliza√ß√£o est√°vel via diferen√ßa:
    diff = log_score_neg - log_score_pos
    # p_pos = 1 / (1 + 10^{diff})
    p_pos = 1.0 / (1.0 + (10 ** diff))
    p_neg = 1.0 - p_pos

    return {
        "predicted": predicted,          # 1 = positivo, 0 = negativo
        "log_score_pos": log_score_pos,
        "log_score_neg": log_score_neg,
        "p_pos": p_pos,
        "p_neg": p_neg,
        "tokens_used": len(tokens)
    }

### Fase 2

Agora que temos um classificador Naive-Bayes, vamos explor√°-lo um pouco:

### Tarefa

Quais s√£o as 100 palavras que mais indicam negatividade, ou seja, onde a diferen√ßa entre a probabilidade da palavra no conjunto dos coment√°rios negativos e positivos √© m√°xima? E quais as 100 palavras de maior positividade? Mostre os resultados na forma de *word clouds*.

In [None]:
from wordcloud import WordCloud
import matplotlib.pyplot as plt

# 1Ô∏è‚É£ Unir probabilidades logaritmicas em um √∫nico RDD
joined_rdd = log_probs_pos.fullOuterJoin(log_probs_neg)

# 2Ô∏è‚É£ Calcular a diferen√ßa:
# diff = log(P(w|neg)) - log(P(w|pos))
# Quanto maior diff, mais negativa a palavra
diff_rdd = joined_rdd.map(
    lambda x: (
        x[0],
        (x[1][1] if x[1][1] is not None else 0.0)
        - (x[1][0] if x[1][0] is not None else 0.0)
    )
)

# 3Ô∏è‚É£ Top-100 palavras mais negativas (maior diff)
top_neg_100 = diff_rdd.takeOrdered(100, key=lambda x: -x[1])

# 4Ô∏è‚É£ Top-100 palavras mais positivas (menor diff)
top_pos_100 = diff_rdd.takeOrdered(100, key=lambda x: x[1])

print("Exemplo negativas:", top_neg_100[:10])
print("Exemplo positivas:", top_pos_100[:10])

# ‚úÖ Converter para dicion√°rios de frequ√™ncias para visualiza√ß√£o em WordCloud
# Quanto maior o valor ‚Üí maior a palavra na nuvem
neg_dict = {word: float(diff) for word, diff in top_neg_100}
pos_dict = {word: float(-diff) for word, diff in top_pos_100}  # invertendo sinal

# 5Ô∏è‚É£ Word Clouds üìä‚ú®
fig, axes = plt.subplots(1, 2, figsize=(20, 10))

wc_neg = WordCloud(width=800, height=600, background_color="white")
wc_neg.generate_from_frequencies(neg_dict)
axes[0].imshow(wc_neg, interpolation="bilinear")
axes[0].set_title("Palavras mais Negativas", fontsize=20)
axes[0].axis("off")

wc_pos = WordCloud(width=800, height=600, background_color="white")
wc_pos.generate_from_frequencies(pos_dict)
axes[1].imshow(wc_pos, interpolation="bilinear")
axes[1].set_title("Palavras mais Positivas", fontsize=20)
axes[1].axis("off")

plt.show()


### Tarefa desafio!

Qual o desempenho do classificador (acur√°cia)? Para medir sua acur√°cia:

- Separe os reviews em dois conjuntos: treinamente e teste
- Repita o "treinamento" do classificador com o conjunto de treinamento
- Para cada review do conjunto de teste, determine se √© positiva ou negativa de acordo com o classificador
- Determine a acur√°cia

Esta n√£o √© uma tarefa trivial. N√£o basta fazer um `for` para determinar a classe de cada review de teste: isso demoraria uma eternidade. Voc√™ tem que usar vari√°veis "broadcast" do Spark para enviar uma c√≥pia da tabela de frequencias para cada *core* do executor.

In [None]:
# === Ajustes / utilidades ===
import re, math

def _tokenize(text: str):
    # Mesma tokeniza√ß√£o usada nas fases anteriores
    return re.findall(r"[a-zA-Z]+", (text or "").lower())

# === 1) Split: treino e teste ===
# rdd: (label:int, title:str, body:str)
rdd_train, rdd_test = rdd.randomSplit([0.8, 0.2], seed=42)
rdd_train = rdd_train.cache()
rdd_test  = rdd_test.cache()

# === 2) "Treinamento" no conjunto de treino ===
# Separar por classe (0 = neg, 1 = pos)
rdd_train_pos = rdd_train.filter(lambda x: x[0] == 1).cache()
rdd_train_neg = rdd_train.filter(lambda x: x[0] == 0).cache()

# Priors (por documentos no treino)
N_pos = rdd_train_pos.count()
N_neg = rdd_train_neg.count()
N_tot = N_pos + N_neg
log_prior_pos = math.log10(N_pos / N_tot) if N_pos > 0 else float("-inf")
log_prior_neg = math.log10(N_neg / N_tot) if N_neg > 0 else float("-inf")

# Contagens de palavras por classe (treino)
word_counts_pos = words_count(rdd_train_pos)  # (w, c_pos)
word_counts_neg = words_count(rdd_train_neg)  # (w, c_neg)

# Totais por classe (T_pos/T_neg) e vocabul√°rio |V| (treino)
T_pos = word_counts_pos.map(lambda x: x[1]).sum()
T_neg = word_counts_neg.map(lambda x: x[1]).sum()
V = (word_counts_pos.keys().union(word_counts_neg.keys())).distinct().count()

# Probabilidades log (sem smoothing aqui; smoothing entra na classifica√ß√£o para OOV)
log_probs_pos = to_log_prob(word_counts_pos)  # (w, log10(c/T_pos))
log_probs_neg = to_log_prob(word_counts_neg)  # (w, log10(c/T_neg))

# Dicion√°rios para broadcast
dict_log_pos = log_probs_pos.collectAsMap()   # palavra -> logP(w|pos)
dict_log_neg = log_probs_neg.collectAsMap()   # palavra -> logP(w|neg)

bc_log_pos = sc.broadcast(dict_log_pos)
bc_log_neg = sc.broadcast(dict_log_neg)

# Smoothing (Laplace) para palavras fora do vocabul√°rio do treino
alpha = 1.0
log_backoff_pos = math.log10(alpha / (T_pos + alpha * V)) if T_pos > 0 else float("-inf")
log_backoff_neg = math.log10(alpha / (T_neg + alpha * V)) if T_neg > 0 else float("-inf")

# === 3) Classificar o conjunto de teste de forma distribu√≠da (sem collect/for) ===
def _classify_record(rec):
    """
    rec: (label, title, body)
    retorna: (pred, label)
    """
    label, title, body = rec
    tokens = _tokenize((title or "") + " " + (body or ""))

    lp_pos = bc_log_pos.value
    lp_neg = bc_log_neg.value

    score_pos = log_prior_pos
    score_neg = log_prior_neg

    # Multinomial NB: soma log-prob por ocorr√™ncia (conta repeti√ß√µes)
    for w in tokens:
        score_pos += lp_pos.get(w, log_backoff_pos)
        score_neg += lp_neg.get(w, log_backoff_neg)

    pred = 1 if score_pos >= score_neg else 0
    return (pred, label)

pred_vs_true = rdd_test.map(_classify_record)

# === 4) Acur√°cia ===
n_total   = pred_vs_true.count()
n_correct = pred_vs_true.filter(lambda x: x[0] == x[1]).count()
accuracy  = n_correct / n_total if n_total > 0 else float("nan")

print(f"Acur√°cia no conjunto de teste: {accuracy:.4f}  (acertos: {n_correct} / {n_total})")

### Tarefa desafio!

Implemente Laplace smoothing

In [None]:
import math

def to_log_prob_smoothed(word_count_rdd, alpha, V):
    """
    Recebe um RDD (palavra, contagem)
    Retorna (palavra, log10((c+alpha)/(T + alpha*V)))
    Onde V = tamanho do vocabul√°rio
    """
    T = word_count_rdd.map(lambda x: x[1]).sum()

    return word_count_rdd.map(
        lambda x: (
            x[0],
            math.log10((x[1] + alpha) / (T + alpha * V))
        )
    )

## Rubrica de avalia√ß√£o

- I: groselha, falha cr√≠tica, ou n√£o entregou nada
- D: Fez uma tentativa honesta de fazer todos os itens da fase 1, mas tem erros
- C: Fase 1 completa
- B: Fase 2, faltando apenas um desafio
- A: Fase 2 completa