# Formação Spark com Pyspark: o Curso Completo

Curso da Udemy.

In [70]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as Func
from pyspark.sql.types import *
import os
import sys

os.environ['JAVA_HOME'] = "D:\Java\jre1.8.0_361"
os.environ['PYSPARK_PYTHON'] = sys.executable

## RDDs

* Transformações: criam um novo RDD / Dataframe cujas alterações não são executadas até que haja uma ação
* Ações: calcula todas as transformações de uma só vez

* RDD - Resilient Distributed Datasets (RDD)
    * Estrutura básica de baixo nível
    * Dados "imutáveis", distribuídos pelo cluster
    * Em memória
    * Pode ser persistido em disco
    * Tolerante a falha
    * Operações sobre um RDD criam um novo RDD
    * Complexo e verboso
    * Otimização difícil pelo Spark

In [45]:
sc = SparkContext.getOrCreate()

### Código 1 - Números

In [46]:
numeros = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

In [47]:
# obter 5 numeros
numeros.take(5)

[1, 2, 3, 4, 5]

In [5]:
# obter os 5 maiores elementos
numeros.top(5)

[10, 9, 8, 7, 6]

In [6]:
# traz todos dados
numeros.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [15]:
# numero de elementos
print(f"Quantidade de elementos: {numeros.count()}")

# média
print(f"Média: {numeros.mean()}")

# soma
print(f"Soma: {numeros.sum()}")

# máximo
print(f"Máximo: {numeros.max()}")

# mínimo
print(f"Mínimo: {numeros.min()}")

# desvio padrão
print(f"Desvio Padrão: {numeros.stdev()}")

Quantidade de elementos: 10
Média: 5.5
Soma: 55
Máximo: 10
Mínimo: 1
Desvio Padrão: 2.8722813232690143


In [16]:
# filtro - transformação
filtro = numeros.filter(lambda filtro: filtro > 2)

# collect - ação
filtro.collect()

[3, 4, 5, 6, 7, 8, 9, 10]

In [17]:
# amostra com reposição - transformação
amostra = numeros.sample(True, 0.5, 1)

# collect - ação
amostra.collect()

[2, 3, 4, 5, 9, 10]

In [18]:
# multiplicar todos elementos por 2 - transformação
mapa = numeros.map(lambda mapa: mapa * 2)

# collect - ação
mapa.collect()

[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]

In [19]:
# unir dois RDDs - transformação
numeros2 = sc.parallelize([6, 7, 8, 9, 10])
uniao = numeros.union(numeros2)

# collect - ação
uniao.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 6, 7, 8, 9, 10]

In [20]:
# intersecção dos RDDs - transformação
interseccao = numeros.intersection(numeros2)

# collect - ação
interseccao.collect()

[6, 8, 10, 7, 9]

In [21]:
# diferença entre RDDs - transformação
subtrai = numeros.subtract(numeros2)

# collect - ação
subtrai.collect()

[2, 4, 1, 3, 5]

In [30]:
# produto cartesiano - transformação
cartesiano = numeros.cartesian(numeros2)

# collect - ação
print(cartesiano.collect())

[(1, 6), (1, 7), (1, 8), (1, 9), (1, 10), (2, 6), (2, 7), (2, 8), (2, 9), (2, 10), (3, 6), (3, 7), (3, 8), (3, 9), (3, 10), (4, 6), (4, 7), (4, 8), (4, 9), (4, 10), (5, 6), (5, 7), (5, 8), (5, 9), (5, 10), (6, 6), (6, 7), (6, 8), (6, 9), (6, 10), (7, 6), (7, 7), (7, 8), (7, 9), (7, 10), (8, 6), (8, 7), (8, 8), (8, 9), (8, 10), (9, 6), (9, 7), (9, 8), (9, 9), (9, 10), (10, 6), (10, 7), (10, 8), (10, 9), (10, 10)]


In [29]:
# contar quantas vezes cada elemento aparece - ação
print(cartesiano.countByValue())

defaultdict(<class 'int'>, {(1, 6): 1, (1, 7): 1, (1, 8): 1, (1, 9): 1, (1, 10): 1, (2, 6): 1, (2, 7): 1, (2, 8): 1, (2, 9): 1, (2, 10): 1, (3, 6): 1, (3, 7): 1, (3, 8): 1, (3, 9): 1, (3, 10): 1, (4, 6): 1, (4, 7): 1, (4, 8): 1, (4, 9): 1, (4, 10): 1, (5, 6): 1, (5, 7): 1, (5, 8): 1, (5, 9): 1, (5, 10): 1, (6, 6): 1, (6, 7): 1, (6, 8): 1, (6, 9): 1, (6, 10): 1, (7, 6): 1, (7, 7): 1, (7, 8): 1, (7, 9): 1, (7, 10): 1, (8, 6): 1, (8, 7): 1, (8, 8): 1, (8, 9): 1, (8, 10): 1, (9, 6): 1, (9, 7): 1, (9, 8): 1, (9, 9): 1, (9, 10): 1, (10, 6): 1, (10, 7): 1, (10, 8): 1, (10, 9): 1, (10, 10): 1})


