# Apache Spark

## Instalando o ambiente

### Opção 1 - Docker

O jeito mais simples de começar a trabalhar com Spark é instalar um container com tudo pronto! No site https://hub.docker.com/r/jupyter/pyspark-notebook vemos uma imagem Docker que já vem com `pyspark` e `jupyter lab`. Instale a imagem com o comando:

```bash
docker pull jupyter/pyspark-notebook
```


Vamos iniciar o ambiente de trabalho com o comando `docker run`. Para isso precisamos tomar alguns cuidados:

1) Temos que mapear nosso diretorio local de trabalho para um diretório interno do container, de modo que alterações feitas dentro do container (nesta pasta escolhida) sejam gravadas no nosso diretorio local. No container temos um usuário padrão com *username* `jovyan`. No *homedir* desse usuario temos uma pasta vazia `work`, que vai servir como local de mapeamento do nosso diretorio local de trabalho. Podemos então fazer esse mapeamendo com a opção `-v` do comando `docker run` da seguinte forma:

```bash
-v <diretorio>:/home/jovyan/work
```

onde `<diretorio>` representa seu diretorio local de trabalho.

2) Para acessar o `jupyter notebook` e o *dashboard* do Spark a partir do nosso *browser* favorito temos que abrir algumas portas do container com a opção `-p`. As portas são `8888` (para o próprio `jupyter notebook`) e `4040` (para o *dashboard* do Spark). Ou seja, adicionaremos às opções do `docker run`o seguinte:

```bash
-p 8888:8888 -p 4040:4040
```

Desta forma, ao acessar `localhost:8888` na nossa máquina, estaremos acessando o servidor Jupyter na porta 8888 interna do container.

3) Vamos iniciar o container no modo interativo, e vamos especificar que o container deve ser encerrado ao fechar o servidor Jupyter. Faremos isso com as opções `-it` e `-rm`

Antes de executar, garanta que as portas 4040 e 8888 estão livres (sem jupyter já executando) ou altere o comando. Ainda, esteja na pasta da aula ao executar, assim apenas ela será exposta ao container.

Portanto, o comando completo que eu uso na minha máquina Linux para iniciar o container é:

```bash
docker run \
    -it \
    --rm \
    -p 8888:8888 \
    -p 4040:4040 \
    -v "`pwd`":/home/jovyan/work \
    jupyter/pyspark-notebook

```

Se estiver no Windows estes comandos, utilize:

- No Powershell: `docker run -it --rm -p 8888:8888 -p 4040:4040 -v ${PWD}:/home/jovyan/work jupyter/pyspark-notebook`

- No Prompt de comando: `docker run -it --rm -p 8888:8888 -p 4040:4040 -v %cd%:/home/jovyan/work jupyter/pyspark-notebook`

Para facilitar a vida eu coloco esse comando em um arquivo `inicia.sh`. Engenheiros, façam do jeito que preferirem!

Agora abra esse notebook lá no container utilizando o link **com o token** que é exibido ao executar o comando!

**Importante:** Se você já estiver visualizando esse notebook via Jupyter, pode ser necessário encerrar o processo para que o comando acima funcione.

Vamos agora configurar o insperautograder para que funcione dentro do container do Docker. Primeiro, instale e importe a biblioteca:

In [None]:
# !pip install git+https://github.com/macielcalebe/insperautograding.git

# import insperautograder.jupyter as ia
# from dotenv import load_dotenv

Copie o arquivo .env para dentro da pasta que está vinculada ao container e veja se está funcionando:

In [None]:
# load_dotenv()
# ia.tasks()

In [None]:
# ia.grades(task="spark")

### Opção 2 - Instalar `pyspark` diretamente

Caso já tenha as dependências em sua máquina, faça:

In [None]:
# !pip install pyspark

## Iniciando o Spark

Vamos iniciar o ambiente Spark. Para isso vamos:

1) Criar um objeto de configuração do ambiente Spark. Nossa configuração será simples: vamos especificar que o nome da nossa aplicação Spark é "Minha aplicação", e que o *master node* é a máquina local, usando todos os *cores* disponíveis. Aplicações reais de Spark são configuradas de modo ligeiramente diferente: ao especificar o *master node* passamos uma URL real, com o endereço do nó gerente do *cluster* Spark.

