# Projeto Spark

A ser realizado em duplas.

Entrega: 6 de dezembro de 2022

## Introdução

Neste projeto vamos construir um classificador Naive-Bayes para determinar o sentimento de um comentário.

## Instalando o ambiente

O jeito mais simples de começar a trabalhar com Spark é instalar um container com tudo pronto! No site https://hub.docker.com/r/jupyter/pyspark-notebook vemos uma imagem Docker que já vem com `pyspark` e `jupyter lab`. Instale a imagem com o comando:

```bash
docker pull jupyter/pyspark-notebook
```

Vamos iniciar o ambiente de trabalho com o comando `docker run`. Para isso precisamos tomar alguns cuidados:

1) Temos que mapear nosso diretorio local de trabalho para um diretório interno do container, de modo que alterações feitas dentro do container (nesta pasta escolhida) sejam gravadas no nosso diretorio local. No container temos um usuário padrão com *username* `jovyan`. No *homedir* desse usuario temos uma pasta vazia `work`, que vai servir como local de mapeamento do nosso diretorio local de trabalho. Podemos então fazer esse mapeamendo com a opção `-v` do comando `docker run` da seguinte forma:

```bash
-v <diretorio>:/home/jovyan/work
```

onde `<diretorio>` representa seu diretorio local de trabalho.

2) Para acessar o `jupyter notebook` e o *dashboard* do Spark a partir do nosso *browser* favorito temos que abrir algumas portas do container com a opção `-p`. As portas são `8888` (para o próprio `jupyter notebook`) e `4040` (para o *dashboard* do Spark). Ou seja, adicionaremos às opções do `docker run`o seguinte:

```bash
-p 8888:8888 -p 4040:4040
```

Desta forma, ao acessar `localhost:8888` na nossa máquina, estaremos acessando o servidor Jupyter na porta 8888 interna do container.

3) Vamos iniciar o container no modo interativo, e vamos especificar que o container deve ser encerrado ao fechar o servidor Jupyter. Faremos isso com as opções `-it` e `-rm`

Portanto, o comando completo que eu uso na minha máquina Linux para iniciar o container é:

```bash
docker run \
    -it \
    --rm \
    -p 8888:8888 \
    -p 4040:4040 \
    -v `pwd`:/home/jovyan/work \
    jupyter/pyspark-notebook


```

Para facilitar a vida eu coloco esse comando em um arquivo `inicia.sh`. Engenheiros, façam do jeito que preferirem!

Agora abra esse notebook lá no container!


## Iniciando o Spark

Vamos iniciar o ambiente Spark. Para isso vamos:

1) Criar um objeto de configuração do ambiente Spark. Nossa configuração será simples: vamos especificar que o nome da nossa aplicação Spark é "Minha aplicação", e que o *master node* é a máquina local, usando todos os *cores* disponíveis. Aplicações reais de Spark são configuradas de modo ligeiramente diferente: ao especificar o *master node* passamos uma URL real, com o endereço do nó gerente do *cluster* Spark.

2) Vamos criar um objeto do tipo `SparkContext` com essa configuração

In [1]:
import pyspark

conf = pyspark.SparkConf()
conf.setAppName('Minha aplicação')
conf.setMaster('local[*]')

sc = pyspark.SparkContext(conf=conf)

In [2]:
#outras bibliotecas
import numpy as np
from functools import partial

O `SparkContext` é a nossa porta de entrada para o cluster Spark, ele será a raiz de todas as nossas operações com o Spark.

In [3]:
sc

O link acima provavelmente não funcionará porque ele se refere à porta 4040 interna do container (portanto a URL está com endereço interno). Porém fizemos o mapeamento da porta 4040 interna para a porta 4040 externa, logo você pode acessar o *dashboard* do Spark no endereço http://localhost:4040

<center><img src="./spark_dashboard.png" width=800/></center>

## Lendo os dados

Vamos começar lendo o arquivo de reviews e gravando o resultado em formato pickle, mais amigável.

In [4]:
def parse_line(line):
    parts = line[1:-1].split('","')
    sentiment = int(parts[0])
    title = parts[1].replace('""', '"')
    body = parts[2].replace('""', '"')
    return (sentiment, title, body)

rdd = sc.textFile('train.csv').map(parse_line)

Agora vamos gravar no formato pickle, para facilitar os trabalhos futuros. Após gravar o arquivo, não mais rode as células desta primeira etapa!

In [5]:
rdd.saveAsPickleFile('reviews.pickle')

## Um classificador Naive-Bayes

Vamos ler o arquivo pickle gravado anteriormente:

In [4]:
rdd = sc.pickleFile('reviews.pickle')