### Código 2 - Compras

In [31]:
# (chave cliente, valor)
compras = sc.parallelize([(1, 200), (2, 300), (3, 120), (4, 250), (5, 78)])

In [32]:
# obter apenas chaves - transformação
chaves = compras.keys()

# collect - ação
chaves.collect()

[1, 2, 3, 4, 5]

In [34]:
# obter apenas valores - transformação
valores = compras.values()

# collect - ação
valores.collect()

[200, 300, 120, 250, 78]

In [37]:
# contar quantas vezes cada chave aparece - ação
compras.countByKey()

defaultdict(int, {1: 1, 2: 1, 3: 1, 4: 1, 5: 1})

In [39]:
# somar 1 a todos valores - transformação
soma = compras.mapValues(lambda soma: soma + 1)

# collect - ação
soma.collect()

[(1, 201), (2, 301), (3, 121), (4, 251), (5, 79)]

In [40]:
# clientes com debito - RDD
debitos = sc.parallelize([(1, 20), (2, 300)])

In [41]:
# inner join compras e debitos - transformaçào
resultado = compras.join(debitos)

# collect - ação
resultado.collect()

[(2, (300, 300)), (1, (200, 20))]

In [43]:
# obter apenas clientes sem debito - transformação
semdebito = compras.subtractByKey(debitos)

# collect - ação
semdebito.collect()

[(4, 250), (3, 120), (5, 78)]

## DataFrames

* Dataset e DataFrame
    * Semelhante a uma tabela de banco de dados
    * Compatível com objetos dataframe do R e do Python
    * Dataset: Disponíveis em Java e Scala e Não disponíveis em R e Python
    * Vamos estudar RDD, porém prioridade será o DataFrame
    * Tabelas com linhas e colunas
    * Imutáveis
    * Com schema conhecido
    * Linhagem preservada
    * Colunas podem ter tipos diferentes
    * Existem análises comuns: Agrupar, ordenar, filtrar
    * Spark pode otimizar estas análises através de planos de execução
    
* Lazy evaluation
    * O processamento de transformação de fato só ocrre quando há uma ação

* Schema
    * Você pode deixar para o Spark inferir a partir de parte dos dados ou
    * Você pode definir o schema
    * Definir tem vantagens:
        * Tipo correto
        * Sem overhead (tira a necessidade do Spark inferir esses dados)
        
### Código 1 - Exemplos Simples

In [48]:
# instanciando SparkSession
spark = SparkSession.builder.appName('PySpark Course').getOrCreate()

In [49]:
# criando dataframe
df1 = spark.createDataFrame([("Pedro", 10), ("Maria", 20), ("José", 40)])
df1.show()

+-----+---+
|   _1| _2|
+-----+---+
|Pedro| 10|
|Maria| 20|
| José| 40|
+-----+---+



In [50]:
# criando dataframe a partir de um schema e dados
schema = "Id INT, Nome STRING"
dados = [[1, "Pedro"], [2, "Maria"]]
df2 = spark.createDataFrame(dados, schema)
df2.show()

+---+-----+
| Id| Nome|
+---+-----+
|  1|Pedro|
|  2|Maria|
+---+-----+



In [52]:
# outro exemplo de schema - vendas
schema2 = "Produtos STRING, Vendas INT"
vendas = [["Caneta", 10], ["Lápis", 20], ["Caneta", 40]]
df3 = spark.createDataFrame(vendas, schema2)
df3.show()

+--------+------+
|Produtos|Vendas|
+--------+------+
|  Caneta|    10|
|   Lápis|    20|
|  Caneta|    40|
+--------+------+



In [71]:
# agregar total de vendas por produto
df3.groupBy("Produtos").agg(Func.sum("Vendas")).show()

+--------+-----------+
|Produtos|sum(Vendas)|
+--------+-----------+
|  Caneta|         50|
|   Lápis|         20|
+--------+-----------+



In [56]:
# observar apenas coluna de Produtos
df3.select("Produtos").show()

+--------+
|Produtos|
+--------+
|  Caneta|
|   Lápis|
|  Caneta|
+--------+



In [72]:
# calcular percentual x% de Vendas
percentual = 0.2
df3.select("Produtos", "Vendas", Func.expr(f"Vendas * {percentual}")).show()

+--------+------+--------------+
|Produtos|Vendas|(Vendas * 0.2)|
+--------+------+--------------+
|  Caneta|    10|           2.0|
|   Lápis|    20|           4.0|
|  Caneta|    40|           8.0|
+--------+------+--------------+



In [60]:
# retornar schema do dataframe
df3.schema

StructType(List(StructField(Produtos,StringType,true),StructField(Vendas,IntegerType,true)))

In [61]:
# retornar colunas
df3.columns

['Produtos', 'Vendas']

### Código 2 - Importando Dados

