# Exercicios com Spark

Caso precise, abaixo estão os comandos para iniciar o container:

Para macOS e linux, utilize:

```bash
docker run \
    -it \
    --rm \
    -p 8888:8888 \
    -p 4040:4040 \
    -v "`pwd`":/home/jovyan/work \
    jupyter/pyspark-notebook


```

Se estiver no Windows estes comandos, utilize:

- No Powershell: `docker run -it --rm -p 8888:8888 -p 4040:4040 -v ${PWD}:/home/jovyan/work jupyter/pyspark-notebook`

- No Prompt de comando: `docker run -it --rm -p 8888:8888 -p 4040:4040 -v %cd%:/home/jovyan/work jupyter/pyspark-notebook`

Agora abra esse notebook lá no container!


## Iniciando o Spark

In [1]:
import pyspark

conf = pyspark.SparkConf()
conf.setAppName('Minha aplicação')
conf.setMaster('local[*]')

sc = pyspark.SparkContext(conf=conf)

23/11/13 08:59:16 WARN Utils: Your hostname, MacBook-Pro-de-Antonio.local resolves to a loopback address: 127.0.0.1; using 10.102.5.64 instead (on interface en0)
23/11/13 08:59:16 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/13 08:59:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
sc

## Iniciando a biblioteca de correção

In [3]:
import insperautograder.jupyter as ia
from dotenv import load_dotenv
load_dotenv()

True

In [4]:
ia.tasks()

|    | Atividade            | De                        | Até                       |
|---:|:---------------------|:--------------------------|:--------------------------|
|  0 | select01             | 2023-08-08 03:00:00+00:00 | 2023-08-21 02:59:59+00:00 |
|  1 | newborn              | 2023-08-08 03:00:00+00:00 | 2023-08-16 02:59:59+00:00 |
|  2 | ddl                  | 2023-08-27 03:00:00+00:00 | 2023-09-02 02:59:59+00:00 |
|  3 | dml                  | 2023-08-29 03:00:00+00:00 | 2023-09-04 02:59:59+00:00 |
|  4 | agg_join             | 2023-09-03 03:00:00+00:00 | 2023-09-09 02:59:59+00:00 |
|  5 | group_having         | 2023-09-03 03:00:00+00:00 | 2023-09-17 02:59:59+00:00 |
|  6 | views                | 2023-09-11 03:00:00+00:00 | 2023-09-18 02:59:59+00:00 |
|  7 | sql_review1          | 2023-09-13 03:00:00+00:00 | 2023-09-20 02:59:59+00:00 |
|  8 | permissions          | 2023-09-20 03:00:00+00:00 | 2023-09-27 03:00:00+00:00 |
|  9 | desafio_normalizacao | 2023-09-25 03:00:00+00:00 | 2023-10-02 03:00:00+00:00 |
| 10 | ai_md_23_2           | 2023-10-09 03:00:00+00:00 | 2023-10-10 03:00:00+00:00 |
| 11 | triggers             | 2023-10-19 03:00:00+00:00 | 2023-10-28 03:00:00+00:00 |
| 12 | spark                | 2023-10-29 03:00:00+00:00 | 2023-11-08 15:00:00+00:00 |
| 13 | functional           | 2023-10-29 03:00:00+00:00 | 2023-11-08 15:00:00+00:00 |
| 14 | exercicios_spark     | 2023-11-02 03:00:00+00:00 | 2023-11-14 03:00:00+00:00 |

In [5]:
ia.grades(task="exercicios_spark")

|    | Atividade        | Exercício   |   Peso |   Nota |
|---:|:-----------------|:------------|-------:|-------:|
|  0 | exercicios_spark | ex01        |      1 |     10 |
|  1 | exercicios_spark | ex02        |      1 |     10 |
|  2 | exercicios_spark | ex03        |      1 |     10 |
|  3 | exercicios_spark | ex04        |      1 |     10 |
|  4 | exercicios_spark | ex05        |      1 |     10 |
|  5 | exercicios_spark | ex06        |      1 |     10 |

## Trabalhando com Spark

Para este exercicio vamos trabalhar com o dataset de reviews da Amazon visto em https://www.kaggle.com/datasets/kritanjalijain/amazon-reviews. Baixe o arquivo "train.csv"

Vamos ler o arquivo "train.csv" em um RDD.

In [6]:
rdd = sc.textFile('train.csv')

In [7]:
rdd.take(1)

                                                                                

['"2","Stuning even for the non-gamer","This sound track was beautiful! It paints the senery in your mind so well I would recomend it even to people who hate vid. game music! I have played the game Chrono Cross but out of all of the games I have ever played it has the best music! It backs away from crude keyboarding and takes a fresher step with grate guitars and soulful orchestras. It would impress anyone who cares to listen! ^_^"']

