### Instalação das bibliotecas

In [1]:
%%sh
pip install spark
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting spark
  Downloading spark-0.2.1.tar.gz (41 kB)
Building wheels for collected packages: spark
  Building wheel for spark (setup.py): started
  Building wheel for spark (setup.py): finished with status 'done'
  Created wheel for spark: filename=spark-0.2.1-py3-none-any.whl size=58762 sha256=3da78601cb71fd675853194bd343ed8c53025ca0d36024fec8dca4a883407d26
  Stored in directory: /root/.cache/pip/wheels/4e/0e/f1/164619f9920fb447d294afaae11a7715bd442ded7225953d72
Successfully built spark
Installing collected packages: spark
Successfully installed spark-0.2.1
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup

### Importação das bibliotecas

In [5]:
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext

### Criar / Iniciar Sessão PySpark

In [6]:
spark = (
    SparkSession\
    .builder\
    .master('local')\
    .appName('pyspark_rdd')\
    .getOrCreate()
)

# RDD - Resilient Distributed Datasets
- RDDs são a unidade fundamental de dados em Spark. São imutáveis.
- Resilient: se dados na memória são perdidos, podem ser recriados.
- Distributed: armazenados na memória por todo o cluster.
- Datasets: dados iniciais podem vir de um arquivo ou ser criado programaticamente.
- A maioria dos programas em Spark consistem em manipular RDDs.
- RDDs são criados por meio de arquivos, de dados na memória ou de outras RDDs
- 2 tipos de operações: 
    - Transformação
        - map(function) -> cria um novo RDD processando a função em cada registro do RD
        - filter(function) -> cria um novo RDD incluindo ou excluindo cada elemento de acordo com um função 
booleana.
        - outros: distinct, sample, union, intersection, subtract, cartesian, combineByKey, groupByKey, join, etc.
    - Ação
        - count() -> retorna o número de elementos.
        - take(n) -> retorna um array com os primeiros n elementos.
        - collect() -> retorna um arraycom todos os elementos.
        - saveAsTextFile(file) -> salva o RDD no arquivo.
- Lazy Evaluation: Nada é processado até uma operação de ação.

In [9]:
sc = SparkContext.getOrCreate()

### parallelize()
- Permite que o Spark distribua os dados por vários nós, em vez de depender de um único nó para processar os dados

In [10]:
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
numeros = sc.parallelize(data)

### take()
- Exibe x primeiros itens do RDD

In [11]:
numeros.take(5)

[1, 2, 3, 4, 5]

### top()
- Exibe os maiores itens do RDDlista

In [12]:
numeros.top(5)

[10, 9, 8, 7, 6]

### collect()
- Retorne uma lista que contenha todos os elementos deste RDD.
- **Obs**: Este método só deve ser usado se a matriz resultante for pequena


In [13]:
numeros.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

### count()

In [14]:
numeros.count()

10

### mean()

In [15]:
numeros.mean()

5.5

### sum()

In [16]:
numeros.sum()

55

### max()

In [19]:
numeros.max()

10

### min()

In [20]:
numeros.min()

1

### stdev()
- desvio padrão

In [21]:
numeros.stdev()

2.8722813232690143

### filter()
- Retorna um novo RDD contendo apenas os elementos que satisfazem um predicado.

In [22]:
# retorna os números maiores que 5
filtro = numeros.filter(lambda filtro: filtro > 5)
filtro.collect()

[6, 7, 8, 9, 10]

### sample()
- Retorne um subconjunto de amostra deste RDD.

In [33]:
amostra = numeros.sample(False, 0.5, 1)
amostra.collect()

[1, 3, 7, 10]

### map()
- aplica uma função lambda para todos os elementos do RDD

In [35]:
# multiplica os elementos do rdd por 2
mapp = numeros.map(lambda mapp: mapp * 2)
mapp.collect()

[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]

### Novo RDD

In [36]:
data2 = [6, 7, 8, 9, 10]
numeros2 = sc.parallelize(data2)
numeros2.collect()

[6, 7, 8, 9, 10]

### union()
- une os rdd

In [37]:
uniao = numeros.union(numeros2)
uniao.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 6, 7, 8, 9, 10]

### intersection
- A saída não conterá nenhum elemento duplicado, mesmo que os RDDs de entrada contenham.

In [38]:
# numeros [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# numeros2 [6, 7, 8, 9, 10]

intersecao = numeros.intersection(numeros2)
intersecao.collect()

[6, 8, 10, 7, 9]

### subtract()
- Retorna os elementos que tem em um RDD mas que não está em outro RDD

