# Engenharia de Dados com Pyspark


- **Task**
  - Converta os arquivos em csv para parquet e os envie para processing zone.

- **Dataset**
  - Usaremos esse dataset https://www.kaggle.com/nhs/general-practice-prescribing-data

### Modos de leitura
- **permissive**: *Define todos os campos para NULL quando encontra registros corrompidos e coloca todos registros corrompidos em uma coluna chamada _corrupt_record.* (default)

- **dropMalformed**: *Apaga uma linha corrompida ou que este não consiga ler.*

- **failFast**: *Falha imediatamente quando encontra uma linha que não consiga ler.*

In [None]:
from pyspark.sql import SparkSession

from pyspark.sql.functions import *

from pyspark.sql.types import *

In [None]:
spark = (
    SparkSession.builder
    .master('local')
    .appName('Scripts_Pyspark_1')
    .getOrCreate()
)

In [None]:
# ler arquivos vários arquivos csv do dbfs com spark
# Lendo todos os arquivos .csv do diretório bigdata (>4GB)
df = spark.read.csv('sample_data/tables/bigdata/*.csv', header=True, inferSchema=True)
df.show(5)


+--------+--------+--------+-----+----+--------+--------+
|practice|bnf_code|bnf_name|items| nic|act_cost|quantity|
+--------+--------+--------+-----+----+--------+--------+
|    1596|   13320|   22721|    1|3.64|    3.38|     112|
|    1596|   20494|     263|    1|1.25|    1.27|      42|
|    1596|   19136|   17446|    1|0.56|    0.63|      14|
|    1596|   17512|   16983|    3|1.95|    2.04|      49|
|    1596|    2653|   12675|    2|2.28|    2.34|      84|
+--------+--------+--------+-----+----+--------+--------+
only showing top 5 rows



In [None]:
df.printSchema()

root
 |-- practice: integer (nullable = true)
 |-- bnf_code: integer (nullable = true)
 |-- bnf_name: integer (nullable = true)
 |-- items: integer (nullable = true)
 |-- nic: double (nullable = true)
 |-- act_cost: double (nullable = true)
 |-- quantity: integer (nullable = true)



In [None]:
# imprime as 10 primeiras linhas do dataframe
display(df.head(10))