De acordo com a documentação deste arquivo vista no Kaggle, cada linha contem 2 elementos: o sentimento do review (1 - negativo, 2 - positivo), o título e o corpo do review. A linha contem esses elementos em um formato "comma-separated value" (CSV), onde cada um dos campos está delimitado por aspas duplas. Se o texto em si (titulo ou corpo) contem aspas, elas aparecem como um par de aspas duplas. Vamos usar o `.filter()` para achar um exemplo desses.

In [8]:
example_line = rdd.filter(lambda x: '""' in x).take(1)
example_line = example_line[0]

example_line

'"2","Amazing!","This soundtrack is my favorite music of all time, hands down. The intense sadness of ""Prisoners of Fate"" (which means all the more if you\'ve played the game) and the hope in ""A Distant Promise"" and ""Girl who Stole the Star"" have been an important inspiration to me personally throughout my teen years. The higher energy tracks like ""Chrono Cross ~ Time\'s Scar~"", ""Time of the Dreamwatch"", and ""Chronomantique"" (indefinably remeniscent of Chrono Trigger) are all absolutely superb as well.This soundtrack is amazing music, probably the best of this composer\'s work (I haven\'t heard the Xenogears soundtrack, so I can\'t say for sure), and even if you\'ve never played the game, it would be worth twice the price to buy it.I wish I could give it 6 stars."'

Levando isso em consideração, vamos fazer uma função simples para separar os campos:

In [9]:
def parse_line(line):
    parts = line[1:-1].split('","')
    sentiment = int(parts[0])
    title = parts[1].replace('""', '"')
    body = parts[2].replace('""', '"')
    return (sentiment, title, body)

In [10]:
parse_line(example_line)

(2,
 'Amazing!',
 'This soundtrack is my favorite music of all time, hands down. The intense sadness of "Prisoners of Fate" (which means all the more if you\'ve played the game) and the hope in "A Distant Promise" and "Girl who Stole the Star" have been an important inspiration to me personally throughout my teen years. The higher energy tracks like "Chrono Cross ~ Time\'s Scar~", "Time of the Dreamwatch", and "Chronomantique" (indefinably remeniscent of Chrono Trigger) are all absolutely superb as well.This soundtrack is amazing music, probably the best of this composer\'s work (I haven\'t heard the Xenogears soundtrack, so I can\'t say for sure), and even if you\'ve never played the game, it would be worth twice the price to buy it.I wish I could give it 6 stars.')

Podemos agora utilizar nossa função para separar os campos de cada linha do dataset. 

In [11]:
rdd_split = rdd.map(parse_line)

Como de costume, nada realmente acontece até que uma "action" seja invocada. O `.map()` é uma "transformation". Vamos usar uma action simples para "materializar" o novo RDD.

In [12]:
rdd_split.count()

                                                                                

3600000

Vamos explorar os resultados para ver se deu certo

In [13]:
rdd_split.take(1)

[(2,
  'Stuning even for the non-gamer',
  'This sound track was beautiful! It paints the senery in your mind so well I would recomend it even to people who hate vid. game music! I have played the game Chrono Cross but out of all of the games I have ever played it has the best music! It backs away from crude keyboarding and takes a fresher step with grate guitars and soulful orchestras. It would impress anyone who cares to listen! ^_^')]

**Atividade**: Implemente uma função que recebe o rdd processado e conte quantos sentimentos diferentes existem, e quantas vezes aparecem, para confirmar que só tem os sentimentos 1 e 2. Sua função deve retornar o resultado em tuplas, onde o primeiro elemento é o sentimento e o segundo é a contagem de vezes que aparece.

In [14]:
def ex01(rdd_split):

    def separa_sentimento(linha):
        rdd_sentimento = linha[0]
        return rdd_sentimento
    
    def conta_sentimento(rdd):
        rdd = rdd.map(lambda x: (x,1)).reduceByKey(lambda x, y: x + y)
        return rdd

    rdd_split = rdd_split.map(separa_sentimento)

    rdd_split = conta_sentimento(rdd_split)

    return rdd_split.collect()
    
ex01(rdd_split)

23/11/13 08:59:29 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

[(1, 1800000), (2, 1800000)]

In [15]:
ia.sender(answer="ex01", task="exercicios_spark", question="ex01", answer_type="pycode")