In [39]:
# numeros [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# numeros2 [6, 7, 8, 9, 10]
subtrai = numeros.subtract(numeros2)
subtrai.collect()

[2, 4, 1, 3, 5]

### cartesian()
- Devolva o produto cartesiano deste RDD e de outro, ou seja, o RDD de todos os pares de elementos (a, b)

In [40]:
cartesiano = numeros.cartesian(numeros2)
cartesiano.collect()

[(1, 6),
 (1, 7),
 (1, 8),
 (1, 9),
 (1, 10),
 (2, 6),
 (2, 7),
 (2, 8),
 (2, 9),
 (2, 10),
 (3, 6),
 (3, 7),
 (3, 8),
 (3, 9),
 (3, 10),
 (4, 6),
 (4, 7),
 (4, 8),
 (4, 9),
 (4, 10),
 (5, 6),
 (5, 7),
 (5, 8),
 (5, 9),
 (5, 10),
 (6, 6),
 (6, 7),
 (6, 8),
 (6, 9),
 (6, 10),
 (7, 6),
 (7, 7),
 (7, 8),
 (7, 9),
 (7, 10),
 (8, 6),
 (8, 7),
 (8, 8),
 (8, 9),
 (8, 10),
 (9, 6),
 (9, 7),
 (9, 8),
 (9, 9),
 (9, 10),
 (10, 6),
 (10, 7),
 (10, 8),
 (10, 9),
 (10, 10)]

### countByValue()
- Retorna a quantidade que cada valor aparece

In [41]:
numeros.countByValue()

defaultdict(int, {1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 1, 9: 1, 10: 1})

### RDD - Compras

In [42]:
data3 = [(1, 200), (2, 300), (3, 120), (4, 250), (5, 78)]
compras = sc.parallelize(data3)
compras.collect()

[(1, 200), (2, 300), (3, 120), (4, 250), (5, 78)]

### Extraindo as keys

In [43]:
chaves = compras.keys()
chaves.collect()

[1, 2, 3, 4, 5]

### Extraindo os values

In [44]:
valores = compras.values()
valores.collect()

[200, 300, 120, 250, 78]

 ### countByKey()
 - Retorna a quantidade de vezes que a key aparece

In [47]:
compras.countByKey()

defaultdict(int, {1: 1, 2: 1, 3: 1, 4: 1, 5: 1})

 ### countByValue()
 - Retorna a quantidade de vezes que o value aparece

In [48]:
compras.countByValue()

defaultdict(int,
            {(1, 200): 1, (2, 300): 1, (3, 120): 1, (4, 250): 1, (5, 78): 1})

### mapValues()
- Para cadar valor do RDD faça alguma coisa

In [50]:
# para cada elemento do RDD, acrescente + 1
# [(1, 200), (2, 300), (3, 120), (4, 250), (5, 78)]
soma = compras.mapValues(lambda soma: soma + 1)
soma.collect()

[(1, 201), (2, 301), (3, 121), (4, 251), (5, 79)]

RDD - debitos

In [51]:
data4 = ([(1, 20), (2, 300)])
debitos = sc.parallelize(data4)

### inner join

In [53]:
# (compras, debitos)
inner_join = compras.join(debitos)
inner_join.collect()

[(2, (300, 300)), (1, (200, 20))]

### leftOuterJoin()
- Retorna o rdd da esquerda mesmo que não tenha nenhuma correspondencia com o rdd da direita

In [56]:
left_join = compras.leftOuterJoin(debitos)
left_join.collect()

[(2, (300, 300)),
 (4, (250, None)),
 (1, (200, 20)),
 (3, (120, None)),
 (5, (78, None))]

### rightOuterJoin()
- Retorna o rdd da direita mesmo que não tenha nenhuma correspondencia com o rdd da esquerda

In [57]:
rigth_join = compras.rightOuterJoin(debitos)
rigth_join.collect()

[(2, (300, 300)), (1, (200, 20))]

### subtractByKey()
- Retorna cada par (chave, valor) em self que não possui par com a chave correspondente em outro

In [54]:
semdebito = compras.subtractByKey(debitos)
semdebito.collect()

[(4, 250), (3, 120), (5, 78)]

### flatMap()
- Retorne um novo RDD aplicando primeiro uma função a todos os elementos desse RDD e, em seguida, nivelando os resultados.

In [71]:
# [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
flattMap = numeros.flatMap(lambda x: [(x, x * 10)])
flattMap.collect()

[(1, 10),
 (2, 20),
 (3, 30),
 (4, 40),
 (5, 50),
 (6, 60),
 (7, 70),
 (8, 80),
 (9, 90),
 (10, 100)]