# Projeto Spark

Alunos:

- Davi Reis
- Guilherme Rameh
- Nicolas Queiroga

Entrega: 6 de dezembro de 2022

## Introdução

Neste projeto vamos construir um classificador Naive-Bayes para determinar o sentimento de um comentário.

## Instalando o ambiente

O jeito mais simples de começar a trabalhar com Spark é instalar um container com tudo pronto! No site https://hub.docker.com/r/jupyter/pyspark-notebook vemos uma imagem Docker que já vem com `pyspark` e `jupyter lab`. Instale a imagem com o comando:

```bash
docker pull jupyter/pyspark-notebook
```

Vamos iniciar o ambiente de trabalho com o comando `docker run`. Para isso precisamos tomar alguns cuidados:

1) Temos que mapear nosso diretorio local de trabalho para um diretório interno do container, de modo que alterações feitas dentro do container (nesta pasta escolhida) sejam gravadas no nosso diretorio local. No container temos um usuário padrão com *username* `jovyan`. No *homedir* desse usuario temos uma pasta vazia `work`, que vai servir como local de mapeamento do nosso diretorio local de trabalho. Podemos então fazer esse mapeamendo com a opção `-v` do comando `docker run` da seguinte forma:

```bash
-v <diretorio>:/home/jovyan/work
```

onde `<diretorio>` representa seu diretorio local de trabalho.

2) Para acessar o `jupyter notebook` e o *dashboard* do Spark a partir do nosso *browser* favorito temos que abrir algumas portas do container com a opção `-p`. As portas são `8888` (para o próprio `jupyter notebook`) e `4040` (para o *dashboard* do Spark). Ou seja, adicionaremos às opções do `docker run`o seguinte:

```bash
-p 8888:8888 -p 4040:4040
```

Desta forma, ao acessar `localhost:8888` na nossa máquina, estaremos acessando o servidor Jupyter na porta 8888 interna do container.

3) Vamos iniciar o container no modo interativo, e vamos especificar que o container deve ser encerrado ao fechar o servidor Jupyter. Faremos isso com as opções `-it` e `-rm`

Portanto, o comando completo que eu uso na minha máquina Linux para iniciar o container é:

```bash
docker run \
    -it \
    --rm \
    -p 8888:8888 \
    -p 4040:4040 \
    -v `pwd`:/home/jovyan/work \
    jupyter/pyspark-notebook


```

Para facilitar a vida eu coloco esse comando em um arquivo `inicia.sh`. Engenheiros, façam do jeito que preferirem!

Agora abra esse notebook lá no container!


## Iniciando o Spark

Vamos iniciar o ambiente Spark. Para isso vamos:

1) Criar um objeto de configuração do ambiente Spark. Nossa configuração será simples: vamos especificar que o nome da nossa aplicação Spark é "Minha aplicação", e que o *master node* é a máquina local, usando todos os *cores* disponíveis. Aplicações reais de Spark são configuradas de modo ligeiramente diferente: ao especificar o *master node* passamos uma URL real, com o endereço do nó gerente do *cluster* Spark.

2) Vamos criar um objeto do tipo `SparkContext` com essa configuração

In [None]:
import pyspark

conf = pyspark.SparkConf()
conf.setAppName('Projeto Spark')
conf.setMaster('local[*]')

sc = pyspark.SparkContext(conf=conf)

O `SparkContext` é a nossa porta de entrada para o cluster Spark, ele será a raiz de todas as nossas operações com o Spark.

In [None]:
sc

O link acima provavelmente não funcionará porque ele se refere à porta 4040 interna do container (portanto a URL está com endereço interno). Porém fizemos o mapeamento da porta 4040 interna para a porta 4040 externa, logo você pode acessar o *dashboard* do Spark no endereço http://localhost:4040

<center><img src="./spark_dashboard.png" width=800/></center>

## Lendo os dados

Vamos começar lendo o arquivo de reviews e gravando o resultado em formato pickle, mais amigável.

In [None]:
def parse_line(line):
    parts = line[1:-1].split('","')
    sentiment = int(parts[0])
    title = parts[1].replace('""', '"')
    body = parts[2].replace('""', '"')
    return (sentiment, title, body)

rdd = sc.textFile('train.csv').map(parse_line)

In [None]:
rdd.count()

In [None]:
rdd.take(1)

