# Exercicios com Spark

Caso precise, abaixo estão os comandos para iniciar o container:

Para macOS e linux, utilize:

```bash
docker run \
    -it \
    --rm \
    -p 8888:8888 \
    -p 4040:4040 \
    -v "`pwd`":/home/jovyan/work \
    jupyter/pyspark-notebook


```

Se estiver no Windows estes comandos, utilize:

- No Powershell: `docker run -it --rm -p 8888:8888 -p 4040:4040 -v ${PWD}:/home/jovyan/work jupyter/pyspark-notebook`

- No Prompt de comando: `docker run -it --rm -p 8888:8888 -p 4040:4040 -v %cd%:/home/jovyan/work jupyter/pyspark-notebook`

Agora abra esse notebook lá no container!


## Iniciando o Spark

In [1]:
import pyspark

conf = pyspark.SparkConf()
conf.setAppName("Minha aplicação")
conf.setMaster("local[*]")

sc = pyspark.SparkContext(conf=conf)

In [2]:
sc

## Iniciando a biblioteca de correção

In [3]:
!pip install git+https://github.com/macielcalebe/insperautograding.git

Collecting git+https://github.com/macielcalebe/insperautograding.git
  Cloning https://github.com/macielcalebe/insperautograding.git to /tmp/pip-req-build-99fiyyiw
  Running command git clone --filter=blob:none --quiet https://github.com/macielcalebe/insperautograding.git /tmp/pip-req-build-99fiyyiw
  Resolved https://github.com/macielcalebe/insperautograding.git to commit acdda51152774d9e1a979b426e41daa7a8a7793c
  Preparing metadata (setup.py) ... [?25ldone


In [4]:
import insperautograder.jupyter as ia
from dotenv import load_dotenv
load_dotenv()

True

In [6]:
ia.tasks()

|    | Atividade            | De                        | Até                       |
|---:|:---------------------|:--------------------------|:--------------------------|
|  0 | newborn              | 2024-02-01 03:00:00+00:00 | 2024-05-30 03:00:00+00:00 |
|  1 | select01             | 2024-02-07 03:00:00+00:00 | 2024-02-19 02:59:59+00:00 |
|  2 | ddl                  | 2024-02-26 03:00:00+00:00 | 2024-03-03 02:59:59+00:00 |
|  3 | dml                  | 2024-02-28 03:00:00+00:00 | 2024-03-05 02:59:59+00:00 |
|  4 | agg_join             | 2024-03-04 03:00:00+00:00 | 2024-03-09 02:59:59+00:00 |
|  5 | group_having         | 2024-03-06 03:00:00+00:00 | 2024-03-11 02:59:59+00:00 |
|  6 | views                | 2024-03-11 03:00:00+00:00 | 2024-03-20 02:59:59+00:00 |
|  7 | sql_review1          | 2024-03-13 03:00:00+00:00 | 2024-03-20 02:59:59+00:00 |
|  8 | permissions          | 2024-03-20 03:00:00+00:00 | 2024-03-26 02:59:59+00:00 |
|  9 | ai_md_23_2           | 2024-03-25 03:00:00+00:00 | 2024-04-01 15:00:00+00:00 |
| 10 | ai_md_23_1           | 2024-03-25 03:00:00+00:00 | 2024-04-01 15:00:00+00:00 |
| 11 | ai_md_24_1           | 2024-04-01 03:00:00+00:00 | 2024-04-01 18:35:00+00:00 |
| 12 | desafio_normalizacao | 2024-04-08 03:00:00+00:00 | 2024-05-02 02:59:59+00:00 |
| 13 | triggers             | 2024-04-22 03:00:00+00:00 | 2024-04-29 02:59:59+00:00 |
| 14 | functional           | 2024-05-06 03:00:00+00:00 | 2024-05-18 02:59:59+00:00 |
| 15 | spark                | 2024-05-08 03:00:00+00:00 | 2024-05-21 02:59:59+00:00 |
| 16 | exercicios_spark     | 2024-05-13 03:00:00+00:00 | 2024-05-25 02:59:59+00:00 |

In [7]:
ia.grades(task="exercicios_spark")

|    | Atividade        | Exercício   |   Peso |   Nota |
|---:|:-----------------|:------------|-------:|-------:|
|  0 | exercicios_spark | ex01        |      1 |     10 |
|  1 | exercicios_spark | ex02        |      1 |     10 |
|  2 | exercicios_spark | ex03        |      1 |     10 |
|  3 | exercicios_spark | ex04        |      1 |     10 |
|  4 | exercicios_spark | ex05        |      1 |     10 |
|  5 | exercicios_spark | ex06        |      1 |      0 |

In [7]:
ia.grades(by="task")

|    | Tarefa               |   Nota |
|---:|:---------------------|-------:|
|  0 | agg_join             |   10   |
|  1 | ai_md_23_1           |   10   |
|  2 | ai_md_23_2           |   10   |
|  3 | ai_md_24_1           |    7.5 |
|  4 | ddl                  |   10   |
|  5 | desafio_normalizacao |   10   |
|  6 | dml                  |   10   |
|  7 | exercicios_spark     |    0   |
|  8 | functional           |   10   |
|  9 | group_having         |   10   |
| 10 | newborn              |   10   |
| 11 | permissions          |   10   |
| 12 | select01             |   10   |
| 13 | spark                |   10   |
| 14 | sql_review1          |   10   |
| 15 | triggers             |    0   |
| 16 | views                |   10   |

## Trabalhando com Spark

Para este exercicio vamos trabalhar com o dataset de reviews da Amazon visto em https://www.kaggle.com/datasets/kritanjalijain/amazon-reviews. Baixe o arquivo "train.csv"

Vamos ler o arquivo "train.csv" em um RDD.

In [10]:
rdd = sc.textFile("train.csv")

In [11]:
rdd.take(1)

['"2","Stuning even for the non-gamer","This sound track was beautiful! It paints the senery in your mind so well I would recomend it even to people who hate vid. game music! I have played the game Chrono Cross but out of all of the games I have ever played it has the best music! It backs away from crude keyboarding and takes a fresher step with grate guitars and soulful orchestras. It would impress anyone who cares to listen! ^_^"']

De acordo com a documentação deste arquivo vista no Kaggle, cada linha contem 2 elementos: o sentimento do review (1 - negativo, 2 - positivo), o título e o corpo do review. A linha contem esses elementos em um formato "comma-separated value" (CSV), onde cada um dos campos está delimitado por aspas duplas. Se o texto em si (titulo ou corpo) contem aspas, elas aparecem como um par de aspas duplas. Vamos usar o `.filter()` para achar um exemplo desses.

In [12]:
example_line = rdd.filter(lambda x: '""' in x).take(1)
example_line = example_line[0]

example_line

'"2","Amazing!","This soundtrack is my favorite music of all time, hands down. The intense sadness of ""Prisoners of Fate"" (which means all the more if you\'ve played the game) and the hope in ""A Distant Promise"" and ""Girl who Stole the Star"" have been an important inspiration to me personally throughout my teen years. The higher energy tracks like ""Chrono Cross ~ Time\'s Scar~"", ""Time of the Dreamwatch"", and ""Chronomantique"" (indefinably remeniscent of Chrono Trigger) are all absolutely superb as well.This soundtrack is amazing music, probably the best of this composer\'s work (I haven\'t heard the Xenogears soundtrack, so I can\'t say for sure), and even if you\'ve never played the game, it would be worth twice the price to buy it.I wish I could give it 6 stars."'

Levando isso em consideração, vamos fazer uma função simples para separar os campos:

In [13]:
def parse_line(line):
    parts = line[1:-1].split('","')
    sentiment = int(parts[0])
    title = parts[1].replace('""', '"')
    body = parts[2].replace('""', '"')
    return (sentiment, title, body)

In [14]:
parse_line(example_line)

(2,
 'Amazing!',
 'This soundtrack is my favorite music of all time, hands down. The intense sadness of "Prisoners of Fate" (which means all the more if you\'ve played the game) and the hope in "A Distant Promise" and "Girl who Stole the Star" have been an important inspiration to me personally throughout my teen years. The higher energy tracks like "Chrono Cross ~ Time\'s Scar~", "Time of the Dreamwatch", and "Chronomantique" (indefinably remeniscent of Chrono Trigger) are all absolutely superb as well.This soundtrack is amazing music, probably the best of this composer\'s work (I haven\'t heard the Xenogears soundtrack, so I can\'t say for sure), and even if you\'ve never played the game, it would be worth twice the price to buy it.I wish I could give it 6 stars.')

Podemos agora utilizar nossa função para separar os campos de cada linha do dataset. 

In [15]:
rdd_split = rdd.map(parse_line)

Como de costume, nada realmente acontece até que uma "action" seja invocada. O `.map()` é uma "transformation". Vamos usar uma action simples para "materializar" o novo RDD.

In [14]:
rdd_split.count()

1089682

Vamos explorar os resultados para ver se deu certo

In [15]:
rdd_split.take(1)

[(2,
  'Stuning even for the non-gamer',
  'This sound track was beautiful! It paints the senery in your mind so well I would recomend it even to people who hate vid. game music! I have played the game Chrono Cross but out of all of the games I have ever played it has the best music! It backs away from crude keyboarding and takes a fresher step with grate guitars and soulful orchestras. It would impress anyone who cares to listen! ^_^')]

**Atividade**: Implemente uma função que recebe o rdd processado e conte quantos sentimentos diferentes existem, e quantas vezes aparecem, para confirmar que só tem os sentimentos 1 e 2. Sua função deve retornar o resultado em tuplas, onde o primeiro elemento é o sentimento e o segundo é a contagem de vezes que aparece.

In [20]:
def ex01(rdd_split):
    # Mapeia cada registro para extrair o sentimento (assumindo que o sentimento é o primeiro item da tupla)
    sentiment_count = rdd_split.map(lambda x: (x[0], 1))
    
    # Agrega as contagens por chave (sentimento)
    sentiment_count = sentiment_count.reduceByKey(lambda a, b: a + b)
    
    # Coleta os resultados usando collect para todos os elementos
    result = sentiment_count.take(1)
    
    return result

ex01(rdd_split)

[(1, 539050)]

In [21]:
ia.sender(answer="ex01", task="exercicios_spark", question="ex01", answer_type="pycode")

interactive(children=(Button(description='Enviar ex01', style=ButtonStyle()), Output()), _dom_classes=('widget…

**Atividade**: Implemente uma função que recebe o rdd processado e retorna quantos reviews não tem titulo.

In [25]:
def ex02(rdd_split):
    # Filtra os registros onde o título é vazio ou contém apenas espaços
    no_title_count = rdd_split.filter(lambda x: not x[1].strip()).count()
    
    return no_title_count

ex02(rdd_split)

19

In [23]:
ia.sender(answer="ex02", task="exercicios_spark", question="ex02", answer_type="pycode")

interactive(children=(Button(description='Enviar ex02', style=ButtonStyle()), Output()), _dom_classes=('widget…

**Atividade**: Implemente uma função que recebe o rdd processado e retorna quantos reviews não tem corpo.

In [26]:
def ex03(rdd_split):
    # Filtra os registros onde o corpo é vazio ou contém apenas espaços
    no_body_count = rdd_split.filter(lambda x: not x[2].strip()).count()
    
    return no_body_count

ex03(rdd_split)

0

In [27]:
ia.sender(answer="ex03", task="exercicios_spark", question="ex03", answer_type="pycode")

interactive(children=(Button(description='Enviar ex03', style=ButtonStyle()), Output()), _dom_classes=('widget…

**Atividade**: Implemente uma função que recebe o rdd processado e retorna qual o comprimento máximo de um título e de um corpo. O resultado deve ser uma tupla com os dois valores.

In [28]:
def ex04(rdd_split):
    # Mapeia o RDD para extrair os comprimentos dos títulos e corpos
    lengths = rdd_split.map(lambda x: (len(x[1].strip()), len(x[2].strip())))
    
    # Encontra o máximo para títulos e corpos
    max_title_length = lengths.map(lambda x: x[0]).max()
    max_body_length = lengths.map(lambda x: x[1]).max()
    
    # Retorna uma tupla com os comprimentos máximos
    return (max_title_length, max_body_length)
    
ex04(rdd_split)

(128, 1010)

In [29]:
ia.sender(answer="ex04", task="exercicios_spark", question="ex04", answer_type="pycode")

interactive(children=(Button(description='Enviar ex04', style=ButtonStyle()), Output()), _dom_classes=('widget…

**Atividade**: Implemente uma função que recebe o rdd processado e retorna qual a maior palavra palíndroma sem pontuações do dataset (no titulo ou corpo) e seu tamanho. Para este exercício, está permitido o uso de list comprehensions.

In [31]:
import string
string.punctuation

'!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~'

In [35]:
def ex05(rdd_split):
    import re  # Importa re dentro da função para uso local

    def is_palindrome(word):
        """ Verifica se a palavra é uma palíndroma. """
        return word.lower() == word.lower()[::-1]

    def find_largest_palindrome(text):
        # Cria uma tabela de tradução que mapeia cada caracter de pontuação para None
        translator = str.maketrans('', '', string.punctuation)
        # Remove pontuações usando translate e divide em palavras
        words = text.translate(translator).split()
        # Filtra palavras que são palíndromos e não são vazias
        palindromes = [word for word in words if is_palindrome(word) and word]
        # Encontra a maior palavra palíndroma, se existir
        if palindromes:
            return max(palindromes, key=len)
        return ""

    # Extrai as maiores palavras palíndromas de títulos e corpos
    largest_palindromes = rdd_split.flatMap(lambda x: [find_largest_palindrome(x[1]), find_largest_palindrome(x[2])])

    # Filtra para remover strings vazias
    largest_palindromes = largest_palindromes.filter(lambda x: x)

    # Encontra a maior palavra palíndroma em todo o RDD
    largest_palindrome = largest_palindromes.max(key=len)

    # Retorna a maior palavra palíndroma e seu comprimento
    return (largest_palindrome, len(largest_palindrome))

ex05(rdd_split)

('Zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz', 60)

In [36]:
ia.sender(answer="ex05", task="exercicios_spark", question="ex05", answer_type="pycode")

interactive(children=(Button(description='Enviar ex05', style=ButtonStyle()), Output()), _dom_classes=('widget…

**Atividade**: Implemente uma função que recebe o rdd processado e retorna as 20 palavras mais populares do titulo com sua frequência absoluta. Teste no subconjunto apresentado abaixo.

In [16]:
def ex06(rdd_split):
    import re  # Importa re dentro da função para uso local
    import string

    def clean_and_split(text):
        # Cria uma tabela de tradução que mapeia cada caracter de pontuação para None
        # translator = str.maketrans('', '', string.punctuation)
        # Remove pontuações e converte texto para minúsculas
        # cleaned_text = text.translate(translator)  # Assegura que tudo seja minúsculo
        # Divide o texto em palavras
        return text.split()

    # Mapeia cada título para palavras, limpa e conta as palavras
    word_counts = rdd_split.flatMap(lambda x: clean_and_split(x[1])) \
                           .map(lambda word: (word, 1)) \
                           .reduceByKey(lambda a, b: a + b)

    # Ordena as palavras pela frequência (descendente) e pega as 20 mais frequentes
    top_20_words = word_counts.takeOrdered(20, key=lambda x: -x[1])

    return top_20_words

In [17]:
rdd_redux = rdd_split.sample(False, 0.05, 7)
ex06(rdd_split)

[('the', 110241),
 ('of', 78235),
 ('a', 77274),
 ('for', 61538),
 ('and', 58181),
 ('A', 56654),
 ('to', 53885),
 ('Great', 48070),
 ('is', 45771),
 ('I', 45705),
 ('The', 40553),
 ('Not', 39973),
 ('this', 37847),
 ('not', 37167),
 ('it', 32156),
 ('book', 29488),
 ('but', 28517),
 ('in', 26507),
 ('good', 26288),
 ('Good', 24830)]

In [18]:
ia.sender(answer="ex06", task="exercicios_spark", question="ex06", answer_type="pycode")

interactive(children=(Button(description='Enviar ex06', style=ButtonStyle()), Output()), _dom_classes=('widget…

## Conferir notas

In [19]:
ia.tasks()

|    | Atividade            | De                        | Até                       |
|---:|:---------------------|:--------------------------|:--------------------------|
|  0 | newborn              | 2024-02-01 03:00:00+00:00 | 2024-05-30 03:00:00+00:00 |
|  1 | select01             | 2024-02-07 03:00:00+00:00 | 2024-02-19 02:59:59+00:00 |
|  2 | ddl                  | 2024-02-26 03:00:00+00:00 | 2024-03-03 02:59:59+00:00 |
|  3 | dml                  | 2024-02-28 03:00:00+00:00 | 2024-03-05 02:59:59+00:00 |
|  4 | agg_join             | 2024-03-04 03:00:00+00:00 | 2024-03-09 02:59:59+00:00 |
|  5 | group_having         | 2024-03-06 03:00:00+00:00 | 2024-03-11 02:59:59+00:00 |
|  6 | views                | 2024-03-11 03:00:00+00:00 | 2024-03-20 02:59:59+00:00 |
|  7 | sql_review1          | 2024-03-13 03:00:00+00:00 | 2024-03-20 02:59:59+00:00 |
|  8 | permissions          | 2024-03-20 03:00:00+00:00 | 2024-03-26 02:59:59+00:00 |
|  9 | ai_md_23_2           | 2024-03-25 03:00:00+00:00 | 2024-04-01 15:00:00+00:00 |
| 10 | ai_md_23_1           | 2024-03-25 03:00:00+00:00 | 2024-04-01 15:00:00+00:00 |
| 11 | ai_md_24_1           | 2024-04-01 03:00:00+00:00 | 2024-04-01 18:35:00+00:00 |
| 12 | desafio_normalizacao | 2024-04-08 03:00:00+00:00 | 2024-05-02 02:59:59+00:00 |
| 13 | triggers             | 2024-04-22 03:00:00+00:00 | 2024-04-29 02:59:59+00:00 |
| 14 | functional           | 2024-05-06 03:00:00+00:00 | 2024-05-18 02:59:59+00:00 |
| 15 | spark                | 2024-05-08 03:00:00+00:00 | 2024-05-21 02:59:59+00:00 |
| 16 | exercicios_spark     | 2024-05-13 03:00:00+00:00 | 2024-05-25 02:59:59+00:00 |

In [20]:
ia.grades(task="exercicios_spark")

|    | Atividade        | Exercício   |   Peso |   Nota |
|---:|:-----------------|:------------|-------:|-------:|
|  0 | exercicios_spark | ex01        |      1 |     10 |
|  1 | exercicios_spark | ex02        |      1 |     10 |
|  2 | exercicios_spark | ex03        |      1 |     10 |
|  3 | exercicios_spark | ex04        |      1 |     10 |
|  4 | exercicios_spark | ex05        |      1 |     10 |
|  5 | exercicios_spark | ex06        |      1 |     10 |