interactive(children=(Button(description='Enviar ex01', style=ButtonStyle()), Output()), _dom_classes=('widget…

**Atividade**: Implemente uma função que recebe o rdd processado e retorna quantos reviews não tem titulo.

In [16]:
def ex02(rdd_split):

    return rdd_split.filter(lambda x: x[1] == "").map(lambda x: 1).reduce(lambda x, y: x + y)

ex02(rdd_split)

                                                                                

48

In [17]:
ia.sender(answer="ex02", task="exercicios_spark", question="ex02", answer_type="pycode")

interactive(children=(Button(description='Enviar ex02', style=ButtonStyle()), Output()), _dom_classes=('widget…

**Atividade**: Implemente uma função que recebe o rdd processado e retorna quantos reviews não tem corpo.

In [18]:
def ex03(rdd_split):
    qt_corpo = rdd_split.filter(lambda x: x[2] == "")
    if qt_corpo.count() == 0:
        return 0
    return qt_corpo.map(lambda x: 1).reduce(lambda x, y: x + y)

ex03(rdd_split)

                                                                                

0

In [19]:
ia.sender(answer="ex03", task="exercicios_spark", question="ex03", answer_type="pycode")

interactive(children=(Button(description='Enviar ex03', style=ButtonStyle()), Output()), _dom_classes=('widget…

**Atividade**: Implemente uma função que recebe o rdd processado e retorna qual o comprimento máximo de um título e de um corpo. O resultado deve ser uma tupla com os dois valores.

In [20]:
def ex04(rdd_split):

    rdd_split = rdd_split.map(lambda x: (len(x[1]), len(x[2])))

    tupla = rdd_split.reduce(lambda x, y: (x[0] if x[0] > y[0] else y[0], x[1] if x[1] > y[1] else y[1]))

    return tupla
    
ex04(rdd_split)

                                                                                

(139, 1010)

In [21]:
ia.sender(answer="ex04", task="exercicios_spark", question="ex04", answer_type="pycode")

interactive(children=(Button(description='Enviar ex04', style=ButtonStyle()), Output()), _dom_classes=('widget…

**Atividade**: Implemente uma função que recebe o rdd processado e retorna qual a maior palavra palíndroma sem pontuações do dataset (no titulo ou corpo) e seu tamanho. Para este exercício, está permitido o uso de list comprehensions.

In [22]:
import string
string.punctuation

'!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~'

In [27]:
def ex05(rdd_split):

    rdd_formatao = rdd_split.map(lambda x: (x[1], x[2]))

    rdd_formatao = rdd_formatao.filter(lambda x: (x[0] if len(x[0]) > 0 else "", x[1] if len(x[1]) >0 else ""))

    def checa_pontuacao(linha):
        rdd_titulo = linha[0]
        rdd_corpo = linha[1]

        titulo_valido = rdd_titulo.split()
        corpo_valido = rdd_corpo.split()

        titulo_valido = [word for word in rdd_titulo.split() if not any(char in word for char in string.punctuation)]
        corpo_valido = [word for word in rdd_corpo.split() if not any(char in word for char in string.punctuation)]
        return (titulo_valido + corpo_valido)

    def eh_palindromo(palavra):
        return palavra == palavra[::-1]

    rdd_tupla = rdd_formatao.map(checa_pontuacao)

    rdd_tupla_flat = rdd_tupla.flatMap(lambda l: l)

    palindromos = rdd_tupla_flat.filter(eh_palindromo)

    maior_palindromo = palindromos.reduce(lambda x, y: x if len(x) > len(y) else y)


    return maior_palindromo, len(maior_palindromo)


ex05(rdd_split)

ERROR:root:KeyboardInterrupt while sending command.               (0 + 16) / 48]
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [None]:
ia.sender(answer="ex05", task="exercicios_spark", question="ex05", answer_type="pycode")

interactive(children=(Button(description='Enviar ex05', style=ButtonStyle()), Output()), _dom_classes=('widget…

**Atividade**: Implemente uma função que recebe o rdd processado e retorna as 20 palavras mais populares do titulo com sua frequência absoluta. Teste no subconjunto apresentado abaixo.

In [None]:
def ex06(rdd_split):

    rdd_formatao = rdd_split.map(lambda x: x[1].split())

    rdd_flat = rdd_formatao.flatMap(lambda l: l)

    rdd_numerado = rdd_flat.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)

    return rdd_numerado.takeOrdered(20, lambda x: -x[-1])

ex06(rdd_split)

In [26]:
rdd_redux = rdd_split.sample(False, 0.05, 7)
ex06(rdd_split)

                                                                                

[('the', 348807),
 ('a', 249841),
 ('of', 241846),
 ('for', 220429),
 ('and', 190973),
 ('to', 177502),
 ('A', 173889),
 ('Great', 168288),
 ('I', 145120),
 ('is', 143982),
 ('Not', 140413),
 ('not', 128331),
 ('this', 121247),
 ('The', 119549),
 ('it', 107117),
 ('but', 95869),
 ('book', 95629),
 ('good', 87873),
 ('Good', 86357),
 ('in', 84964)]

In [None]:
ia.sender(answer="ex06", task="exercicios_spark", question="ex06", answer_type="pycode")

interactive(children=(Button(description='Enviar ex06', style=ButtonStyle()), Output()), _dom_classes=('widget…