# <span style="color:blue">MBA em Ciência de Dados</span>
# <span style="color:blue">Análise de Dados com Base em Processamento Massivo em Paralelo</span>

## <span style="color:blue">Aula 06: Processamento Paralelo e Distribuído</span>
## <span style="color:blue">Apache Spark RDD</span>

**Material Produzido por:**<br>
>**Profa. Dra. Cristina Dutra de Aguiar Ciferri**<br>
>**André Marcos Perez**<br> 

**CEMEAI - ICMC/USP São Carlos**

## **1 Introdução**

O objetivo deste notebook é apresentar conceitos relacionados ao Apache Spark RDD. Antes de apresentar esses conceitos, a seguir são definidas as principais características do *framework* Apache Spark.

O *framework* Apache Spark:

- Executa sobre o sistema de arquivos distribuídos **HDFS** (*Hadoop Distributed File System*). 

- Incorpora e estende os conceitos relacionados ao modelo de programação funcional **MapReduce**. Em especial, o *framework* Apache Spark introduz diversas operações que podem ser executadas sobre os RDDs, além da possibilidade de criação de dois tipos de variáveis compartilhadas. 

- É baseado no uso de **conjuntos de dados distribuídos e resilientes, RDDs** (*Resilient and Distributed Datasets*). RDDs são abstrações que representam blocos de dados que podem ser reconstruídos em caso de falhas. Eles possibilitam o armazenamento dos resultados intermediários em memória primária sempre que possível. Como resultado, o uso de RDDs diminui o número de acessos a disco.

- Possibilita o **agendamento de tarefas na forma de grafos acíclicos e direcionados, DAGs**. O processamento das tarefas consiste de vários estágios, os quais podem ser executados em paralelo caso não haja dependências entre os estágios. Como resultado, o uso de DAGs melhora o desempenho computacional das aplicações. 




## **2 Apache Spark Cluster**

### 2.1 Arquitetura

Um *cluster* Spark é uma arquitetura de sistema distribuído para processamento paralelo composto por três elementos: 

> **Nó mestre**: O nó mestre (*master node*) é o componente responsável por coordenar a execução das tarefas. Um programa Spark possui apenas um nó mestre. Neste nó, roda-se o *driver*, um *software* que distribui as tarefas (*tasks*) entre os nós do *cluster* e obtém os resultados dessas tarefas. Para criar o *driver*, é necessário criar um objeto chamado *SparkContext*, no qual são definidos os recursos computacionais desejados.

> **Nó de trabalho**: O nó de trabalho (*worker node*) é o componente responsável por realizar as tarefas. Um programa Spark possui um ou mais nós de trabalho. Nesses nós, roda-se um ou mais *executors*. Um *executor* é um *software* que recebe as tarefas (*tasks*) do *software* *driver*, armazena os resultados intermediários e retorna os dados gerados quando requisitado.

> **Gerenciador do cluster**: O gerenciador do *cluster* (*cluster manager*) é o componente responsável por disponibilizar os recursos computacionais (por exemplo: memória e núcleos de processamento) por meio dos *executors* para o *driver*. O Spark é agnóstico quanto ao gerenciador e oferece suporte para os seguintes gerenciadores em sua versão atual (versão 3.0.1): Spark Standalone (gerenciador próprio do Spark), Apache YARN (mais utilizado), Apache Mesos e Kubernetes (muito utilizado em ambientes de computação em nuvem). 

