# RDD: resilient distributed dataset

* Resilient: os dados perdidos na memória podem ser recriados
* Distributed: dados separados atráves do cluster
* Data Set: entrada pode ser qualquer tipo de origem

RDDs são a unidade fundamental do Spark, são imutáveis. Podem ser criados de três formas:

* De um arquivo ou conjunto de arquivos;
* De dados na memória;
* De outro RDD;

In [1]:
#import findspark
#findspark.init()

#Deixei comentado as 2 linhas acima porque na forma que eu instalei não precisa

import pyspark
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

Existem três formas de criar um RDD:

    1. De um arquivo ou conjunto de arquivos;
    2. De dados na memória
    3. De outro rdd
    
Utilizamos o método **.textFile()** para carregar arquivos em uma aplicação spark. Seu retorno é um RDD

In [None]:
rdd = sc.textFile('sobreRDD.txt')

O método **.collect()** é responsável por passar os dados do RDD ao drive e apresenta-los para o usuário, seu uso é *custoso*.

In [None]:
rdd.collect()

O método **.filter** que aprendemos em python também é utilizado em RDD's, para isso usamos a função **lambda**

In [None]:
# Utilizando funcao .filter(), mesmo conceito que usamos em python, aplicando a funcao lambda.
rddsparklinhas = rdd.filter(lambda linha: 'spark' in linha)

In [None]:
# .collect() para exibir o resultado
rddsparklinhas.collect()

O método **first()** pode nos auxiliar exibindo a primeira linha. Observação: não é possível selecionar quantas linhas apresentar.

In [None]:
rdd.first()

E para exibir a quantidade de linhas dentro do RDD, utilizamos o método **.count()**

In [None]:
rdd.count()

In [None]:
# RDD com 2 "colunas"
sc.parallelize([(1,2),(3,4)]).collect()

# Transformação e Ação

Transformações básicas em um RDD. Contendo: {1, 2, 3, 3}

Nome da função | Propósito | Exemplo | Resultado
---- | ----
```.map()``` | Aplica a função para cada elemento no RDD, seu retorno é outro RDD. | rdd.map(lambda x: x + 1) | {2, 3, 4, 4}
```.flatMap()``` | Aplica a função para cada elemento no RDD, seu retorno é outro RDD. | rdd.flatMap(lambda x: range(x, 4)) | {1, 2, 3, 2, 3, 3, 3}
```.filter()``` | Retorna um novo RDD somente com o resultado do filtro realizado. | rdd.filter(lambda x: x != 1) | {2, 3, 3}
```.distinct()``` | Remove duplicados. | rdd.distinct() | {1, 2, 3}

In [None]:
# map()
rdd = sc.parallelize([1,2,3,3])
rddResult = rdd.map(lambda x: x * x)
for valor in rddResult.collect():
    print(valor)

In [None]:
# flatMap()
rdd = sc.parallelize([1,2,3,3])
rdd.flatMap(lambda x: range(x,4)).collect()

In [None]:
# Contador de palavras
rddGrafen = sc.parallelize(['Eu estou aprendendo Spark com a Grafen', 
                            'eu estou gostando das aulas da Grafen', 
                            'Grafen e top'])

In [None]:
rddQuebraLinha = rddGrafen.flatMap(lambda line: line.split(" "))

In [None]:
rddQuebraLinha.collect()

In [None]:
rddPalavraEnumero = rddQuebraLinha.map(lambda palavra: (palavra, 1))

In [None]:
rddPalavraEnumero.collect() # poderia ter feito rddPalavraEnumero.take(4) para ver somente 4 palavras

In [None]:
contador = rddPalavraEnumero.reduceByKey(lambda a,b: a + b)

In [None]:
contador.collect()

Transformações básicas que utilizam dois RDD's. Contendo {1, 2, 3} e {3, 4, 5}

Nome da função | Propósito | Exemplo | Resultado
---- | ----
```.union()``` | Produzir um RDD contendo elementos de ambos os RDDs. | rdd.union(other) | {1, 2, 3, 3, 4, 5}
```.intersection()``` | RDD contendo apenas elementos encontrados em ambos os RDDs. | rdd.intersection(other) | {3}
```.subtract()``` | Remover o conteúdo de um RDD (por exemplo, remover dados de treinamento). | rdd.subtract(other) | {1, 2}

In [None]:
# union(), trazendo rdd somente com os dados entre os dois.
rdd1 = sc.parallelize([1,2,3])
rdd2 = sc.parallelize([3,4,5])
rdd1.union(rdd2).collect()