In [64]:
# especificando schema e lendo csv
arqschema = "id INT, nome STRING, status STRING, cidade STRING, vendas INT, data STRING"
despachantes = spark.read.csv("../data/despachantes.csv", header = False, schema = arqschema)
despachantes.show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [65]:
# teste - deixar o pyspark inferir os data types
desp_autoschema = spark.read.load("../data/despachantes.csv", header = False, format = "csv", sep = ",", inferSchema = True)
desp_autoschema.show()

+---+-------------------+-----+-------------+---+----------+
|_c0|                _c1|  _c2|          _c3|_c4|       _c5|
+---+-------------------+-----+-------------+---+----------+
|  1|   Carminda Pestana|Ativo|  Santa Maria| 23|2020-08-11|
|  2|    Deolinda Vilela|Ativo|Novo Hamburgo| 34|2020-03-05|
|  3|   Emídio Dornelles|Ativo| Porto Alegre| 34|2020-02-05|
|  4|Felisbela Dornelles|Ativo| Porto Alegre| 36|2020-02-05|
|  5|     Graça Ornellas|Ativo| Porto Alegre| 12|2020-02-05|
|  6|   Matilde Rebouças|Ativo| Porto Alegre| 22|2019-01-05|
|  7|    Noêmia   Orriça|Ativo|  Santa Maria| 45|2019-10-05|
|  8|      Roque Vásquez|Ativo| Porto Alegre| 65|2020-03-05|
|  9|      Uriel Queiroz|Ativo| Porto Alegre| 54|2018-05-05|
| 10|   Viviana Sequeira|Ativo| Porto Alegre|  0|2020-09-05|
+---+-------------------+-----+-------------+---+----------+



In [68]:
# comparação de schemas - manual e inferido
print(despachantes.schema)

print(desp_autoschema.schema)

StructType(List(StructField(id,IntegerType,true),StructField(nome,StringType,true),StructField(status,StringType,true),StructField(cidade,StringType,true),StructField(vendas,IntegerType,true),StructField(data,StringType,true)))
StructType(List(StructField(_c0,IntegerType,true),StructField(_c1,StringType,true),StructField(_c2,StringType,true),StructField(_c3,StringType,true),StructField(_c4,IntegerType,true),StructField(_c5,StringType,true)))


In [76]:
# filtros: filtrar dataframe despachantes cuja venda foi maior que 20
despachantes.select('id', 'nome', 'vendas').where(Func.col('vendas') > 20).show()

+---+-------------------+------+
| id|               nome|vendas|
+---+-------------------+------+
|  1|   Carminda Pestana|    23|
|  2|    Deolinda Vilela|    34|
|  3|   Emídio Dornelles|    34|
|  4|Felisbela Dornelles|    36|
|  6|   Matilde Rebouças|    22|
|  7|    Noêmia   Orriça|    45|
|  8|      Roque Vásquez|    65|
|  9|      Uriel Queiroz|    54|
+---+-------------------+------+



In [78]:
# exemplo 2 filtros
cond1 = Func.col('vendas') > 20
cond2 = Func.col('vendas') < 40
despachantes.select('id', 'nome', 'vendas').where(cond1 & cond2).show()

+---+-------------------+------+
| id|               nome|vendas|
+---+-------------------+------+
|  1|   Carminda Pestana|    23|
|  2|    Deolinda Vilela|    34|
|  3|   Emídio Dornelles|    34|
|  4|Felisbela Dornelles|    36|
|  6|   Matilde Rebouças|    22|
+---+-------------------+------+



In [80]:
# renomeando colunas - é necessário criar um novo dataframe
novodf = despachantes.withColumnRenamed("nome", "nomes")
novodf.columns

['id', 'nomes', 'status', 'cidade', 'vendas', 'data']

In [84]:
# mudar tipo da coluna data para datetime
despachantes2 = despachantes.withColumn("data2", Func.to_timestamp(Func.col("data"), "yyyy-MM-dd"))
despachantes2.schema

StructType(List(StructField(id,IntegerType,true),StructField(nome,StringType,true),StructField(status,StringType,true),StructField(cidade,StringType,true),StructField(vendas,IntegerType,true),StructField(data,StringType,true),StructField(data2,TimestampType,true)))

In [91]:
# funções de datas em colunas string
despachantes2.select("nome", Func.year("data")) \
             .orderBy("nome") \
             .show()

+-------------------+----------+
|               nome|year(data)|
+-------------------+----------+
|   Carminda Pestana|      2020|
|    Deolinda Vilela|      2020|
|   Emídio Dornelles|      2020|
|Felisbela Dornelles|      2020|
|     Graça Ornellas|      2020|
|   Matilde Rebouças|      2019|
|    Noêmia   Orriça|      2019|
|      Roque Vásquez|      2020|
|      Uriel Queiroz|      2018|
|   Viviana Sequeira|      2020|
+-------------------+----------+



In [94]:
# funções de datas em colunas string - groupby
despachantes2.select("data") \
             .groupBy(Func.year("data")) \
             .count() \
             .show()

+----------+-----+
|year(data)|count|
+----------+-----+
|      2018|    1|
|      2019|    2|
|      2020|    7|
+----------+-----+