Agora, complete as tarefas em sequencia para construir o classificador Naive-Bayes:

### Fase 1

#### Tarefa

Construa uma função que recebe um RDD no formato do RDD original e retorna um RDD no qual cada item é um par (palavra, contagem).

In [23]:
def splita_frase(item):
    return item[1].split() + item[2].split()

def palavra_contagem(rdd):
    return rdd.flatMap(splita_frase).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x+y)

In [24]:
new = palavra_contagem(rdd)

In [25]:
new.take(10)

[('book', 1398748),
 ('Healing', 893),
 ('pyramid', 286),
 ('like', 930710),
 ('When', 119667),
 ('out', 667825),
 ('Tianca', 1),
 ('way.', 30976),
 ('decides', 4141),
 ('ambulance', 133)]

#### Tarefa

Construa uma função que recebe o RDD (palavra, contagem) construido anteriormente e retorna um RDD no qual cada item é um par (palavra, $\log_{10}\left(c \, / \, T\right)$), onde $c$ é a contagem daquela palavra e $T$ é a soma das contagens de palavra.

In [8]:
total_ = new.reduce(lambda x, y: ("tudo", x[1] + y[1]))

In [9]:
def logaritmo(rdd, T):
    return rdd.map(lambda x: (x[0], np.log10(x[1]/T[1])))

In [10]:
rdd_with_log = logaritmo(new, total_)

In [11]:
rdd_with_log.take(3)

[('book', -2.3052565818700304),
 ('Healing', -5.500144601542316),
 ('pyramid', -5.994630027301819)]

#### Tarefa

Separe o RDD original em dois RDDs: o dos reviews positivos e o dos negativos. Em seguida, use as funções anteriores para construir RDDs que contem os pares (palavra, $\log_{10}\left(c \, / \, T\right)$)

In [12]:
#ou seja, vamos reiniciar o processo, começando pela divisão e só depois aplicaremos o log e etc
def filtra_categ(rdd, categ):
    return rdd.filter(lambda x: x[0] == categ)

In [13]:
positive = filtra_categ(rdd, 2)
negative = filtra_categ(rdd, 1)

### Verificando separação dos dados positivos

In [14]:
#vamos verificar para os positivos
positive.take(3)

[(2,
  'A great first novel by an American icon.',
  "Harrison's first novel is a great book. It shadows the irrevrant character of the novelist himself. From the purpose of the trip [to see wolves] to the final gorging of food in Ishpeming, Harrison takes one on a roller coaster of emotions all bundled up in the persona of a modern man trapped in a world he doesn't understand and that has long foresaken him. Wolf is a must read for Harrrison fans."),
 (2,
  'Product Ordered',
  'Order product and arrived on time and in great shape, would order again from this company - completely satisfied with book.'),
 (2,
  'Two stories',
  "Mackenzie's Pleasure (4 stars): Zane protects Barrie. I still don't really understand why he left the Seals though.A Game of Chance (3 stars): Chance sets up Sunny. He should be kicked.Overall the Mackenzie series is well worth reading. Ms. Howard does have a knack for romance.")]

### Verificando separação dos dados negativos

In [15]:
negative.take(3)