2) Vamos criar um objeto do tipo `SparkContext` com essa configuração

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("MinhaAplicacao").getOrCreate()
sc = spark.sparkContext

O `SparkContext` é a nossa porta de entrada para o cluster Spark, ele será a raiz de todas as nossas operações com o Spark.

In [None]:
sc

O link acima para a Spark UI provavelmente não funcionará porque ele se refere à porta 4040 interna do container (portanto a URL está com endereço interno). Porém fizemos o mapeamento da porta 4040 interna para a porta 4040 externa, logo você pode acessar o *dashboard* do Spark para monitorar seus *jobs* no endereço http://localhost:4040

<center><img src="./img/spark_dashboard.png" width=800/></center>

Pronto, assim você vai conseguir testar seus programas com facilidade!

## Trabalhando com RDDs

Esse é o jeito "Apache raiz": Resilient Distributed Datasets (RDDs). Este é o principal objeto de processamento do Spark.

Um RDD é criado à partir do objeto `SparkContext`. Por exemplo:

In [None]:
rdd = sc.parallelize([1, 2, 3, 4, 5])

Vamos recuperar dois elementos:

In [None]:
rdd.take(2)

Um RDD também pode ser criado a partir de um dataset (claro!):

In [None]:
rdd = sc.textFile("data/memorias.txt")

Veja que o rdd pode ser particionado

In [None]:
print("Quantidade de partições: ", rdd.getNumPartitions())

Podemos imaginar um RDD como uma coleção de itens, similar a uma grande lista. O que vem em cada item depende do arquivo original de dados. Para arquivos de texto, cada linha é um item.

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.html e sobre os RDDs em https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html

### Açoes e Transformações

O objeto `rdd` é repleto de métodos para definir pipelines computacionais. Estes métodos se dividem em *actions* e *transformations*.

*Transformations* são métodos que atuam no RDD e devolvem um "novo" RDD que está conectado ao RDD antigo. Por exemplo, vamos usar a *transformation* `map` para inverter a sequência de letras em cada linha: 

In [None]:
def inverte_linha(linha):
    return linha[::-1]

rdd2 = rdd.map(inverte_linha)
rdd2

Se você consultar a Web UI do Spark (http://localhost:4040) vai ver que nada ainda aconteceu. Isto é assim porque o Spark é *lazy*: a computação só acontece quando um resultado de uma sequência de *transformations* é demandado por uma *action*.

As *actions* são métodos que retornam dados para o seu programa. Por exemplo, a *action* `count` retorna o número de itens no RDD, e a *action* `take` permite coletar alguns itens no inicio do RDD, bem útil para debugar:

In [None]:
rdd.count()

Confira a Web UI e veja que agora temos um novo job completo!

Vamos espiar as primeiras 20 linhas do documento original:

In [None]:
rdd.take(20)

E agora as 20 primeiras linhas após a inversão de linha:

In [None]:
rdd2.take(20)

Vamos discutir algumas açoes e transformações uteis.

#### `map`

A transformação `map` recebe uma função e aplica esta função a cada item do RDD. A função deve receber um item e retornar um unico item. Como exemplo temos o uso de `map` acima para inverter a ordem das letras de cada linha.

Note que no `map`, o RDD resultante tem o mesmo numero de elementos do RDD original.

**Exercicio 1**: Crie uma função chamada conta_letras que recebe um rdd e retorna o rdd transformado onde cada linha tenha a contagem de letras da linha recebida. Por exemplo, a linha "abacaxi" deve ser transformada em 7.

In [12]:
import insperautograder.jupyter as ia
from dotenv import load_dotenv

In [13]:
# resultado_esperado: [72, 0, 74, 67, 74, 67, 76, 79, 0, 38, 0, 24, 0, 41, 0, 20, 0, 0, 77, 0 ...

def conta_letras(rdd):
    rdd = rdd.map(lambda x: len(x))
    return rdd # Pode alterar o retorno!

conta_letras(rdd).take(20)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2) (10.102.18.13 executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:701)
	at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:745)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:698)
	at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:663)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:639)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:585)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:543)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 17 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:181)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:701)
	at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:745)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:698)
	at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:663)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:639)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:585)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:543)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 17 more


