# Operações entre DataFrames e Armazenamento

## Instalação do PySpark e Instâncias

In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession


In [2]:
sc = SparkContext.getOrCreate()


In [4]:
spark = SparkSession.builder.appName('PySpark Dataframe').getOrCreate()


# Transformation

Operações específicas de transformação de Dados

## map()

O que é um RDD?
Antes de começar, é importante saber o que é um RDD. RDD significa "Resilient Distributed Dataset". Em palavras mais simples, é como uma grande lista de coisas (dados) que está espalhada por vários computadores. O Spark usa RDDs para guardar e processar dados de forma rápida.

Vamos entender o código passo a passo:
Lista de números (data = [1, 2, 3, 4, 5]):

Imagine que você tem uma caixa com 5 bolinhas. Cada bolinha tem um número de 1 a 5. Esta é a sua lista de números chamada data.
Criando o RDD (myRDD = sc.parallelize(data)):

Agora, você vai espalhar essas bolinhas por várias caixas (que podem estar em diferentes lugares). Isso é o que o sc.parallelize(data) faz. Ele pega a sua lista e transforma em um RDD (uma grande lista que o Spark pode usar).
myRDD é o nome que você deu para essa lista espalhada.
Multiplicando cada número por 2 (newRDD = myRDD.map(lambda x: x * 2)):

Aqui, você vai multiplicar o número de cada bolinha por 2.
O map é como um mágico que faz alguma coisa com cada bolinha. Neste caso, o mágico está pegando cada bolinha, olhando o número que ela tem e criando uma nova bolinha com o número dobrado.
Então, se você tinha a bolinha 1, ela vira 2; a bolinha 2 vira 4; e assim por diante.
newRDD é o nome da nova lista de bolinhas que tem os números dobrados.
Reunindo todas as bolinhas (print(newRDD.collect())):

Finalmente, você quer ver todas as novas bolinhas juntas.
O collect() pega todas as bolinhas que foram espalhadas e junta em uma lista que você pode ver.
print(newRDD.collect()) mostra essa nova lista na tela.
O que acontece no final?
No final, o código pega a lista [1, 2, 3, 4, 5], dobra cada número, e te mostra a nova lista [2, 4, 6, 8, 10].



Pense que você tem uma caixa com 5 bolinhas numeradas de 1 a 5. Você espalha essas bolinhas em várias caixas e depois dá um "poder mágico" para cada uma, fazendo com que o número de cada bolinha seja dobrado. No final, você junta todas as bolinhas e olha para os novos números: 2, 4, 6, 8, e 10!

In [5]:
# Exemplo:

data = [1,2,3,4,5]
myRDD = sc.parallelize(data)
newRDD = myRDD.map(lambda x: x*2)
print(newRDD.collect())


[2, 4, 6, 8, 10]


## filter()

Se os números forem divisíveis por 2 e o resto for igual a zero.

Exemplo: 

Filtrando os números pares (newRDD = myRDD.filter(lambda x: x % 2 == 0)):

Aqui vem a parte legal! Você vai olhar para cada bolinha e ver se o número nela é par.
O filter é como um filtro mágico que só deixa passar as bolinhas com números pares.
O que o lambda x: x % 2 == 0 faz é simples: ele olha o número de cada bolinha e verifica se, quando você divide esse número por 2, o resto é zero. Isso significa que o número é par.
Se for par, a bolinha passa pelo filtro e vai para a nova lista (newRDD). Se não for, ela fica de fora.

In [7]:
data = [1,2,3,4,5,6,7,8,9,10]
myRDD = sc.parallelize(data)
newRDD = myRDD.filter(lambda x: x%2 == 0)
print(newRDD.collect())


[2, 4, 6, 8, 10]


## distinct()

In [8]:
data = [1,1,1,2,2,2,3,3,3,3]
myRDD = sc.parallelize(data)
newRDD = myRDD.distinct()
print(newRDD.collect())


[1, 2, 3]


In [10]:
#  Se eu mudar para 'count' no lugar de 'collect' trará o resultado da quantidade de itens distintos

data = [1,1,1,2,2,2,3,3,3,3]
myRDD = sc.parallelize(data)
newRDD = myRDD.distinct()
print(newRDD.count())


3


## groupByKey()

In [12]:
# Significa rodar o groupby baseado em uma chave

myRDD = sc.parallelize([('a',1),('a',2),('a',3),('b',1),])
# print result as list
resultList = myRDD.groupByKey().mapValues(list)
resultList.collect()


[('a', [1, 2, 3]), ('b', [1])]

## reduceByKey()

É um tipo de execução que utilizamos para reduzir os dados.  

Resumindo:
reduceByKey() é uma função poderosa para agregar valores em um RDD baseado em suas chaves. Ela faz isso de forma eficiente e distribuída, ideal para grandes conjuntos de dados onde você precisa agrupar e combinar informações associadas a cada chave.

