# Exercicios com Spark

Caso precise, abaixo estão os comandos para iniciar o container:

Para macOS e linux, utilize:

```bash
docker run \
    -it \
    --rm \
    -p 8888:8888 \
    -p 4040:4040 \
    -v "`pwd`":/home/jovyan/work \
    jupyter/pyspark-notebook


```

Se estiver no Windows estes comandos, utilize:

- No Powershell: `docker run -it --rm -p 8888:8888 -p 4040:4040 -v ${PWD}:/home/jovyan/work jupyter/pyspark-notebook`

- No Prompt de comando: `docker run -it --rm -p 8888:8888 -p 4040:4040 -v %cd%:/home/jovyan/work jupyter/pyspark-notebook`

Agora abra esse notebook lá no container!


## Iniciando o Spark

In [1]:
import pyspark

conf = pyspark.SparkConf()
conf.setAppName("Minha aplicação")
conf.setMaster("local[*]")

sc = pyspark.SparkContext(conf=conf)

In [2]:
sc

## Iniciando a biblioteca de correção

In [3]:
!pip install git+https://github.com/macielcalebe/insperautograding.git

import insperautograder.jupyter as ia
from dotenv import load_dotenv
load_dotenv()

Collecting git+https://github.com/macielcalebe/insperautograding.git
  Cloning https://github.com/macielcalebe/insperautograding.git to /tmp/pip-req-build-lnbxzye6
  Running command git clone --filter=blob:none --quiet https://github.com/macielcalebe/insperautograding.git /tmp/pip-req-build-lnbxzye6
  Resolved https://github.com/macielcalebe/insperautograding.git to commit acdda51152774d9e1a979b426e41daa7a8a7793c
  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting python-dotenv (from insperautograder==0.2.0)
  Downloading python_dotenv-1.0.1-py3-none-any.whl.metadata (23 kB)