In [14]:
ia.sender(answer="conta_letras", task="spark", question="ex01", answer_type="pycode")

interactive(children=(Button(description='Enviar ex01', style=ButtonStyle()), Output()), _dom_classes=('widget…

**Exercício 2**: Crie uma função chamada transforma_minusculas que recebe um rdd e retorna o rdd transformado onde cada linha tenha todas as letras em minúsculas. Por exemplo, a linha "AbaCaxi HOJE" deve ser transformada em "abacaxi hoje".

In [15]:
def transforma_minusculas(rdd):
    rdd = rdd.map(lambda x: x.lower())
    return rdd # Pode alterar o retorno!
    
transforma_minusculas(rdd).take(20)

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "c:\Users\julia\AppData\Local\Programs\Python\Python310\lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "c:\Users\julia\AppData\Local\Programs\Python\Python310\lib\site-packages\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "c:\Users\julia\AppData\Local\Programs\Python\Python310\lib\socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [16]:
ia.sender(answer="transforma_minusculas", task="spark", question="ex02", answer_type="pycode")

interactive(children=(Button(description='Enviar ex02', style=ButtonStyle()), Output()), _dom_classes=('widget…

#### `flatMap`

Esta transformação é similar ao `map`, porém a função passada para o `flatMap` pode retornar zero ou mais itens. Estes itens então são concatenados e o RDD resultante acaba tendo um número de itens diferente do RDD original.

Por exemplo: suponha que queremos gerar uma lista de palavras do documento. Podemos fazê-lo da seguinte forma:

In [None]:
def separa_palavras(linha):
    return linha.strip().split()

Observe o resultado da aplicação da função `separa_palavras` com `map`:

In [None]:
rdd_map = rdd.map(lambda x: x.lower()).map(separa_palavras)
rdd_map.take(30)

E o resultado com `flatMap`, que é o resultado desejado:

In [None]:
rdd_flatMap = rdd.map(lambda x: x.lower()).flatMap(separa_palavras)
rdd_flatMap.take(30)

Veja a contagem de palavras no `rdd_flatMap`:

In [None]:
rdd_flatMap.count()

Compare com o numero de itens no RDD original:

In [None]:
rdd.count()

**Exercício 3**: Crie uma função chamada gera_bigramas que recebe um rdd e retorna o rdd transformado onde cada linha contém uma **tupla** com um *bigrama*: sequências de duas palavras consecutivas.

Gere cada bigrama como uma **tupla** (p[i], p[i+1]). 

Sua função deve utilizar o comando `flatMap`. Para esse exercício é permitido utilizar um comando for dentro do flatmap. Por que isso não prejudica a performance?

Além disso, teremos um pequeno problema nos bigramas - que problema é esse?

In [17]:
def gera_bigramas(rdd):
    rdd = rdd.map(lambda x: x.lower())
    
    # Create subfunction to mount bigrams per line
    def create_bigrams_per_line(line):
        # Split line into words
        words = line.split()
        # Create bigrams
        bigrams = [(words[i], words[i+1]) for i in range(len(words) - 1)]
        return bigrams

    return rdd.flatMap(create_bigrams_per_line)

rdd_bigramas = gera_bigramas(rdd)
rdd_bigramas.take(20)

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "c:\Users\julia\AppData\Local\Programs\Python\Python310\lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "c:\Users\julia\AppData\Local\Programs\Python\Python310\lib\site-packages\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "c:\Users\julia\AppData\Local\Programs\Python\Python310\lib\socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [18]:
ia.sender(answer="gera_bigramas", task="spark", question="ex03", answer_type="pycode")

interactive(children=(Button(description='Enviar ex03', style=ButtonStyle()), Output()), _dom_classes=('widget…

O problema é que bigramas que cruzam linhas não serão contabilizados.

#### `filter`

A transformação `filter` recebe uma função que recebe um item e retorna True se o item deve ser mantido, e False se deve ser ignorado. Por exemplo, suponha que temos uma lista de palavras proibidas e queremos manter apenas as palavras permitidas da nossa lista de palavras em `rdd_flatMap`:

In [None]:
rdd_flatMap.take(30)

In [None]:
def eh_palavra_permitida(palavra):
    palavras_proibidas = set(["Braz", "Assis"])
    return palavra not in palavras_proibidas

rdd_limpo = rdd_flatMap.filter(eh_palavra_permitida)
rdd_limpo.take(30)

#### `reduce`

A *action* `reduce` apresenta o mesmo comportamento da operação `reduce` da biblioteca `functools` do Python. Ela recebe uma função de dois argumentos que retorna apenas um valor. O comportamento de `reduce` é aplicar esta função aos vários elementos do RDD de modo sucessivo, visando "reduzir" o RDD inteiro a um único valor, que é então retornado para o programa principal.

Por exemplo, suponha que desejamos calcular a soma dos comprimentos de palavras. Podemos fazer o seguinte:

In [None]:
rdd_flatMap.map(lambda x: len(x)).reduce(lambda x, y: x + y)

Note que como `reduce` é uma *action*, temos um resultado concreto.

**Exercício 4**: Crie uma função chamada maior_palindromo que recebe um rdd com o texto e retorna a maior palavra palíndroma do texto.

In [19]:
def maior_palindromo(rdd):
    # Transform all characters to lowercase
    rdd = rdd.map(lambda x: x.lower())
    # Filter blank lines
    rdd = rdd.filter(lambda x: len(x) > 0)
    # Create subfunction to check if a word is a palindrome
    def is_palindrome(word):
        return word == word[::-1]
    # Filter palindromes
    rdd = rdd.filter(is_palindrome)
    # Filter the blank spaces between words
    rdd = rdd.filter(lambda x: len(x.split()) == 1)
    # Return the longest palindrome
    return rdd.reduce(lambda x, y: x if len(x) > len(y) else y)

maior_palindromo(rdd)

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "c:\Users\julia\AppData\Local\Programs\Python\Python310\lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "c:\Users\julia\AppData\Local\Programs\Python\Python310\lib\site-packages\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "c:\Users\julia\AppData\Local\Programs\Python\Python310\lib\socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [20]:
ia.sender(answer="maior_palindromo", task="spark", question="ex04", answer_type="pycode")

interactive(children=(Button(description='Enviar ex04', style=ButtonStyle()), Output()), _dom_classes=('widget…

### Pares chave-valor

Sempre que o RDD consistir em itens que sejam tuplas de dois elementos, Spark vai considerar que temos um par chave-valor por item. Isso nos permite realizar agregações por chave, abrindo a possibilidade de coletar dados agregados mais interessantes. 

#### `reduceByKey`
Para essa tarefa, a transformação mais importante é a `reduceByKey`. Esta operação realiza uma agregação por chave (gerando uma lista de valores para aquela chave), e realiza uma redução da lista dos valores associados à chave.

Por exemplo, o *hello world* do Spark: o conta-palavras:

In [None]:
rdd_flatMap \
    .map(lambda x: (x, 1)) \
    .reduceByKey(lambda x, y: x + y) \
    .takeOrdered(10, lambda x: -x[1])

**Exercício 5**: Crie uma função chamada conta_bigramas que recebe um rdd com as tuplas de bigramas e retorna pares chave e valor dos 10 bigramas mais populares.

**Atenção:** Apesar da resposta aparecer como se fossem listas, os elementos apresentados são tuplas em python.

In [21]:
def conta_bigramas(rdd_bigramas):
    # Count the number of occurrences of each bigram
    rdd = rdd_bigramas.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
    # Return a new RDD with the number of occurrences of each bigram
    return rdd.takeOrdered(10, lambda x: -x[1])

conta_bigramas(gera_bigramas(rdd))

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "c:\Users\julia\AppData\Local\Programs\Python\Python310\lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "c:\Users\julia\AppData\Local\Programs\Python\Python310\lib\site-packages\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "c:\Users\julia\AppData\Local\Programs\Python\Python310\lib\socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [23]:
ia.sender(answer="conta_bigramas", task="spark", question="ex05", answer_type="pycode")

interactive(children=(Button(description='Enviar ex05', style=ButtonStyle()), Output()), _dom_classes=('widget…

#### `join`

A transformação `join` realiza o *inner join* de dois RDDs. Por exemplo:

In [None]:
rdd_a = sc.parallelize([("a", 10), ("b", 20), ("b", 30)])
rdd_b = sc.parallelize([("a", "banana"), ("a", "abacate"), ("b", "chuchu"), ("b", "tomate")])
rdd_c = rdd_a.join(rdd_b)

rdd_c.collect()

**Desafio 1**: Crie uma função chamada desafio1 que recebe um rdd e o transforma de modo que para cada bigrama, temos a seguinte informação:

(bigrama, contagem_bigrama, contagem_palavra_1, contagem_palavra_2)

**Atenção:** Apesar da resposta aparecer como se fossem listas, os elementos apresentados são tuplas em python.

In [26]:
def desafio1(rdd):
     # Transform all characters to lowercase
    rdd = rdd.map(lambda x: x.lower())
    # Filter blank lines
    rdd = rdd.filter(lambda x: len(x) > 0)

    def separate_word(linha):
        # Split `linha` into words
        return linha.strip().split()
    
    # Separate words
    rdd_words = rdd.flatMap(separate_word)
    # Count the number of occurrences of each word
    rdd_words = rdd_words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
    # Create a dictionary with the number of occurrences of each word
    words = rdd_words.collectAsMap()

    # Create subfunction to generate bigrams in a line
    def generate_bigrams_line(line):
        # Split line into words
        words = line.split()
        # Create bigrams
        bigrams = [(words[i], words[i+1]) for i in range(len(words) - 1)]
        return bigrams
    
    rdd = rdd.flatMap(generate_bigrams_line)

    # Count the number of occurrences of each bigram
    rdd = rdd.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)

    # Get the number of occurrences of each bigram in `rdd_words`
    rdd = rdd.map(lambda x: (x[0], x[1], words[x[0][0]], words[x[0][1]]))
    return rdd 

desafio1(rdd)

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "c:\Users\julia\AppData\Local\Programs\Python\Python310\lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "c:\Users\julia\AppData\Local\Programs\Python\Python310\lib\site-packages\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "c:\Users\julia\AppData\Local\Programs\Python\Python310\lib\socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [27]:
ia.sender(answer="desafio1", task="spark", question="desafio1", answer_type="pycode")

interactive(children=(Button(description='Enviar desafio1', style=ButtonStyle()), Output()), _dom_classes=('wi…

**Desafio 2**: Faça uma função chamada desafio2 que recebe um RDD no formato retornado pela função desafio1 e constrói um RDD com a seguinte informação:

`(bigrama, (contagem_bigrama) /((contagem_palavra_1 + K) * (contagem_palavra_2 + K)))`

para K = 10

Por fim, retorne os 15 bigramas com maior valor associado.

In [28]:
def desafio2(rdd_d1):
    K = 10
    rdd_d1 = rdd_d1.map(lambda x: (x[0], (x[1] /((x[2] + K) * (x[3] + K)))))
    return rdd_d1.takeOrdered(15, lambda x: -x[1])

desafio2(desafio1(rdd))

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "c:\Users\julia\AppData\Local\Programs\Python\Python310\lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "c:\Users\julia\AppData\Local\Programs\Python\Python310\lib\site-packages\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "c:\Users\julia\AppData\Local\Programs\Python\Python310\lib\socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [29]:
ia.sender(answer="desafio2", task="spark", question="desafio2", answer_type="pycode")

interactive(children=(Button(description='Enviar desafio2', style=ButtonStyle()), Output()), _dom_classes=('wi…

## Gabarito

**<div id="gab_ex3">**Exercício 3**</div>**
<div class="alert alert-warning">

```python
def gera_bigramas(rdd):
    return rdd.map(lambda x: x.lower()) \
        .map(lambda x: x.split()) \
        .flatMap(lambda x: [(x[i], x[i+1]) for i in range(len(x)-1)])
```
    
</div>

**<div id="gab_ex4">**Exercício 5**</div>**
<div class="alert alert-warning">

```python
def conta_bigramas(rdd_bigramas):
    rdd_conta_bigramas = rdd_bigramas.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
    return rdd_conta_bigramas.takeOrdered(10, lambda x: -x[1])
```
    
</div>