In [None]:
# intersection(), trazendo rdd somente com os dados entre os dois.
rdd1 = sc.parallelize([1,2,3])
rdd2 = sc.parallelize([3,4,5])
rdd1.intersection(rdd2).collect()

In [None]:
# subtract(), trazendo rdd somente com os dados entre os dois.
rdd1 = sc.parallelize([1,2,3])
rdd2 = sc.parallelize([3,4,5])
rdd1.subtract(rdd2).collect()

Nome da função | Propósito | Exemplo | Resultado
---- | ----
```.collect()``` | Retorna todos os elementos de um RDD. | rdd.collect() | {1, 2, 3, 3}
```.count()``` | Retorna o número de elementos em um RDD. | rdd.intersection(other) | 4
```.countByValue()``` | Retorna número de elementos pela chave. | rdd.countByValue() | {(1, 1), (2, 1), (3, 2)}
```.take()``` | Retorna os elementos(n) do RDD. | rdd.subtract(other) | {1, 2}
```.top()``` | Retorna os elementos top(n) do RDD | rdd.top(2) | {3, 3}
```.reduce()``` | Combina os elementos do RDD juntos em paralelo (por exemplo, soma). | rdd.reduce(lambda x, y: x + y) | 9
```.foreach()``` | Aplica a função fornecida a cada elemento do RDD | rdd.foreach(func) | Não se aplica

In [None]:
# Utilizando .reduce() para trazer a SOMA
rdd = sc.parallelize([2,5])
rdd.reduce(lambda x,y: x+y)

## Exercícios: 
#### *Praticar é a arte do aprender*

1. Faça um código que exista a entrada de dois números inteiros e exiba a multiplicação deles.

In [2]:
rdd = sc.parallelize([2,5])
rdd.reduce(lambda x,y: x*y)

10

2. Faça um código que exiba os valores de um RDD que são maiores que 10.

In [3]:
rdd = sc.parallelize([2,5,10,15,20,25,30,40])
rdd.filter(lambda x: x > 10).collect()

[15, 20, 25, 30, 40]

3. Faça um código que exiba os valores de um RDD que são somente impares

In [4]:
rdd = sc.parallelize([2,3,5,6,7,10,13,15,20,25,30,40])
rdd.filter(lambda x: x % 2 == 1).collect()

[3, 5, 7, 13, 15, 25]

4. Faça um código que exiba a soma dos valores dentro de um RDD

In [5]:
rdd = sc.parallelize([25,55,1,2,3,4,5,67,100])
rdd.reduce(lambda x,y: x+y)

262

5. Faça um código que exiba cada elemento do RDD multiplicado por 100.

In [6]:
rdd = sc.parallelize([1, 5, 7, 10, 13, 15, 20, 25, 30, 22, 11, 44, 55, 60])
rdd.map(lambda x: x*100).collect()

[100,
 500,
 700,
 1000,
 1300,
 1500,
 2000,
 2500,
 3000,
 2200,
 1100,
 4400,
 5500,
 6000]

6. Faça um código que leia o arquivo "sobreRDD.txt" que foi disponibilizado em aula e conte quantas vezes aparece a palavra "rdd"

In [7]:
rdd = sc.textFile('sobreRDD.txt')

In [8]:
rddsparklinhas = rdd.filter(lambda linha: 'rdd' in linha)

In [9]:
rddsparklinhas.collect()

['Um rdd no spark e simplesmente uma colecao distribuida imutavel de objetos. ',
 'Cada rdd e dividido em varias particoes, que podem ser calculadas em diferentes nos do cluster. ',
 'Os rdd podem conter qualquer tipo de objetos Python, Java ou Scala, incluindo classes definidas pelo usuario, o spark utiliza rdd para trabalhar com essas estruturas de dados']

In [10]:
rddQuebraLinha = rddsparklinhas.flatMap(lambda line: line.split(" "))

In [11]:
rddQuebraLinha\
 .filter(lambda x: x == "rdd")\
 .map(lambda x: (x, 1))\
 .reduceByKey(lambda a,b: a + b).collect()

[('rdd', 4)]

7. Faça um código que contenha um RDD com as palavras "rdd" e "spark". 

    Leia o arquivo "sobreRDD.txt" e identifique se o arquivo possui as duas palavras do rdd anterior
    
    OBS: Quebre cada frase em diversas linhas de palavras

In [14]:
rdd1 = sc.parallelize(['rdd','spark'])

In [15]:
rdd2 = sc.textFile('sobreRDD.txt')

In [16]:
rdd2 = rdd2.flatMap(lambda line: line.split(" "))

In [18]:
rdd.intersection(rdd2).collect()

['rdd', 'spark']