# Big Data para Cientista de Dados

In [0]:
# Lendo o arquivo de dados
arquivo = "/FileStore/tables/bronze/2015_summary.json"

In [0]:
# lendo o arquivo de dados
# inferSchema = True
# header = True

flightData2015 = spark\
.read.format("json")\
.option("inferSchema", "True")\ # Descobrir o tipo de dado 
.option("header", "True")\
.json(arquivo)

In [0]:
# imprime os datatypes das colunas do dataframe
# permite nulos
flightData2015.printSchema()

In [0]:
# imprime o tipo da variável flightData2015
type(flightData2015)

In [0]:
# retorna as primeiras 3 linhas do dataframe em formato de array.
flightData2015.take(5)

In [0]:
# Usando o comando display
display(flightData2015.show(3))

In [0]:
# imprime a quantidade de linhas no dataframe.
flightData2015.count()

In [0]:
# lendo o arquivo previamente com a opção inferSchema desligada. Poderá trazer todos os tipos sendo apenas string
flightData2015 = spark\
.read\
.option("inferSchema", "False")\
.option("header", "True")\
.json(arquivo)

In [0]:
df = spark\
.read\
.option("inferSchema", "True")\
.option("header", "True")\
.json("/FileStore/tables/bronze/*.json")

In [0]:
df.show(10)

In [0]:
# imprime a quantidade de linhas do datafrme
df.count()

In [0]:
# Opções de Plots
display(df.head(10))

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,15
United States,Croatia,1
United States,Ireland,344
Egypt,United States,15
United States,India,62
United States,Singapore,1
United States,Grenada,62
Costa Rica,United States,588
Senegal,United States,40
Moldova,United States,1


#Trabalhando com SQL

In [0]:
%sql
DROP TABLE IF EXISTS all_files;

In [0]:
%sql
CREATE TABLE all_files
USING json
OPTIONS (path "/FileStore/tables/bronze/*.json", header "true")

In [0]:
%sql
-- Consultando dados usando a linguagem SQL
SELECT * FROM all_files;

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,15
United States,Croatia,1
United States,Ireland,344
Egypt,United States,15
United States,India,62
United States,Singapore,1
United States,Grenada,62
Costa Rica,United States,588
Senegal,United States,40
Moldova,United States,1


In [0]:
%sql
-- Consultando dados usando a linguagem SQL
SELECT count(*) FROM all_files;

count(1)
1502


In [0]:
%sql
-- Consultando dados usando a linguagem SQL
SELECT DEST_COUNTRY_NAME
       ,avg(count) AS Quantidade_Paises
FROM all_files
GROUP BY DEST_COUNTRY_NAME
ORDER BY DEST_COUNTRY_NAME;

DEST_COUNTRY_NAME,Quantidade_Paises
Afghanistan,8.0
Algeria,5.0
Angola,13.166666666666666
Anguilla,26.33333333333333
Antigua and Barbuda,129.66666666666666
Argentina,187.66666666666663
Aruba,350.6666666666667
Australia,294.0
Austria,41.333333333333336
Azerbaijan,8.0


In [0]:
# Create a view or table temporária.
df.createOrReplaceTempView("2015_summary_json")

In [0]:
%sql
select * from 2015_summary_json

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,15
United States,Croatia,1
United States,Ireland,344
Egypt,United States,15
United States,India,62
United States,Singapore,1
United States,Grenada,62
Costa Rica,United States,588
Senegal,United States,40
Moldova,United States,1


In [0]:
%sql
-- Query na view 2015_summary_json com multiplicação.
SELECT DEST_COUNTRY_NAME 
      ,ORIGIN_COUNTRY_NAME
      ,count * 10 as count_multiplicado_por_dez
FROM 2015_summary_json

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count_multiplicado_por_dez
United States,Romania,150
United States,Croatia,10
United States,Ireland,3440
Egypt,United States,150
United States,India,620
United States,Singapore,10
United States,Grenada,620
Costa Rica,United States,5880
Senegal,United States,400
Moldova,United States,10


In [0]:
from pyspark.sql.functions import max
df.select(max("count")).take(1)

In [0]:
# Filtrando linhas de um dataframe usando filter
df.filter("count < 2").show(2)

In [0]:
# Usando where (um alias para o metodo filter)
df.where("count < 2").show(2)

In [0]:
%sql
-- filtrando linhas com sql
SELECT * 
FROM 2015_summary_json
WHERE count < 2
LIMIT 2

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Croatia,1
United States,Singapore,1


In [0]:
# obtendo linhas únicas considerando 2 colunas
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()

### Manipulando Dataframes

In [0]:
# sort ordena do menor para o maior
df.sort("count").show(10)

In [0]:
from pyspark.sql.functions import desc, asc, expr
# ordenando por ordem crescente
df.orderBy(expr("count asc")).show(10)

In [0]:
# visualizando estatísticas descritivas das colunas
df.describe().show()

In [0]:
# usar o collect com cuidado, se dataframe for muito grande pode travar a máquina
# imprime as linhas do dataframe
df.collect()

