# Exercicios com Spark

## Instalando o ambiente

O jeito mais simples de começar a trabalhar com Spark é instalar um container com tudo pronto! No site https://hub.docker.com/r/jupyter/pyspark-notebook vemos uma imagem Docker que já vem com `pyspark` e `jupyter lab`. Instale a imagem com o comando:

```bash
docker pull jupyter/pyspark-notebook
```

Vamos iniciar o ambiente de trabalho com o comando `docker run`. Para isso precisamos tomar alguns cuidados:

1) Temos que mapear nosso diretorio local de trabalho para um diretório interno do container, de modo que alterações feitas dentro do container (nesta pasta escolhida) sejam gravadas no nosso diretorio local. No container temos um usuário padrão com *username* `jovyan`. No *homedir* desse usuario temos uma pasta vazia `work`, que vai servir como local de mapeamento do nosso diretorio local de trabalho. Podemos então fazer esse mapeamendo com a opção `-v` do comando `docker run` da seguinte forma:

```bash
-v <diretorio>:/home/jovyan/work
```

onde `<diretorio>` representa seu diretorio local de trabalho.

2) Para acessar o `jupyter notebook` e o *dashboard* do Spark a partir do nosso *browser* favorito temos que abrir algumas portas do container com a opção `-p`. As portas são `8888` (para o próprio `jupyter notebook`) e `4040` (para o *dashboard* do Spark). Ou seja, adicionaremos às opções do `docker run`o seguinte:

```bash
-p 8888:8888 -p 4040:4040
```

Desta forma, ao acessar `localhost:8888` na nossa máquina, estaremos acessando o servidor Jupyter na porta 8888 interna do container.

3) Vamos iniciar o container no modo interativo, e vamos especificar que o container deve ser encerrado ao fechar o servidor Jupyter. Faremos isso com as opções `-it` e `-rm`

Portanto, o comando completo que eu uso na minha máquina Linux para iniciar o container é:

```bash
docker run \
    -it \
    --rm \
    -p 8888:8888 \
    -p 4040:4040 \
    -v `pwd`:/home/jovyan/work \
    jupyter/pyspark-notebook


```

Para facilitar a vida eu coloco esse comando em um arquivo `inicia.sh`. Engenheiros, façam do jeito que preferirem!

Agora abra esse notebook lá no container!


## Iniciando o Spark

Vamos iniciar o ambiente Spark. Para isso vamos:

1) Criar um objeto de configuração do ambiente Spark. Nossa configuração será simples: vamos especificar que o nome da nossa aplicação Spark é "Minha aplicação", e que o *master node* é a máquina local, usando todos os *cores* disponíveis. Aplicações reais de Spark são configuradas de modo ligeiramente diferente: ao especificar o *master node* passamos uma URL real, com o endereço do nó gerente do *cluster* Spark.

2) Vamos criar um objeto do tipo `SparkContext` com essa configuração

In [None]:
import pyspark

conf = pyspark.SparkConf()
conf.setAppName('Minha aplicação')
conf.setMaster('local[*]')

sc = pyspark.SparkContext(conf=conf)

O `SparkContext` é a nossa porta de entrada para o cluster Spark, ele será a raiz de todas as nossas operações com o Spark.

In [None]:
sc

O link acima provavelmente não funcionará porque ele se refere à porta 4040 interna do container (portanto a URL está com endereço interno). Porém fizemos o mapeamento da porta 4040 interna para a porta 4040 externa, logo você pode acessar o *dashboard* do Spark no endereço http://localhost:4040

<center><img src="./spark_dashboard.png" width=800/></center>

## Trabalhando com Spark

Para este exercicio vamos trabalhar com o dataset de reviews da Amazon visto em https://www.kaggle.com/datasets/kritanjalijain/amazon-reviews. Baixe o arquivo "train.csv"

Vamos ler o arquivo "train.csv" em um RDD.

In [None]:
rdd = sc.textFile('train.csv')

In [None]:
rdd.take(1)

De acordo com a documentação deste arquivo vista no Kaggle, cada linha contem 2 elementos: o sentimento do review (1 - negativo, 2 - positivo), o título e o corpo do review. A linha contem esses elementos em um formato "comma-separated value" (CSV), onde cada um dos campos está delimitado por aspas duplas. Se o texto em si (titulo ou corpo) contem aspas, elas aparecem como um par de aspas duplas. Vamos usar o `.filter()` para achar um exemplo desses.

In [None]:
example_line = rdd.filter(lambda x: '""' in x).take(1)
example_line = example_line[0]

example_line

Levando isso em consideração, vamos fazer uma função simples para separar os campos:

In [None]:
def parse_line(line):
    parts = line[1:-1].split('","')
    sentiment = int(parts[0])
    title = parts[1].replace('""', '"')
    body = parts[2].replace('""', '"')
    return (sentiment, title, body)

In [None]:
parse_line(example_line)

Podemos agora utilizar nossa função para separar os campos de cada linha do dataset. 

In [None]:
rdd_split = rdd.map(parse_line)

Como de costume, nada realmente acontece até que uma "action" seja invocada. O `.map()` é uma "transformation". Vamos usar uma action simples para "materializar" o novo RDD.

In [None]:
rdd_split.count()

Vamos explorar os resultados para ver se deu certo

In [None]:
rdd_split.take(1)

**Atividade**: Conte quantos sentimentos diferentes existem, e quantas vezes aparecem, para confirmar que só tem os sentimentos 1 e 2:

**Atividade**: Quantos reviews não tem titulo?

**Atividade**: Quantos reviews não tem corpo?

**Atividade**: Qual o comprimento máximo de um título e de um corpo?

**Atividade**: Qual a maior palavra palíndroma no titulo ou corpo?

**Atividade**: Quais as 20 palavras mais populares do titulo? 