Agora vamos gravar no formato pickle, para facilitar os trabalhos futuros. Após gravar o arquivo, não mais rode as células desta primeira etapa!

In [None]:
rdd.saveAsPickleFile('reviews.pickle')

## Um classificador Naive-Bayes

Vamos ler o arquivo pickle gravado anteriormente:

In [None]:
rdd = sc.pickleFile('reviews.pickle')

In [None]:
rdd.count()

In [None]:
rdd.take(1)

Agora, complete as tarefas em sequencia para construir o classificador Naive-Bayes:

### Fase 1

In [None]:
import numpy as np

#### Tarefa

Construa uma função que recebe um RDD no formato do RDD original e retorna um RDD no qual cada item é um par (palavra, contagem).

Na função abaixo, importante notar que precisamos percorrer tanto o título como o corpo de cada avaliação do RDD original. O "reduceByKey" agrupa todas as palavras com a contagem que o map faz.

In [None]:
def word_count(rdd):
    return rdd.flatMap(lambda x: x[1].split() + x[2].split()).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)

#### Tarefa

Construa uma função que recebe o RDD (palavra, contagem) construido anteriormente e retorna um RDD no qual cada item é um par (palavra, $\log_{10}\left(c \, / \, T\right)$), onde $c$ é a contagem daquela palavra e $T$ é a soma das contagens de palavra.

Essa função apenas usa o reduce para contar o total de palavras, e depois faz a conta com o log.

In [None]:
def word_log_count(rdd):
    total_count = rdd.map(lambda x: x[1]).reduce(lambda x, y: x + y)
    return rdd.map(lambda x: (x[0], np.log10(x[1] / total_count)))

#### Tarefa

Separe o RDD original em dois RDDs: o dos reviews positivos e o dos negativos. Em seguida, use as funções anteriores para construir RDDs que contem os pares (palavra, $\log_{10}\left(c \, / \, T\right)$)

Para separar os RDDs, basta ver a classificação do RDD original, se for 1 é positivo, se for 2 é negativo.

In [None]:
def split_by_sentiment(rdd):
    positive = rdd.filter(lambda x: x[0] == 1)
    negative = rdd.filter(lambda x: x[0] == 2)
    return positive, negative

In [None]:
rdd_positive, rdd_negative = split_by_sentiment(rdd)

In [None]:
rdd_positive_log_count = word_log_count(word_count(rdd_positive))
rdd_negative_log_count = word_log_count(word_count(rdd_negative))

### Tarefa

Use o `.fullOuterJoin()` dos RDDs para construir um RDD unificado, no qual cada item é da forma (palavra, log_prob_positivo, log_prob_negativo). "Baixe" esse resultado final usando `.collect()`.

In [None]:
rdd_unified = rdd_positive_log_count.fullOuterJoin(rdd_negative_log_count).map(lambda x: (x[0], x[1][0], x[1][1]))

rdd_unified.take(10)

- Testando para a palavra 'Slow'

In [None]:
rdd_positive_log_count.filter(lambda x: x[0] == 'Slow').collect()

In [None]:
rdd_negative_log_count.filter(lambda x: x[0] == 'Slow').collect()

#### Tarefa

Para uma dada string, determine se ela é um review positivo ou negativo usando os RDDs acima. Lembre-se de como funciona o classificador Naive-Bayes: http://stanford.edu/~jurafsky/slp3/slides/7_NB.pdf, consulte tambem suas notas de aula de Ciência dos Dados!

Na célula abaixo fazemos uma classe que contem todas as funções que usamos anteriormente para calcular o log das palavras, e uma função chamada 'classify', que acumula o valor dos logs positivo e negativo de cada palavra dado uma frase passada.

In [None]:
class NaiveBayes:
    def __init__(self, rdd):
        self.rdd = rdd
        self.positive, self.negative = self.split_by_sentiment(rdd)
        self.positive_count = self.positive.count()
        self.negative_count = self.negative.count()
        self.total_count = self.positive_count + self.negative_count
        self.positive_log_count = self.word_log_count(self.word_count(self.positive))
        self.negative_log_count = self.word_log_count(self.word_count(self.negative))
        self.unified = self.positive_log_count.fullOuterJoin(self.negative_log_count).map(lambda x: (x[0], x[1][0], x[1][1]))

    def split_by_sentiment(self, rdd):
        positive = rdd.filter(lambda x: x[0] == 1)
        negative = rdd.filter(lambda x: x[0] == 2)
        return positive, negative

    def word_log_count(self, rdd):
        total_count = rdd.map(lambda x: x[1]).reduce(lambda x, y: x + y)
        return rdd.map(lambda x: (x[0], np.log10(x[1] / total_count)))

    def word_count(self, rdd):
        return rdd.flatMap(lambda x: x[1].split()).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)

    def classify(self, text):
        positive = 0
        negative = 0
        for word in text.split():
            positive += self.unified.filter(lambda x: x[0] == word).map(lambda x: x[1]).collect()[0]
            negative += self.unified.filter(lambda x: x[0] == word).map(lambda x: x[2]).collect()[0]
        if positive < negative:
            return 2, positive
        else:
            return 1, negative