In [13]:
from operator import add
myRDD = sc.parallelize([('a',1),('a',2),('a',3),('b',1),])
# add the values by keys
newRDD = myRDD.reduceByKey(add)
newRDD.collect()

#  Essa função agrupa os dados e já traz os valores somados / Adiciona 'add' os valores


[('a', 6), ('b', 1)]

## SortByKey()

Essa função no SQL é conhecida como o 'OrderBy'

In [14]:
myRDD = sc.parallelize([('c',1),('d',2),('a',3),('b',4),])
# sort by key
newRDD = myRDD.sortByKey()
newRDD.collect()


[('a', 3), ('b', 4), ('c', 1), ('d', 2)]

## UNION()

In [15]:
myRDD1 = sc.parallelize([1,2,3,4])
myRDD2 = sc.parallelize([3,4,5,6,7])
# union of my RDD1 an RDD2
newRDD = myRDD1.union(myRDD2)
newRDD.collect()


[1, 2, 3, 4, 3, 4, 5, 6, 7]

# Actions


## count()

In [16]:
# Contagem de dados dentro da Base de Dados / *sc = SparkContext --> Ele está declarado na seção de transformação (sc = SparkContext.getOrCreate())

# passando os dados para uma lista / poderia ser uma base por exemplo
data = ['Scala', 'Python','Java', 'R']
# ... e passando para um RDD
myRDD = sc.parallelize(data)
# ... retornando 4 como output
myRDD.count()


4

In [17]:
# Exemplo para quando temos valores repetidos / Ela não destingue os repetidos, conta efetivamente a contagem de itens

data = ['Scala', 'Python','Java', 'R', 'Scala', 'Scala', 'Java']
myRDD = sc.parallelize(data)
myRDD.count()


7

In [20]:
# E seu eu quiser contar os distintos

data = ['Scala', 'Python', 'Java', 'R', 'Scala', 'Scala', 'Java']
myRDD = sc.parallelize(data)

# Obter os elementos distintos
distinctRDD = myRDD.distinct()

# Contar quantos elementos distintos existem
distinctCount = distinctRDD.count()

print(distinctCount)


4


## reduce()

In [21]:
data = [1,2,3,4,5]
myRDD = sc.parallelize(data)
# return the product of all the elements
myRDD.reduce(lambda x, y: x * y)

# Aqui vai multiplica um pelo outro dos itens que temos na lista = 1*2*3*4*5 = 120


120

## foreach()

Basicamente executará a funcão  da lista e a colocará num RDD. 


In [23]:
def fun (x):
    print(x)
data = ['Scala', 'Python','Java', 'R']
myRDD = sc.parallelize(data)
# function applied to all the elements
myRDD.foreach(fun)


A função myRDD.foreach(fun) aplica a função fun a cada elemento do RDD myRDD. No exemplo dado:

fun(x): imprime cada elemento x do RDD.
myRDD.foreach(fun): executa fun para todos os elementos de myRDD, ou seja, imprime 'Scala', 'Python', 'Java' e 'R'.
Em resumo, foreach percorre todos os elementos do RDD e aplica a função especificada a cada um deles.
"x" poder ser uma condição, uma palavra, uma soma, etc...

## countByValue()

In [25]:
# contagem de valores baseado no conteúdo dele. 

data = ['Python', 'Java', 'R', 'Scala', 'Scala', 'Java', 'Python']
myRDD = sc.parallelize(data)
# itens() returns a list with all the dictionary keys and values returned by countByValue()
myRDD.countByValue().items()


dict_items([('Python', 2), ('Java', 2), ('R', 1), ('Scala', 2)])

## countbyKey()

In [26]:
data = [('a',1),('b',1),('b',1),('a',1)]
myRDD = sc.parallelize(data)
myRDD.countByKey().items()

# conta pela chave e não pelo valor / Na prática tem a mesma finalidade mas executa a função de maneira diferente


dict_items([('a', 2), ('b', 2)])

## take(n)

Tem a mesma função do Top N do SQL, porém não define ordem crescente ou decrescente. Simplesmente pega os primeiros da lista

In [28]:
# Temos a possibilidade de limitar a quantidade de output / Retorna sempre os primeiros itens da lista, independente da ordem de data, por exemplo.

data=[2,5,3,8,4]
myRDD = sc.parallelize(data)
# retornando os 3 primeiros elementos
myRDD.take(3)


[2, 5, 3]

## top(n)

In [29]:
# Neste caso já há a possibilidade de definir uma ordem / ex: do maior para o menor = OrderBy desc

data=[2,5,3,8,4]
myRDD = sc.parallelize(data)
# retornando os 3 primeiros elementos
myRDD.top(3)


[8, 5, 4]