In [0]:
# dados em lista
type(df.collect())

In [0]:
# iterando sobre todas as linhas do dataframe que estão em lista
for i in df.collect():
    #print (i)
    print(i[0], i[1], i[2] * 2)

In [0]:
from pyspark.sql.functions import lower, upper, col
df.select(col("DEST_COUNTRY_NAME"),lower(col("DEST_COUNTRY_NAME")),upper(lower(col("DEST_COUNTRY_NAME")))).show(10)

In [0]:
%sql
-- Usando SQL..
SELECT DEST_COUNTRY_NAME
      ,lower(DEST_COUNTRY_NAME)
      ,Upper(DEST_COUNTRY_NAME)
FROM 2015_summary_json

DEST_COUNTRY_NAME,lower(DEST_COUNTRY_NAME),upper(DEST_COUNTRY_NAME)
United States,united states,UNITED STATES
United States,united states,UNITED STATES
United States,united states,UNITED STATES
Egypt,egypt,EGYPT
United States,united states,UNITED STATES
United States,united states,UNITED STATES
United States,united states,UNITED STATES
Costa Rica,costa rica,COSTA RICA
Senegal,senegal,SENEGAL
Moldova,moldova,MOLDOVA


In [0]:
# remove espaços em branco a esquerda
from pyspark.sql.functions import ltrim
df.select(ltrim(col("DEST_COUNTRY_NAME"))).show(2)

In [0]:
# remove espaços a direita
from pyspark.sql.functions import rtrim
df.select(rtrim(col("DEST_COUNTRY_NAME"))).show(2)

In [0]:
# todas as operações juntas..
# a função lit cria uma coluna na cópia do dataframe
from pyspark.sql.functions import lit, ltrim, rtrim, rpad, lpad, trim
df.select(
ltrim(lit(" HELLO ")).alias("ltrim"),  # lit cria a coluna " HELLO " no dataframe ltrim remove espaço a esquerda
rtrim(lit(" HELLO ")).alias("rtrim"),  # lit cria a coluna " HELLO " no dataframe rtrim remove espaço a direita
trim(lit(" HELLO ")).alias("trim"), # lit cria a coluna " HELLO " no dataframe trim remove espaço na esquerda e na direita
lpad(lit("HELLO"), 3, " ").alias("lp"), # lit cria a coluna "HELLO" no dataframe lpad manteve os 3 primeiros caracteres
rpad(lit("HELLO"), 10, " ").alias("rp")).show(2) # lit cria a coluna "HELLO" no dataframe rpad usou 5 caracteres para HELLO e colocou 5 "em branco"

In [0]:
# ordenando primeiro pela coluna count e depois DEST_COUNTRY_NAME
df.orderBy("count", "DEST_COUNTRY_NAME").show(10)

In [0]:
from pyspark.sql.functions import desc, asc
df.orderBy(expr("count desc")).show(10) # ordenando de forma decrescente
df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(20) # ordenando count de forma descrescente e DEST_COUNTRY_NAME de forma ascendente