In [None]:
nb = NaiveBayes(rdd)

- Testando o classificador

In [None]:
test = "The best game for my kids. I love"
classification, score = nb.classify(test)
print('Classification: {}, Score: {}'.format(classification, score))

### Fase 2

Agora que temos um classificador Naive-Bayes, vamos explorá-lo um pouco:

### Tarefa

Quais são as 100 palavras que mais indicam negatividade, ou seja, onde a diferença entre a probabilidade da palavra no conjunto dos comentários negativos e positivos é máxima? E quais as 100 palavras de maior positividade? Mostre os resultados na forma de *word clouds*.

Para delecionar as palavras positivas e negativas, basta pegar o RDD unificado, filtrar qual o sentimento que queremos (com x[][] != None) e usar o takeOrdered para selecionar os 100 maiores valores (por isso o - na frente, já que o log é negativo, é necessário pegar o módulo).

In [None]:
rdd_unified_fase_2 = rdd_positive_log_count.fullOuterJoin(rdd_negative_log_count)

In [None]:
positivity_words = rdd_unified_fase_2.filter(lambda x: x[1][0] != None).filter(lambda x: x[1][1] != None).map(lambda x:(x[0], (x[1][1]) - (x[1][0]))).takeOrdered(100, lambda x: -x[1])
positivity_words

In [None]:
negativity_words = rdd_unified_fase_2.filter(lambda x: x[1][0] != None).filter(lambda x: x[1][1] != None).map(lambda x:(x[0], (x[1][0]) - (x[1][1]))).takeOrdered(100, lambda x: -x[1])
negativity_words

In [None]:
!pip install wordcloud

In [None]:
from wordcloud import WordCloud
import matplotlib.pyplot as plt

def plot_wordcloud(words):
    wordcloud = WordCloud().generate_from_frequencies(dict(words))
    plt.figure(figsize=(10, 10))
    plt.imshow(wordcloud, interpolation='bilinear')
    plt.axis("off")
    plt.show()

- Palavras Positivas

In [None]:
plot_wordcloud(positivity_words)

- Palavras Negativas

In [None]:
plot_wordcloud(negativity_words)

### Tarefa desafio!

Qual o desempenho do classificador (acurácia)? Para medir sua acurácia:

- Separe os reviews em dois conjuntos: treinamente e teste
- Repita o "treinamento" do classificador com o conjunto de treinamento
- Para cada review do conjunto de teste, determine se é positiva ou negativa de acordo com o classificador
- Determine a acurácia

Esta não é uma tarefa trivial. Não basta fazer um `for` para determinar a classe de cada review de teste: isso demoraria uma eternidade. Você tem que usar variáveis "broadcast" do Spark para enviar uma cópia da tabela de frequencias para cada *core* do executor.

Primeiro dividimos os dados:

In [None]:
train, test = rdd.randomSplit([0.8, 0.2], seed=42)

In [None]:
train.count()

In [None]:
test.count()

In [None]:
nb_train = NaiveBayes(train)

Após treinarmos o NB, precisamos de uma função que recebe o um texto e o broadcast, e retorna a previsão. Dividimos o texto em palavras, e pra cada palavra, acumulamos o valor treinado e verificamos qual valor em módulo é maior, o positivo ou negativo, e classificamos a palavra assim.

In [None]:
def classify_new(text, training_set):
    training = training_set.value
    
    positive = 0
    negative = 0
    for word in text.split():
        if word in training and training[word][0] != None and training[word][1] != None:
            positive += training[word][0]
            negative += training[word][1]
    if positive <negative:
        return 2, positive
    else:
        return 1, negative

In [None]:
from functools import partial

all_words = nb_train.unified.collect()