Detalhes adicionais sobre o Apache Spark Cluster podem ser obtidos na documentação oficial do Spark neste [link](https://spark.apache.org/docs/latest/cluster-overview.html#cluster-manager-types).



### 2.2 Criação de uma Aplicação Spark

A criação de uma aplicação Spark é feita da forma descrita a seguir, utilizando como base a Figura 1. 

1. Deve ser criado um objeto *SparkContext* definindo os recursos computacionais desejados. A criação desse objeto instancia o *driver* no **nó mestre**. 

2. O *driver* solicita ao **gerenciador do cluster** um conjunto de *executors* de acordo com os recursos computacionais selecionados.

3.  O **gerenciador do cluster** cria os _executors_ nos **nós de trabalho** e retorna a informação relacionada a essa criação ao *driver*;

4.  O *driver* submete as tarefas (*tasks*) para os *executors* e obtém os resultados retornados por esses *executors*.

<br>

<p align="center"><img src="https://raw.githubusercontent.com/cdaciferri/Figures/main/CreateSparkApp.png" width="600" height="300"></p>
<p align="center">Figura 1. Exemplo ilustrativo da criação de uma aplicação Spark.</p>

<br>

**Exemplo:** O exemplo a seguir cria uma aplicação Spark utilizando as seguintes configurações: (i) *cluster* gerenciado pelo Apache YARN; (ii) 1 *driver* de 1 núcleo de computação e 4 gigabytes de memória; e (iii) 4 *executors* de 2 núcleos de computação e 16 gigabytes de memória cada. O objeto criado é um objeto *SparkContext*. 

```
from pyspark import SparkConf, SparkContext

conf = SparkConf(). \
       setMaster("yarn"). \
       set("spark.driver.cores", "1"). \
       set("spark.driver.memory", "4g"). \
       set("spark.executor.cores", "2"). \
       set("spark.executor.memory", "16g"). \
       set("spark.executor.instances", "4"). \

spark = SparkContext(conf=conf)
```
Uma lista completa de configurações pode ser obtida na documentação oficial do Spark neste [link](https://spark.apache.org/docs/latest/configuration.html).



### 2.3 Instalação

Neste *notebook* é criado um *cluster* Spark composto apenas por um **nó mestre**. Ou seja, o *cluster* não possui um ou mais **nós de trabalho** e o **gerenciador de cluster**. Nessa configuração, as tarefas (*tasks*) são realizadas no próprio *driver* localizado no **nó mestre**.

Para que o cluster possa ser criado, primeiramente é instalado o Java Runtime Environment (JRE) versão 8. 

In [1]:
#instalando Java Runtime Environment (JRE) versão 8
%%capture
!apt-get remove openjdk*
!apt-get update --fix-missing
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

Na sequência, é feito o *download* do Apache Spark versão 3.0.0.

In [2]:
#baixando Apache Spark versão 3.0.0
%%capture
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz
!tar xf spark-3.0.0-bin-hadoop2.7.tgz && rm spark-3.0.0-bin-hadoop2.7.tgz

Na sequência, são configuradas as variáveis de ambiente JAVA_HOME e SPARK_HOME. Isto permite que tanto o Java quanto o Spark possam ser encontrados.

In [3]:
import os
#configurando a variável de ambiente JAVA_HOME
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
#configurando a variável de ambiente SPARK_HOME
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop2.7"

Por fim, são instalados dois pacotes da linguagem de programação Python, cujas funcionalidades são descritas a seguir.

> **Pacote findspark:** Usado para ler a variável de ambiente SPARK_HOME e armazenar seu valor na variável dinâmica de ambiente PYTHONPATH. Como resultado, Python pode encontrar a instalação do Spark. 

> **Pacote pyspark:** PySpark é a API do Python para Spark. Ela possibilita o uso de Python, considerando que o *framework* Apache Spark encontra-se desenvolvido na linguagem de programação Scala. 

In [4]:
%%capture
#instalando o pacote findspark
!pip install -q findspark==1.4.2
#instalando o pacote pyspark
!pip install -q pyspark==3.0.0

### 2.4 Conexão

PySpark não é adicionado ao *sys.path* por padrão. Isso significa que não é possível importá-lo, pois o interpretador da linguagem Python não sabe onde encontrá-lo. 

Para resolver esse aspecto, é necessário instalar o módulo `findspark`. Esse módulo mostra onde PySpark está localizado. Os comandos a seguir têm essa finalidade.


In [5]:
#importando o módulo findspark
import findspark
#carregando a variávels SPARK_HOME na variável dinâmica PYTHONPATH
findspark.init()

Depois de configurados os pacotes e módulos e inicializadas as variáveis de ambiente, é possível criar o objeto *SparkContext*. No comando de criação a seguir, é definido que é utilizado o próprio sistema operacional deste *notebook* como **nó mestre** por meio do parâmetro **local** do método **setMaster**. O complemento do parametro **[*]** indica que são alocados todos os núcleos de processamento disponíveis para o objeto *driver* criado.

In [6]:
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local[*]")
spark = SparkContext(conf=conf)

## **3 API Apache Spark RDD**

Nesta seção, é detalhada a classe `pyspark.RDD`, que está relacionada ao uso de RDDs. Primeiramente, são descritos dois conjuntos de dados usados como base para os exemplos. Depois, os principais conceitos envolvidos na classe pyspark.RDD são introduzidos. 

O detalhamento da classe `pyspark.RDD` pode ser encontrada na documentação oficial do Spark neste [link](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD). Neste notebook são explicados os métodos mais utilizados e também os métodos que são necessários para o desenvolvimento da lista de exercícios.

### 3.1 Conjuntos de Dados

Para descrever a API Apache Spark RDD, são utilizadas dois conjuntos de dados, conforme descrito a seguir:

- `fib`: Uma lista Python contendo os 30 primeiros números da Sequência de Fibonacci. Na matemática, a Sucessão de Fibonacci (ou Sequência de Fibonacci), é uma sequência de números inteiros, começando normalmente por 0 e 1, na qual cada termo subsequente corresponde à soma dos dois anteriores.

- `logs.txt`: Um arquivo texto com 56.481 linhas de *log* de um servidor web. O arquivo fonte pode ser obtido neste ([link](https://github.com/logpai/loghub/tree/master/Apache)).



O comando a seguir cria a lista `fib`.

In [7]:
#criando a lista fib com os 30 primeiros números da Sequência de Fibonacci
def fibonacci(i: int) -> int:
  if i <= 0: raise Exception()
  elif i == 1: return 0
  elif i == 2: return 1
  else: return fibonacci(i-1) + fibonacci(i-2)

fib = [fibonacci(i) for i in range(1, 31)]

In [8]:
#exibindo a lista criada
fib

[0,
 1,
 1,
 2,
 3,
 5,
 8,
 13,
 21,
 34,
 55,
 89,
 144,
 233,
 377,
 610,
 987,
 1597,
 2584,
 4181,
 6765,
 10946,
 17711,
 28657,
 46368,
 75025,
 121393,
 196418,
 317811,
 514229]

O comando a seguir obtém os dados do arquivo texto `logs.txt`.

In [9]:
#obtendo os dados do arquivo texto logs.txt
%%capture
!wget -q https://zenodo.org/record/3227177/files/Apache.tar.gz?download=1 -O logs.tar.gz
!tar xf logs.tar.gz && rm -rf logs.tar.gz
!mv Apache.log logs.txt

In [10]:
#abrindo o arquivo logs.txt no modo leitura (mode="r") 
#exibindo os primeiros 10 registros do arquivo
with open(file="logs.txt", mode="r") as fp: 
  for _ in range(0, 10): print(fp.readline())

[Thu Jun 09 06:07:04 2005] [notice] LDAP: Built with OpenLDAP LDAP SDK

[Thu Jun 09 06:07:04 2005] [notice] LDAP: SSL support unavailable

[Thu Jun 09 06:07:04 2005] [notice] suEXEC mechanism enabled (wrapper: /usr/sbin/suexec)

[Thu Jun 09 06:07:05 2005] [notice] Digest: generating secret for digest authentication ...

[Thu Jun 09 06:07:05 2005] [notice] Digest: done

[Thu Jun 09 06:07:05 2005] [notice] LDAP: Built with OpenLDAP LDAP SDK

[Thu Jun 09 06:07:05 2005] [notice] LDAP: SSL support unavailable

[Thu Jun 09 06:07:05 2005] [error] env.createBean2(): Factory error creating channel.jni:jni ( channel.jni, jni)

[Thu Jun 09 06:07:05 2005] [error] config.update(): Can't create channel.jni:jni

[Thu Jun 09 06:07:05 2005] [error] env.createBean2(): Factory error creating vm: ( vm, )



### 3.2 Criação de RDDs

Existem duas formas de se criar RDDs, conforme descrito a seguir:

- Paralelizando uma coleção de dados já existente no *driver*.

- Referenciando um conjunto de dados armazenado em um sistema de armazenamento externo, como um sistema de arquivo compartilhado, HDFS, HBase, Cassandra, ou qualquer outra fonte de dados que ofereça suporte para o formato de entrada do Hadoop. 

#### Método parallelize()

``parallelize(nameCollection, nPartitions=None)``

Distribui uma coleção de dados em Python para formar um RDD. O parâmetro `nameCollection` indica o nome da coleção, enquanto que o parâmetro `nPartitions` indica o número de partições nas quais os dados da coleção são particionados. Usualmente, Spark seta o número de partições automaticamente, com base no *cluster* sendo utilizado. 


O comando a seguir utiliza o método `parellelize()` para armazenar no RDD chamado fib_rdd o conjunto de dados referente à Sequência de Fibonacci. Uma vez criado, fib_rdd pode ser manipulado em paralelo utilizando comandos que são apresentados ao longo do *notebook*.

In [11]:
fib_rdd = spark.parallelize(fib)

#### Método textFile()

``textFile(nameFile, nPartitions=None, use_unicode=True)``

Lê um arquivo do tipo texto chamado `nameFile`, o qual encontra-se armazenado em um sistema de armazenamento externo, e o retorna como um RDD de *strings*. O segundo parâmetro, `nPartitions`, indica o número de partições nas quais os registros do arquivo são particionados. O último parâmetro se refere ao formato do arquivo texto, cujo padrão é o formato UTF-8. 


O comando a seguir utiliza o método `textFile()` para armazenar no RDD chamado lines_rdd os registros do arquivo de texto `"logs.txt"`. Uma vez criado, lines_rdd pode ser manipulado em paralelo utilizando comandos que são apresentados ao longo do notebook.

In [12]:
lines_rdd = spark.textFile("logs.txt")

### 3.2 Transformações

Transformações são operações que transformam um RDD em outro RDD.  

No Spark, as **transformações** não são executadas imediatamente sobre os dados do RDD. Elas são anexadas ao grafo acíclico direcionado de transformações, o qual é executado apenas quando uma **ação** sobre o RDD em questão for solicitada. Essa característica é conhecida como *lazy-evaluation*. Mais informações sobre essa característica podem ser encontradas neste [link](https://databricks.com/blog/2015/06/22/understanding-your-spark-application-through-visualization.html).

#### Método map()

``map(func)``

Aplica a função `func` sobre todos os elementos do RDD.

Os comandos a seguir definem uma função que multiplica todos os elementos da Sequência de Fibonacci contidos em `fib_rdd` por 10.

In [13]:
#definindo a função separadamente e chamando a função
def multiply_by_ten(element): return element * 10 

fib_rdd. \
  map(multiply_by_ten). \
  collect()

[0,
 10,
 10,
 20,
 30,
 50,
 80,
 130,
 210,
 340,
 550,
 890,
 1440,
 2330,
 3770,
 6100,
 9870,
 15970,
 25840,
 41810,
 67650,
 109460,
 177110,
 286570,
 463680,
 750250,
 1213930,
 1964180,
 3178110,
 5142290]

In [14]:
#definindo a função lambda
fib_rdd. \
  map(lambda element: element * 10). \
  take(5)

[0, 10, 10, 20, 30]

#### Método flatMap()

`flatMap(func, preservesPartitioning=False)`

Aplica a função `func` sobre todos os elementos do RDD e nivela os resultados gerados.

No comando a seguir, aplica-se o método nativo do Python `split`, por meio da transformação do método `map`, sobre os elementos em `line_rdd` para separar as linhas do arquivo `logs.txt` em palavras. Contudo, o método `split` retorna uma lista Python. Portanto, os elementos do novo RDD não são palavras individuais, mas sim listas de palavras.

In [15]:
# aplicando a transformaçao map para transformar linhas em palavras
# o resultado não são palavras individuais, mas sim listas Python de palavras
lines_rdd. \
  map(lambda line: line.split(" ")). \
  take(2)

[['[Thu',
  'Jun',
  '09',
  '06:07:04',
  '2005]',
  '[notice]',
  'LDAP:',
  'Built',
  'with',
  'OpenLDAP',
  'LDAP',
  'SDK'],
 ['[Thu',
  'Jun',
  '09',
  '06:07:04',
  '2005]',
  '[notice]',
  'LDAP:',
  'SSL',
  'support',
  'unavailable']]

Para unir as coleções Python resultantes da transformação provida pelo método `map`, ou seja, para nivelar os elementos do novo RDD, utiliza-se a transformação `flatMap`.

In [16]:
# aplicando a transformaçao flatMap para transformar linhas em palavras
# as listas Python de palavras resultantes são niveladas, gerando um RDD em que cada elemente é uma palavra individual
lines_rdd. \
  flatMap(lambda line: line.split(" ")). \
  take(12)

['[Thu',
 'Jun',
 '09',
 '06:07:04',
 '2005]',
 '[notice]',
 'LDAP:',
 'Built',
 'with',
 'OpenLDAP',
 'LDAP',
 'SDK']

#### Método mapValues()

``mapValues(func)``

Considerando um RDD composto por pares chave-valor (C,V), aplica a função `func` apenas sobre os valores V do RDD.

Nos comandos a seguir, primeiro aplica-se a transformação `map` para criar um RDD composto por pares chave-valor, de forma que chave = valor para cada elemento do RDD. Na sequência, aplica-se a transformação `mapValues` usando uma função `lambda` que mapeia os valores da sequência de Fibonacci em par (*even*) ou ímpar (*odd*).

In [17]:
#produzindo pares na forma chave-valor 
fib_rdd. \
  map(lambda element: (element, element)). \
  mapValues(lambda element: "even" if element % 2 == 0 else "odd"). \
  collect()

[(0, 'even'),
 (1, 'odd'),
 (1, 'odd'),
 (2, 'even'),
 (3, 'odd'),
 (5, 'odd'),
 (8, 'even'),
 (13, 'odd'),
 (21, 'odd'),
 (34, 'even'),
 (55, 'odd'),
 (89, 'odd'),
 (144, 'even'),
 (233, 'odd'),
 (377, 'odd'),
 (610, 'even'),
 (987, 'odd'),
 (1597, 'odd'),
 (2584, 'even'),
 (4181, 'odd'),
 (6765, 'odd'),
 (10946, 'even'),
 (17711, 'odd'),
 (28657, 'odd'),
 (46368, 'even'),
 (75025, 'odd'),
 (121393, 'odd'),
 (196418, 'even'),
 (317811, 'odd'),
 (514229, 'odd')]

#### Método filter()

``filter(func)``

Aplica a função booleana `func` sobre todos os elementos do RDD e retorna apenas os elementos que são verdadeiros.

Os comandos a seguir definem uma função booleana que retorna verdadeiro caso a string `[error]` esteja presente na linha de `lines_rdd`.  Essa função é utilizada pela transformação `filter` para exibir as linhas de `lines_rdd`.

In [18]:
#aplicando o método filter para recuperar as linhas de interesse
def is_error_line(line) -> bool: return True if "[error]" in line else False

lines_rdd. \
  filter(is_error_line). \
  take(15)

['[Thu Jun 09 06:07:05 2005] [error] env.createBean2(): Factory error creating channel.jni:jni ( channel.jni, jni)',
 "[Thu Jun 09 06:07:05 2005] [error] config.update(): Can't create channel.jni:jni",
 '[Thu Jun 09 06:07:05 2005] [error] env.createBean2(): Factory error creating vm: ( vm, )',
 "[Thu Jun 09 06:07:05 2005] [error] config.update(): Can't create vm:",
 '[Thu Jun 09 06:07:05 2005] [error] env.createBean2(): Factory error creating worker.jni:onStartup ( worker.jni, onStartup)',
 "[Thu Jun 09 06:07:05 2005] [error] config.update(): Can't create worker.jni:onStartup",
 '[Thu Jun 09 06:07:05 2005] [error] env.createBean2(): Factory error creating worker.jni:onShutdown ( worker.jni, onShutdown)',
 "[Thu Jun 09 06:07:05 2005] [error] config.update(): Can't create worker.jni:onShutdown",
 '[Thu Jun 09 06:07:20 2005] [error] mod_jk child init 1 0',
 '[Thu Jun 09 07:11:21 2005] [error] [client 204.100.200.22] Directory index forbidden by rule: /var/www/html/',
 '[Thu Jun 09 12:08:5

#### Método join()

``join(RDD²)``

Quando aplicado a um RDD¹ composto por elementos na forma pares chave-valor (C¹,V¹) e, considerando o parâmetro RDD² na forma de pares chave-valor (C²,V²), junta os valores V¹,V² que possuem o mesmo valor de chave (C¹ = C²), gerando  (C¹, (V¹,V²)). O método `join()` combina os valores dos elementos dos RDDs que compartilham a mesma chave dois a dois. Esse método requer uma operação de `suffle`. 

Os comandos a seguir dividem o RDD `fib_rdd` em dois novos RDDs usando o método `filter` e o método `map`. O método `filter` é usado para filtrar os dados do RDD de acordo com os seus valores. No primeiro RDD são considerados valores menores ou iguais a 100, enquanto que no segundo RDD são considerados valores maiores do que 100. O método `map` é usado para mapear o RDD em um RDD do tipo chave-valor, com chave igual a par (even) ou ímpar (odd) de acordo com o valor. 

In [19]:
up_to_100_rdd = fib_rdd. \
                filter(lambda element: element <= 100). \
                map(lambda element: ("even", element) if element % 2 == 0 else ("odd", element))
up_to_100_rdd.take(3)

[('even', 0), ('odd', 1), ('odd', 1)]

In [20]:
greater_than_100_rdd = fib_rdd. \
                       filter(lambda element: element > 100). \
                       map(lambda element: ("even", element) if element % 2 == 0 else ("odd", element))
greater_than_100_rdd.take(4)

[('even', 144), ('odd', 233), ('odd', 377), ('even', 610)]

O comando a seguir aplica o método `join` para juntar os valores que possuem a mesma chave. O resultado produzido é referente a um `inner join`.

In [21]:
up_to_100_rdd. \
  join(greater_than_100_rdd). \
  take(3)

[('even', (0, 144)), ('even', (0, 610)), ('even', (0, 2584))]

#### Método reduceByKey()

``reduceByKey(func,nPartitions=None)``

Quando aplicado a um RDD composto por pares chave-valor (C,V), agrega os valores V, de forma que esses valores são computados dois a dois usando a função `func` e agrupados de acordo com a chave C. O segundo parâmetro, `nPartitions`, possibilita que o número de tarefas *reduce* seja configurado. Esse método requer uma operação de `suffle`. 

Os comandos a seguir analisam os elementos de `fib_rdd`, retornando a quantidade de elementos que possuem valores menores do que 10 e a quantidade de elementos que possuem valores maiores ou iguais a 10. 

Note que a entrada para o método `reduceByKey()` é um RDD composto por pares chave-valor. Portanto, primeiramente é aplicado o método `map()`, para depois ser aplicado o método `reduceByKey()`. A saída é formada por pares chave-valor.

In [22]:
fib_rdd. \
  map(lambda element: (True if element > 10 else False, 1)). \
  reduceByKey(lambda x, y: x + y). \
  collect()

[(False, 7), (True, 23)]

#### Método sortByKey()

``sortByKey(order=ascending, nPartitions=None)``

Quando aplicado a um RDD composto por pares chave-valor (C,V), ordena os elementos de acordo com os valores de C em ordem ascendente ou descentende. O primeiro parâmetro, `order`, indica ascendente ou descendente. O segundo parâmetro, `nPartitions`, permite configurar o número de partições. Este método requer uma operação de `shuffle`. 

Os comandos a seguir verificam se cada elemento de `fib_rdd` é par ou ímpar, retornando True para elementos pares e False para elementos ímpares. O resultado é ordenado em ordem ascentende, exibindo primeiro os elementos referentes às chaves False e depois os elementos referentes às chaves True.

Note que a entrada para o método `sortByKey()` é um RDD composto por pares chave-valor. Portanto, primeiramente é aplicado o método `map()`, para depois ser aplicado o método `sortByKey()`. A saída é formada por pares chave-valor.

In [23]:
fib_rdd. \
  map(lambda element: (True if element % 2 == 0 else False, element)). \
  sortByKey(). \
  collect()

[(False, 1),
 (False, 1),
 (False, 3),
 (False, 5),
 (False, 13),
 (False, 21),
 (False, 55),
 (False, 89),
 (False, 233),
 (False, 377),
 (False, 987),
 (False, 1597),
 (False, 4181),
 (False, 6765),
 (False, 17711),
 (False, 28657),
 (False, 75025),
 (False, 121393),
 (False, 317811),
 (False, 514229),
 (True, 0),
 (True, 2),
 (True, 8),
 (True, 34),
 (True, 144),
 (True, 610),
 (True, 2584),
 (True, 10946),
 (True, 46368),
 (True, 196418)]

#### Método sortBy()

sortBy(keyfunc, ascending=True, nPartitions=None)

Ordena os elementos do RDD usando como base o parâmetro `keyfunc`. O segundo parâmetro, `ascending=True`, indica a ordem ascendente é a ordem padrão. O terceiro parâmetro, `nPartitions`, permite configurar o número de partições.  

Os comandos a seguir são análogos aos comandos da transformação do método `sortByKey`. No método `sortBy`, a ordenação dos elementos do RDD é definida pela função `keyfunc` que aponta para o primeiro item do elemento, ou seja, a chave.

In [24]:
fib_rdd. \
  map(lambda element: (True if element % 2 == 0 else False, element)). \
  sortBy(lambda element: element[0]). \
  collect()

[(False, 1),
 (False, 1),
 (False, 3),
 (False, 5),
 (False, 13),
 (False, 21),
 (False, 55),
 (False, 89),
 (False, 233),
 (False, 377),
 (False, 987),
 (False, 1597),
 (False, 4181),
 (False, 6765),
 (False, 17711),
 (False, 28657),
 (False, 75025),
 (False, 121393),
 (False, 317811),
 (False, 514229),
 (True, 0),
 (True, 2),
 (True, 8),
 (True, 34),
 (True, 144),
 (True, 610),
 (True, 2584),
 (True, 10946),
 (True, 46368),
 (True, 196418)]

### 3.3 Ações

Ações são operações que retornam valores calculados sobre um RDD.  

No Spark, as ações disparam a execução das **transformações** que foram anexadas ao grafo acíclico direcionado dessas transformações.

#### Métodos collect(), take() e first()

Os métodos collect(), take() e first() têm como objetivo retornar os elementos de um RDD, conforme descrito a seguir.

- `collect()`: Retorna uma lista com todos os elementos do RDD.

- `take(num)`: Retorna uma lista com os primeiros `num` elementos do RDD.

- `first()`:  Retorna o primeiro elemento do RDD.

In [25]:
#listando todos os elementos da Sequência de Fibonacci armazenados em fib_rdd
fib_rdd.collect()

[0,
 1,
 1,
 2,
 3,
 5,
 8,
 13,
 21,
 34,
 55,
 89,
 144,
 233,
 377,
 610,
 987,
 1597,
 2584,
 4181,
 6765,
 10946,
 17711,
 28657,
 46368,
 75025,
 121393,
 196418,
 317811,
 514229]

In [26]:
#listando os 10 primeiros elementos da Sequência de Fibonacci armazenados em fib_rdd
fib_rdd.take(10)

[0, 1, 1, 2, 3, 5, 8, 13, 21, 34]

In [27]:
#listando o primeiro elemento da Sequência de Fibonacci armazenados em fib_rdd
fib_rdd.first()

0

#### Métodos  max(), min(), mean(), count()

Os métodos max(), min(), mean() e count() desempenham funcionalidades semelhantes às funções de agregação, conforme descrito a seguir.

- `max(key=None)`: Retorna o maior elemento do RDD. O parâmetro opcional `key` pode ser usado para gerar uma `key` para comparação.
-  `min(key=None)`: Retorna o menor elemento do RDD. O parâmetro opcional `key` pode ser usado para gerar uma `key` para comparação.
-  `mean()`: Calcula a média dos valores dos elementos do RDD. 
- `count()`: Retorna o número de elementos do RDD.

In [28]:
#listando o maior elemento armazenado em fib_rdd
fib_rdd.max()

514229

In [29]:
#listando o menor elemento armazenado em fib_rdd
fib_rdd.min()

0

In [30]:
#calculando a média dos valores dos elementos armazeandos em fib_rdd
fib_rdd.mean()

44875.6

In [31]:
#listando a quantidade de elementos armazenados em fib_rdd
fib_rdd.count()

30

#### Método reduce()

`reduce(func)`

Aplica a função `func` a todos os elementos do RDD, processando os elementos dois a dois até a geração de um resultado final agregado.

Os comandos a seguir calculam a soma dos elementos armazenados em fib_rdd.

In [32]:
#calculando a soma dos elementos armazenados em fib_rdd
#definindo a função separadamente e chamando a função
def sum(x: int, y: int) -> int: return x + y

fib_rdd.reduce(sum)

1346268

In [33]:
#calculando a soma dos elementos armazenados em fib_rdd
#definindo a função lambda
fib_rdd.reduce(lambda x, y: x + y)

1346268

## **4 Contador de Palavras**

O contador de palavras é um exemplo clássico de explicação da funcionalidade do modelo de programação funcional MapReduce. 

Neste notebook é apresentado um exemplo que tem como objetivo contar as palavras presentes no texto do arquivo `logs.txt`. Primeiramente, o exemplo é implementado passo a passo, visando fins didáticos. Depois, são mostrados todos os passos realizados de uma única vez. 

(1) Criação do RDD `lines_rdd` com as linhas do texto do arquivo. A criação deste RDD pode ser encontrada junto à descrição do método `textFile()`.

In [34]:
lines_rdd.take(15)

['[Thu Jun 09 06:07:04 2005] [notice] LDAP: Built with OpenLDAP LDAP SDK',
 '[Thu Jun 09 06:07:04 2005] [notice] LDAP: SSL support unavailable',
 '[Thu Jun 09 06:07:04 2005] [notice] suEXEC mechanism enabled (wrapper: /usr/sbin/suexec)',
 '[Thu Jun 09 06:07:05 2005] [notice] Digest: generating secret for digest authentication ...',
 '[Thu Jun 09 06:07:05 2005] [notice] Digest: done',
 '[Thu Jun 09 06:07:05 2005] [notice] LDAP: Built with OpenLDAP LDAP SDK',
 '[Thu Jun 09 06:07:05 2005] [notice] LDAP: SSL support unavailable',
 '[Thu Jun 09 06:07:05 2005] [error] env.createBean2(): Factory error creating channel.jni:jni ( channel.jni, jni)',
 "[Thu Jun 09 06:07:05 2005] [error] config.update(): Can't create channel.jni:jni",
 '[Thu Jun 09 06:07:05 2005] [error] env.createBean2(): Factory error creating vm: ( vm, )',
 "[Thu Jun 09 06:07:05 2005] [error] config.update(): Can't create vm:",
 '[Thu Jun 09 06:07:05 2005] [error] env.createBean2(): Factory error creating worker.jni:onStartup 

(2) Separação das linhas do arquivo de dados em palavras usando o método `flatMap`.

In [35]:
words_rdd = lines_rdd.flatMap(lambda line: line.split(" "))
words_rdd.take(7)

['[Thu', 'Jun', '09', '06:07:04', '2005]', '[notice]', 'LDAP:']

(3) Mapeamento de cada palavra presente no RDD words_rdd em um par chave-valor usando o método `map`. Cada chave corresponde a uma palavra e cada valor corresponde ao valor 1. 

In [36]:
words_tuple_rdd = words_rdd.map(lambda word: (word, 1))
words_tuple_rdd.take(7)

[('[Thu', 1),
 ('Jun', 1),
 ('09', 1),
 ('06:07:04', 1),
 ('2005]', 1),
 ('[notice]', 1),
 ('LDAP:', 1)]

(4) Agrupamento das palavras iguais usando o método `reduceByKey`. A função usada como parâmetro é soma. Ou seja, soma-se o número de vezes que cada palavra aparece.


In [37]:
words_counts_rdd = words_tuple_rdd.reduceByKey(lambda x, y: x + y)
words_counts_rdd.take(7)

[('09', 1793),
 ('06:07:04', 7),
 ('LDAP:', 106),
 ('SDK', 53),
 ('SSL', 53),
 ('support', 53),
 ('unavailable', 53)]

Pode ser interessante ordenar o resultado final usando o método `sortBy`, de forma que as palavras com maior ocorrência apareçam primeiro. 

In [38]:
words_counts_rdd_sorted = words_counts_rdd.sortBy(lambda word_count: word_count[1], ascending=False)
words_counts_rdd_sorted.take(7)

[('[error]', 38081),
 ('2005]', 32309),
 ('[client', 31115),
 ('not', 28808),
 ('File', 20861),
 ('does', 20861),
 ('exist:', 20861)]

Todos os passos anteriores podem ser agrupados, conforme descrito a seguir.



In [39]:
words_counts = spark.textFile("logs.txt"). \
               flatMap(lambda line: line.split(" ")). \
               map(lambda word: (word, 1)). \
               reduceByKey(lambda x, y: x + y). \
               sortBy(lambda word_count: word_count[1], ascending=False). \
               collect()

In [40]:
words_counts[0:7]

[('[error]', 38081),
 ('2005]', 32309),
 ('[client', 31115),
 ('not', 28808),
 ('File', 20861),
 ('does', 20861),
 ('exist:', 20861)]

# 5 Persistência dos RDDs

RDDs são de primordial importância no Spark, desde que possibilitam o armazenamento dos resultados intermediários em memória primária sempre que possível. Conforme discutido anteriormente, o uso de RDDs diminui o número de acessos a disco e, consequentemente, melhora o desempenho da aplicação.



Existem dois métodos para a persistência dos RDDs, conforme descrito a seguir.

- `cache()`: Persiste um RDD utilizando o nível de armazenamento padrão, que é MEMORY_ONLY. 

- `persist(storageLevel=StorageLevel(False, True, False, False, 1))`: Seta o nível de armazenamento que persiste um RDD na primeira vez que o RDD é calculado. Este comando somente pode ser utilizado para associar um novo nível de armazenamento caso o RDD não possua nenhum nível já associado. 

Os níveis de armazenamento são:

- MEMORY_ONLY (StorageLevel(False, True, False, False, 1)): Armazena o RDD em memória primária, ou seja, todas as partições do RDD são armazenadas em memória primária quando possível. Se o RDD não couber totalmente em memória primária, as partições que não couberem são calculadas em tempo de execução todas as vezes que for necessário usá-las. Este é o nível de armazenamento default.   

- MEMORY_AND_DISK (StorageLevel(True, True, False, False, 1)): Armazena o RDD em memória primária e em disco. As partições do RDD não que couberem em memória primária são armazenadas nessa memória, e as partições que não couberem são armazenadas em disco. As partições armazenadas em disco são lidas do disco todas as vezes que for necessário usá-las.

- DISK_ONLY (StorageLevel(True, False, False, False, 1)): Armazena o RDD em disco, ou seja, todas as partições do RDD são armazenadas em disco. Essas partições devem ser lidas do disco todas as vezes que for necessário usá-las.

- MEMORY_ONLY_2 (StorageLevel(False, True, False, False, 2)): Possui funcionalidade similar ao nível de armazenamento MEMORY_ONLY, porém replica cada partição em dois nós do *cluster*.

- MEMORY_AND_DISK_2 (StorageLevel(True, True, False, False, 2)): Possui funcionalidade similar ao nível de armazenamento MEMORY_AND_DISK, porém replica cada partição em dois nós do *cluster*.

- DISK_ONLY_2 (StorageLevel(True, False, False, False, 2)): Possui funcionalidade similar ao nível de armazenamento DISK_ONLY, porém replica cada partição em dois nós do *cluster*.  

- OFF_HEAP (StorageLevel(True, True, True, False, 1)): Armazena o RDD em memória off-heap.


Em Python, os objetos armazenados são serializáveis por meio da biblioteca Pickle, ou seja, não existe necessidade de se escolher o nível serializável. Em linguagens de programação com suporte diferente, pode-se usar os níveis de armazenamento MEMORY_ONLY_SER e MEMORY_AND_DISK_SER.

Spark automaticamente persiste alguns dados intermediários em operações *suffle* (por exemplo, `reduceByKey()`), mesmo quando isso não é definido de forma explícita. Isto é feito para se evitar a necessidade de recomputação de entrada inteira caso ocorra uma falha durante o *suffle*. 