[(1,
  'Rambling and Incoherent',
  'I had read Sundog and Woman Lit by Fireflies and was excited to read more. This novel was a great disappointment. Harrison seemed to be exploring a prose style which mirrored the insanity of his alcohol deprived/cold turkey protagonist. It was a valiant effort but in the end - it failed misearably.'),
 (1,
  'Disappointing',
  'I too read this book and was deeply disappointed. A rambling story with no real satisfaction reached by the reader at the end. I rank this book right up there with "Farmer" and "A good day to die" as being one of Jim\'s lessor works.'),
 (1,
  'Dont waste your money',
  "I found this book to be a big disappointment. It is not an easy read and the majority of the book is full of what I considered useless information. The part of the book that is supposed to explain how to construct your own pyramid is the smallest section of the book and isn't even clearly defined. I have read 5 or 6 different pyramid books and this is the onl

Agora queremos que cada par positivo/negativo, tenha também o seu log(c/T)

In [16]:
#agora sim que já separamos cada um, podemos realizar o processo de manipulação algebrica para encontras as probabilidades de cada palavra no seu contexto (positive ou negative)
positive_ = palavra_contagem(positive)
negative_ = palavra_contagem(negative)

In [17]:
total_positive = positive_.reduce(lambda x, y: ("tudo", x[1] + y[1]))
total_negative = negative_.reduce(lambda x, y: ("tudo", x[1] + y[1]))

In [19]:
positive_log = logaritmo(positive_, total_positive)
negative_log = logaritmo(negative_, total_negative)

### Verificando o output 

Agora que já fizemos a separação de reviews em contexto e calculamos as probabilidades em relação a uma palavra ser negativa ou positiva, podemos verificar o resultado abaixo:

In [20]:
positive_log.take(3)

[('book', -2.272766745312638),
 ('Healing', -5.282237419514753),
 ('pyramid', -5.9961630875414125)]

In [21]:
negative_log.take(3)

[('book', -2.3376666325195163),
 ('pyramid', -5.993215643498072),
 ('like', -2.4720889730358304)]

### Tarefa

Use o `.fullOuterJoin()` dos RDDs para construir um RDD unificado, no qual cada item é da forma (palavra, log_prob_positivo, log_prob_negativo). "Baixe" esse resultado final usando `.collect()`.

In [22]:
palavra_concat = positive_log.fullOuterJoin(negative_log).collect()

Vamos verificar as 10 primeiras palavras:

In [112]:
palavra_concat[0:9]

[('out', (-2.6616432990632193, -2.596022446951644)),
 ('ambulance', (-6.340491965199566, -6.3151435631912705)),
 ('Stars', (-4.739659538336522, -4.854012962539754)),
 ('family', (-3.4798931704033724, -3.9203467150039017)),
 ('sound,', (-4.343373450607565, -4.484889325271383)),
 ('Holiday', (-5.022630737344416, -5.465698194765326)),
 ('everyone,', (-4.776093194346103, -5.33903263885652)),
 ('outdoes', (-5.877611149594514, -6.634922994868091)),
 ('(and', (-3.7824224191001767, -3.795149282785406))]

## De lista para dicionário

O resultado foi retornado em forma de lista, contudo, uma vez que vamos precisar consultar várias vezes probabilidades relacionadas a uma palavra que está contida na lista, teríamos que percorrer a lista até encontrar a palavra. No pior caso, a palavra poderia ser a última da lista, necessitando percorrer a lista inteira apenas para encontrar suas probabilidades.

Se convertermos isso para um dicionário em que as chaves são as palavras, podemos de maneira simples consultar as probabilidades (uma complexidade bem menor).

In [9]:
def coconvert_tupla_dict(list_tuplas):
    return {k[0]: (k[1][0], k[1][1]) for k in list_tuplas}

In [43]:
palavra_concat_dict = convert_tupla_dict(palavra_concat)

Pronto, com a mudança o processo se tornará muito mais rápido. Agora podemos prosseguir com a atividade!

#### Tarefa

Para uma dada string, determine se ela é um review positivo ou negativo usando os RDDs acima. Lembre-se de como funciona o classificador Naive-Bayes: http://stanford.edu/~jurafsky/slp3/slides/7_NB.pdf, consulte tambem suas notas de aula de Ciência dos Dados!

In [45]:
#vamos pegar um teste qualquer presente no dataset
phrase = '''A complete waste of time. Typographical errors, poor grammar, and a totally pathetic plot add up to absolutely nothing. 
I'm embarrassed for this author and very disappointed I actually paid for this book.'''

### Passo a passo 

Tendo a frase em mãos, precisaremos dividir ela em palavras e em seguida verificar a probabilidade para cada uma delas de estar presente em um contexto positivo ou negativo. Ao somar todas as probabilidade, se a probabilidade de ser negativo for maior, teremos uma frase negativa. Se for o contrário, teremos uma frase positiva =)

In [47]:
all_terms = phrase.split()
all_terms

['A',
 'complete',
 'waste',
 'of',
 'time.',
 'Typographical',
 'errors,',
 'poor',
 'grammar,',
 'and',
 'a',
 'totally',
 'pathetic',
 'plot',
 'add',
 'up',
 'to',
 'absolutely',
 'nothing.',
 "I'm",
 'embarrassed',
 'for',
 'this',
 'author',
 'and',
 'very',
 'disappointed',
 'I',
 'actually',
 'paid',
 'for',
 'this',
 'book.']

In [10]:
def naive_bayes(words, reference):
    #precisamos divir em palavras positivas e palavras negativas e somar a probabilidade total
    positive = 0
    negative = 0
    
    for word in words:
        if word in reference:
            if reference[word][0] != None:
                positive += reference[word][0]
            if reference[word][1] != None:
                negative += reference[word][1]
    
    print(positive)
    print(negative)
    if positive > negative:
        return "positive"

    return "negative"
    

In [49]:
result = naive_bayes(all_terms, palavra_concat_dict)
result

-112.3695899560691
-104.0231864327492


'negative'

Aparentemente deu tudo certo, porque o resultado foi negativo e a mensagem que estávamos tentando classificar era sobre um leitor indignado com uma obra e sobre sua expectativa não ter sido atingida durante a leitura.

### Fase 2

Agora que temos um classificador Naive-Bayes, vamos explorá-lo um pouco:

### Tarefa

Quais são as 100 palavras que mais indicam negatividade, ou seja, onde a diferença entre a probabilidade da palavra no conjunto dos comentários negativos e positivos é máxima? E quais as 100 palavras de maior positividade? Mostre os resultados na forma de *word clouds*.

In [79]:
palavra_rdd_concat = positive_log.fullOuterJoin(negative_log)

Abaixo estamos realizando várias ações que basicamente pegam a diferença da probabilidade de cada palavra e seleciona as mais negativas. Após isso, as tuplas são limpas para ficar apenas uma lista de **(palavra, diferença de prob)**

In [63]:
most_negative = palavra_rdd.filter(lambda x: x[1][0] != None).\
    filter(lambda x: x[1][1] != None).map(lambda x: (x[0], x[1][0], x[1][1], x[1][1] - x[1][0])).map(lambda x: (x[0], x[3])).takeOrdered(100, lambda x: -x[1])

Aqui estão as mais negativa:

In [64]:
most_negative

[('Worthless', 2.7038334385911877),
 ('Awful!', 2.63719133801127),
 ('Terrible!', 2.5171176927583643),
 ('Uninspired', 2.5055578415802504),
 ('Avoid!', 2.4941116436588127),
 ('Useless.', 2.491526549824319),
 ('Disappointed...', 2.491526549824319),
 ('JUNK!!!', 2.455032459287918),
 ('Junk!', 2.442636824735115),
 ('worthless!', 2.430374731773381),
 ('Worthless.', 2.4105265387055494),
 ('unfunny,', 2.404232305607861),
 ('Dull,', 2.390546268204961),
 ('Unreliable', 2.3502971087679043),
 ('JUNK!', 2.3466929844990796),
 ('Horrible.', 2.3181814431927394),
 ('Disappointing...', 2.3079162673656137),
 ('Horrible!', 2.3065952184933094),
 ('Tripe', 2.2782356038432283),
 ('Disappointment!', 2.259844297498919),
 ('Yawn', 2.2467565168608354),
 ('Boring,', 2.24440336171976),
 ('GARBAGE!', 2.2277446115799666),
 ('Atrocious', 2.2119944106016236),
 ('Unsatisfactory', 2.202010189695023),
 ('Disappointing.', 2.2011677171090014),
 ('Misrepresentation', 2.194368447401147),
 ('USELESS.', 2.186589830827529),
 

In [65]:
most_positive = palavra_rdd.filter(lambda x: x[1][0] != None).\
    filter(lambda x: x[1][1] != None).map(lambda x: (x[0], x[1][0], x[1][1], x[1][0] - x[1][1])).map(lambda x: (x[0], x[3])).takeOrdered(100, lambda x: -x[1])

O processo foi repetido de maneira totalmente análoga para as palavras mais positivas. Ṕodemos perceber que o resultado abaixo faz sentido, se não são adjetivos positivos, são palavras facilmente imagináveis em contexto positivos, por exemplo 'must-read'.

In [66]:
most_positive

[('Excellent!!!', 2.4226843415770585),
 ('must-read!', 2.2430332717551567),
 ('Inspiring!', 2.226642855566988),
 ('Outstanding!', 2.220333381659981),
 ('Excellent!!', 2.1827373698679065),
 ('Adorable!', 2.173397343613763),
 ('Underrated', 2.1491287688868264),
 ('Bookwatch', 2.1407282268603938),
 ('Fantastic!!', 2.137321978168483),
 ('Excellent!', 2.0897600634096705),
 ('Gem!', 2.086596700695946),
 ('must-have!', 2.080793124596706),
 ('Insightful!', 2.0749109423707512),
 ('Awesome!!!', 1.987760766651851),
 ('Excelente', 1.9845317965216527),
 ('MUST!', 1.9829082638574391),
 ('FANTASTIC!!!', 1.9780009293626941),
 ('Pleasantly', 1.973750052209037),
 ('Beautiful!', 1.9658470395875813),
 ('Superb!', 1.95650307292141),
 ('Captivating!', 1.9473321095962426),
 ('Mesmerizing', 1.9366082442044705),
 ('Impressive!', 1.9311453485029677),
 ('Eye-opening', 1.9256128599030067),
 ('Inspirational!', 1.9256128599030067),
 ('Underappreciated', 1.908579520604226),
 ('Addictive!', 1.9027499769435021),
 ('Mu

# Tarefa desafio!

Qual o desempenho do classificador (acurácia)? Para medir sua acurácia:

- Separe os reviews em dois conjuntos: treinamente e teste
- Repita o "treinamento" do classificador com o conjunto de treinamento
- Para cada review do conjunto de teste, determine se é positiva ou negativa de acordo com o classificador
- Determine a acurácia

Esta não é uma tarefa trivial. Não basta fazer um `for` para determinar a classe de cada review de teste: isso demoraria uma eternidade. Você tem que usar variáveis "broadcast" do Spark para enviar uma cópia da tabela de frequencias para cada *core* do executor.

### Parte 1 

Vamos dividir o dataset em treino e teste. Boa notícia, o pyspark já tem um método pŕoprio para isso! https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.DataFrame.randomSplit.html

In [11]:
train, test = rdd.randomSplit(weights=[80, 20], seed=42) #poderia ser adicionada aqui uma seed, facilitando debugs futuros

In [13]:
train.take(3)

[(2,
  'A great first novel by an American icon.',
  "Harrison's first novel is a great book. It shadows the irrevrant character of the novelist himself. From the purpose of the trip [to see wolves] to the final gorging of food in Ishpeming, Harrison takes one on a roller coaster of emotions all bundled up in the persona of a modern man trapped in a world he doesn't understand and that has long foresaken him. Wolf is a must read for Harrrison fans."),
 (1,
  'Rambling and Incoherent',
  'I had read Sundog and Woman Lit by Fireflies and was excited to read more. This novel was a great disappointment. Harrison seemed to be exploring a prose style which mirrored the insanity of his alcohol deprived/cold turkey protagonist. It was a valiant effort but in the end - it failed misearably.'),
 (1,
  'Disappointing',
  'I too read this book and was deeply disappointed. A rambling story with no real satisfaction reached by the reader at the end. I rank this book right up there with "Farmer" and 

Agora que já separamos em treino e teste, podemos começar a treinar o modelo com o dataset de treino! Além disso, haverão palavras presentes no dataset de teste que não estarão no de treino e o contrário também pode ocorrer, por isso, vamos utilizar também o laplace smoothing (poderíamos até mudar a ordem das questões para que esse desafio ficasse após a questão de laplace, mas vamos apenas copiar a função)

In [12]:
def naive_bayes_with_laplace(rdd, alfa, V, T):
    return rdd.map(lambda x: (x[0], np.log( (x[1] + alfa)/(T[1] + V*alfa) ) ) )

alfa = 100
V = 250000

In [13]:
#vamos criar uma função que realiza todo o processo de treinamento, como um pipeline. Podemos até mesmo aproveitar todas as funções que ja fizemos:

def treina_modelo(rdd, alfa, V):
    
    #primeiro separamos frases positivas e depois frases negativas
    positive_quotes = filtra_categ(rdd, 2)
    negative_quotes = filtra_categ(rdd, 1)
    
    #em seguida fazemos uma contagem da frequencia de cada palavra no seu contexto (positivo ou negativo)
    positive_words_count = palavra_contagem(positive_quotes)
    negative_words_count = palavra_contagem(negative_quotes)
    
    #agora precisamos do total de palavras no contexto 
    total_positives = positive_words_count.reduce(lambda x, y: ("total", x[1] + y[1])) #realizamos a soma da frequencia de cada palavra
    total_negatives = positive_words_count.reduce(lambda x, y: ("total", x[1] + y[1]))
    
    #a proxima parte é o inicio da aplicação do naive bayes e vamos colocar o laplace smoothing. Se não colocassemos ele, teriamos que substituir os Nones por 0 nas palavras que
    #nao aparecem
    
    positive_smoothing = naive_bayes_with_laplace(positive_words_count, alfa, V, total_positives)
    negative_smoothing = naive_bayes_with_laplace(negative_words_count, alfa, V, total_negatives)
    
    #lembrando que não podemos deixar Nones 
    return positive_smoothing.fullOuterJoin(negative_smoothing).\
        map(lambda x: (x[0], (x[1][0], np.log10(alfa/(alfa*V)))) if x[1][1] == None else (x[0], (x[1][0],x[1][1]))).\
        map(lambda x: (x[0], (np.log10(alfa/(alfa*V)), x[1][1])) if x[1][0] == None else (x[0], (x[1][0],x[1][1]))).\
        collect() 

    '''
        o collect vai retornar uma lista, na verdade poderiamos utilizar o collectAsMap 
        (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.collectAsMap.html),
        que foi uma dica do Ayres durante o atendimento. Mas como já haviamos feito nossa propria função de conversão, vamos continuar utilizando-a =)
    
    '''
    

As celulas abaixo foram para debug da pipeline, que inicialmente estava dando problemas. Preferimos deixar porque ela também representa a documentação do projeto ^^.

In [98]:
#positive_quotes = filtra_categ(train, 2)
#negative_quotes = filtra_categ(train, 1)

In [99]:
#positive_words_count = palavra_contagem(positive_quotes)
#negative_words_count = palavra_contagem(negative_quotes)

In [100]:
#total_positives = positive_words_count.reduce(lambda x, y: ("total", x[1] + y[1])) #realizamos a soma da frequencia de cada palavra
#total_negatives = positive_words_count.reduce(lambda x, y: ("total", x[1] + y[1]))

In [101]:
#positive_smoothing = naive_bayes_with_laplace(positive_words_count, alfa, V, total_positives)
#negative_smoothing = naive_bayes_with_laplace(negative_words_count, alfa, V, total_negatives)

Vamos verificar a saída da pipeline:

In [14]:
listao_modelo_treino = treina_modelo(train, alfa, V)

Tudo aparenta estar nos conformes, então vamos seguir transformando a lista em um dicionario.

In [15]:
dicionarizao_modelo_treino = coconvert_tupla_dict(listao_modelo_treino)

Como o proprio enunciado do desafio diz, vamos precisar realizar o broadcast de variaveis. Vamos fazer isso agora:

In [16]:
bc_treino = sc.broadcast(dicionarizao_modelo_treino)

In [17]:
bc_alfa = sc.broadcast(alfa)
bc_v = sc.broadcast(V)

O professor Ayres em aula sugeriu o uso do partial, então vamos fazer isso! Vamos precisar passar uma função pro partial e até fizemos uma função de NB que analisa as probabilidades quando realizamos o NB para uma frase de teste, mas precisaremos adaptar para utilizar no partial usando broadcast

In [18]:
#deixaremos essa para quando não formos utilizar smoothing
def naive_bayes_new(words, reference_bc):
    #precisamos divir em palavras positivas e palavras negativas e somar a probabilidade total
    positive = 0
    negative = 0
    
    reference = reference_bc.value
    
    for word in words:
        if word in reference:
            if reference[word][0] != None:
                positive += reference[word][0]
            if reference[word][1] != None:
                negative += reference[word][1]
    
    if positive > negative:
        return 2

    return 1

'''
essa sera utilizada para o smoothing. Aproveitamos para passar tanto alfa quanto v para encapsular, assim podemos testar varios valores 
que funcionem como hiperparametros do modelo e deixar o melhor.
'''
def naive_bayes_new_with_smoothing(words, reference_bc, v_bc, alfa_bc):
    #precisamos divir em palavras positivas e palavras negativas e somar a probabilidade total
    positive = 0
    negative = 0
    
    reference = reference_bc.value
    
    for word in words:
        if word in reference:
            if reference[word][0] != None:
                positive += reference[word][0]
            else:
                positive += np.log10(alfa_bc.value/(alfa_bc.value*v_bc.value))
                
            if reference[word][1] != None:
                negative += reference[word][1]
            else:
                negative += np.log10(alfa_bc.value/(alfa_bc.value*v_bc.value))
                
    if positive > negative:
        return 2

    return 1

Vamos criar o nosso partial e iniciar o processo de aplicação do modelo treinado nos testes

In [19]:
NB_smooth_model = partial(naive_bayes_new_with_smoothing, reference_bc=bc_treino, v_bc=bc_v, alfa_bc=bc_alfa)

In [20]:
#vamos aplicar nosso teste
result = test.map(lambda x: (x[0], splita_frase(x))).map(lambda x: (NB_smooth_model(words = x[1]), x[0]))

In [21]:
result.take(10)

[(1, 1),
 (1, 1),
 (1, 1),
 (1, 1),
 (1, 1),
 (2, 2),
 (1, 2),
 (1, 2),
 (1, 1),
 (1, 1)]

Agora temos que fazer uma contagem de quantas foram as que acertamos, porque com isso, basta fazer em relação ao total que teremos o quanto realmente foi a nossa acuracia. Para saber o quanto acertamos só precisamos de um filter no rdd comparando as tuplas.

## Observação importante:

Por algum motivo, realizar o collect direto ou o count do rdd filtrado apenas com os acertos, explode o consumo de memória RAM. Inicalmente estávamos no windows em que havia 6GB de RAM livre mas depois fizemos toda a mudança para o linux em que temos 12GB de RAM livre, mas nunca era suficiente e o processo explodia, matando o kernel. A única opção que se tornou viável foi a gambiarra abaixo, que possui todos os créditos para a dupla da Livia e do Bernardo. 

In [18]:
lista_correto = sc.broadcast(result.filter(lambda x: x[0] == x[1]).collect())

In [None]:
n_acertos = len(lista_correto.value)

In [23]:
n_total = test.count()

In [None]:
acuracia = n_acertos/n_total

# Tarefa desafio!

Implemente Laplace smoothing

O que é o laplace smoothing e para que ele serve?

Você pode ter percebido que logo no inicio do projeto, quando aplicamos o laplace, ocorreu que algumas palavras só apareciam em contextos positivos ou negativos, tendo uma probabilidade zero. O smoothing de laplace faz com que isso não ocorra, um fator de smoothing e o número total de palavras do dicionário na fórmula (uma estimativa do total de palavras)

$P(w/positive)$ = $\frac{\alpha + positive}{N + \alpha*V}$

In [56]:
#valores retirados do projeto de cdados (https://github.com/wilgnerl/Projeto1-Cdados)
alfa = 100
V = 250000

In [42]:
def naive_bayes_with_laplace(rdd, alfa, V, T):
    return rdd.map(lambda x: (x[0], np.log10( (x[1] + alfa)/(T[1] + V*alfa) ) ) )

In [43]:
#Agora que temos o laplace smoothing, basta reaplicar nos testes que fizemos para verificar a diferença
positive_smoothing = naive_bayes_with_laplace(positive_, alfa, V, total_positive)
negative_smoothing = naive_bayes_with_laplace(negative_, alfa, V, total_negative)

In [44]:
positive_smoothing.take(4)

[('book', -2.346095596243699),
 ('Healing', -5.298323913072647),
 ('pyramid', -5.831524088674815),
 ('like', -2.5666435281488233)]

In [45]:
negative_smoothing.take(4)

[('book', -2.40594650080964),
 ('pyramid', -5.838546852526434),
 ('like', -2.540345474299366),
 ('out', -2.664249960582503)]

### Atenção!

Opa! Mas dessa vez não é só juntar o smoothing dos negativos e dos positivos com o fullOuterJoin? Não, porque quando o join ocorrer, ele vai tentar juntar palavras que talvez só existam em um contexto, e por isso, uma das probabilidades será None! Nesse caso, é preciso reaplicar o laplace pra esses casos que são desconhecidos =)

In [59]:
palavra_concat_smooth = positive_smoothing.fullOuterJoin(negative_smoothing).\
    map(lambda x: (x[0], (x[1][0], np.log10(alfa/(alfa*V)))) if x[1][1] == None else (x[0], (x[1][0],x[1][1]))).\
    map(lambda x: (x[0], (np.log10(alfa/(alfa*V)), x[1][1])) if x[1][0] == None else (x[0], (x[1][0],x[1][1]))).\
    collect()

Sabemos que nenhum dado foi perdido porque o tamanho da lista continua igual a como era na fase 2 do projeto! 

In [60]:
len(palavra_concat_smooth)

4950593

As palavras que não existem em um contexto ficaram com a probabilidade de -5.3979400086720375. Consegue perceber abaixo? 

In [61]:
palavra_concat_smooth

[('out', (-2.734885365374987, -2.664249960582503)),
 ('ambulance', (-5.996757420142288, -6.001750089230017)),
 ('Stars', (-4.795832648474573, -4.901702169798683)),
 ('family', (-3.552317429857434, -3.9862334674012705)),
 ('sound,', (-4.409767483131622, -4.544284306375992)),
 ('Holiday', (-5.0635701889473035, -5.455149708364345)),
 ('everyone,', (-4.830791720066347, -5.347128899286434)),
 ('outdoes', (-5.7591144033427, -6.1076414012573625)),
 ('(and', (-3.85387764671081, -3.8616502125434433)),
 ('profanities),', (-6.201951060902276, -5.3979400086720375)),
 ('kinda', (-4.446680126038944, -4.479633933227099)),
 ('ALWAYS', (-5.017625138685202, -5.120802847315334)),
 ("BURTON'S", (-6.1934352099797465, -5.3979400086720375)),
 ('matter,', (-5.077550150346492, -4.984326197313276)),
 ('Calvary', (-6.004875310364468, -6.124156489322921)),
 ('cute', (-4.124817106862345, -4.2650507885036975)),
 ('serviceable', (-5.861880160999808, -5.756179704028327)),
 ('Olvidan"', (-6.201951060902276, -5.3979400

Vamos refazer o processo mas sem o collect dessa vez, para verificarmos se algo mudou com as palavras mais positivas ou negativas.

In [51]:
palavra_concat_smooth = positive_smoothing.fullOuterJoin(negative_smoothing).\
    map(lambda x: (x[0], (x[1][0], np.log10(alfa/(alfa*V)))) if x[1][1] == None else (x[0], (x[1][0],x[1][1]))).\
    map(lambda x: (x[0], (np.log10(alfa/(alfa*V)), x[1][1])) if x[1][0] == None else (x[0], (x[1][0],x[1][1])))

In [52]:
most_negative_with_smoothing = palavra_concat_smooth.map(lambda x: (x[0], x[1][0], x[1][1], x[1][1] - x[1][0])).map(lambda x: (x[0], x[3])).takeOrdered(100, lambda x: -x[1])

In [53]:
most_negative_with_smoothing

[('Disappointing', 1.8220457182801608),
 ('Waste', 1.7305894232232353),
 ('Disappointed', 1.6599676965922034),
 ('Terrible', 1.620164578913955),
 ('Poorly', 1.5998804062605947),
 ('Boring', 1.5864624905567304),
 ('WASTE', 1.5816392884229709),
 ('Horrible', 1.5811793460304422),
 ('Disappointment', 1.5455175658294849),
 ('Worst', 1.5090061700392052),
 ('Junk', 1.5081834081731094),
 ('Awful', 1.4913398102703184),
 ('refund.', 1.4844859150111187),
 ('waste', 1.4526240130314894),
 ('Avoid', 1.422531918545455),
 ('WORST', 1.4214686846339752),
 ('Worthless', 1.4022768053579142),
 ('Defective', 1.3851282699841914),
 ('junk.', 1.3751418624793867),
 ('Useless', 1.3673625526800377),
 ('garbage.', 1.3670083326446676),
 ('Buyer', 1.3579565971092853),
 ('Poor', 1.3478366261394847),
 ('awful.', 1.3323128509570452),
 ('refund', 1.3317332308939802),
 ('disappointment!', 1.3090471202601277),
 ('disappointment.', 1.3015407682374072),
 ('poorly', 1.2967569012165239),
 ('trash.', 1.2851942073100844),
 ('Bo

In [54]:
most_positive_with_smoothing = palavra_concat_smooth.map(lambda x: (x[0], x[1][0], x[1][1], x[1][0] - x[1][1])).map(lambda x: (x[0], x[3])).takeOrdered(100, lambda x: -x[1])

In [55]:
most_positive_with_smoothing

[('Excellent', 1.4894506925491404),
 ('Awesome', 1.4216832465162552),
 ('Outstanding', 1.4011812496639253),
 ('Excellent!', 1.3728181058943294),
 ('Awesome!', 1.2893398365510205),
 ('must-have', 1.2337027424750149),
 ('Wonderful', 1.1900803018596555),
 ('Wonderful!', 1.174357471786596),
 ('Excellent,', 1.1594210304715622),
 ('amazing!', 1.1397716604434098),
 ('Highly', 1.1346680801736397),
 ('Enjoy!', 1.1243188951512924),
 ('Fantastic', 1.11250691803885),
 ('Great!', 1.1106848711277983),
 ('Fantastic!', 1.1097266069900336),
 ('Superb', 1.1056086421094147),
 ('Terrific', 1.0659929123676992),
 ('awesome!', 1.0635873843535766),
 ('must-read', 1.0626692768800865),
 ('Enjoyable', 1.0551889530389937),
 ('pleasantly', 1.0507253421117877),
 ('Recommended.', 1.0330817071776055),
 ('recommended!', 1.0330614253636607),
 ('invaluable', 1.027150703801559),
 ('Perfect!', 1.0216001561035783),
 ('Amazing', 1.017396190158271),
 ('Amazing!', 1.0160289454829936),
 ('Perfect', 1.0153240405694586),
 ('Insp

### Algumas observações

Aparentemente, mudou em relação a tarefa inicial de predizer as 100 mais negativas e as 100 mais positivas, contudo, continua fazendo sentido. Inicialmente, nós (Jean e Fabricio) tivemos problemas porque os resultados não pareciam fazer muito sentido, apesar de estarmos fazendo as mesmas coisas que fizemos na primeira parte. Então percebemos que o alpha e o V (número estimado de palavras na língua) afetavam de maneira diferente uma palavra existente no contexto e uma palavra inexistente, então estimamos valores melhores que começaram a fazer mais sentido até chegar no resultado acima.

## Rubrica de avaliação

- I: groselha, falha crítica, ou não entregou nada
- D: Fez uma tentativa honesta de fazer todos os itens da fase 1, mas tem erros
- C: Fase 1 completa
- B: Fase 2, faltando apenas um desafio
- A: Fase 2 completa