all_words_dict = {}
for word in all_words:
    all_words_dict[word[0]] = word[1:]

training_set = sc.broadcast(all_words_dict)

In [None]:
classify_new_final = partial(classify_new, training_set=training_set)

In [None]:
print(classify_new_final('The best game for my kids. I love'))

In [None]:
test_classified = test.map(lambda x: (x[0], x[1], classify_new_final(x[1])))
test_classified.take(5)

- Analisando a acurácia do classificador

In [None]:
correct = test_classified.filter(lambda x: x[0] == x[2][0]).count()
total = test_classified.count()
print(total)
accuracy = correct / total
print('Accuracy: {:.3f}%'.format(accuracy*100))

### Tarefa desafio!

Implemente Laplace smoothing

Função que implementa o Laplace Smoothing na associação dos logs para cada palavra.

In [None]:
def laplace_smoothing(rdd, T, v=171476, alpha=1):
    op = rdd.map(lambda x: (x[0], np.log10((x[1] + alpha) / (T[1] + alpha * v))))
    return op

Aplicamos o Laplace smoothing para a divisão positiva e negativa, e agrupamos ambos com um fullOuterJoin, para transformarmos esse RDD em um dicionário. A partir dele fazemos o broadcast que será usado novamente na classificação.

A partir daqui, repitimos todo o processo aterior, de treinamento e teste a partir da classe NB, checamos sua acurácia e verificamos as 100 palavras mais negativas e positivas.

In [None]:
count = word_count(rdd)
T = count.reduce(lambda x, y: ("all", x[1] + y[1]))
smoothing = laplace_smoothing(rdd=count, T=T).take(10)
smoothing

In [None]:
count = word_count(rdd_negative)
T = count.reduce(lambda x, y: ("all", x[1] + y[1]))
smoothing_negative = laplace_smoothing(rdd=count, T=T)
smoothing_negative.take(10)

In [None]:
count = word_count(rdd_positive)
T = count.reduce(lambda x, y: ("all", x[1] + y[1]))
smoothing_positive = laplace_smoothing(rdd=count, T=T)
smoothing_positive.take(10)

In [None]:
all_smoothings = smoothing_negative.fullOuterJoin(smoothing_positive).collect()

In [None]:
smoothing_dict = {a: [b] for a, b in all_smoothings}

In [None]:
training_set_laplace = sc.broadcast(smoothing_dict)

In [None]:
def classify_new(text, training, v=171476):
    positive = 0
    negative = 0
    for word in text.split():
        if word in training:
            if training[word][0][0] != None:
                positive += training[word][0][0]
            else:
                positivo += np.log10(1.0/v)
            if training[word][0][1] != None:
                negative += training[word][0][1]
            else:
                negative += np.log10(1.0/v)
        
    if positive < negative:
        return 2, positive
    else:
        return 1, negative

In [None]:
classify_new_final = partial(classify_new, training=training_set_laplace)

In [None]:
classify_new_final('i love it')

In [None]:
test_classified = test.map(lambda x: (x[0], x[1], classify_new_final(x[1])))
test_classified.take(5)

In [None]:
correct = test_classified.filter(lambda x: x[0] == x[2][0])
total = test_classified.count()
print(total)
accuracy = correct / total
print('Accuracy: {:.3f}%'.format(accuracy*100))

In [None]:
all_smoothings = smoothing_negative.fullOuterJoin(smoothing_positive)

In [None]:
positive_words = all_smoothings \
                    .filter(lambda x: x[1][0] != None and x[1][1] != None)\
                    .takeOrdered(100, lambda x: -x[1][0])
positive_words

In [None]:
negative_words = all_smoothings \
                    .filter(lambda x: x[1][0] != None and x[1][1] != None)\
                    .takeOrdered(100, lambda x: -x[1][1])
negative_words

In [None]:
negative_smoothing_text = " ".join(word[0] for word in negative_words)
plot_wordcloud(negative_smoothing_text)

In [None]:
positive_smoothing_text = " ".join(word[0] for word in positive_words)
plot_wordcloud(positive_smoothing_text)

## Rubrica de avaliação

- I: groselha, falha crítica, ou não entregou nada
- D: Fez uma tentativa honesta de fazer todos os itens da fase 1, mas tem erros
- C: Fase 1 completa OK
- B: Fase 2, faltando apenas um desafio OK
- A: Fase 2 completa OK