[Row(practice=1596, bnf_code=13320, bnf_name=22721, items=1, nic=3.64, act_cost=3.38, quantity=112),
 Row(practice=1596, bnf_code=20494, bnf_name=263, items=1, nic=1.25, act_cost=1.27, quantity=42),
 Row(practice=1596, bnf_code=19136, bnf_name=17446, items=1, nic=0.56, act_cost=0.63, quantity=14),
 Row(practice=1596, bnf_code=17512, bnf_name=16983, items=3, nic=1.95, act_cost=2.04, quantity=49),
 Row(practice=1596, bnf_code=2653, bnf_name=12675, items=2, nic=2.28, act_cost=2.34, quantity=84),
 Row(practice=1596, bnf_code=12551, bnf_name=1282, items=1, nic=0.71, act_cost=0.77, quantity=42),
 Row(practice=1596, bnf_code=27926, bnf_name=17643, items=1, nic=39.67, act_cost=36.84, quantity=14),
 Row(practice=1596, bnf_code=12518, bnf_name=12939, items=1, nic=211.12, act_cost=195.56, quantity=56),
 Row(practice=1596, bnf_code=1674, bnf_name=23525, items=2, nic=3.52, act_cost=3.28, quantity=56),
 Row(practice=1596, bnf_code=8979, bnf_name=18703, items=1, nic=13.06, act_cost=12.1, quantity=150

In [None]:
# conta a quantidade de linhas
df.count()

24692437

#### Leva os dados convertidos para a Processing Zone

- *Atente para NÃO escrever e ler arquivos parquet em versoes diferentes*

In [None]:
# Converte para formato parquet
df.write.format("parquet")\
.mode("overwrite")\
.save("sample_data/tables/processing/df-parquet-file.parquet")

In [None]:
# lendo arquivos parquet
# atente para a velocidade de leitura

df_parquet = spark.read.format("parquet")\
.load("sample_data/tables/processing/df-parquet-file.parquet")

In [None]:
# conta a quantidade de linhas do dataframe
df_parquet.count()

24692437

In [None]:
display(df_parquet.head(10))

[Row(practice=1596, bnf_code=17512, bnf_name=16983, items=2, nic=1.56, act_cost=1.57, quantity=42),
 Row(practice=1596, bnf_code=5413, bnf_name=7316, items=2, nic=0.77, act_cost=0.94, quantity=21),
 Row(practice=1596, bnf_code=22566, bnf_name=22100, items=1, nic=13.93, act_cost=13.02, quantity=42),
 Row(practice=1596, bnf_code=25587, bnf_name=16124, items=2, nic=0.61, act_cost=0.79, quantity=24),
 Row(practice=1596, bnf_code=2653, bnf_name=12675, items=1, nic=0.11, act_cost=0.21, quantity=4),
 Row(practice=1596, bnf_code=12551, bnf_name=1282, items=2, nic=1.92, act_cost=2.0, quantity=112),
 Row(practice=1596, bnf_code=18938, bnf_name=10575, items=2, nic=1.48, act_cost=1.6, quantity=56),
 Row(practice=1596, bnf_code=4452, bnf_name=18603, items=1, nic=52.78, act_cost=49.04, quantity=28),
 Row(practice=1596, bnf_code=2827, bnf_name=5592, items=1, nic=1.46, act_cost=1.47, quantity=21),
 Row(practice=1596, bnf_code=8979, bnf_name=18703, items=1, nic=13.01, act_cost=12.07, quantity=150)]

In [None]:
#Add columns to DataFrame using SQL
df_parquet.createOrReplaceTempView("view_df_parquet")

spark.sql("SELECT BNF_CODE as Bnf_code \
                  ,SUM(ACT_COST) as Soma_Act_cost \
                  ,SUM(QUANTITY) as Soma_Quantity \
                  ,SUM(ITEMS) as Soma_items \
                  ,SUM(ACT_COST) as Media_Act_cost \
           FROM view_df_parquet \
           GROUP BY bnf_code").show()

+--------+------------------+-------------+----------+------------------+
|Bnf_code|     Soma_Act_cost|Soma_Quantity|Soma_items|    Media_Act_cost|
+--------+------------------+-------------+----------+------------------+
|    7253| 589656.8299999997|      6973782|    137361| 589656.8299999997|
|   18498|1580820.3399999859|        20448|     20414|1580820.3399999859|
|    8389| 599873.0799999982|      4553570|    357061| 599873.0799999982|
|     463|175470.41000000224|       995524|     16698|175470.41000000224|
|   28088|3891.4799999999987|          315|       288|3891.4799999999987|
|   23364|246824.15000000093|       137479|      2195|246824.15000000093|
|   23271|28598.859999999935|       117018|      3716|28598.859999999935|
|    9376|219164.35000000015|       736315|     14307|219164.35000000015|
|   16861|43350.130000000485|      1266050|     27988|43350.130000000485|
|   18866| 2282.600000000001|         7801|       145| 2282.600000000001|
|   10206| 46688.25999999992|      272

# Avançando com Pyspark


#### Criando um schema
- A opção **infer_schema** nem sempre vai definir o melhor datatype.
- Melhora a performance na leitura de grandes bases.
- Permite uma customização dos tipos das colunas.
- É importante saber para reescrita de aplicações. (Códigos pandas)

In [None]:
# Lendo o arquivo de dados
arquivo = "sample_data/flights/"

In [None]:
# lendo o arquivo previamente com a opção inferSchema ligada
df = spark \
.read \
.option("inferSchema", "True")\
.option("header", "True")\
.csv(arquivo)

In [None]:
# imprime o schema do dataframe (infer_schema=True)
df.printSchema()

root
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: integer (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: integer (nullable = true)
 |-- DEPARTURE_TIME: string (nullable = true)
 |-- DEPARTURE_DELAY: string (nullable = true)
 |-- TAXI_OUT: string (nullable = true)
 |-- WHEELS_OFF: string (nullable = true)
 |-- SCHEDULED_TIME: integer (nullable = true)
 |-- ELAPSED_TIME: string (nullable = true)
 |-- AIR_TIME: string (nullable = true)
 |-- DISTANCE: integer (nullable = true)
 |-- WHEELS_ON: string (nullable = true)
 |-- TAXI_IN: string (nullable = true)
 |-- SCHEDULED_ARRIVAL: integer (nullable = true)
 |-- ARRIVAL_TIME: string (nullable = true)
 |-- ARRIVAL_DELAY: string (nullable = tru

In [None]:
display(df)

DataFrame[YEAR: int, MONTH: int, DAY: int, DAY_OF_WEEK: int, AIRLINE: string, FLIGHT_NUMBER: int, TAIL_NUMBER: string, ORIGIN_AIRPORT: string, DESTINATION_AIRPORT: string, SCHEDULED_DEPARTURE: int, DEPARTURE_TIME: string, DEPARTURE_DELAY: string, TAXI_OUT: string, WHEELS_OFF: string, SCHEDULED_TIME: int, ELAPSED_TIME: string, AIR_TIME: string, DISTANCE: int, WHEELS_ON: string, TAXI_IN: string, SCHEDULED_ARRIVAL: int, ARRIVAL_TIME: string, ARRIVAL_DELAY: string, DIVERTED: int, CANCELLED: int, CANCELLATION_REASON: string, AIR_SYSTEM_DELAY: string, SECURITY_DELAY: string, AIRLINE_DELAY: string, LATE_AIRCRAFT_DELAY: string, WEATHER_DELAY: string]

In [None]:
# usa o objeto StructType
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, DoubleType, TimestampType

schema_df = StructType([
    StructField("date", StringType()),
    StructField("delay", IntegerType()),
    StructField("distance", IntegerType()),
    StructField("origin", StringType()),
    StructField("destination", StringType())
])

In [None]:
# verificando o tipo da variável schema_df
type(schema_df)

In [None]:
# usando o parâmetro schema()
df = spark.read.format("csv")\
.option("header", "True")\
.schema(schema_df)\
.load(arquivo)

In [None]:
# imprime o schema do dataframe.
df.printSchema()

root
 |-- date: string (nullable = true)
 |-- delay: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- destination: string (nullable = true)



In [None]:
# imprime 10 primeiras linhas do dataframe.
df.show(10)

+----+-----+--------+------+-----------+
|date|delay|distance|origin|destination|
+----+-----+--------+------+-----------+
|2015|    1|       1|     4|         AS|
|2015|    1|       1|     4|         AA|
|2015|    1|       1|     4|         US|
|2015|    1|       1|     4|         AA|
|2015|    1|       1|     4|         AS|
|2015|    1|       1|     4|         DL|
|2015|    1|       1|     4|         NK|
|2015|    1|       1|     4|         US|
|2015|    1|       1|     4|         AA|
|2015|    1|       1|     4|         DL|
+----+-----+--------+------+-----------+
only showing top 10 rows



In [None]:
# imprime o tipo da varia'vel df
type(df)

In [None]:
# retorna as primeiras 10 linhas do dataframe em formato de array.
df.take(10)

[Row(date='2015', delay=1, distance=1, origin='4', destination='AS'),
 Row(date='2015', delay=1, distance=1, origin='4', destination='AA'),
 Row(date='2015', delay=1, distance=1, origin='4', destination='US'),
 Row(date='2015', delay=1, distance=1, origin='4', destination='AA'),
 Row(date='2015', delay=1, distance=1, origin='4', destination='AS'),
 Row(date='2015', delay=1, distance=1, origin='4', destination='DL'),
 Row(date='2015', delay=1, distance=1, origin='4', destination='NK'),
 Row(date='2015', delay=1, distance=1, origin='4', destination='US'),
 Row(date='2015', delay=1, distance=1, origin='4', destination='AA'),
 Row(date='2015', delay=1, distance=1, origin='4', destination='DL')]

In [None]:
# imprime a quantidade de linhas no dataframe.
df.count()

1000

In [None]:
from pyspark.sql.functions import max
df.select(max("delay")).take(1)

[Row(max(delay)=1)]

In [None]:
# Filtrando linhas de um dataframe usando filter
df.filter("delay < 2").show(2)

+----+-----+--------+------+-----------+
|date|delay|distance|origin|destination|
+----+-----+--------+------+-----------+
|2015|    1|       1|     4|         AS|
|2015|    1|       1|     4|         AA|
+----+-----+--------+------+-----------+
only showing top 2 rows



In [None]:
# Usando where (um alias para o metodo filter)
df.where("delay < 2").show(2)

+----+-----+--------+------+-----------+
|date|delay|distance|origin|destination|
+----+-----+--------+------+-----------+
|2015|    1|       1|     4|         AS|
|2015|    1|       1|     4|         AA|
+----+-----+--------+------+-----------+
only showing top 2 rows



In [None]:
# ordena o dataframe pela coluna delay
df.sort("delay").show(5)

+----+-----+--------+------+-----------+
|date|delay|distance|origin|destination|
+----+-----+--------+------+-----------+
|2015|    1|       1|     4|         AS|
|2015|    1|       1|     4|         AA|
|2015|    1|       1|     4|         US|
|2015|    1|       1|     4|         AA|
|2015|    1|       1|     4|         AS|
+----+-----+--------+------+-----------+
only showing top 5 rows



In [None]:
from pyspark.sql.functions import desc, asc, expr
# ordenando por ordem crescente
df.orderBy(expr("delay desc")).show(10)

+----+-----+--------+------+-----------+
|date|delay|distance|origin|destination|
+----+-----+--------+------+-----------+
|2015|    1|       1|     4|         AS|
|2015|    1|       1|     4|         AA|
|2015|    1|       1|     4|         US|
|2015|    1|       1|     4|         AA|
|2015|    1|       1|     4|         AS|
|2015|    1|       1|     4|         DL|
|2015|    1|       1|     4|         NK|
|2015|    1|       1|     4|         US|
|2015|    1|       1|     4|         AA|
|2015|    1|       1|     4|         DL|
+----+-----+--------+------+-----------+
only showing top 10 rows



In [None]:
# visualizando estatísticas descritivas
df.describe().show()

+-------+------+-----+--------+------+-----------+
|summary|  date|delay|distance|origin|destination|
+-------+------+-----+--------+------+-----------+
|  count|  1000| 1000|    1000|  1000|       1000|
|   mean|2015.0|  1.0|     1.0|   4.0|       NULL|
| stddev|   0.0|  0.0|     0.0|   0.0|       NULL|
|    min|  2015|    1|       1|     4|         AA|
|    max|  2015|    1|       1|     4|         WN|
+-------+------+-----+--------+------+-----------+



In [None]:
# iterando sobre todas as linhas do dataframe
for i in df.collect():
  #print (i)
  print(i[0], i[1], i[2] * 2)

2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2015 1 2
2

In [None]:
# Adicionando uma coluna ao dataframe
df = df.withColumn('Nova Coluna',df['delay']+2)
df.show(10)

+----+-----+--------+------+-----------+-----------+
|date|delay|distance|origin|destination|Nova Coluna|
+----+-----+--------+------+-----------+-----------+
|2015|    1|       1|     4|         AS|          3|
|2015|    1|       1|     4|         AA|          3|
|2015|    1|       1|     4|         US|          3|
|2015|    1|       1|     4|         AA|          3|
|2015|    1|       1|     4|         AS|          3|
|2015|    1|       1|     4|         DL|          3|
|2015|    1|       1|     4|         NK|          3|
|2015|    1|       1|     4|         US|          3|
|2015|    1|       1|     4|         AA|          3|
|2015|    1|       1|     4|         DL|          3|
+----+-----+--------+------+-----------+-----------+
only showing top 10 rows



In [None]:
# Reovendo coluna
df = df.drop('Nova Coluna')
df.show(10)

+----+-----+--------+------+-----------+
|date|delay|distance|origin|destination|
+----+-----+--------+------+-----------+
|2015|    1|       1|     4|         AS|
|2015|    1|       1|     4|         AA|
|2015|    1|       1|     4|         US|
|2015|    1|       1|     4|         AA|
|2015|    1|       1|     4|         AS|
|2015|    1|       1|     4|         DL|
|2015|    1|       1|     4|         NK|
|2015|    1|       1|     4|         US|
|2015|    1|       1|     4|         AA|
|2015|    1|       1|     4|         DL|
+----+-----+--------+------+-----------+
only showing top 10 rows



In [None]:
# Renomenando uma coluna no dataframe
df.withColumnRenamed('Nova Coluna','New Column').show()

+----+-----+--------+------+-----------+
|date|delay|distance|origin|destination|
+----+-----+--------+------+-----------+
|2015|    1|       1|     4|         AS|
|2015|    1|       1|     4|         AA|
|2015|    1|       1|     4|         US|
|2015|    1|       1|     4|         AA|
|2015|    1|       1|     4|         AS|
|2015|    1|       1|     4|         DL|
|2015|    1|       1|     4|         NK|
|2015|    1|       1|     4|         US|
|2015|    1|       1|     4|         AA|
|2015|    1|       1|     4|         DL|
|2015|    1|       1|     4|         DL|
|2015|    1|       1|     4|         AA|
|2015|    1|       1|     4|         DL|
|2015|    1|       1|     4|         DL|
|2015|    1|       1|     4|         DL|
|2015|    1|       1|     4|         AS|
|2015|    1|       1|     4|         DL|
|2015|    1|       1|     4|         UA|
|2015|    1|       1|     4|         AS|
|2015|    1|       1|     4|         DL|
+----+-----+--------+------+-----------+
only showing top

#### Trabalhando com missing values
- Tratamento de dados e limpeza de dados

In [None]:
# checa valoes null na coluna delay
df.filter("delay is NULL").show()

+----+-----+--------+------+-----------+
|date|delay|distance|origin|destination|
+----+-----+--------+------+-----------+
+----+-----+--------+------+-----------+



In [None]:
# conta a quantidade de linhas nulas
print ("Valores nulos coluna Delay: {0}".format(df.filter("delay is NULL").count()))
print ("Valores nulos coluna Date: {0}".format(df.filter("date is NULL").count()))
print ("Valores nulos coluna Distance: {0}".format(df.filter("distance is NULL").count()))
print ("Valores nulos coluna Origin: {0}".format(df.filter("origin is NULL").count()))
print ("Valores nulos coluna Destination: {0}".format(df.filter("destination is NULL").count()))

Valores nulos coluna Delay: 0
Valores nulos coluna Date: 0
Valores nulos coluna Distance: 0
Valores nulos coluna Origin: 0
Valores nulos coluna Destination: 0


In [None]:
# preenche os dados missing com o valor 0
# para fazer o preenchimento sobrescreva a variável df e retire o método show()
df = df.na.fill(value=0)

In [None]:
# checa valoes null na coluna delay
df.filter("delay is NULL").show()

+----+-----+--------+------+-----------+
|date|delay|distance|origin|destination|
+----+-----+--------+------+-----------+
+----+-----+--------+------+-----------+



In [None]:
# preenche valores missing com valor 0 apenas da coluna delay
df.na.fill(value=0, subset=['delay']).show()

+----+-----+--------+------+-----------+
|date|delay|distance|origin|destination|
+----+-----+--------+------+-----------+
|2015|    1|       1|     4|         AS|
|2015|    1|       1|     4|         AA|
|2015|    1|       1|     4|         US|
|2015|    1|       1|     4|         AA|
|2015|    1|       1|     4|         AS|
|2015|    1|       1|     4|         DL|
|2015|    1|       1|     4|         NK|
|2015|    1|       1|     4|         US|
|2015|    1|       1|     4|         AA|
|2015|    1|       1|     4|         DL|
|2015|    1|       1|     4|         DL|
|2015|    1|       1|     4|         AA|
|2015|    1|       1|     4|         DL|
|2015|    1|       1|     4|         DL|
|2015|    1|       1|     4|         DL|
|2015|    1|       1|     4|         AS|
|2015|    1|       1|     4|         DL|
|2015|    1|       1|     4|         UA|
|2015|    1|       1|     4|         AS|
|2015|    1|       1|     4|         DL|
+----+-----+--------+------+-----------+
only showing top

In [None]:
# imprime o dataframe
df.show()

+----+-----+--------+------+-----------+
|date|delay|distance|origin|destination|
+----+-----+--------+------+-----------+
|2015|    1|       1|     4|         AS|
|2015|    1|       1|     4|         AA|
|2015|    1|       1|     4|         US|
|2015|    1|       1|     4|         AA|
|2015|    1|       1|     4|         AS|
|2015|    1|       1|     4|         DL|
|2015|    1|       1|     4|         NK|
|2015|    1|       1|     4|         US|
|2015|    1|       1|     4|         AA|
|2015|    1|       1|     4|         DL|
|2015|    1|       1|     4|         DL|
|2015|    1|       1|     4|         AA|
|2015|    1|       1|     4|         DL|
|2015|    1|       1|     4|         DL|
|2015|    1|       1|     4|         DL|
|2015|    1|       1|     4|         AS|
|2015|    1|       1|     4|         DL|
|2015|    1|       1|     4|         UA|
|2015|    1|       1|     4|         AS|
|2015|    1|       1|     4|         DL|
+----+-----+--------+------+-----------+
only showing top

In [None]:
# preenche os dados com valores de string vazia
df.na.fill("").show()

+----+-----+--------+------+-----------+
|date|delay|distance|origin|destination|
+----+-----+--------+------+-----------+
|2015|    1|       1|     4|         AS|
|2015|    1|       1|     4|         AA|
|2015|    1|       1|     4|         US|
|2015|    1|       1|     4|         AA|
|2015|    1|       1|     4|         AS|
|2015|    1|       1|     4|         DL|
|2015|    1|       1|     4|         NK|
|2015|    1|       1|     4|         US|
|2015|    1|       1|     4|         AA|
|2015|    1|       1|     4|         DL|
|2015|    1|       1|     4|         DL|
|2015|    1|       1|     4|         AA|
|2015|    1|       1|     4|         DL|
|2015|    1|       1|     4|         DL|
|2015|    1|       1|     4|         DL|
|2015|    1|       1|     4|         AS|
|2015|    1|       1|     4|         DL|
|2015|    1|       1|     4|         UA|
|2015|    1|       1|     4|         AS|
|2015|    1|       1|     4|         DL|
+----+-----+--------+------+-----------+
only showing top

In [None]:
# remove qualquer linha nula de qualquer coluna
df = df.na.drop()

In [None]:
# obtem o valor máximo da coluna delay
from pyspark.sql.functions import max
df.select(max("delay")).take(1)

[Row(max(delay)=1)]

In [None]:
# Filtrando linhas de um dataframe usando filter
df.filter("delay < 2").show(2)

+----+-----+--------+------+-----------+
|date|delay|distance|origin|destination|
+----+-----+--------+------+-----------+
|2015|    1|       1|     4|         AS|
|2015|    1|       1|     4|         AA|
+----+-----+--------+------+-----------+
only showing top 2 rows



#### Manipulando Strings

In [None]:
# lendo os arquivos de dados de voos (2010_summary.csv...2015_summary.csv)
df = spark\
.read\
.option("inferSchema", "True")\
.option("header", "True")\
.csv("sample_data/tables/bigdata/*.csv")

In [None]:
# imprime 10 linhas do dataframe
df.show(10)

+--------+--------+--------+-----+------+--------+--------+
|practice|bnf_code|bnf_name|items|   nic|act_cost|quantity|
+--------+--------+--------+-----+------+--------+--------+
|    1596|   13320|   22721|    1|  3.64|    3.38|     112|
|    1596|   20494|     263|    1|  1.25|    1.27|      42|
|    1596|   19136|   17446|    1|  0.56|    0.63|      14|
|    1596|   17512|   16983|    3|  1.95|    2.04|      49|
|    1596|    2653|   12675|    2|  2.28|    2.34|      84|
|    1596|   12551|    1282|    1|  0.71|    0.77|      42|
|    1596|   27926|   17643|    1| 39.67|   36.84|      14|
|    1596|   12518|   12939|    1|211.12|  195.56|      56|
|    1596|    1674|   23525|    2|  3.52|    3.28|      56|
|    1596|    8979|   18703|    1| 13.06|    12.1|     150|
+--------+--------+--------+-----+------+--------+--------+
only showing top 10 rows



In [None]:
# imprime a quantidade de registros do dataframe
df.count()

1496

In [None]:
from pyspark.sql.functions import lower, upper, col
df.select(col("DEST_COUNTRY_NAME"),lower(col("DEST_COUNTRY_NAME")),upper(lower(col("DEST_COUNTRY_NAME")))).show(10)

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `DEST_COUNTRY_NAME` cannot be resolved. Did you mean one of the following? [`"count":15}`, `"DEST_COUNTRY_NAME":"United States"`, `{"ORIGIN_COUNTRY_NAME":"Romania"`].;
'Project ['DEST_COUNTRY_NAME, unresolvedalias(lower('DEST_COUNTRY_NAME), Some(org.apache.spark.sql.Column$$Lambda$3070/0x0000000841312040@729e0181)), unresolvedalias(upper(lower('DEST_COUNTRY_NAME)), Some(org.apache.spark.sql.Column$$Lambda$3070/0x0000000841312040@729e0181))]
+- Relation [{"ORIGIN_COUNTRY_NAME":"Romania"#1178,"DEST_COUNTRY_NAME":"United States"#1179,"count":15}#1180] csv


In [None]:
# remove espaços em branco a esquerda
from pyspark.sql.functions import ltrim
df.select(ltrim(col("DEST_COUNTRY_NAME"))).show(2)

In [None]:
# remove espaços a direita
from pyspark.sql.functions import rtrim
df.select(rtrim(col("DEST_COUNTRY_NAME"))).show(2)

In [None]:
# todas as operações juntas..
# a função lit cria uma coluna na cópia do dataframe
from pyspark.sql.functions import lit, ltrim, rtrim, rpad, lpad, trim
df.select(
ltrim(lit(" HELLO ")).alias("ltrim"),
rtrim(lit(" HELLO ")).alias("rtrim"),
trim(lit(" HELLO ")).alias("trim"),
lpad(lit("HELLO"), 3, " ").alias("lp"),
rpad(lit("HELLO"), 10, " ").alias("rp")).show(2)

+------+------+-----+---+----------+
| ltrim| rtrim| trim| lp|        rp|
+------+------+-----+---+----------+
|HELLO | HELLO|HELLO|HEL|HELLO     |
|HELLO | HELLO|HELLO|HEL|HELLO     |
+------+------+-----+---+----------+
only showing top 2 rows



Estatística descritiva básica:
- **mean()** - Retorna o valor médio de cada grupo.
- **max()** - Retorna o valor máximo de cada grupo.
- **min()** - Retorna o valor mínimo de cada grupo.
- **sum()** - Retorna a soma de todos os valores do grupo.
- **avg()** - Retorna o valor médio de cada grupo.

In [None]:
# ler o dataset retail-data
df = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("sample_data/tables/retail/retail_2010_12_01.csv")

In [None]:
# imprime as 10 primeiras linhas do dataframe
df.show(10)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|2010-12-01 08:26:00|     7.65|   17850.0|United Kingdom|
|   536365|    21730|GLASS S

In [None]:
# Soma preços unitários por país
df.groupBy("Country").sum("UnitPrice").show()

+--------------+------------------+
|       Country|    sum(UnitPrice)|
+--------------+------------------+
|       Germany| 93.82000000000002|
|        France|             55.29|
|          EIRE|133.64000000000001|
|        Norway|            102.67|
|     Australia|              73.9|
|United Kingdom|12428.080000000024|
|   Netherlands|             16.85|
+--------------+------------------+



In [None]:
# Conta a quantidade de países distintos.
df.groupBy("Country").count().show()

+--------------+-----+
|       Country|count|
+--------------+-----+
|       Germany|   29|
|        France|   20|
|          EIRE|   21|
|        Norway|   73|
|     Australia|   14|
|United Kingdom| 2949|
|   Netherlands|    2|
+--------------+-----+



In [None]:
# retorna o valor mínimo por grupo
df.groupBy("Country").min("UnitPrice").show()

+--------------+--------------+
|       Country|min(UnitPrice)|
+--------------+--------------+
|       Germany|          0.42|
|        France|          0.42|
|          EIRE|          0.65|
|        Norway|          0.29|
|     Australia|          0.85|
|United Kingdom|           0.0|
|   Netherlands|          1.85|
+--------------+--------------+



In [None]:
# retorna o valor máximo por grupo
df.groupBy("Country").max("UnitPrice").show()

+--------------+--------------+
|       Country|max(UnitPrice)|
+--------------+--------------+
|       Germany|          18.0|
|        France|          18.0|
|          EIRE|          50.0|
|        Norway|          7.95|
|     Australia|           8.5|
|United Kingdom|        607.49|
|   Netherlands|          15.0|
+--------------+--------------+



In [None]:
# retorna o valor médio por grupo
df.groupBy("Country").avg("UnitPrice").show()

+--------------+------------------+
|       Country|    avg(UnitPrice)|
+--------------+------------------+
|       Germany| 3.235172413793104|
|        France|            2.7645|
|          EIRE|6.3638095238095245|
|        Norway|1.4064383561643836|
|     Australia| 5.278571428571429|
|United Kingdom|4.2143370634113335|
|   Netherlands|             8.425|
+--------------+------------------+



In [None]:
# retorna o valor médio por grupo
df.groupBy("Country").mean("UnitPrice").show()

+--------------+------------------+
|       Country|    avg(UnitPrice)|
+--------------+------------------+
|       Germany| 3.235172413793104|
|        France|            2.7645|
|          EIRE|6.3638095238095245|
|        Norway|1.4064383561643836|
|     Australia| 5.278571428571429|
|United Kingdom|4.2143370634113335|
|   Netherlands|             8.425|
+--------------+------------------+



In [None]:
# GroupBy várias colunas
df.groupBy("Country","CustomerID") \
    .sum("UnitPrice") \
    .show()

+--------------+----------+------------------+
|       Country|CustomerID|    sum(UnitPrice)|
+--------------+----------+------------------+
|United Kingdom|   17420.0| 38.99999999999999|
|United Kingdom|   15922.0|              48.5|
|United Kingdom|   16250.0|             47.27|
|United Kingdom|   13065.0| 73.11000000000001|
|United Kingdom|   18074.0|62.150000000000006|
|United Kingdom|   16048.0|12.969999999999999|
|       Germany|   12472.0|             49.45|
|United Kingdom|   18085.0|              34.6|
|United Kingdom|   17905.0|109.90000000000003|
|United Kingdom|   17841.0|254.63999999999982|
|United Kingdom|   15291.0|               6.0|
|United Kingdom|   17951.0|22.000000000000004|
|United Kingdom|   13255.0|27.299999999999997|
|United Kingdom|   17690.0|              34.8|
|United Kingdom|   18229.0|             48.65|
|United Kingdom|   15605.0| 58.20000000000002|
|United Kingdom|   18011.0| 66.10999999999999|
|United Kingdom|   17809.0|              1.45|
|United Kingd

#### Trabalhando com datas
- Existem diversas funçoes em Pyspark para manipular datas e timestamp.
- Evite escrever suas próprias funçoes para isso.
- Algumas funcoes mais usadas:
    - current_day():
    - date_format(dateExpr,format):
    - to_date():
    - to_date(column, fmt):
    - add_months(Column, numMonths):
    - date_add(column, days):
    - date_sub(column, days):
    - datediff(end, start)
    - current_timestamp():
    - hour(column):

In [None]:
# imprime o dataframe
df.show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|2010-12-01 08:26:00|     7.65|   17850.0|United Kingdom|
|   536365|    21730|GLASS S

In [None]:
# imprime o schema
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [None]:
from pyspark.sql.functions import *
#current_date() = imprime
df.select(current_date().alias("current_date")).show(1)

+------------+
|current_date|
+------------+
|  2025-02-06|
+------------+
only showing top 1 row



In [None]:
# formata valores de data
df.select(col("InvoiceDate"), \
          date_format(col("InvoiceDate"), "dd/MM/yyyy hh:mm:ss")\
          .alias("Formato Brasil")).show()

+-------------------+-------------------+
|        InvoiceDate|     Formato Brasil|
+-------------------+-------------------+
|2010-12-01 08:26:00|01/12/2010 08:26:00|
|2010-12-01 08:26:00|01/12/2010 08:26:00|
|2010-12-01 08:26:00|01/12/2010 08:26:00|
|2010-12-01 08:26:00|01/12/2010 08:26:00|
|2010-12-01 08:26:00|01/12/2010 08:26:00|
|2010-12-01 08:26:00|01/12/2010 08:26:00|
|2010-12-01 08:26:00|01/12/2010 08:26:00|
|2010-12-01 08:28:00|01/12/2010 08:28:00|
|2010-12-01 08:28:00|01/12/2010 08:28:00|
|2010-12-01 08:34:00|01/12/2010 08:34:00|
|2010-12-01 08:34:00|01/12/2010 08:34:00|
|2010-12-01 08:34:00|01/12/2010 08:34:00|
|2010-12-01 08:34:00|01/12/2010 08:34:00|
|2010-12-01 08:34:00|01/12/2010 08:34:00|
|2010-12-01 08:34:00|01/12/2010 08:34:00|
|2010-12-01 08:34:00|01/12/2010 08:34:00|
|2010-12-01 08:34:00|01/12/2010 08:34:00|
|2010-12-01 08:34:00|01/12/2010 08:34:00|
|2010-12-01 08:34:00|01/12/2010 08:34:00|
|2010-12-01 08:34:00|01/12/2010 08:34:00|
+-------------------+-------------

In [None]:
# imprime a diferença entre duas datas
df.select(col("InvoiceDate"),
    datediff(current_date(),col("InvoiceDate")).alias("datediff")
  ).show()

+-------------------+--------+
|        InvoiceDate|datediff|
+-------------------+--------+
|2010-12-01 08:26:00|    5181|
|2010-12-01 08:26:00|    5181|
|2010-12-01 08:26:00|    5181|
|2010-12-01 08:26:00|    5181|
|2010-12-01 08:26:00|    5181|
|2010-12-01 08:26:00|    5181|
|2010-12-01 08:26:00|    5181|
|2010-12-01 08:28:00|    5181|
|2010-12-01 08:28:00|    5181|
|2010-12-01 08:34:00|    5181|
|2010-12-01 08:34:00|    5181|
|2010-12-01 08:34:00|    5181|
|2010-12-01 08:34:00|    5181|
|2010-12-01 08:34:00|    5181|
|2010-12-01 08:34:00|    5181|
|2010-12-01 08:34:00|    5181|
|2010-12-01 08:34:00|    5181|
|2010-12-01 08:34:00|    5181|
|2010-12-01 08:34:00|    5181|
|2010-12-01 08:34:00|    5181|
+-------------------+--------+
only showing top 20 rows



In [None]:
# meses entre datas
df.select(col("InvoiceDate"),
    months_between(current_date(),col("InvoiceDate")).alias("months_between")
  ).show()

+-------------------+--------------+
|        InvoiceDate|months_between|
+-------------------+--------------+
|2010-12-01 08:26:00|   170.1499552|
|2010-12-01 08:26:00|   170.1499552|
|2010-12-01 08:26:00|   170.1499552|
|2010-12-01 08:26:00|   170.1499552|
|2010-12-01 08:26:00|   170.1499552|
|2010-12-01 08:26:00|   170.1499552|
|2010-12-01 08:26:00|   170.1499552|
|2010-12-01 08:28:00|  170.14991039|
|2010-12-01 08:28:00|  170.14991039|
|2010-12-01 08:34:00|  170.14977599|
|2010-12-01 08:34:00|  170.14977599|
|2010-12-01 08:34:00|  170.14977599|
|2010-12-01 08:34:00|  170.14977599|
|2010-12-01 08:34:00|  170.14977599|
|2010-12-01 08:34:00|  170.14977599|
|2010-12-01 08:34:00|  170.14977599|
|2010-12-01 08:34:00|  170.14977599|
|2010-12-01 08:34:00|  170.14977599|
|2010-12-01 08:34:00|  170.14977599|
|2010-12-01 08:34:00|  170.14977599|
+-------------------+--------------+
only showing top 20 rows



In [None]:
# Extrai ano, mës, próximo dia, dia da semana.
df.select(col("InvoiceDate"),
     year(col("InvoiceDate")).alias("year"),
     month(col("InvoiceDate")).alias("month"),
     next_day(col("InvoiceDate"),"Sunday").alias("next_day"),
     weekofyear(col("InvoiceDate")).alias("weekofyear")
  ).show()

+-------------------+----+-----+----------+----------+
|        InvoiceDate|year|month|  next_day|weekofyear|
+-------------------+----+-----+----------+----------+
|2010-12-01 08:26:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:26:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:26:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:26:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:26:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:26:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:26:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:28:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:28:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:34:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:34:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:34:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:34:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:34:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:34:00|2010|   12|2010-12-05|        48|
|2010-12-0

In [None]:
# Dia da semana, dia do mës, dias do ano
df.select(col("InvoiceDate"),
     dayofweek(col("InvoiceDate")).alias("dayofweek"),
     dayofmonth(col("InvoiceDate")).alias("dayofmonth"),
     dayofyear(col("InvoiceDate")).alias("dayofyear"),
  ).show()

+-------------------+---------+----------+---------+
|        InvoiceDate|dayofweek|dayofmonth|dayofyear|
+-------------------+---------+----------+---------+
|2010-12-01 08:26:00|        4|         1|      335|
|2010-12-01 08:26:00|        4|         1|      335|
|2010-12-01 08:26:00|        4|         1|      335|
|2010-12-01 08:26:00|        4|         1|      335|
|2010-12-01 08:26:00|        4|         1|      335|
|2010-12-01 08:26:00|        4|         1|      335|
|2010-12-01 08:26:00|        4|         1|      335|
|2010-12-01 08:28:00|        4|         1|      335|
|2010-12-01 08:28:00|        4|         1|      335|
|2010-12-01 08:34:00|        4|         1|      335|
|2010-12-01 08:34:00|        4|         1|      335|
|2010-12-01 08:34:00|        4|         1|      335|
|2010-12-01 08:34:00|        4|         1|      335|
|2010-12-01 08:34:00|        4|         1|      335|
|2010-12-01 08:34:00|        4|         1|      335|
|2010-12-01 08:34:00|        4|         1|    

In [None]:
# imprime o timestamp atual
df.select(current_timestamp().alias("current_timestamp")
  ).show(1,truncate=False)

+--------------------------+
|current_timestamp         |
+--------------------------+
|2025-02-06 23:24:24.384719|
+--------------------------+
only showing top 1 row



In [None]:
# retorna hora, minuto e segundo
df.select(col("InvoiceDate"),
    hour(col("InvoiceDate")).alias("hour"),
    minute(col("InvoiceDate")).alias("minute"),
    second(col("InvoiceDate")).alias("second")
  ).show()

+-------------------+----+------+------+
|        InvoiceDate|hour|minute|second|
+-------------------+----+------+------+
|2010-12-01 08:26:00|   8|    26|     0|
|2010-12-01 08:26:00|   8|    26|     0|
|2010-12-01 08:26:00|   8|    26|     0|
|2010-12-01 08:26:00|   8|    26|     0|
|2010-12-01 08:26:00|   8|    26|     0|
|2010-12-01 08:26:00|   8|    26|     0|
|2010-12-01 08:26:00|   8|    26|     0|
|2010-12-01 08:28:00|   8|    28|     0|
|2010-12-01 08:28:00|   8|    28|     0|
|2010-12-01 08:34:00|   8|    34|     0|
|2010-12-01 08:34:00|   8|    34|     0|
|2010-12-01 08:34:00|   8|    34|     0|
|2010-12-01 08:34:00|   8|    34|     0|
|2010-12-01 08:34:00|   8|    34|     0|
|2010-12-01 08:34:00|   8|    34|     0|
|2010-12-01 08:34:00|   8|    34|     0|
|2010-12-01 08:34:00|   8|    34|     0|
|2010-12-01 08:34:00|   8|    34|     0|
|2010-12-01 08:34:00|   8|    34|     0|
|2010-12-01 08:34:00|   8|    34|     0|
+-------------------+----+------+------+
only showing top

#### Condições com operadores boleanos

In [None]:
# Retorna linhas das colunas 'InvoiceNo' e 'Description' onde 'InvoiceNo' é diferente de 536365
from pyspark.sql.functions import col
df.where(col("InvoiceNo") != 536365)\
.select("InvoiceNo", "Description")\
.show(10)

+---------+--------------------+
|InvoiceNo|         Description|
+---------+--------------------+
|   536366|HAND WARMER UNION...|
|   536366|HAND WARMER RED P...|
|   536367|ASSORTED COLOUR B...|
|   536367|POPPY'S PLAYHOUSE...|
|   536367|POPPY'S PLAYHOUSE...|
|   536367|FELTCRAFT PRINCES...|
|   536367|IVORY KNITTED MUG...|
|   536367|BOX OF 6 ASSORTED...|
|   536367|BOX OF VINTAGE JI...|
|   536367|BOX OF VINTAGE AL...|
+---------+--------------------+
only showing top 10 rows



In [None]:
# usando o operador boleando com um predicado em uma expressão.
df.where("InvoiceNo <> 536365").show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536366|    22633|HAND WARMER UNION...|       6|2010-12-01 08:28:00|     1.85|   17850.0|United Kingdom|
|   536366|    22632|HAND WARMER RED P...|       6|2010-12-01 08:28:00|     1.85|   17850.0|United Kingdom|
|   536367|    84879|ASSORTED COLOUR B...|      32|2010-12-01 08:34:00|     1.69|   13047.0|United Kingdom|
|   536367|    22745|POPPY'S PLAYHOUSE...|       6|2010-12-01 08:34:00|      2.1|   13047.0|United Kingdom|
|   536367|    22748|POPPY'S PLAYHOUSE...|       6|2010-12-01 08:34:00|      2.1|   13047.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 5 rows



In [None]:
# usando o operador boleando com um predicado em uma expressão.
df.where("InvoiceNo == 536365").show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 5 rows



In [None]:
# Entendendo a ordem dos operadores boleanos
from pyspark.sql.functions import instr
priceFilter = col("UnitPrice") > 600
descripFilter = instr(df.Description, "POSTAGE") >= 1

In [None]:
# aplicando os operadores como filtros
df.where(df.StockCode.isin("DOT")).where(priceFilter | descripFilter).show()

+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|   Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
|   536544|      DOT|DOTCOM POSTAGE|       1|2010-12-01 14:32:00|   569.77|      NULL|United Kingdom|
|   536592|      DOT|DOTCOM POSTAGE|       1|2010-12-01 17:06:00|   607.49|      NULL|United Kingdom|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+



In [None]:
# Create a view ou tabela temporária.
df.createOrReplaceTempView("dfTable")

In [None]:
%sql
-- Aplicando a mesmo código em SQL
SELECT *
FROM dfTable
WHERE StockCode in ("DOT")
AND(UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1)

SyntaxError: invalid syntax (<ipython-input-105-4fb6fd3899ad>, line 2)

In [None]:
# Combinando filtros e operadores boleanos
from pyspark.sql.functions import instr
DOTCodeFilter = col("StockCode") == "DOT"
priceFilter = col("UnitPrice") > 600
descripFilter = instr(col("Description"), "POSTAGE") >= 1

In [None]:
# Combinando filtros e operadores boleanos
df.withColumn("isExpensive", DOTCodeFilter & (priceFilter | descripFilter))\
.where("isExpensive")\
.select("unitPrice", "isExpensive").show(5)

+---------+-----------+
|unitPrice|isExpensive|
+---------+-----------+
|   569.77|       true|
|   607.49|       true|
+---------+-----------+



In [None]:
%sql
-- Aplicando as mesmas ideias usando SQL
SELECT UnitPrice, (StockCode = 'DOT' AND
(UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1)) as isExpensive
FROM dfTable
WHERE (StockCode = 'DOT' AND
(UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1))

SyntaxError: invalid syntax (<ipython-input-108-daa7202491db>, line 2)

#### Comparando a performance de SQL vs Python Apis

In [None]:
## utilizando SQL
sqlWay = spark.sql("""
SELECT StockCode, count(*)
FROM dfTable
GROUP BY StockCode
""")

In [None]:
# Utilizando Python
dataFrameWay = df.groupBy("StockCode").count()

In [None]:
# imprime o plano de execução do código
sqlWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[StockCode#1326], functions=[count(1)])
   +- Exchange hashpartitioning(StockCode#1326, 200), ENSURE_REQUIREMENTS, [plan_id=1367]
      +- HashAggregate(keys=[StockCode#1326], functions=[partial_count(1)])
         +- FileScan csv [StockCode#1326] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/sample_data/tables/retail/retail_2010_12_01.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<StockCode:string>




In [None]:
# imprime o plano de execução do código
dataFrameWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[StockCode#1326], functions=[count(1)])
   +- Exchange hashpartitioning(StockCode#1326, 200), ENSURE_REQUIREMENTS, [plan_id=1380]
      +- HashAggregate(keys=[StockCode#1326], functions=[partial_count(1)])
         +- FileScan csv [StockCode#1326] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/sample_data/tables/retail/retail_2010_12_01.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<StockCode:string>




### Trabalhando com Joins

In [None]:
# Cria dataframes
pessoa = spark.createDataFrame([
(0, "João de Maria", 0, [100]),
(1, "Norma Maria", 1, [500, 250, 100]),
(2, "João de Deus", 1, [250, 100]),
(3, "Ana Maria Silva", 4, [250, 100])])\
.toDF("id", "name", "graduate_program", "spark_status")

programa_graduacao = spark.createDataFrame([
(0, "Masters", "School of Information", "UC Berkeley"),
(2, "Masters", "EECS", "UC Berkeley"),
(1, "Ph.D.", "EECS", "UC Berkeley")])\
.toDF("id", "degree", "department", "school")

status = spark.createDataFrame([
(500, "Vice President"),
(250, "PMC Member"),
(100, "Contributor")])\
.toDF("id", "status")

In [None]:
# cria tabelas para os dataframes criados acima
pessoa.createOrReplaceTempView("pessoa")
programa_graduacao.createOrReplaceTempView("programa_graduacao")
status.createOrReplaceTempView("status")

In [None]:
# imprime os dataframes criados
pessoa.show()
programa_graduacao.show()
status.show()

+---+---------------+----------------+---------------+
| id|           name|graduate_program|   spark_status|
+---+---------------+----------------+---------------+
|  0|  João de Maria|               0|          [100]|
|  1|    Norma Maria|               1|[500, 250, 100]|
|  2|   João de Deus|               1|     [250, 100]|
|  3|Ana Maria Silva|               4|     [250, 100]|
+---+---------------+----------------+---------------+

+---+-------+--------------------+-----------+
| id| degree|          department|     school|
+---+-------+--------------------+-----------+
|  0|Masters|School of Informa...|UC Berkeley|
|  2|Masters|                EECS|UC Berkeley|
|  1|  Ph.D.|                EECS|UC Berkeley|
+---+-------+--------------------+-----------+

+---+--------------+
| id|        status|
+---+--------------+
|500|Vice President|
|250|    PMC Member|
|100|   Contributor|
+---+--------------+



In [None]:
# cria um objeto com as chaves para fazer join
keys_join = pessoa["graduate_program"] == programa_graduacao['id']

In [None]:
# imprime objeto
type(keys_join)

In [None]:
# dataframe com inner join entre pessoa e programa de graduação
pessoa.join(programa_graduacao, keys_join).show()

+---+-------------+----------------+---------------+---+-------+--------------------+-----------+
| id|         name|graduate_program|   spark_status| id| degree|          department|     school|
+---+-------------+----------------+---------------+---+-------+--------------------+-----------+
|  0|João de Maria|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  1|  Norma Maria|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  2| João de Deus|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
+---+-------------+----------------+---------------+---+-------+--------------------+-----------+



In [None]:
# dataframe com inner join entre pessoa e programa de graduação
# sintaxe join(dataframealvo, condição-de-join, tipo-de-join)

pessoa.join(programa_graduacao, pessoa["graduate_program"] == programa_graduacao['id'], 'inner').show()

+---+-------------+----------------+---------------+---+-------+--------------------+-----------+
| id|         name|graduate_program|   spark_status| id| degree|          department|     school|
+---+-------------+----------------+---------------+---+-------+--------------------+-----------+
|  0|João de Maria|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  1|  Norma Maria|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  2| João de Deus|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
+---+-------------+----------------+---------------+---+-------+--------------------+-----------+



In [None]:
%sql
-- Inner join em SQL
SELECT *
FROM pessoa INNER JOIN programa_graduacao
ON pessoa.graduate_program = programa_graduacao.id

SyntaxError: invalid syntax (<ipython-input-120-b5335dc5bb1f>, line 2)

In [None]:
# Outer joins: retorna null para linhas que não existam em um dos dataframes e retorna qualquer dado em qualquer dataframe caso exista a chave
join_type = "outer"
pessoa.join(programa_graduacao, keys_join, join_type).show()

+----+---------------+----------------+---------------+----+-------+--------------------+-----------+
|  id|           name|graduate_program|   spark_status|  id| degree|          department|     school|
+----+---------------+----------------+---------------+----+-------+--------------------+-----------+
|   0|  João de Maria|               0|          [100]|   0|Masters|School of Informa...|UC Berkeley|
|   1|    Norma Maria|               1|[500, 250, 100]|   1|  Ph.D.|                EECS|UC Berkeley|
|   2|   João de Deus|               1|     [250, 100]|   1|  Ph.D.|                EECS|UC Berkeley|
|NULL|           NULL|            NULL|           NULL|   2|Masters|                EECS|UC Berkeley|
|   3|Ana Maria Silva|               4|     [250, 100]|NULL|   NULL|                NULL|       NULL|
+----+---------------+----------------+---------------+----+-------+--------------------+-----------+



In [None]:
%sql
-- Outer join em SQL
SELECT *
FROM pessoa FULL OUTER JOIN programa_graduacao
ON pessoa.graduate_program = programa_graduacao.id

SyntaxError: invalid syntax (<ipython-input-122-b08b17b4bf76>, line 2)

In [None]:
# Left joins: retorna null para linhas que não existam no dataframe da direita
join_type = "left_outer"
pessoa.join(programa_graduacao, keys_join, join_type).show()

+---+---------------+----------------+---------------+----+-------+--------------------+-----------+
| id|           name|graduate_program|   spark_status|  id| degree|          department|     school|
+---+---------------+----------------+---------------+----+-------+--------------------+-----------+
|  0|  João de Maria|               0|          [100]|   0|Masters|School of Informa...|UC Berkeley|
|  1|    Norma Maria|               1|[500, 250, 100]|   1|  Ph.D.|                EECS|UC Berkeley|
|  2|   João de Deus|               1|     [250, 100]|   1|  Ph.D.|                EECS|UC Berkeley|
|  3|Ana Maria Silva|               4|     [250, 100]|NULL|   NULL|                NULL|       NULL|
+---+---------------+----------------+---------------+----+-------+--------------------+-----------+



In [None]:
%sql
-- Left outer join em SQL
SELECT *
FROM pessoa LEFT OUTER JOIN programa_graduacao
ON pessoa.graduate_program = programa_graduacao.id

SyntaxError: invalid syntax (<ipython-input-124-460b6a136e48>, line 2)

In [None]:
# Right joins: retorna null para linhas que não existam no dataframe a esquerda
join_type = "right_outer"
pessoa.join(programa_graduacao, keys_join, join_type).show()

+----+-------------+----------------+---------------+---+-------+--------------------+-----------+
|  id|         name|graduate_program|   spark_status| id| degree|          department|     school|
+----+-------------+----------------+---------------+---+-------+--------------------+-----------+
|   0|João de Maria|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|   2| João de Deus|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|   1|  Norma Maria|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|NULL|         NULL|            NULL|           NULL|  2|Masters|                EECS|UC Berkeley|
+----+-------------+----------------+---------------+---+-------+--------------------+-----------+



In [None]:
%sql
-- Right join em SQL
SELECT *
FROM pessoa RIGHT OUTER JOIN programa_graduacao
ON pessoa.graduate_program = programa_graduacao.id

SyntaxError: invalid syntax (<ipython-input-126-dca921c9d1d4>, line 2)

#### Condições

In [None]:
# altera a condição de join
keys_join = ((pessoa["graduate_program"] == programa_graduacao["id"]) & (pessoa["graduate_program"] > 0))
join_type = "inner"
pessoa.join(programa_graduacao, keys_join, join_type).show()

+---+------------+----------------+---------------+---+------+----------+-----------+
| id|        name|graduate_program|   spark_status| id|degree|department|     school|
+---+------------+----------------+---------------+---+------+----------+-----------+
|  1| Norma Maria|               1|[500, 250, 100]|  1| Ph.D.|      EECS|UC Berkeley|
|  2|João de Deus|               1|     [250, 100]|  1| Ph.D.|      EECS|UC Berkeley|
+---+------------+----------------+---------------+---+------+----------+-----------+



In [None]:
%sql
-- Inner join em SQL
-- adicionando uma codição where
SELECT *
FROM pessoa INNER JOIN programa_graduacao
ON pessoa.graduate_program = programa_graduacao.id
WHERE pessoa.graduate_program > 0

SyntaxError: invalid syntax (<ipython-input-128-03b48f385202>, line 2)

In [None]:
# Condições mais complexas usando expressão

from pyspark.sql.functions import expr

pessoa.withColumnRenamed("id", "personId")\
.join(status, expr("array_contains(spark_status, id)")).show()

+--------+---------------+----------------+---------------+---+--------------+
|personId|           name|graduate_program|   spark_status| id|        status|
+--------+---------------+----------------+---------------+---+--------------+
|       0|  João de Maria|               0|          [100]|100|   Contributor|
|       1|    Norma Maria|               1|[500, 250, 100]|500|Vice President|
|       1|    Norma Maria|               1|[500, 250, 100]|250|    PMC Member|
|       1|    Norma Maria|               1|[500, 250, 100]|100|   Contributor|
|       2|   João de Deus|               1|     [250, 100]|250|    PMC Member|
|       2|   João de Deus|               1|     [250, 100]|100|   Contributor|
|       3|Ana Maria Silva|               4|     [250, 100]|250|    PMC Member|
|       3|Ana Maria Silva|               4|     [250, 100]|100|   Contributor|
+--------+---------------+----------------+---------------+---+--------------+



In [None]:
%sql
-- Condições mais complexas usando expressão feitas em SQL
SELECT *
FROM
  (select id as personId
         ,name
         ,graduate_program
         ,spark_status
   FROM pessoa)
  INNER JOIN status ON array_contains(spark_status, id)

SyntaxError: invalid syntax (<ipython-input-130-2fe5cb21b9e5>, line 2)

#### Trabalhando com UDFs
- Integração de código entre as APIs
- É preciso cuidado com performance dos códigos usando UDFs

In [None]:
from pyspark.sql.types import LongType
# define a função
def quadrado(s):
  return s * s

In [None]:
# registra no banco de dados do spark e define o tipo de retorno por padrão é stringtype
from pyspark.sql.types import LongType
spark.udf.register("Func_Py_Quadrado", quadrado, LongType())

In [None]:
# gera valores aleatórios
spark.range(1, 20).show()

+---+
| id|
+---+
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+---+



In [None]:
# cria a visão View_temp
spark.range(1, 20).createOrReplaceTempView("View_temp")

In [None]:
%sql
-- Usando a função criada em python juntamente com código SQL
select id,
       Func_Py_Quadrado(id) as id_ao_quadrado
from View_temp

SyntaxError: invalid syntax (<ipython-input-135-4f1ed196c83b>, line 2)

#### UDFs com Dataframes

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
# registra a Udf
Func_Py_Quadrado = udf(quadrado, LongType())

In [None]:
# cria um dataframe apartir da tabela temporária
df = spark.table("View_temp")

In [None]:
# imprime o dataframe
df.show(10)

+---+
| id|
+---+
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
| 10|
+---+
only showing top 10 rows



In [None]:
# usando o dataframe juntamente com a Udf registrada
df.select("id", Func_Py_Quadrado("id").alias("id_quadrado")).show(20)

+---+-----------+
| id|id_quadrado|
+---+-----------+
|  1|          1|
|  2|          4|
|  3|          9|
|  4|         16|
|  5|         25|
|  6|         36|
|  7|         49|
|  8|         64|
|  9|         81|
| 10|        100|
| 11|        121|
| 12|        144|
| 13|        169|
| 14|        196|
| 15|        225|
| 16|        256|
| 17|        289|
| 18|        324|
| 19|        361|
+---+-----------+



#### Koalas
- Koalas é um projeto de código aberto que fornece um substituto imediato para os pandas.
- O pandas é comumente usado por ser um pacote que fornece estruturas de dados e ferramentas de análise de dados fáceis de usar para a linguagem de programação Python.
- O Koalas preenche essa lacuna fornecendo APIs equivalentes ao pandas que funcionam no Apache Spark.
- Koalas é útil não apenas para usuários de pandas, mas também para usuários de PySpark.
  - Koalas suporta muitas tarefas que são difíceis de fazer com PySpark, por exemplo, plotar dados diretamente de um PySpark DataFrame.
- Koalas suporta SQL diretamente em seus dataframes.

In [None]:
import numpy as np
import pandas as pd
import databricks.koalas as ks

In [None]:
# cria um pandas DataFrame
pdf = pd.DataFrame({'A': np.random.rand(5),
                    'B': np.random.rand(5)})

In [None]:
# imprime um pandas dataframe
type(pdf)

In [None]:
# Cria um Koalas DataFrame
kdf = ks.DataFrame({'A': np.random.rand(5),
                    'B': np.random.rand(5)})

In [None]:
# imprime o tipo de dados
type(kdf)

In [None]:
# Cria um Koalas dataframe a partir de um pandas dataframe
kdf = ks.DataFrame(pdf)
type(kdf)

In [None]:
# métodos já conhecidos
pdf.head()

In [None]:
# métodos já conhecidos
kdf.head()

In [None]:
# método describe()
kdf.describe()

In [None]:
# ordenando um dataframe
kdf.sort_values(by='B')

In [None]:
# define configurações de layout de células
from databricks.koalas.config import set_option, get_option
ks.get_option('compute.max_rows')
ks.set_option('compute.max_rows', 2000)

In [None]:
# slice
kdf[['A', 'B']]

In [None]:
# slice
kdf[['A', 'B']]

In [None]:
# iloc
kdf.iloc[:3, 1:2]

#### Usando funções python com dataframe koalas

In [None]:
# cria função python
def quadrado(x):
    return x ** 2

In [None]:
# habilita computação de dataframes e séries.
from databricks.koalas.config import set_option, reset_option
set_option("compute.ops_on_diff_frames", True)

In [None]:
# cria uma nova coluna a partir da função quadrado
kdf['C'] = kdf.A.apply(quadrado)

In [None]:
# visualizando o dataframe
kdf.head()

In [None]:
# agrupando dados
kdf.groupby('A').sum()

In [None]:
# agrupando mais de uma coluna
kdf.groupby(['A', 'B']).sum()

In [None]:
# para plotar gráfico diretamente na célula use o inline
%matplotlib inline

speed = [0.1, 17.5, 40, 48, 52, 69, 88]
lifespan = [2, 8, 70, 1.5, 25, 12, 28]

index = ['snail', 'pig', 'elephant',
         'rabbit', 'giraffe', 'coyote', 'horse']

kdf = ks.DataFrame({'speed': speed,
                   'lifespan': lifespan}, index=index)
kdf.plot.bar()

**Usando SQL no Koalas**

In [None]:
# cria um dataframe Koalas
kdf = ks.DataFrame({'year': [1990, 1997, 2003, 2009, 2014],
                    'pig': [20, 18, 489, 675, 1776],
                    'horse': [4, 25, 281, 600, 1900]})

In [None]:
# Faz query no dataframe koalas
ks.sql("SELECT * FROM {kdf} WHERE pig > 100")

In [None]:
# cria um dataframe pandas
pdf = pd.DataFrame({'year': [1990, 1997, 2003, 2009, 2014],
                    'sheep': [22, 50, 121, 445, 791],
                    'chicken': [250, 326, 589, 1241, 2118]})

In [None]:
# Query com inner join entre dataframe pandas e koalas
ks.sql('''
    SELECT ks.pig, pd.chicken
    FROM {kdf} ks INNER JOIN {pdf} pd
    ON ks.year = pd.year
    ORDER BY ks.pig, pd.chicken''')

In [None]:
# converte koalas dataframe para Pyspark
kdf = ks.DataFrame({'A': [1, 2, 3, 4, 5], 'B': [10, 20, 30, 40, 50]})

In [None]:
pydf = kdf.to_spark()

In [None]:
type(pydf)