In [0]:
# utilizando SQL
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM 2015_summary_json
GROUP BY DEST_COUNTRY_NAME
""")

In [0]:
# Utilizando Python
dataFrameWay = df.groupBy("DEST_COUNTRY_NAME").count()

In [0]:
# imprime o plano de execução do código. Como o SPARK está processando o consulta de forma lógica
sqlWay.explain()

In [0]:
# imprime o plano de execução do código. Como o SPARK está processando o consulta de forma lógica
dataFrameWay.explain()

In [0]:
# Informações de dados de varejo
df = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("/FileStore/tables/bronze/2010_12_01.csv")


In [0]:
#imprime as  10 primeiras linhas
display(df.head(10))

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,2010-12-01 08:26:00,2.55,17850.0,United Kingdom
536365,71053,WHITE METAL LANTERN,6,2010-12-01 08:26:00,3.39,17850.0,United Kingdom
536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,2010-12-01 08:26:00,2.75,17850.0,United Kingdom
536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,2010-12-01 08:26:00,3.39,17850.0,United Kingdom
536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,2010-12-01 08:26:00,3.39,17850.0,United Kingdom
536365,22752,SET 7 BABUSHKA NESTING BOXES,2,2010-12-01 08:26:00,7.65,17850.0,United Kingdom
536365,21730,GLASS STAR FROSTED T-LIGHT HOLDER,6,2010-12-01 08:26:00,4.25,17850.0,United Kingdom
536366,22633,HAND WARMER UNION JACK,6,2010-12-01 08:28:00,1.85,17850.0,United Kingdom
536366,22632,HAND WARMER RED POLKA DOT,6,2010-12-01 08:28:00,1.85,17850.0,United Kingdom
536367,84879,ASSORTED COLOUR BIRD ORNAMENT,32,2010-12-01 08:34:00,1.69,13047.0,United Kingdom


In [0]:
# Tipos Boleanos. Pegando pedidos diferentes de 536365, filtrando apenas as colunas "InvoiceNo" e "Description"
from pyspark.sql.functions import col
df.where(col("InvoiceNo") != 536365)\
.select("InvoiceNo", "Description")\
.show(5)

In [0]:
# cria a tabela temporária dftrable
df.createOrReplaceTempView("dfTable")

In [0]:
# imprime 10 primeiras linhas
display(df.head(10))

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,2010-12-01 08:26:00,2.55,17850.0,United Kingdom
536365,71053,WHITE METAL LANTERN,6,2010-12-01 08:26:00,3.39,17850.0,United Kingdom
536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,2010-12-01 08:26:00,2.75,17850.0,United Kingdom
536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,2010-12-01 08:26:00,3.39,17850.0,United Kingdom
536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,2010-12-01 08:26:00,3.39,17850.0,United Kingdom
536365,22752,SET 7 BABUSHKA NESTING BOXES,2,2010-12-01 08:26:00,7.65,17850.0,United Kingdom
536365,21730,GLASS STAR FROSTED T-LIGHT HOLDER,6,2010-12-01 08:26:00,4.25,17850.0,United Kingdom
536366,22633,HAND WARMER UNION JACK,6,2010-12-01 08:28:00,1.85,17850.0,United Kingdom
536366,22632,HAND WARMER RED POLKA DOT,6,2010-12-01 08:28:00,1.85,17850.0,United Kingdom
536367,84879,ASSORTED COLOUR BIRD ORNAMENT,32,2010-12-01 08:34:00,1.69,13047.0,United Kingdom


In [0]:
# usando o operador boleando com um predicado em uma expressão. Diferente <>  de 536365
df.where("InvoiceNo <> 536365").show(10)

In [0]:
# usando o operador boleando com um predicado em uma expressão.
df.where("InvoiceNo = 536365").show(5)

In [0]:
# Entendendo a ordem dos operadores boleanos
from pyspark.sql.functions import instr
priceFilter = col("UnitPrice") > 600  # A variável priceFilter terá valores maiores que 600 para a coluna UnitPrice
descripFilter = instr(df.Description, "POSTAGE") >= 1  # A variavel descripFilter terá a conversao para inteiro (instr) do valor POSTAGE da coluna Description se for >= 1. Substituião do POSTAGE

In [0]:
# aplicando os operadores como filtros.
# se StockCode existe "DOT", onde o priceFilter > 600 OU o descripFilter for >=1 
df.where(df.StockCode.isin("DOT")).where(priceFilter | descripFilter).show()

In [0]:
%sql
-- Aplicando a mesmo código em SQL
SELECT * 
FROM dfTable 
WHERE StockCode in ("DOT")
AND(UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1)

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536544,DOT,DOTCOM POSTAGE,1,2010-12-01 14:32:00,569.77,,United Kingdom
536592,DOT,DOTCOM POSTAGE,1,2010-12-01 17:06:00,607.49,,United Kingdom


In [0]:
# Combinando filtros e operadores boleanos
from pyspark.sql.functions import instr
DOTCodeFilter = col("StockCode") == "DOT"
priceFilter = col("UnitPrice") > 600
descripFilter = instr(col("Description"), "POSTAGE") >= 1


In [0]:
# Combinando filtros e operadores boleanos
# criando a coluna isExpensive 
df.withColumn("isExpensive", DOTCodeFilter & (priceFilter | descripFilter))\  
.where("isExpensive")\
.select("unitPrice", "isExpensive").show(5)

In [0]:
%sql
-- Aplicando as mesmas ideias usando SQL
SELECT UnitPrice, (StockCode = 'DOT' AND
(UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1)) as isExpensive
FROM dfTable
WHERE (StockCode = 'DOT' AND
(UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1))

UnitPrice,isExpensive
569.77,True
607.49,True


# Trabalhando com tipos diferentes de arquivos

### Modos de leitura
- **permissive**: *Define todos os campos para NULL quando encontra registros corrompidos e coloca todos registros corrompidos em uma coluna chamada _corrupt_record.* (default)

- **dropMalformed**: *Apaga uma linha corrompida ou que este não consiga ler.*

- **failFast**: *Falha imediatamente quando encontra uma linha que não consiga ler.*

In [0]:
# Lendo arquivos csv
spark.read.format("csv")
.option("mode", "permissive")
.option("inferSchema", "true")
.option("path", "path/to/file(s)")
.schema(someSchema)
.load()

In [0]:
# leia o arquivo alterando os modos de leitura (failfast, permissive, dropmalformed)
df = spark.read.format("csv")\
.option("mode", "failfast")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("/FileStore/tables/bronze/2010_12_01.csv")

# Não teve linhas corrompidas na leitura do dataset

In [0]:
# imprime as 10 primeiras linhas do dataframe
display(df.head(10))

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,2010-12-01 08:26:00,2.55,17850.0,United Kingdom
536365,71053,WHITE METAL LANTERN,6,2010-12-01 08:26:00,3.39,17850.0,United Kingdom
536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,2010-12-01 08:26:00,2.75,17850.0,United Kingdom
536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,2010-12-01 08:26:00,3.39,17850.0,United Kingdom
536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,2010-12-01 08:26:00,3.39,17850.0,United Kingdom
536365,22752,SET 7 BABUSHKA NESTING BOXES,2,2010-12-01 08:26:00,7.65,17850.0,United Kingdom
536365,21730,GLASS STAR FROSTED T-LIGHT HOLDER,6,2010-12-01 08:26:00,4.25,17850.0,United Kingdom
536366,22633,HAND WARMER UNION JACK,6,2010-12-01 08:28:00,1.85,17850.0,United Kingdom
536366,22632,HAND WARMER RED POLKA DOT,6,2010-12-01 08:28:00,1.85,17850.0,United Kingdom
536367,84879,ASSORTED COLOUR BIRD ORNAMENT,32,2010-12-01 08:34:00,1.69,13047.0,United Kingdom


#### Criando um schema
- A opção **infer_schema** nem sempre vai definir o melhor datatype.
- Melhora a performance na leitura de grandes bases.
- Permite uma customização dos tipos das colunas.
- É importante saber para reescrita de aplicações. (Códigos pandas)

In [0]:
# imprime o schema do dataframe (infer_schema=True)
df.printSchema()

In [0]:
# Criando um Schema caso o SPARK identifique de forma errada
# usa o objeto StructType
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, DoubleType, TimestampType
schema_df = StructType([
    StructField("InvoiceNo", IntegerType()),
    StructField("StockCode", IntegerType()),
    StructField("Description", StringType()),
    StructField("Quantity", IntegerType()),
    StructField("InvoiceDate", TimestampType()),
    StructField("UnitPrice", DoubleType()),
    StructField("CustomerID", DoubleType()),
    StructField("Country", StringType())
])

In [0]:
# verificando o tipo da variável schema_df
type(schema_df)

In [0]:
# usando o parâmetro schema() criado acima
df = spark.read.format("csv")\
.option("header", "True")\
.schema(schema_df)\
.option("timestampFormat",'yyyy-/MM/DD hh:mm:ss')\  # Definindo formato de data e hora
.load("/FileStore/tables/bronze/2010_12_01.csv")

In [0]:
# imprime o schema do dataframe.
df.printSchema()

In [0]:
# imprime 10 primeiras linhas do dataframe.
display(df.collect())
# A coluna StockCode tem linhas null por que o Structype foi definido como Integer, mas existem linhas que tem números e letras
# por isso o SPARK colocou como NULL. Deveria ser colocado na Structype STRING

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536365.0,,WHITE HANGING HEART T-LIGHT HOLDER,6,2010-12-01T08:26:00.000+0000,2.55,17850.0,United Kingdom
536365.0,71053.0,WHITE METAL LANTERN,6,2010-12-01T08:26:00.000+0000,3.39,17850.0,United Kingdom
536365.0,,CREAM CUPID HEARTS COAT HANGER,8,2010-12-01T08:26:00.000+0000,2.75,17850.0,United Kingdom
536365.0,,KNITTED UNION FLAG HOT WATER BOTTLE,6,2010-12-01T08:26:00.000+0000,3.39,17850.0,United Kingdom
536365.0,,RED WOOLLY HOTTIE WHITE HEART.,6,2010-12-01T08:26:00.000+0000,3.39,17850.0,United Kingdom
536365.0,22752.0,SET 7 BABUSHKA NESTING BOXES,2,2010-12-01T08:26:00.000+0000,7.65,17850.0,United Kingdom
536365.0,21730.0,GLASS STAR FROSTED T-LIGHT HOLDER,6,2010-12-01T08:26:00.000+0000,4.25,17850.0,United Kingdom
536366.0,22633.0,HAND WARMER UNION JACK,6,2010-12-01T08:28:00.000+0000,1.85,17850.0,United Kingdom
536366.0,22632.0,HAND WARMER RED POLKA DOT,6,2010-12-01T08:28:00.000+0000,1.85,17850.0,United Kingdom
536367.0,84879.0,ASSORTED COLOUR BIRD ORNAMENT,32,2010-12-01T08:34:00.000+0000,1.69,13047.0,United Kingdom


### Arquivos JSON

In [0]:
# FAILFAST se der erro em alguma linha interrompe o processo. Mas, aparecerá a falha apenas no print da dataframe.
df_json = spark.read.format("json")\
.option("mode", "FAILFAST")\
.option("inferSchema", "true")\
.load("/FileStore/tables/bronze/2010_summary.json")

In [0]:
df_json.printSchema()

In [0]:
display(df_json.head(10))

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,1
United States,Ireland,264
United States,India,69
Egypt,United States,24
Equatorial Guinea,United States,1
United States,Singapore,25
United States,Grenada,54
Costa Rica,United States,477
Senegal,United States,29
United States,Marshall Islands,44


### Escrevendo arquivos
- **append** : Adiciona arquivos de saída na lista de arquivos que já existem na localizaçao.
- **overwrite** : Sobreescreve os arquivos no destino.
- **erroIfExists** : Emite um erro e pára se existir arquivos no destino.
- **ignore** : Se existir o dado no destino nao faz nada.

In [0]:
# escrevendo arquivos csv
df.write.format("csv")\
.mode("overwrite") \
.option("sep", ",") \
.save("/FileStore/tables/bronze/saida_2010_12_01.csv")

In [0]:
# observe o arquivo gerado com o camando acima
file = "/FileStore/tables/bronze/saida_2010_12_01.csv/part-00000-tid-8917741957881108372-f7cfd3f8-9c66-4463-9b60-6318ece10335-60-1-c000.csv"
df = spark.read.format("csv")\
.option("header", "True")\
.option("inferSchema", "True")\
.option("timestampFormat",'yyyy-/MM/DD hh:mm:ss')\
.load(file)

In [0]:
# imprime as 10 primeiras linhas do dataframe 
df.show(10)

#### Escrevendo dados em paralelo

In [0]:
# Dividindo o arquivo em 5 partes para distribuir no cluster
# observe o diretório criado
df.repartition(5).write.format("csv")\
.mode("overwrite") \
.option("sep", ",") \
.save("/FileStore/tables/bronze/saida_2010_12_01.csv")

### Arquivos Parquet

#####**Convertendo .csv para .parquet**
- Dataset .csv usado https://www.kaggle.com/nhs/general-practice-prescribing-data

In [0]:
#Removendo arquivos do DBFS
dbutils.fs.rm("/FileStore/tables/bronze/2010_summary-1.json")

In [0]:
#Removendo Pastas do DBFS
dbutils.fs.rm("/FileStore/tables/curso", True)

In [0]:
# Lendo todos os arquivos .csv do diretório bigdata (>4GB)
df = spark.read.format("csv")\
.option("header", "True")\
.option("inferSchema","True")\
.load("/FileStore/tables/bigdata/*.csv")

In [0]:
display(df.head(10))

In [0]:
df.printSchema()

In [0]:
# conta a quantidade de linhas
df.count()

*Atente para NÃO escrever e ler arquivos parquet em versoes diferentes*

In [0]:
# escrevendo em formato parquet
df.write.format("parquet")\
.mode("overwrite")\
.save("/FileStore/tables/bronze/df-parquet-file.parquet")

In [0]:
%fs
ls /FileStore/tables/bronze/df-parquet-file.parquet

In [0]:
# lendo arquivos parquet
# atente para a velocidade de leitura
df_parquet = spark.read.format("parquet")\
.load("/FileStore/tables/bronze/df-parquet-file.parquet")

In [0]:
# conta a quantidade de linhas do dataframe
df_parquet.count()

In [0]:
# visualizando o dataframe
display(df_parquet.head(10))

In [0]:
# visualizando o tamanho dos arquivos
display(dbutils.fs.ls("/FileStore/tables/bronze/df-parquet-file.parquet"))

In [0]:
%scala
// script para pegar tamanho em Gigabytes
val path="/FileStore/tables/bronze/df-parquet-file.parquet"
val filelist=dbutils.fs.ls(path)
val df_temp = filelist.toDF()
df_temp.createOrReplaceTempView("adlsSize")

In [0]:
%sql
-- consulta a view criada.
select round(sum(size)/(1024*1024*1024),3) as sizeInGB from adlsSize

### Spark + PostgreSQL
- Consultar e escrever em um banco de dados relacional.

In [0]:
# Isso é equivalente a executar uma query como: select * from pg_catalog.pg_tables
# jdbc:postgresql://pgserver-1.postgres.database.azure.com:5432/{your_database}?user=stack_user@pgserver-1&password={your_password}&sslmode=require
pgDF = spark.read.format("jdbc")\
.option("driver", "org.postgresql.Driver")\
.option("url", "jdbc:postgresql://pgserver-1.postgres.database.azure.com:5432/postgres?user=stack_user@pgserver-1&password=Bigdata2021&sslmode=require")\
.option("dbtable", "pg_catalog.pg_tables")\
.option("user", "stack_user").option("password", "Bigdata2021").load()

In [0]:
# imprime todas as linhas do dataframe
display(pgDF.collect())

In [0]:
# consulta dados da coluna schemaname
pgDF.select("schemaname").distinct().show()

In [0]:
# Especifica uma query diretamente.
# Útil para evitar o "select * from."
pgDF = spark.read.format("jdbc")\
.option("driver", "org.postgresql.Driver")\
.option("url", "jdbc:postgresql://pgserver-1.postgres.database.azure.com:5432/postgres?user=stack_user@pgserver-1&password=Bigdata2021&sslmode=require")\
.option("query", "select schemaname,tablename from pg_catalog.pg_tables")\
.option("user", "stack_user").option("password", "Bigdata2021").load()

In [0]:
# imprime todas as linhas do dataframe
display(pgDF.collect())

In [0]:
# imprime as 5 linhas do dataframe df
# não se esqueça de recriar esse dataframe.
df.show(5)

In [0]:
# cria a tabela "produtos" a apartir dos dados do dataframe df.
pgDF.write.mode("overwrite")\
.format("jdbc")\
.option("url", "jdbc:postgresql://pgserver-1.postgres.database.azure.com:5432/postgres?user=stack_user@pgserver-1&password=Bigdata2021&sslmode=require")\
.option("dbtable", "produtos")\
.option("user", "stack_user")\
.option("password", "Bigdata2021")\
.save()

In [0]:
# cria o dataframe df_produtos a partir da tabela criada.
df_produtos = spark.read.format("jdbc")\
.option("driver", "org.postgresql.Driver")\
.option("url", "jdbc:postgresql://pgserver-1.postgres.database.azure.com:5432/postgres?user=stack_user@pgserver-1&password=Bigdata2021&sslmode=require")\
.option("dbtable", "produtos")\
.option("user", "stack_user").option("password", "Bigdata2021").load()

In [0]:
# imprime as linhas do dataframe.
display(df_produtos.collect())

#### Avançando com Pyspark

- **mean()** - Retorna o valor médio de cada grupo.

- **max()** - Retorna o valor máximo de cada grupo.

- **min()** - Retorna o valor mínimo de cada grupo.

- **sum()** - Retorna a soma de todos os valores do grupo.

- **avg()** - Retorna o valor médio de cada grupo.

In [0]:
# imprime as 10 primeiras linhas do dataframe
df.show(10)

In [0]:
# Soma preços unitários por país
df.groupBy("Country").sum("UnitPrice").show()

In [0]:
# Conta a quantidade de países distintos.
df.groupBy("Country").count().show()

In [0]:
# retorna o valor mínimo por grupo
df.groupBy("Country").min("UnitPrice").show()

In [0]:
# retorna o valor mínimo por grupo
df.groupBy("Country").max("UnitPrice").show()

In [0]:
# retorna o valor médio por grupo
df.groupBy("Country").avg("UnitPrice").show()

In [0]:
# retorna o valor médio por grupo
df.groupBy("Country").mean("UnitPrice").show()

In [0]:
# GroupBy várias colunas
df.groupBy("Country","CustomerID") \
    .sum("UnitPrice") \
    .show()

#### Trabalhando com datas
- Existem diversas funçoes em Pyspark para manipular datas e timestamp.
- Evite escrever suas próprias funçoes para isso.
- Algumas funcoes mais usadas:
    - current_day():
    - date_format(dateExpr,format):
    - to_date():
    - to_date(column, fmt):
    - add_months(Column, numMonths):
    - date_add(column, days):
    - date_sub(column, days):
    - datediff(end, start)
    - current_timestamp():
    - hour(column):

In [0]:
# imprime o dataframe
df.show()

In [0]:
df.printSchema()

In [0]:
from pyspark.sql.functions import *
#current_date()
df.select(current_date().alias("current_date")).show(1)

In [0]:
#date_format()
df.select(col("InvoiceDate"), \
          date_format(col("InvoiceDate"), "dd-MM-yyyy hh:mm:ss")\
          .alias("date_format")).show()

In [0]:
#datediff
df.select(col("InvoiceDate"),
    datediff(current_date(),col("InvoiceDate")).alias("datediff")  
  ).show()

In [0]:
#months_between()
df.select(col("InvoiceDate"), 
    months_between(current_date(),col("InvoiceDate")).alias("months_between")  
  ).show()

In [0]:
# utiliza as funçoes para adicionar, subtrair meses e dias
df.select(col("InvoiceDate"), 
    add_months(col("InvoiceDate"),3).alias("add_months"), 
    add_months(col("InvoiceDate"),-3).alias("sub_months"), 
    date_add(col("InvoiceDate"),4).alias("date_add"), 
    date_sub(col("InvoiceDate"),4).alias("date_sub") 
  ).show()

In [0]:
# Extrai ano, mës, próximo dia, dia da semana.
df.select(col("InvoiceDate"), 
     year(col("InvoiceDate")).alias("year"), 
     month(col("InvoiceDate")).alias("month"), 
     next_day(col("InvoiceDate"),"Sunday").alias("next_day"), 
     weekofyear(col("InvoiceDate")).alias("weekofyear") 
  ).show()

In [0]:
# Dia da semana, dia do mës, dias do ano
df.select(col("InvoiceDate"),  
     dayofweek(col("InvoiceDate")).alias("dayofweek"), 
     dayofmonth(col("InvoiceDate")).alias("dayofmonth"), 
     dayofyear(col("InvoiceDate")).alias("dayofyear"), 
  ).show()

In [0]:
# imprime o timestamp atual
df.select(current_timestamp().alias("current_timestamp")
  ).show(1,truncate=False)

In [0]:
# retorna hora, minuto e segundo
df.select(col("InvoiceDate"), 
    hour(col("InvoiceDate")).alias("hour"), 
    minute(col("InvoiceDate")).alias("minute"),
    second(col("InvoiceDate")).alias("second") 
  ).show()

#### Missing Values com Pyspark

In [0]:
# visualizando datasets de exemplos da databricks
display(dbutils.fs.ls("/databricks-datasets"))

path,name,size
dbfs:/databricks-datasets/,databricks-datasets/,0
dbfs:/databricks-datasets/COVID/,COVID/,0
dbfs:/databricks-datasets/README.md,README.md,976
dbfs:/databricks-datasets/Rdatasets/,Rdatasets/,0
dbfs:/databricks-datasets/SPARK_README.md,SPARK_README.md,3359
dbfs:/databricks-datasets/adult/,adult/,0
dbfs:/databricks-datasets/airlines/,airlines/,0
dbfs:/databricks-datasets/amazon/,amazon/,0
dbfs:/databricks-datasets/asa/,asa/,0
dbfs:/databricks-datasets/atlas_higgs/,atlas_higgs/,0


In [0]:
# lendo o arquivo de dados
# inferSchema = True
# header = True

arquivo = "dbfs:/databricks-datasets/flights/"

df = spark \
.read.format("csv")\
.option("inferSchema", "True")\
.option("header", "True")\
.csv(arquivo)

In [0]:
df.show()

In [0]:
df.filter("delay is NULL").show()

In [0]:
# filtrando valores missing
df.filter(df.delay.isNull()).show(10)

In [0]:
# preenche os dados missing com o valor 0
df.na.fill(value=0).show()

In [0]:
# preenche valores missing com valor 0 apenas da coluna delay
df.na.fill(value=0, subset=['delay']).show()

In [0]:
# preenche os dados com valores de string vazia
df.na.fill("").show(100)

In [0]:
df.filter("delay is NULL").show()

In [0]:
# remove qualquer linha nula de qualquer coluna
df.na.drop().show()

#### Tarefas básicas em dataframes

In [0]:
# Adicionando uma coluna ao dataframe
df = df.withColumn('Nova Coluna', df['delay']+2)
df.show(10)

In [0]:
# Reovendo coluna
df = df.drop('Nova Coluna')
df.show(10)

In [0]:
# Renomenando uma coluna no dataframe
df.withColumnRenamed('Nova Coluna','Delay_2').show()

#### Trabalhando com UDFs
- Integraçáo de código entre as APIs
- É preciso cuidado com performance dos códigos usando UDFs

In [0]:
from pyspark.sql.types import LongType
# define a função
def quadrado(s):
  return s * s

In [0]:
# registra no banco de dados do spark e define o tipo de retorno por padrão é stringtype
from pyspark.sql.types import LongType
spark.udf.register("Func_Py_Quadrado", quadrado, LongType())

In [0]:
# gera valores aleatórios
spark.range(1, 20).show()

In [0]:
# cria a visão View_temp
spark.range(1, 20).createOrReplaceTempView("View_temp")

In [0]:
%sql
-- Usando a função criada em python juntamente com código SQL
select id, Func_Py_Quadrado(id) as id_ao_quadrado
from View_temp

id,id_ao_quadrado
1,1
2,4
3,9
4,16
5,25
6,36
7,49
8,64
9,81
10,100


##### UDFs com Dataframes

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
Func_Py_Quadrado = udf(quadrado, LongType())

In [0]:
df = spark.table("View_temp")

In [0]:
df.show()

In [0]:
display(df.select("id", Func_Py_Quadrado("id").alias("id_quadrado")))

id,id_quadrado
1,1
2,4
3,9
4,16
5,25
6,36
7,49
8,64
9,81
10,100


#### Koalas
- Koalas é um projeto de código aberto que fornece um substituto imediato para os pandas. 
- O pandas é comumente usado por ser um pacote que fornece estruturas de dados e ferramentas de análise de dados fáceis de usar para a linguagem de programação Python.
- O Koalas preenche essa lacuna fornecendo APIs equivalentes ao pandas que funcionam no Apache Spark. 
- Koalas é útil não apenas para usuários de pandas, mas também para usuários de PySpark.
  - Koalas suporta muitas tarefas que são difíceis de fazer com PySpark, por exemplo, plotar dados diretamente de um PySpark DataFrame.
- Koalas suporta SQL diretamente em seus dataframes.

In [0]:
import numpy as np
import pandas as pd
import databricks.koalas as ks

In [0]:
# cria um pandas DataFrame
pdf = pd.DataFrame({'A': np.random.rand(5),
                    'B': np.random.rand(5)})

In [0]:
# imprime um pandas dataframe
type(pdf)

In [0]:
# Cria um Koalas DataFrame
kdf = ks.DataFrame({'A': np.random.rand(5),
                    'B': np.random.rand(5)})

In [0]:
# imprime o tipo de dados
type(kdf)

In [0]:
# Cria um Koalas dataframe a partir de um pandas dataframe
kdf = ks.DataFrame(pdf)
type(kdf)

In [0]:
# outra forma de converter
kdf = ks.from_pandas(pdf)
type(kdf)

In [0]:
# métodos já conhecidos
pdf.head()

Unnamed: 0,A,B
0,0.599634,0.706762
1,0.534563,0.055601
2,0.704126,0.03555
3,0.161522,0.768715
4,0.304619,0.921843


In [0]:
# métodos já conhecidos
kdf.head()

Unnamed: 0,A,B
0,0.599634,0.706762
1,0.534563,0.055601
2,0.704126,0.03555
3,0.161522,0.768715
4,0.304619,0.921843


In [0]:
# método describe()
kdf.describe()

Unnamed: 0,A,B
count,5.0,5.0
mean,0.460893,0.497694
std,0.22242,0.420145
min,0.161522,0.03555
25%,0.304619,0.055601
50%,0.534563,0.706762
75%,0.599634,0.768715
max,0.704126,0.921843


In [0]:
# ordenando um dataframe
kdf.sort_values(by='B')

Unnamed: 0,A,B
2,0.704126,0.03555
1,0.534563,0.055601
0,0.599634,0.706762
3,0.161522,0.768715
4,0.304619,0.921843


In [0]:
# define configurações de layout de células
from databricks.koalas.config import set_option, get_option
ks.get_option('compute.max_rows')
ks.set_option('compute.max_rows', 2000)

In [0]:
# slice
kdf[['A', 'B']]

Unnamed: 0,A,B
0,0.599634,0.706762
1,0.534563,0.055601
2,0.704126,0.03555
3,0.161522,0.768715
4,0.304619,0.921843


In [0]:
# loc
kdf.loc[1:2]

Unnamed: 0,A,B
1,0.534563,0.055601
2,0.704126,0.03555


In [0]:
# iloc
kdf.iloc[:3, 1:2]

Unnamed: 0,B
0,0.706762
1,0.055601
2,0.03555


** Usando funções python com dataframe koalas**

In [0]:
# cria função python
def quadrado(x):
    return x ** 2

In [0]:
# habilita computação de dataframes e séries.
from databricks.koalas.config import set_option, reset_option
set_option("compute.ops_on_diff_frames", True)

In [0]:
# cria uma nova coluna a partir da função quadrado
kdf['C'] = kdf.A.apply(quadrado)

In [0]:
# visualizando o dataframe
kdf.head()

Unnamed: 0,A,B,C
0,0.599634,0.706762,0.359561
1,0.534563,0.055601,0.285758
2,0.704126,0.03555,0.495794
3,0.161522,0.768715,0.026089
4,0.304619,0.921843,0.092793


In [0]:
# agrupando dados
kdf.groupby('A').sum()

Unnamed: 0_level_0,B,C
A,Unnamed: 1_level_1,Unnamed: 2_level_1
0.599634,0.706762,0.359561
0.534563,0.055601,0.285758
0.704126,0.03555,0.495794
0.161522,0.768715,0.026089
0.304619,0.921843,0.092793


In [0]:
# agrupando mais de uma coluna
kdf.groupby(['A', 'B']).sum()

Unnamed: 0_level_0,Unnamed: 1_level_0,C
A,B,Unnamed: 2_level_1
0.599634,0.706762,0.359561
0.534563,0.055601,0.285758
0.704126,0.03555,0.495794
0.161522,0.768715,0.026089
0.304619,0.921843,0.092793


In [0]:
# This is needed for visualizing plot on notebook
%matplotlib inline

In [0]:
speed = [0.1, 17.5, 40, 48, 52, 69, 88]
lifespan = [2, 8, 70, 1.5, 25, 12, 28]

index = ['snail', 'pig', 'elephant',
         'rabbit', 'giraffe', 'coyote', 'horse']

kdf = ks.DataFrame({'speed': speed,
                   'lifespan': lifespan}, index=index)
kdf.plot.bar()

**Usando SQL no Koalas**

In [0]:
# cria um dataframe Koalas
kdf = ks.DataFrame({'year': [1990, 1997, 2003, 2009, 2014],
                    'pig': [20, 18, 489, 675, 1776],
                    'horse': [4, 25, 281, 600, 1900]})

In [0]:
# Faz query no dataframe koalas
ks.sql("SELECT * FROM {kdf} WHERE pig > 100")

Unnamed: 0,year,pig,horse
0,2003,489,281
1,2009,675,600
2,2014,1776,1900


In [0]:
# cria um dataframe pandas
pdf = pd.DataFrame({'year': [1990, 1997, 2003, 2009, 2014],
                    'sheep': [22, 50, 121, 445, 791],
                    'chicken': [250, 326, 589, 1241, 2118]})

In [0]:
# Query com inner join entre dataframe pandas e koalas
ks.sql('''
    SELECT ks.pig, pd.chicken
    FROM {kdf} ks INNER JOIN {pdf} pd
    ON ks.year = pd.year
    ORDER BY ks.pig, pd.chicken''')

Unnamed: 0,pig,chicken
0,18,326
1,20,250
2,489,589
3,675,1241
4,1776,2118


In [0]:
# converte koalas dataframe para Pyspark
kdf = ks.DataFrame({'A': [1, 2, 3, 4, 5], 'B': [10, 20, 30, 40, 50]})
pydf = kdf.to_spark()

In [0]:
type(pydf)