Downloading python_dotenv-1.0.1-py3-none-any.whl (19 kB)
Building wheels for collected packages: insperautograder
  Building wheel for insperautograder (setup.py) ... [?25ldone
[?25h  Created wheel for insperautograder: filename=insperautograder-0.2.0-py3-none-any.whl size=4418 sha256=60bcf2fed9e7d43acc9bf06205bc2566d7cc6c2c09f5c7adc7a31ad2b98d6f1e
  Stored in directory: /tmp/pip-ephem-wheel-cache-luu

True

In [4]:
ia.tasks()

|    | Atividade            | De                        | Até                       |
|---:|:---------------------|:--------------------------|:--------------------------|
|  0 | newborn              | 2024-02-01 03:00:00+00:00 | 2024-05-30 03:00:00+00:00 |
|  1 | select01             | 2024-02-08 03:00:00+00:00 | 2024-02-19 02:59:59+00:00 |
|  2 | ddl                  | 2024-02-22 03:00:00+00:00 | 2024-02-27 02:59:59+00:00 |
|  3 | dml                  | 2024-02-26 03:00:00+00:00 | 2024-03-03 02:59:59+00:00 |
|  4 | group_having         | 2024-02-29 03:00:00+00:00 | 2024-03-12 02:59:59+00:00 |
|  5 | views                | 2024-02-29 03:00:00+00:00 | 2024-03-20 02:59:59+00:00 |
|  6 | agg_join             | 2024-02-29 03:00:00+00:00 | 2024-03-05 02:59:59+00:00 |
|  7 | sql_review1          | 2024-03-11 03:00:00+00:00 | 2024-03-20 02:59:59+00:00 |
|  8 | permissions          | 2024-03-18 03:00:00+00:00 | 2024-03-26 02:59:59+00:00 |
|  9 | desafio_normalizacao | 2024-03-21 03:00:00+00:00 | 2024-04-15 02:59:59+00:00 |
| 10 | ai_md_23_1           | 2024-03-25 03:00:00+00:00 | 2024-04-01 15:00:00+00:00 |
| 11 | ai_md_23_2           | 2024-03-25 03:00:00+00:00 | 2024-04-01 15:00:00+00:00 |
| 12 | ai_md_24_1           | 2024-04-01 03:00:00+00:00 | 2024-04-01 18:35:00+00:00 |
| 13 | triggers             | 2024-04-18 03:00:00+00:00 | 2024-04-27 02:59:59+00:00 |
| 14 | functional           | 2024-04-25 03:00:00+00:00 | 2024-05-13 02:59:59+00:00 |
| 15 | spark                | 2024-05-02 03:00:00+00:00 | 2024-05-16 02:59:59+00:00 |
| 16 | exercicios_spark     | 2024-05-06 03:00:00+00:00 | 2024-05-20 02:59:59+00:00 |

In [5]:
ia.grades(task="exercicios_spark")

|    | Atividade        | Exercício   |   Peso |   Nota |
|---:|:-----------------|:------------|-------:|-------:|
|  0 | exercicios_spark | ex01        |      1 |      0 |
|  1 | exercicios_spark | ex02        |      1 |      0 |
|  2 | exercicios_spark | ex03        |      1 |      0 |
|  3 | exercicios_spark | ex04        |      1 |      0 |
|  4 | exercicios_spark | ex05        |      1 |      0 |
|  5 | exercicios_spark | ex06        |      1 |      0 |

## Trabalhando com Spark

Para este exercicio vamos trabalhar com o dataset de reviews da Amazon visto em https://www.kaggle.com/datasets/kritanjalijain/amazon-reviews. Baixe o arquivo "train.csv"

Vamos ler o arquivo "train.csv" em um RDD.

In [6]:
rdd = sc.textFile("train.csv")

In [7]:
rdd.take(1)

['"2","Stuning even for the non-gamer","This sound track was beautiful! It paints the senery in your mind so well I would recomend it even to people who hate vid. game music! I have played the game Chrono Cross but out of all of the games I have ever played it has the best music! It backs away from crude keyboarding and takes a fresher step with grate guitars and soulful orchestras. It would impress anyone who cares to listen! ^_^"']

De acordo com a documentação deste arquivo vista no Kaggle, cada linha contem 2 elementos: o sentimento do review (1 - negativo, 2 - positivo), o título e o corpo do review. A linha contem esses elementos em um formato "comma-separated value" (CSV), onde cada um dos campos está delimitado por aspas duplas. Se o texto em si (titulo ou corpo) contem aspas, elas aparecem como um par de aspas duplas. Vamos usar o `.filter()` para achar um exemplo desses.

In [8]:
example_line = rdd.filter(lambda x: '""' in x).take(1)
example_line = example_line[0]

example_line

'"2","Amazing!","This soundtrack is my favorite music of all time, hands down. The intense sadness of ""Prisoners of Fate"" (which means all the more if you\'ve played the game) and the hope in ""A Distant Promise"" and ""Girl who Stole the Star"" have been an important inspiration to me personally throughout my teen years. The higher energy tracks like ""Chrono Cross ~ Time\'s Scar~"", ""Time of the Dreamwatch"", and ""Chronomantique"" (indefinably remeniscent of Chrono Trigger) are all absolutely superb as well.This soundtrack is amazing music, probably the best of this composer\'s work (I haven\'t heard the Xenogears soundtrack, so I can\'t say for sure), and even if you\'ve never played the game, it would be worth twice the price to buy it.I wish I could give it 6 stars."'

Levando isso em consideração, vamos fazer uma função simples para separar os campos:

In [9]:
def parse_line(line):
    parts = line[1:-1].split('","')
    sentiment = int(parts[0])
    title = parts[1].replace('""', '"')
    body = parts[2].replace('""', '"')
    return (sentiment, title, body)

In [10]:
parse_line(example_line)

(2,
 'Amazing!',
 'This soundtrack is my favorite music of all time, hands down. The intense sadness of "Prisoners of Fate" (which means all the more if you\'ve played the game) and the hope in "A Distant Promise" and "Girl who Stole the Star" have been an important inspiration to me personally throughout my teen years. The higher energy tracks like "Chrono Cross ~ Time\'s Scar~", "Time of the Dreamwatch", and "Chronomantique" (indefinably remeniscent of Chrono Trigger) are all absolutely superb as well.This soundtrack is amazing music, probably the best of this composer\'s work (I haven\'t heard the Xenogears soundtrack, so I can\'t say for sure), and even if you\'ve never played the game, it would be worth twice the price to buy it.I wish I could give it 6 stars.')

Podemos agora utilizar nossa função para separar os campos de cada linha do dataset. 

In [None]:
rdd_split = rdd.map(parse_line)

Como de costume, nada realmente acontece até que uma "action" seja invocada. O `.map()` é uma "transformation". Vamos usar uma action simples para "materializar" o novo RDD.

In [None]:
rdd_split.count()

Vamos explorar os resultados para ver se deu certo

In [None]:
rdd_split.take(1)

**Atividade**: Implemente uma função que recebe o rdd processado e conte quantos sentimentos diferentes existem, e quantas vezes aparecem, para confirmar que só tem os sentimentos 1 e 2. Sua função deve retornar o resultado em tuplas, onde o primeiro elemento é o sentimento e o segundo é a contagem de vezes que aparece.

In [None]:
def ex01(rdd_split):
    # Seu código AQUI
    return rdd_split # Pode alterar o retorno, se necessário!
    
ex01(rdd_split)

In [None]:
ia.sender(answer="ex01", task="exercicios_spark", question="ex01", answer_type="pycode")

**Atividade**: Implemente uma função que recebe o rdd processado e retorna quantos reviews não tem titulo.

In [None]:
def ex02(rdd_split):
    # Seu código AQUI
    return rdd_split # Pode alterar o retorno, se necessário!

ex02(rdd_split)

In [None]:
ia.sender(answer="ex02", task="exercicios_spark", question="ex02", answer_type="pycode")

**Atividade**: Implemente uma função que recebe o rdd processado e retorna quantos reviews não tem corpo.

In [None]:
def ex03(rdd_split):
    # Seu código AQUI
    return rdd_split # Pode alterar o retorno, se necessário!

ex03(rdd_split)

In [None]:
ia.sender(answer="ex03", task="exercicios_spark", question="ex03", answer_type="pycode")

**Atividade**: Implemente uma função que recebe o rdd processado e retorna qual o comprimento máximo de um título e de um corpo. O resultado deve ser uma tupla com os dois valores.

In [None]:
def ex04(rdd_split):
    # Seu código AQUI
    return rdd_split # Pode alterar o retorno, se necessário!
    
ex04(rdd_split)

In [None]:
ia.sender(answer="ex04", task="exercicios_spark", question="ex04", answer_type="pycode")

**Atividade**: Implemente uma função que recebe o rdd processado e retorna qual a maior palavra palíndroma sem pontuações do dataset (no titulo ou corpo) e seu tamanho. Para este exercício, está permitido o uso de list comprehensions.

In [None]:
import string
string.punctuation

In [None]:
def ex05(rdd_split):
    # Seu código AQUI
    return rdd_split # Pode alterar o retorno, se necessário!

ex05(rdd_split)

In [None]:
ia.sender(answer="ex05", task="exercicios_spark", question="ex05", answer_type="pycode")

**Atividade**: Implemente uma função que recebe o rdd processado e retorna as 20 palavras mais populares do titulo com sua frequência absoluta. Teste no subconjunto apresentado abaixo.

In [None]:
def ex06(rdd_split):
    # Seu código AQUI
    return rdd_split # Pode alterar o retorno, se necessário!

ex06(rdd_split)

In [None]:
rdd_redux = rdd_split.sample(False, 0.05, 7)
ex06(rdd_split)

In [None]:
ia.sender(answer="ex06", task="exercicios_spark", question="ex06", answer_type="pycode")

## Conferir notas

In [None]:
ia.tasks()

In [None]:
ia.grades(task="exercicios_spark")