# Big Data para Cientista de Dados

In [0]:
# Lendo o arquivo de dados
arquivo = "/FileStore/tables/2015_summary-2.json"

In [0]:
# lendo o arquivo de dados
# inferSchema = True (Infer data types)
# header = True (header in first line)

flightData2015 = spark\
.read.format("json")\
.option("inferSchema", "True")\
.option("header", "True")\
.json(arquivo)

In [0]:
# imprime os datatypes das colunas do dataframe
flightData2015.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



In [0]:
# imprime o tipo da variável flightData2015
# dataframe spark permite performance devido ao sistema distribuido e computacao paralelizada != pandas.DataFrame
type(flightData2015)

Out[20]: pyspark.sql.dataframe.DataFrame

In [0]:
# retorna as primeiras 5 linhas do dataframe em formato de array.
flightData2015.take(5)

Out[21]: [Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344),
 Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62)]

In [0]:
# Usando o comando display
display(flightData2015.show(3))

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
+-----------------+-------------------+-----+
only showing top 3 rows



In [0]:
# imprime a quantidade de linhas no dataframe.
flightData2015.count()

Out[23]: 256

In [0]:
# lendo o arquivo previamente com a opção inferSchema desligada
flightData2015 = spark\
.read\
.option("inferSchema", "False")\
.option("header", "True")\
.json(arquivo)

In [0]:
df = spark\
.read\
.option("inferSchema", "True")\
.option("header", "True")\
.json("/FileStore/tables/bigdatafords/*.json")

In [0]:
df.show(10)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
|    United States|          Singapore|    1|
|    United States|            Grenada|   62|
|       Costa Rica|      United States|  588|
|          Senegal|      United States|   40|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+
only showing top 10 rows



In [0]:
# imprime a quantidade de linhas do datafrme
df.count()

Out[5]: 1502

In [0]:
# Opções de Plots
display(df.head(10))

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,15
United States,Croatia,1
United States,Ireland,344
Egypt,United States,15
United States,India,62
United States,Singapore,1
United States,Grenada,62
Costa Rica,United States,588
Senegal,United States,40
Moldova,United States,1


#Trabalhando com SQL

In [0]:
%sql
DROP TABLE IF EXISTS all_files;

In [0]:
%sql
CREATE TABLE all_files
USING json
OPTIONS (path "/FileStore/tables/bigdatafords/*.json", header "true")

In [0]:
%sql
-- Consutando dados usando a linguagem SQL
SELECT * FROM all_files;

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,15
United States,Croatia,1
United States,Ireland,344
Egypt,United States,15
United States,India,62
United States,Singapore,1
United States,Grenada,62
Costa Rica,United States,588
Senegal,United States,40
Moldova,United States,1


In [0]:
%sql
-- Consutando dados usando a linguagem SQL
SELECT COUNT(*) FROM all_files;

count(1)
1502


In [0]:
%sql
-- Consutando dados usando a linguagem SQL
SELECT DEST_COUNTRY_NAME
       , AVG(count) AS Quantidade_Paises
FROM all_files
GROUP BY DEST_COUNTRY_NAME
ORDER BY DEST_COUNTRY_NAME;

DEST_COUNTRY_NAME,Quantidade_Paises
Afghanistan,8.0
Algeria,5.0
Angola,13.166666666666666
Anguilla,26.33333333333333
Antigua and Barbuda,129.66666666666666
Argentina,187.66666666666663
Aruba,350.6666666666667
Australia,294.0
Austria,41.333333333333336
Azerbaijan,8.0


In [0]:
# Create a view or table temporária.
df.createOrReplaceTempView("2015_summary_view")

In [0]:
%sql
select * from 2015_summary_view

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,15
United States,Croatia,1
United States,Ireland,344
Egypt,United States,15
United States,India,62
United States,Singapore,1
United States,Grenada,62
Costa Rica,United States,588
Senegal,United States,40
Moldova,United States,1


In [0]:
%sql
-- Query na view 2015_summary_csv com multiplicação.
SELECT DEST_COUNTRY_NAME 
      ,ORIGIN_COUNTRY_NAME
      ,COUNT * 10 as count_multiplicado_por_dez
FROM 2015_summary_view

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count_multiplicado_por_dez
United States,Romania,150
United States,Croatia,10
United States,Ireland,3440
Egypt,United States,150
United States,India,620
United States,Singapore,10
United States,Grenada,620
Costa Rica,United States,5880
Senegal,United States,400
Moldova,United States,10


In [0]:
from pyspark.sql.functions import max
df.select(max("count")).take(1)

Out[10]: [Row(max(count)=370002)]

In [0]:
# Filtrando linhas de um dataframe usando filter
df.filter("count < 2").show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [0]:
# Usando where (um alias para o metodo filter)
df.where("count < 2").show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [0]:
%sql
-- filtrando linhas com sql
SELECT * FROM 2015_summary_view WHERE count < 2 LIMIT 2;

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Croatia,1
United States,Singapore,1


In [0]:
# obtendo linhas únicas
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct()

Out[13]: DataFrame[ORIGIN_COUNTRY_NAME: string, DEST_COUNTRY_NAME: string]

In [0]:
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()

Out[14]: 320

### Manipulando Dataframes

In [0]:
df.sort("count").show(5)

+-----------------+--------------------+-----+
|DEST_COUNTRY_NAME| ORIGIN_COUNTRY_NAME|count|
+-----------------+--------------------+-----+
|         Slovakia|       United States|    1|
|          Liberia|       United States|    1|
|    United States|              Cyprus|    1|
|Equatorial Guinea|       United States|    1|
|    United States|Bosnia and Herzeg...|    1|
+-----------------+--------------------+-----+
only showing top 5 rows



In [0]:
from pyspark.sql.functions import desc, asc, expr
# ordenando por ordem crescente
df.orderBy(expr("count desc")).show(10)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Algeria|    1|
|          Georgia|      United States|    1|
|    United States|             Brunei|    1|
|    United States|           Malaysia|    1|
|            Malta|      United States|    1|
|            Libya|      United States|    1|
|          Belarus|      United States|    1|
|       The Gambia|      United States|    1|
|    United States|          Lithuania|    1|
|    United States|          Gibraltar|    1|
+-----------------+-------------------+-----+
only showing top 10 rows



In [0]:
df.groupBy("DEST_COUNTRY_NAME").count().orderBy("count", ascending=False).show(10)

+-----------------+-----+
|DEST_COUNTRY_NAME|count|
+-----------------+-----+
|    United States|  736|
|             Fiji|    6|
|           Turkey|    6|
|         Dominica|    6|
|           Jordan|    6|
|           Russia|    6|
|          Germany|    6|
|          Senegal|    6|
|         Kiribati|    6|
|            Palau|    6|
+-----------------+-----+
only showing top 10 rows



In [0]:
# visualizando estatísticas descritivas
df.describe().show()

+-------+-----------------+-------------------+------------------+
|summary|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|             count|
+-------+-----------------+-------------------+------------------+
|  count|             1502|               1502|              1502|
|   mean|             null|               null|1718.3189081225032|
| stddev|             null|               null|22300.368619668894|
|    min|      Afghanistan|        Afghanistan|                 1|
|    max|         Zimbabwe|           Zimbabwe|            370002|
+-------+-----------------+-------------------+------------------+



In [0]:
type(df.collect())

Out[21]: list

In [0]:
# iterando sobre todas as linhas do dataframe
for i in df.collect(): # retorna as linhas do dataframe
  print (f"item {i}\n")
  print(i[0], i[1], i[2] * 2)
  print("--------------------------")

item Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15)

United States Romania 30
--------------------------
item Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1)

United States Croatia 2
--------------------------
item Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)

United States Ireland 688
--------------------------
item Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15)

Egypt United States 30
--------------------------
item Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62)

United States India 124
--------------------------
item Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1)

United States Singapore 2
--------------------------
item Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Grenada', count=62)

United States Grenada 124
--------------------------
item Row(DEST_COUNTRY_NAME='Costa Rica

In [0]:
from pyspark.sql.functions import lower, upper, col
df.select(col("DEST_COUNTRY_NAME"), lower(col("DEST_COUNTRY_NAME")), upper(col("DEST_COUNTRY_NAME"))).show(10)

+-----------------+------------------------+------------------------+
|DEST_COUNTRY_NAME|lower(DEST_COUNTRY_NAME)|upper(DEST_COUNTRY_NAME)|
+-----------------+------------------------+------------------------+
|    United States|           united states|           UNITED STATES|
|    United States|           united states|           UNITED STATES|
|    United States|           united states|           UNITED STATES|
|            Egypt|                   egypt|                   EGYPT|
|    United States|           united states|           UNITED STATES|
|    United States|           united states|           UNITED STATES|
|    United States|           united states|           UNITED STATES|
|       Costa Rica|              costa rica|              COSTA RICA|
|          Senegal|                 senegal|                 SENEGAL|
|          Moldova|                 moldova|                 MOLDOVA|
+-----------------+------------------------+------------------------+
only showing top 10 

In [0]:
%sql
-- Usando SQL..
SELECT DEST_COUNTRY_NAME
      ,lower(DEST_COUNTRY_NAME)
      ,upper(DEST_COUNTRY_NAME)
FROM 2015_summary_view

DEST_COUNTRY_NAME,lower(DEST_COUNTRY_NAME),upper(DEST_COUNTRY_NAME)
United States,united states,UNITED STATES
United States,united states,UNITED STATES
United States,united states,UNITED STATES
Egypt,egypt,EGYPT
United States,united states,UNITED STATES
United States,united states,UNITED STATES
United States,united states,UNITED STATES
Costa Rica,costa rica,COSTA RICA
Senegal,senegal,SENEGAL
Moldova,moldova,MOLDOVA


In [0]:
# remove espaços em branco a esquerda
from pyspark.sql.functions import ltrim
df.select(ltrim(col("DEST_COUNTRY_NAME"))).show(2)

+------------------------+
|ltrim(DEST_COUNTRY_NAME)|
+------------------------+
|           United States|
|           United States|
+------------------------+
only showing top 2 rows



In [0]:
# remove espaços a direita
from pyspark.sql.functions import rtrim
df.select(rtrim(col("DEST_COUNTRY_NAME"))).show(2)

+------------------------+
|rtrim(DEST_COUNTRY_NAME)|
+------------------------+
|           United States|
|           United States|
+------------------------+
only showing top 2 rows



In [0]:
# todas as operações juntas..
# a função lit cria uma coluna na cópia do dataframe
from pyspark.sql.functions import lit, ltrim, rtrim, rpad, lpad, trim
df.select(
ltrim(lit(" HELLO ")).alias("ltrim"),
rtrim(lit(" HELLO ")).alias("rtrim"),
trim(lit(" HELLO ")).alias("trim"),
lpad(lit("HELLO"), 3, " ").alias("lp"),
rpad(lit("HELLO"), 10, " ").alias("rp")).show(2)

+------+------+-----+---+----------+
| ltrim| rtrim| trim| lp|        rp|
+------+------+-----+---+----------+
|HELLO | HELLO|HELLO|HEL|HELLO     |
|HELLO | HELLO|HELLO|HEL|HELLO     |
+------+------+-----+---+----------+
only showing top 2 rows



In [0]:
df.orderBy("count", "DEST_COUNTRY_NAME").show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|       Azerbaijan|      United States|    1|
|          Belarus|      United States|    1|
|          Belarus|      United States|    1|
|           Brunei|      United States|    1|
|         Bulgaria|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [0]:
from pyspark.sql.functions import desc, asc
df.orderBy(expr("count desc")).show(2)
df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|              Niger|    1|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 2 rows

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|      United States|358354|
+-----------------+-------------------+------+
only showing top 2 rows



In [0]:
# utilizando SQL
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM 2015_summary_view
GROUP BY DEST_COUNTRY_NAME
""")

In [0]:
# Utilizando Python
dataFrameWay = df.groupBy("DEST_COUNTRY_NAME").count()

In [0]:
# imprime o plano de execução do código
sqlWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#87], functions=[finalmerge_count(merge count#929L) AS count(1)#917L])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#87, 200), ENSURE_REQUIREMENTS, [id=#1249]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#87], functions=[partial_count(1) AS count#929L])
         +- FileScan json [DEST_COUNTRY_NAME#87] Batched: false, DataFilters: [], Format: JSON, Location: InMemoryFileIndex(6 paths)[dbfs:/FileStore/tables/bigdatafords/2010_summary.json, dbfs:/FileStore..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>




In [0]:
# imprime o plano de execução do código
dataFrameWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#87], functions=[finalmerge_count(merge count#934L) AS count(1)#924L])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#87, 200), ENSURE_REQUIREMENTS, [id=#1295]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#87], functions=[partial_count(1) AS count#934L])
         +- FileScan json [DEST_COUNTRY_NAME#87] Batched: false, DataFilters: [], Format: JSON, Location: InMemoryFileIndex(6 paths)[dbfs:/FileStore/tables/bigdatafords/2010_summary.json, dbfs:/FileStore..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>




In [0]:
df = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("/FileStore/tables/bigdatafords/2010_12_01.csv")


In [0]:
#imprime as  10 primeiras linhas
display(df.head(10))

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,2010-12-01T08:26:00.000+0000,2.55,17850.0,United Kingdom
536365,71053,WHITE METAL LANTERN,6,2010-12-01T08:26:00.000+0000,3.39,17850.0,United Kingdom
536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,2010-12-01T08:26:00.000+0000,2.75,17850.0,United Kingdom
536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,2010-12-01T08:26:00.000+0000,3.39,17850.0,United Kingdom
536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,2010-12-01T08:26:00.000+0000,3.39,17850.0,United Kingdom
536365,22752,SET 7 BABUSHKA NESTING BOXES,2,2010-12-01T08:26:00.000+0000,7.65,17850.0,United Kingdom
536365,21730,GLASS STAR FROSTED T-LIGHT HOLDER,6,2010-12-01T08:26:00.000+0000,4.25,17850.0,United Kingdom
536366,22633,HAND WARMER UNION JACK,6,2010-12-01T08:28:00.000+0000,1.85,17850.0,United Kingdom
536366,22632,HAND WARMER RED POLKA DOT,6,2010-12-01T08:28:00.000+0000,1.85,17850.0,United Kingdom
536367,84879,ASSORTED COLOUR BIRD ORNAMENT,32,2010-12-01T08:34:00.000+0000,1.69,13047.0,United Kingdom


In [0]:
# Tipos Boleanos
from pyspark.sql.functions import col
df.where(col("InvoiceNo") != 536365)\
.select("InvoiceNo", "Description")\
.show(5)

+---------+--------------------+
|InvoiceNo|         Description|
+---------+--------------------+
|   536366|HAND WARMER UNION...|
|   536366|HAND WARMER RED P...|
|   536367|ASSORTED COLOUR B...|
|   536367|POPPY'S PLAYHOUSE...|
|   536367|POPPY'S PLAYHOUSE...|
+---------+--------------------+
only showing top 5 rows



In [0]:
# cria a tabela temporária dftrable
df.createOrReplaceTempView("dfTable")

In [0]:
# imprime 10 primeiras linhas
display(df.head(10))

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,2010-12-01T08:26:00.000+0000,2.55,17850.0,United Kingdom
536365,71053,WHITE METAL LANTERN,6,2010-12-01T08:26:00.000+0000,3.39,17850.0,United Kingdom
536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,2010-12-01T08:26:00.000+0000,2.75,17850.0,United Kingdom
536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,2010-12-01T08:26:00.000+0000,3.39,17850.0,United Kingdom
536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,2010-12-01T08:26:00.000+0000,3.39,17850.0,United Kingdom
536365,22752,SET 7 BABUSHKA NESTING BOXES,2,2010-12-01T08:26:00.000+0000,7.65,17850.0,United Kingdom
536365,21730,GLASS STAR FROSTED T-LIGHT HOLDER,6,2010-12-01T08:26:00.000+0000,4.25,17850.0,United Kingdom
536366,22633,HAND WARMER UNION JACK,6,2010-12-01T08:28:00.000+0000,1.85,17850.0,United Kingdom
536366,22632,HAND WARMER RED POLKA DOT,6,2010-12-01T08:28:00.000+0000,1.85,17850.0,United Kingdom
536367,84879,ASSORTED COLOUR BIRD ORNAMENT,32,2010-12-01T08:34:00.000+0000,1.69,13047.0,United Kingdom


In [0]:
# usando o operador booleano com um predicado em uma expressão.
df.where("InvoiceNo <> 536365").show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536366|    22633|HAND WARMER UNION...|       6|2010-12-01 08:28:00|     1.85|   17850.0|United Kingdom|
|   536366|    22632|HAND WARMER RED P...|       6|2010-12-01 08:28:00|     1.85|   17850.0|United Kingdom|
|   536367|    84879|ASSORTED COLOUR B...|      32|2010-12-01 08:34:00|     1.69|   13047.0|United Kingdom|
|   536367|    22745|POPPY'S PLAYHOUSE...|       6|2010-12-01 08:34:00|      2.1|   13047.0|United Kingdom|
|   536367|    22748|POPPY'S PLAYHOUSE...|       6|2010-12-01 08:34:00|      2.1|   13047.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 5 rows



In [0]:
# usando o operador boleando com um predicado em uma expressão.
display(df.where("InvoiceNo = 536365").show(5))

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 5 rows



In [0]:
# Entendendo a ordem dos operadores boleanos
from pyspark.sql.functions import instr
priceFilter = col("UnitPrice") > 600
descripFilter = instr(df.Description, "POSTAGE") >= 1

In [0]:
# aplicando os operadores como filtros
df.where(df.StockCode.isin("DOT")).where(priceFilter | descripFilter).show()

+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|   Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
|   536544|      DOT|DOTCOM POSTAGE|       1|2010-12-01 14:32:00|   569.77|      null|United Kingdom|
|   536592|      DOT|DOTCOM POSTAGE|       1|2010-12-01 17:06:00|   607.49|      null|United Kingdom|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+



In [0]:
%sql
-- Aplicando a mesmo código em SQL
SELECT * FROM dfTable WHERE StockCode in ("DOT") AND (UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1)

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536544,DOT,DOTCOM POSTAGE,1,2010-12-01T14:32:00.000+0000,569.77,,United Kingdom
536592,DOT,DOTCOM POSTAGE,1,2010-12-01T17:06:00.000+0000,607.49,,United Kingdom


In [0]:
# Combinando filtros e operadores boleanos
from pyspark.sql.functions import instr
DOTCodeFilter = col("StockCode") == "DOT"
priceFilter = col("UnitPrice") > 600
descripFilter = instr(col("Description"), "POSTAGE") >= 1


In [0]:
# Combinando filtros e operadores boleanos
df.withColumn("isExpensive", DOTCodeFilter & (priceFilter | descripFilter))\
.where("isExpensive")\
.select("unitPrice", "isExpensive").show(5)

+---------+-----------+
|unitPrice|isExpensive|
+---------+-----------+
|   569.77|       true|
|   607.49|       true|
+---------+-----------+



In [0]:
%sql
-- Aplicando as mesmas ideias usando SQL
SELECT UnitPrice, (StockCode = 'DOT' AND
(UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1)) as isExpensive
FROM dfTable
WHERE (StockCode = 'DOT' AND
(UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1))

UnitPrice,isExpensive
569.77,True
607.49,True


# Trabalhando com tipos diferentes de arquivos

### Modos de leitura
- **permissive**: *Define todos os campos para NULL quando encontra registros corrompidos e coloca todos registros corrompidos em uma coluna chamada _corrupt_record.* (default)

- **dropMalformed**: *Apaga uma linha corrompida ou que este não consiga ler.*

- **failFast**: *Falha imediatamente quando encontra uma linha que não consiga ler.*

In [0]:
# Lendo arquivos csv
# sintaxe 
spark.read.format("csv")
.option("mode", "permissive")
.option("inferSchema", "true")
.option("path", "path/to/file(s)")
.schema(someSchema)
.load()

In [0]:
# leia o arquivo alterando os modos de leitura (failfast, permissive, dropmalformed)
df = spark.read.format("csv")\
.option("mode", "failfast")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("/FileStore/tables/bigdatafords/2010_12_01.csv")

In [0]:
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [0]:
# imprime as 10 primeiras linhas do dataframe
display(df.head(10))

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,2010-12-01T08:26:00.000+0000,2.55,17850.0,United Kingdom
536365,71053,WHITE METAL LANTERN,6,2010-12-01T08:26:00.000+0000,3.39,17850.0,United Kingdom
536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,2010-12-01T08:26:00.000+0000,2.75,17850.0,United Kingdom
536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,2010-12-01T08:26:00.000+0000,3.39,17850.0,United Kingdom
536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,2010-12-01T08:26:00.000+0000,3.39,17850.0,United Kingdom
536365,22752,SET 7 BABUSHKA NESTING BOXES,2,2010-12-01T08:26:00.000+0000,7.65,17850.0,United Kingdom
536365,21730,GLASS STAR FROSTED T-LIGHT HOLDER,6,2010-12-01T08:26:00.000+0000,4.25,17850.0,United Kingdom
536366,22633,HAND WARMER UNION JACK,6,2010-12-01T08:28:00.000+0000,1.85,17850.0,United Kingdom
536366,22632,HAND WARMER RED POLKA DOT,6,2010-12-01T08:28:00.000+0000,1.85,17850.0,United Kingdom
536367,84879,ASSORTED COLOUR BIRD ORNAMENT,32,2010-12-01T08:34:00.000+0000,1.69,13047.0,United Kingdom


#### Criando um schema
- A opção **infer_schema** nem sempre vai definir o melhor datatype.
- Melhora a performance na leitura de grandes bases.
- Permite uma customização dos tipos das colunas.
- É importante saber para reescrita de aplicações. (Códigos pandas)

In [0]:
# imprime o schema do dataframe (infer_schema=True)
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [0]:
# definindo o schema com o objeto StructType
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, DoubleType, TimestampType
schema_df = StructType([
    StructField("InvoiceNo", IntegerType()),
    StructField("StockCode", IntegerType()),
    StructField("Description", StringType()),
    StructField("Quantity", IntegerType()),
    StructField("InvoiceDate", TimestampType()),
    StructField("UnitPrice", DoubleType()),
    StructField("CustomerID", DoubleType()),
    StructField("Country", StringType())
])

In [0]:
# verificando o tipo da variável schema_df
type(schema_df)

Out[75]: pyspark.sql.types.StructType

In [0]:
# usando o parâmetro schema()
df = spark.read.format("csv")\
.option("header", "True")\
.schema(schema_df)\
.option("mode", "failfast")\
.option("timestampFormat",'yyyy-/MM/DD hh:mm:ss')\
.load("/FileStore/tables/bigdatafords/2010_12_01.csv")

In [0]:
# imprime o schema do dataframe.
df.printSchema()

root
 |-- InvoiceNo: integer (nullable = true)
 |-- StockCode: integer (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [0]:
display(df.head(10))

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
[0;32m<command-497105139949038>[0m in [0;36m<module>[0;34m[0m
[0;32m----> 1[0;31m [0mdisplay[0m[0;34m([0m[0mdf[0m[0;34m.[0m[0mhead[0m[0;34m([0m[0;36m10[0m[0;34m)[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/sql/dataframe.py[0m in [0;36mhead[0;34m(self, n)[0m
[1;32m   1744[0m             [0mrs[0m [0;34m=[0m [0mself[0m[0;34m.[0m[0mhead[0m[0;34m([0m[0;36m1[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m   1745[0m             [0;32mreturn[0m [0mrs[0m[0;34m[[0m[0;36m0[0m[0;34m][0m [0;32mif[0m [0mrs[0m [0;32melse[0m [0;32mNone[0m[0;34m[0m[0;34m[0m[0m
[0;32m-> 1746[0;31m         [0;32mreturn[0m [0mself[0m[0;34m.[0m[0mtake[0m[0;34m([0m[0mn[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m   1747[0m

In [0]:
# imprime 10 primeiras linhas do dataframe.
display(df.collect())

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
[0;32m<command-4090805643558199>[0m in [0;36m<module>[0;34m[0m
[1;32m      1[0m [0;31m# imprime 10 primeiras linhas do dataframe.[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 2[0;31m [0mdisplay[0m[0;34m([0m[0mdf[0m[0;34m.[0m[0mcollect[0m[0;34m([0m[0;34m)[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/sql/dataframe.py[0m in [0;36mcollect[0;34m(self)[0m
[1;32m    713[0m         [0;31m# Default path used in OSS Spark / for non-DF-ACL clusters:[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[1;32m    714[0m         [0;32mwith[0m [0mSCCallSiteSync[0m[0;34m([0m[0mself[0m[0;34m.[0m[0m_sc[0m[0;34m)[0m [0;32mas[0m [0mcss[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0;32m--> 715[0;31m             [0msock_info[0m [0;34m=[0m [0mself

### Arquivos JSON

In [0]:
df_json = spark.read.format("json")\
.option("mode", "failfast")\
.option("inferSchema", "true")\
.load("/FileStore/tables/bigdatafords/2010_summary.json")

In [0]:
df_json.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



In [0]:
display(df_json.head(10))

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,1
United States,Ireland,264
United States,India,69
Egypt,United States,24
Equatorial Guinea,United States,1
United States,Singapore,25
United States,Grenada,54
Costa Rica,United States,477
Senegal,United States,29
United States,Marshall Islands,44


### Escrevendo arquivos
- **append** : Adiciona arquivos de saída na lista de arquivos que já existem na localizaçao.
- **overwrite** : Sobreescreve os arquivos no destino.
- **erroIfExists** : Emite um erro e para se já existir arquivos no destino.
- **ignore** : Se existir o dado no destino náo faz nada.

In [0]:
# escrevendo arquivos csv
df.write.format("csv")\
.mode("overwrite")\
.option("sep", ",")\
.save("/FileStore/tables/bigdatafords/2010_summary/saida_2010_12_01.csv")

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
[0;32m<command-4090805643558205>[0m in [0;36m<module>[0;34m[0m
[1;32m      1[0m [0;31m# escrevendo arquivos csv[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 2[0;31m [0mdf[0m[0;34m.[0m[0mwrite[0m[0;34m.[0m[0mformat[0m[0;34m([0m[0;34m"csv"[0m[0;34m)[0m[0;31m\[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      3[0m [0;34m.[0m[0mmode[0m[0;34m([0m[0;34m"overwrite"[0m[0;34m)[0m[0;31m\[0m[0;34m[0m[0;34m[0m[0m
[1;32m      4[0m [0;34m.[0m[0moption[0m[0;34m([0m[0;34m"sep"[0m[0;34m,[0m [0;34m","[0m[0;34m)[0m[0;31m\[0m[0;34m[0m[0;34m[0m[0m
[1;32m      5[0m [0;34m.[0m[0msave[0m[0;34m([0m[0;34m"/FileStore/tables/bigdatafords/2010_summary/saida_2010_12_01.csv"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m

[0;32m/databricks/spark/python/pyspark/sql/read

In [0]:
# observe o arquivo gerado.
file = "/FileStore/tables/bigdatafords/saida_2010_12_01.csv/part-00000-tid-513137111285552141-fa5fcb38-55a1-4a12-ac99-df3fa327627c-83-1-c000.csv"
df = spark.read.format("csv")\
.option("header", "True")\
.option("inferSchema", "True")\
.option("timestampFormat",'yyyy-/MM/DD hh:mm:ss')\
.load(file)

In [0]:
# imprime as 10 primeiras linhas do dataframe 
df.show(10)

#### Escrevendo dados em paralelo

In [0]:
# reparticionando o arquivo csv
# observe o diretório criado
df.repartition(5).write.format("csv")\
.mode("overwrite") \
.option("sep", ",") \
.save("/FileStore/tables/bigdatafords/saida_2010_12_01.csv")

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
[0;32m<command-4090805643558209>[0m in [0;36m<module>[0;34m[0m
[1;32m      1[0m [0;31m# reparticionando o arquivo csv[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[1;32m      2[0m [0;31m# observe o diretório criado[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 3[0;31m [0mdf[0m[0;34m.[0m[0mrepartition[0m[0;34m([0m[0;36m5[0m[0;34m)[0m[0;34m.[0m[0mwrite[0m[0;34m.[0m[0mformat[0m[0;34m([0m[0;34m"csv"[0m[0;34m)[0m[0;31m\[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      4[0m [0;34m.[0m[0mmode[0m[0;34m([0m[0;34m"overwrite"[0m[0;34m)[0m[0;31m [0m[0;31m\[0m[0;34m[0m[0;34m[0m[0m
[1;32m      5[0m [0;34m.[0m[0moption[0m[0;34m([0m[0;34m"sep"[0m[0;34m,[0m [0;34m","[0m[0;34m)[0m[0;31m [0m[0;31m\[0m[0;34m[0m[0;34m[0m[0m

[0;32m/databricks/spark/p

### Arquivos Parquet

##### Convertendo .csv para .parquet
- Dataset .csv usado https://www.kaggle.com/nhs/general-practice-prescribing-data

In [0]:
# Lendo todos os arquivos .csv do diretório bigdata (>4GB)
df = spark.read.format("csv")\
.option("header", "True")\
.option("inferSchema","True")\
.load("/FileStore/tables/bigdata/*.csv")

In [0]:
display(df.head(10))

practice,bnf_code,bnf_name,items,nic,act_cost,quantity
5668,8092,592,2,44.1,40.84,189
1596,17512,16983,2,1.64,1.64,35
1596,25587,16124,1,1.26,1.28,42
1596,12551,1282,2,0.86,1.02,42
1596,18938,10575,1,1.85,1.82,56
1596,8777,21507,1,3.31,3.18,56
1596,9369,12008,1,63.15,58.56,56
1596,27926,17643,2,158.66,147.07,56
1596,26148,10230,1,0.35,0.44,14
1596,9148,3381,1,0.26,0.35,7


In [0]:
df.printSchema()

root
 |-- practice: integer (nullable = true)
 |-- bnf_code: integer (nullable = true)
 |-- bnf_name: integer (nullable = true)
 |-- items: integer (nullable = true)
 |-- nic: double (nullable = true)
 |-- act_cost: double (nullable = true)
 |-- quantity: integer (nullable = true)



In [0]:
# conta a quantidade de linhas
df.count()

Out[4]: 20336266

*Atente para NÃO escrever e ler arquivos parquet em versoes diferentes*

In [0]:
# escrevendo em formato parquet: armazena os dados orientados por coluna
df.write.format("parquet")\
.mode("overwrite")\
.save("/FileStore/tables/bigdatafords/df-parquet-file.parquet")

In [0]:
%fs
ls /FileStore/tables/bigdatafords/df-parquet-file.parquet

path,name,size,modificationTime
dbfs:/FileStore/tables/bigdatafords/df-parquet-file.parquet/_SUCCESS,_SUCCESS,0,1660870078000
dbfs:/FileStore/tables/bigdatafords/df-parquet-file.parquet/_committed_4603035452224338177,_committed_4603035452224338177,816,1660870078000
dbfs:/FileStore/tables/bigdatafords/df-parquet-file.parquet/_started_4603035452224338177,_started_4603035452224338177,0,1660869994000
dbfs:/FileStore/tables/bigdatafords/df-parquet-file.parquet/part-00000-tid-4603035452224338177-cbcaa6f6-2e4e-409e-b483-0357632243dd-27-1-c000.snappy.parquet,part-00000-tid-4603035452224338177-cbcaa6f6-2e4e-409e-b483-0357632243dd-27-1-c000.snappy.parquet,27280062,1660870076000
dbfs:/FileStore/tables/bigdatafords/df-parquet-file.parquet/part-00001-tid-4603035452224338177-cbcaa6f6-2e4e-409e-b483-0357632243dd-28-1-c000.snappy.parquet,part-00001-tid-4603035452224338177-cbcaa6f6-2e4e-409e-b483-0357632243dd-28-1-c000.snappy.parquet,27242683,1660870076000
dbfs:/FileStore/tables/bigdatafords/df-parquet-file.parquet/part-00002-tid-4603035452224338177-cbcaa6f6-2e4e-409e-b483-0357632243dd-29-1-c000.snappy.parquet,part-00002-tid-4603035452224338177-cbcaa6f6-2e4e-409e-b483-0357632243dd-29-1-c000.snappy.parquet,27073684,1660870075000
dbfs:/FileStore/tables/bigdatafords/df-parquet-file.parquet/part-00003-tid-4603035452224338177-cbcaa6f6-2e4e-409e-b483-0357632243dd-30-1-c000.snappy.parquet,part-00003-tid-4603035452224338177-cbcaa6f6-2e4e-409e-b483-0357632243dd-30-1-c000.snappy.parquet,27305131,1660870072000
dbfs:/FileStore/tables/bigdatafords/df-parquet-file.parquet/part-00004-tid-4603035452224338177-cbcaa6f6-2e4e-409e-b483-0357632243dd-31-1-c000.snappy.parquet,part-00004-tid-4603035452224338177-cbcaa6f6-2e4e-409e-b483-0357632243dd-31-1-c000.snappy.parquet,27320339,1660870076000
dbfs:/FileStore/tables/bigdatafords/df-parquet-file.parquet/part-00005-tid-4603035452224338177-cbcaa6f6-2e4e-409e-b483-0357632243dd-32-1-c000.snappy.parquet,part-00005-tid-4603035452224338177-cbcaa6f6-2e4e-409e-b483-0357632243dd-32-1-c000.snappy.parquet,27188455,1660870074000
dbfs:/FileStore/tables/bigdatafords/df-parquet-file.parquet/part-00006-tid-4603035452224338177-cbcaa6f6-2e4e-409e-b483-0357632243dd-33-1-c000.snappy.parquet,part-00006-tid-4603035452224338177-cbcaa6f6-2e4e-409e-b483-0357632243dd-33-1-c000.snappy.parquet,26692930,1660870069000


In [0]:
# lendo arquivos parquet
# atente para a velocidade de leitura
df_parquet = spark.read.format("parquet")\
.load("/FileStore/tables/bigdatafords/df-parquet-file.parquet")

In [0]:
type(df_parquet)

Out[22]: pyspark.sql.dataframe.DataFrame

In [0]:
# conta a quantidade de linhas do dataframe
df_parquet.count()

Out[7]: 20336266

In [0]:
# visualizando o dataframe
display(df_parquet.head(10))

In [0]:
# visualizando o tamanho dos arquivos
display(dbutils.fs.ls("/FileStore/tables/bigdatafords/df-parquet-file.parquet"))

path,name,size,modificationTime
dbfs:/FileStore/tables/bigdatafords/df-parquet-file.parquet/_SUCCESS,_SUCCESS,0,1660870078000
dbfs:/FileStore/tables/bigdatafords/df-parquet-file.parquet/_committed_4603035452224338177,_committed_4603035452224338177,816,1660870078000
dbfs:/FileStore/tables/bigdatafords/df-parquet-file.parquet/_started_4603035452224338177,_started_4603035452224338177,0,1660869994000
dbfs:/FileStore/tables/bigdatafords/df-parquet-file.parquet/part-00000-tid-4603035452224338177-cbcaa6f6-2e4e-409e-b483-0357632243dd-27-1-c000.snappy.parquet,part-00000-tid-4603035452224338177-cbcaa6f6-2e4e-409e-b483-0357632243dd-27-1-c000.snappy.parquet,27280062,1660870076000
dbfs:/FileStore/tables/bigdatafords/df-parquet-file.parquet/part-00001-tid-4603035452224338177-cbcaa6f6-2e4e-409e-b483-0357632243dd-28-1-c000.snappy.parquet,part-00001-tid-4603035452224338177-cbcaa6f6-2e4e-409e-b483-0357632243dd-28-1-c000.snappy.parquet,27242683,1660870076000
dbfs:/FileStore/tables/bigdatafords/df-parquet-file.parquet/part-00002-tid-4603035452224338177-cbcaa6f6-2e4e-409e-b483-0357632243dd-29-1-c000.snappy.parquet,part-00002-tid-4603035452224338177-cbcaa6f6-2e4e-409e-b483-0357632243dd-29-1-c000.snappy.parquet,27073684,1660870075000
dbfs:/FileStore/tables/bigdatafords/df-parquet-file.parquet/part-00003-tid-4603035452224338177-cbcaa6f6-2e4e-409e-b483-0357632243dd-30-1-c000.snappy.parquet,part-00003-tid-4603035452224338177-cbcaa6f6-2e4e-409e-b483-0357632243dd-30-1-c000.snappy.parquet,27305131,1660870072000
dbfs:/FileStore/tables/bigdatafords/df-parquet-file.parquet/part-00004-tid-4603035452224338177-cbcaa6f6-2e4e-409e-b483-0357632243dd-31-1-c000.snappy.parquet,part-00004-tid-4603035452224338177-cbcaa6f6-2e4e-409e-b483-0357632243dd-31-1-c000.snappy.parquet,27320339,1660870076000
dbfs:/FileStore/tables/bigdatafords/df-parquet-file.parquet/part-00005-tid-4603035452224338177-cbcaa6f6-2e4e-409e-b483-0357632243dd-32-1-c000.snappy.parquet,part-00005-tid-4603035452224338177-cbcaa6f6-2e4e-409e-b483-0357632243dd-32-1-c000.snappy.parquet,27188455,1660870074000
dbfs:/FileStore/tables/bigdatafords/df-parquet-file.parquet/part-00006-tid-4603035452224338177-cbcaa6f6-2e4e-409e-b483-0357632243dd-33-1-c000.snappy.parquet,part-00006-tid-4603035452224338177-cbcaa6f6-2e4e-409e-b483-0357632243dd-33-1-c000.snappy.parquet,26692930,1660870069000


In [0]:
%scala
// script para pegar tamanho em Gigabytes
val path="/FileStore/tables/bigdatafords/df-parquet-file.parquet"
val filelist=dbutils.fs.ls(path)
val df_temp = filelist.toDF()
df_temp.createOrReplaceTempView("adlsSize")

In [0]:
%sql
-- consulta a view criada.
select round(sum(size)/(1024*1024*1024),3) as sizeInGB from adlsSize

sizeInGB
0.2


### Spark + PostgreSQL
- Consultar e escrever em um banco de dados relacional.

In [0]:
# Isso é equivalente a executar uma query como: select * from pg_catalog.pg_tables
# jdbc:postgresql://pgserver-1.postgres.database.azure.com:5432/{your_database}?user=stack_user@pgserver-1&password={your_password}&sslmode=require
pgDF = spark.read.format("jdbc")\
.option("driver", "org.postgresql.Driver")\
.option("url", "jdbc:postgresql://pgserver-1.postgres.database.azure.com:5432/postgres?user=stack_user@pgserver-1&password=Bigdata2021&sslmode=require")\
.option("dbtable", "pg_catalog.pg_tables")\
.option("user", "stack_user").option("password", "Bigdata2021").load()

In [0]:
# imprime todas as linhas do dataframe
display(pgDF.collect())

In [0]:
# consulta dados da coluna schemaname
pgDF.select("schemaname").distinct().show()

In [0]:
# Especifica uma query diretamente.
# Útil para evitar o "select * from."
pgDF = spark.read.format("jdbc")\
.option("driver", "org.postgresql.Driver")\
.option("url", "jdbc:postgresql://pgserver-1.postgres.database.azure.com:5432/postgres?user=stack_user@pgserver-1&password=Bigdata2021&sslmode=require")\
.option("query", "select schemaname, tablename from pg_catalog.pg_tables")\
.option("user", "stack_user").option("password", "Bigdata2021").load()

In [0]:
# imprime todas as linhas do dataframe
display(pgDF.collect())

In [0]:
# imprime as 5 linhas do dataframe df
# não se esqueça de recriar esse dataframe.
df.show(5)

In [0]:
# cria a tabela "produtos" a apartir dos dados do dataframe df.
pgDF.write.mode("overwrite")\
.format("jdbc")\
.option("url", "jdbc:postgresql://pgserver-1.postgres.database.azure.com:5432/postgres?user=stack_user@pgserver-1&password=Bigdata2021&sslmode=require")\
.option("dbtable", "produtos")\
.option("user", "stack_user")\
.option("password", "Bigdata2021")\
.save()

In [0]:
# cria o dataframe df_produtos a partir da tabela criada.
df_produtos = spark.read.format("jdbc")\
.option("driver", "org.postgresql.Driver")\
.option("url", "jdbc:postgresql://pgserver-1.postgres.database.azure.com:5432/postgres?user=stack_user@pgserver-1&password=Bigdata2021&sslmode=require")\
.option("dbtable", "produtos")\
.option("user", "stack_user").option("password", "Bigdata2021").load()

In [0]:
# imprime as linhas do dataframe.
display(df_produtos.collect())

#### Avançando com Pyspark

In [0]:
# lendo arquivos parquet
df = spark.read.format("csv")\
.option("header", "True")\
.option("inferSchema", "True")\
.option("mode", "failfast")\
.option("timestampFormat",'yyyy-/MM/DD hh:mm:ss')\
.load("/FileStore/tables/bigdatafords/2010_12_01.csv")

- **mean()** - Retorna o valor médio de cada grupo.

- **max()** - Retorna o valor máximo de cada grupo.

- **min()** - Retorna o valor mínimo de cada grupo.

- **sum()** - Retorna a soma de todos os valores do grupo.

- **avg()** - Retorna o valor médio de cada grupo.

In [0]:
# imprime as 10 primeiras linhas do dataframe
df.show(10)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|2010-12-01 08:26:00|     7.65|   17850.0|United Kingdom|
|   536365|    21730|GLASS S

In [0]:
# Soma preços unitários por país
df.groupBy("Country").sum("UnitPrice").show()

+--------------+------------------+
|       Country|    sum(UnitPrice)|
+--------------+------------------+
|       Germany| 93.82000000000002|
|        France|             55.29|
|          EIRE|133.64000000000001|
|        Norway|            102.67|
|     Australia|              73.9|
|United Kingdom|12428.080000000024|
|   Netherlands|             16.85|
+--------------+------------------+



In [0]:
# Conta a quantidade de países distintos.
df.groupBy("Country").count().show()

+--------------+-----+
|       Country|count|
+--------------+-----+
|       Germany|   29|
|        France|   20|
|          EIRE|   21|
|        Norway|   73|
|     Australia|   14|
|United Kingdom| 2949|
|   Netherlands|    2|
+--------------+-----+



In [0]:
# retorna o valor mínimo por grupo
df.groupBy("Country").min("UnitPrice").show()

+--------------+--------------+
|       Country|min(UnitPrice)|
+--------------+--------------+
|       Germany|          0.42|
|        France|          0.42|
|          EIRE|          0.65|
|        Norway|          0.29|
|     Australia|          0.85|
|United Kingdom|           0.0|
|   Netherlands|          1.85|
+--------------+--------------+



In [0]:
# retorna o valor mínimo por grupo
df.groupBy("Country").max("UnitPrice").show()

+--------------+--------------+
|       Country|max(UnitPrice)|
+--------------+--------------+
|       Germany|          18.0|
|        France|          18.0|
|          EIRE|          50.0|
|        Norway|          7.95|
|     Australia|           8.5|
|United Kingdom|        607.49|
|   Netherlands|          15.0|
+--------------+--------------+



In [0]:
# retorna o valor médio por grupo
df.groupBy("Country").avg("UnitPrice").show()

+--------------+------------------+
|       Country|    avg(UnitPrice)|
+--------------+------------------+
|       Germany| 3.235172413793104|
|        France|            2.7645|
|          EIRE|6.3638095238095245|
|        Norway|1.4064383561643836|
|     Australia| 5.278571428571429|
|United Kingdom|4.2143370634113335|
|   Netherlands|             8.425|
+--------------+------------------+



In [0]:
# retorna o valor médio por grupo
df.groupBy("Country").mean("UnitPrice").show()

+--------------+------------------+
|       Country|    avg(UnitPrice)|
+--------------+------------------+
|       Germany| 3.235172413793104|
|        France|            2.7645|
|          EIRE|6.3638095238095245|
|        Norway|1.4064383561643836|
|     Australia| 5.278571428571429|
|United Kingdom|4.2143370634113335|
|   Netherlands|             8.425|
+--------------+------------------+



In [0]:
# GroupBy várias colunas
df.groupBy("Country","CustomerID")\
    .sum("UnitPrice")\
    .show()

+--------------+----------+------------------+
|       Country|CustomerID|    sum(UnitPrice)|
+--------------+----------+------------------+
|United Kingdom|   17420.0| 38.99999999999999|
|United Kingdom|   15922.0|              48.5|
|United Kingdom|   16250.0|             47.27|
|United Kingdom|   13065.0| 73.11000000000001|
|United Kingdom|   18074.0|62.150000000000006|
|United Kingdom|   16048.0|12.969999999999999|
|       Germany|   12472.0|             49.45|
|United Kingdom|   18085.0|              34.6|
|United Kingdom|   17905.0|109.90000000000003|
|United Kingdom|   17841.0|254.63999999999982|
|United Kingdom|   15291.0|               6.0|
|United Kingdom|   17951.0|22.000000000000004|
|United Kingdom|   13255.0|27.299999999999997|
|United Kingdom|   17690.0|              34.8|
|United Kingdom|   18229.0|             48.65|
|United Kingdom|   15605.0| 58.20000000000002|
|United Kingdom|   18011.0| 66.10999999999999|
|United Kingdom|   17809.0|              1.45|
|United Kingd

#### Trabalhando com datas
- Existem diversas funçoes em Pyspark para manipular datas e timestamp.
- Evite escrever suas próprias funçoes para isso.
- Algumas funcoes mais usadas:
    - current_day():
    - date_format(dateExpr,format):
    - to_date():
    - to_date(column, fmt):
    - add_months(Column, numMonths):
    - date_add(column, days):
    - date_sub(column, days):
    - datediff(end, start)
    - current_timestamp():
    - hour(column):

In [0]:
# imprime o dataframe
df.show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|2010-12-01 08:26:00|     7.65|   17850.0|United Kingdom|
|   536365|    21730|GLASS S

In [0]:
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [0]:
from pyspark.sql.functions import *
# current_date()
df.select(current_date().alias("current_date")).show(2)

+------------+
|current_date|
+------------+
|  2022-08-20|
|  2022-08-20|
+------------+
only showing top 2 rows



In [0]:
# date_format()
df.select(col("InvoiceDate"), \
          date_format(col("InvoiceDate"), "dd-MM-yyyy hh:mm:ss")\
          .alias("date_format")).show()

+-------------------+-------------------+
|        InvoiceDate|        date_format|
+-------------------+-------------------+
|2010-12-01 08:26:00|01-12-2010 08:26:00|
|2010-12-01 08:26:00|01-12-2010 08:26:00|
|2010-12-01 08:26:00|01-12-2010 08:26:00|
|2010-12-01 08:26:00|01-12-2010 08:26:00|
|2010-12-01 08:26:00|01-12-2010 08:26:00|
|2010-12-01 08:26:00|01-12-2010 08:26:00|
|2010-12-01 08:26:00|01-12-2010 08:26:00|
|2010-12-01 08:28:00|01-12-2010 08:28:00|
|2010-12-01 08:28:00|01-12-2010 08:28:00|
|2010-12-01 08:34:00|01-12-2010 08:34:00|
|2010-12-01 08:34:00|01-12-2010 08:34:00|
|2010-12-01 08:34:00|01-12-2010 08:34:00|
|2010-12-01 08:34:00|01-12-2010 08:34:00|
|2010-12-01 08:34:00|01-12-2010 08:34:00|
|2010-12-01 08:34:00|01-12-2010 08:34:00|
|2010-12-01 08:34:00|01-12-2010 08:34:00|
|2010-12-01 08:34:00|01-12-2010 08:34:00|
|2010-12-01 08:34:00|01-12-2010 08:34:00|
|2010-12-01 08:34:00|01-12-2010 08:34:00|
|2010-12-01 08:34:00|01-12-2010 08:34:00|
+-------------------+-------------

In [0]:
# datediff
df.select(col("InvoiceDate"),
    datediff(current_date(),col("InvoiceDate")).alias("datediff")  
  ).show()

+-------------------+--------+
|        InvoiceDate|datediff|
+-------------------+--------+
|2010-12-01 08:26:00|    4280|
|2010-12-01 08:26:00|    4280|
|2010-12-01 08:26:00|    4280|
|2010-12-01 08:26:00|    4280|
|2010-12-01 08:26:00|    4280|
|2010-12-01 08:26:00|    4280|
|2010-12-01 08:26:00|    4280|
|2010-12-01 08:28:00|    4280|
|2010-12-01 08:28:00|    4280|
|2010-12-01 08:34:00|    4280|
|2010-12-01 08:34:00|    4280|
|2010-12-01 08:34:00|    4280|
|2010-12-01 08:34:00|    4280|
|2010-12-01 08:34:00|    4280|
|2010-12-01 08:34:00|    4280|
|2010-12-01 08:34:00|    4280|
|2010-12-01 08:34:00|    4280|
|2010-12-01 08:34:00|    4280|
|2010-12-01 08:34:00|    4280|
|2010-12-01 08:34:00|    4280|
+-------------------+--------+
only showing top 20 rows



In [0]:
# months_between()
df.select(col("InvoiceDate"), 
    months_between(current_date(),col("InvoiceDate")).alias("months_between")  
  ).show()

+-------------------+--------------+
|        InvoiceDate|months_between|
+-------------------+--------------+
|2010-12-01 08:26:00|   140.6015681|
|2010-12-01 08:26:00|   140.6015681|
|2010-12-01 08:26:00|   140.6015681|
|2010-12-01 08:26:00|   140.6015681|
|2010-12-01 08:26:00|   140.6015681|
|2010-12-01 08:26:00|   140.6015681|
|2010-12-01 08:26:00|   140.6015681|
|2010-12-01 08:28:00|   140.6015233|
|2010-12-01 08:28:00|   140.6015233|
|2010-12-01 08:34:00|  140.60138889|
|2010-12-01 08:34:00|  140.60138889|
|2010-12-01 08:34:00|  140.60138889|
|2010-12-01 08:34:00|  140.60138889|
|2010-12-01 08:34:00|  140.60138889|
|2010-12-01 08:34:00|  140.60138889|
|2010-12-01 08:34:00|  140.60138889|
|2010-12-01 08:34:00|  140.60138889|
|2010-12-01 08:34:00|  140.60138889|
|2010-12-01 08:34:00|  140.60138889|
|2010-12-01 08:34:00|  140.60138889|
+-------------------+--------------+
only showing top 20 rows



In [0]:
# utiliza as funçoes para adicionar e subtrair meses e dias
df.select(col("InvoiceDate"), 
    add_months(col("InvoiceDate"), 3).alias("add_months"), 
    add_months(col("InvoiceDate"), -3).alias("sub_months"), 
    date_add(col("InvoiceDate"), 4).alias("date_add"), 
    date_sub(col("InvoiceDate"), 4).alias("date_sub") 
  ).show()

+-------------------+----------+----------+----------+----------+
|        InvoiceDate|add_months|sub_months|  date_add|  date_sub|
+-------------------+----------+----------+----------+----------+
|2010-12-01 08:26:00|2011-03-01|2010-09-01|2010-12-05|2010-11-27|
|2010-12-01 08:26:00|2011-03-01|2010-09-01|2010-12-05|2010-11-27|
|2010-12-01 08:26:00|2011-03-01|2010-09-01|2010-12-05|2010-11-27|
|2010-12-01 08:26:00|2011-03-01|2010-09-01|2010-12-05|2010-11-27|
|2010-12-01 08:26:00|2011-03-01|2010-09-01|2010-12-05|2010-11-27|
|2010-12-01 08:26:00|2011-03-01|2010-09-01|2010-12-05|2010-11-27|
|2010-12-01 08:26:00|2011-03-01|2010-09-01|2010-12-05|2010-11-27|
|2010-12-01 08:28:00|2011-03-01|2010-09-01|2010-12-05|2010-11-27|
|2010-12-01 08:28:00|2011-03-01|2010-09-01|2010-12-05|2010-11-27|
|2010-12-01 08:34:00|2011-03-01|2010-09-01|2010-12-05|2010-11-27|
|2010-12-01 08:34:00|2011-03-01|2010-09-01|2010-12-05|2010-11-27|
|2010-12-01 08:34:00|2011-03-01|2010-09-01|2010-12-05|2010-11-27|
|2010-12-0

In [0]:
# Extrai ano, mës, próximo dia e dia da semana.
df.select(col("InvoiceDate"), 
     year(col("InvoiceDate")).alias("year"), 
     month(col("InvoiceDate")).alias("month"), 
     next_day(col("InvoiceDate"),"Sunday").alias("next_day"), 
     weekofyear(col("InvoiceDate")).alias("weekofyear") 
  ).show()

+-------------------+----+-----+----------+----------+
|        InvoiceDate|year|month|  next_day|weekofyear|
+-------------------+----+-----+----------+----------+
|2010-12-01 08:26:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:26:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:26:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:26:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:26:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:26:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:26:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:28:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:28:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:34:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:34:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:34:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:34:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:34:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:34:00|2010|   12|2010-12-05|        48|
|2010-12-0

In [0]:
# Dia da semana, dia do mës, dias do ano
df.select(col("InvoiceDate"),  
     dayofweek(col("InvoiceDate")).alias("dayofweek"), 
     dayofmonth(col("InvoiceDate")).alias("dayofmonth"), 
     dayofyear(col("InvoiceDate")).alias("dayofyear"), 
  ).show()

+-------------------+---------+----------+---------+
|        InvoiceDate|dayofweek|dayofmonth|dayofyear|
+-------------------+---------+----------+---------+
|2010-12-01 08:26:00|        4|         1|      335|
|2010-12-01 08:26:00|        4|         1|      335|
|2010-12-01 08:26:00|        4|         1|      335|
|2010-12-01 08:26:00|        4|         1|      335|
|2010-12-01 08:26:00|        4|         1|      335|
|2010-12-01 08:26:00|        4|         1|      335|
|2010-12-01 08:26:00|        4|         1|      335|
|2010-12-01 08:28:00|        4|         1|      335|
|2010-12-01 08:28:00|        4|         1|      335|
|2010-12-01 08:34:00|        4|         1|      335|
|2010-12-01 08:34:00|        4|         1|      335|
|2010-12-01 08:34:00|        4|         1|      335|
|2010-12-01 08:34:00|        4|         1|      335|
|2010-12-01 08:34:00|        4|         1|      335|
|2010-12-01 08:34:00|        4|         1|      335|
|2010-12-01 08:34:00|        4|         1|    

In [0]:
# imprime o timestamp atual
df.select(current_timestamp().alias("current_timestamp")
  ).show(1, truncate=False)

+-----------------------+
|current_timestamp      |
+-----------------------+
|2022-08-20 01:03:22.443|
+-----------------------+
only showing top 1 row



In [0]:
# retorna hora, minuto e segundo
df.select(col("InvoiceDate"), 
    hour(col("InvoiceDate")).alias("hour"), 
    minute(col("InvoiceDate")).alias("minute"),
    second(col("InvoiceDate")).alias("second") 
  ).show()

+-------------------+----+------+------+
|        InvoiceDate|hour|minute|second|
+-------------------+----+------+------+
|2010-12-01 08:26:00|   8|    26|     0|
|2010-12-01 08:26:00|   8|    26|     0|
|2010-12-01 08:26:00|   8|    26|     0|
|2010-12-01 08:26:00|   8|    26|     0|
|2010-12-01 08:26:00|   8|    26|     0|
|2010-12-01 08:26:00|   8|    26|     0|
|2010-12-01 08:26:00|   8|    26|     0|
|2010-12-01 08:28:00|   8|    28|     0|
|2010-12-01 08:28:00|   8|    28|     0|
|2010-12-01 08:34:00|   8|    34|     0|
|2010-12-01 08:34:00|   8|    34|     0|
|2010-12-01 08:34:00|   8|    34|     0|
|2010-12-01 08:34:00|   8|    34|     0|
|2010-12-01 08:34:00|   8|    34|     0|
|2010-12-01 08:34:00|   8|    34|     0|
|2010-12-01 08:34:00|   8|    34|     0|
|2010-12-01 08:34:00|   8|    34|     0|
|2010-12-01 08:34:00|   8|    34|     0|
|2010-12-01 08:34:00|   8|    34|     0|
|2010-12-01 08:34:00|   8|    34|     0|
+-------------------+----+------+------+
only showing top

#### Missing Values com Pyspark

In [0]:
# visualizando datasets de exemplos da databricks
display(dbutils.fs.ls("/databricks-datasets"))

path,name,size,modificationTime
dbfs:/databricks-datasets/,databricks-datasets/,0,0
dbfs:/databricks-datasets/COVID/,COVID/,0,0
dbfs:/databricks-datasets/README.md,README.md,976,1532468253000
dbfs:/databricks-datasets/Rdatasets/,Rdatasets/,0,0
dbfs:/databricks-datasets/SPARK_README.md,SPARK_README.md,3359,1455043490000
dbfs:/databricks-datasets/adult/,adult/,0,0
dbfs:/databricks-datasets/airlines/,airlines/,0,0
dbfs:/databricks-datasets/amazon/,amazon/,0,0
dbfs:/databricks-datasets/asa/,asa/,0,0
dbfs:/databricks-datasets/atlas_higgs/,atlas_higgs/,0,0


In [0]:
# lendo o arquivo de dados
# inferSchema = True
# header = True

arquivo = "dbfs:/databricks-datasets/flights/"

df = spark \
.read.format("csv")\
.option("inferSchema", "True")\
.option("header", "True")\
.csv(arquivo)

In [0]:
df.show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
|01061215|   -6|     602|   ABE|        ATL|
|01061725|   69|     602|   ABE|        ATL|
|01061230|    0|     369|   ABE|        DTW|
|01060625|   -3|     602|   ABE|        ATL|
|01070600|    0|     369|   ABE|        DTW|
|01071725|    0|     602|   ABE|        ATL|
|01071230|    0|     369|   ABE|        DTW|
|01070625|    0|     602|   ABE|        ATL|
|01071219|    0|     569|   ABE|        ORD|
|01080600|

In [0]:
df.filter("delay is NULL").show()

+--------------------+-----+--------+------+-----------+
|                date|delay|distance|origin|destination|
+--------------------+-----+--------+------+-----------+
|Abbotsford\tBC\tC...| null|    null|  null|       null|
|Aberdeen\tSD\tUSA...| null|    null|  null|       null|
|Abilene\tTX\tUSA\...| null|    null|  null|       null|
| Akron\tOH\tUSA\tCAK| null|    null|  null|       null|
|Alamosa\tCO\tUSA\...| null|    null|  null|       null|
|Albany\tGA\tUSA\tABY| null|    null|  null|       null|
|Albany\tNY\tUSA\tALB| null|    null|  null|       null|
|Albuquerque\tNM\t...| null|    null|  null|       null|
|Alexandria\tLA\tU...| null|    null|  null|       null|
|Allentown\tPA\tUS...| null|    null|  null|       null|
|Alliance\tNE\tUSA...| null|    null|  null|       null|
|Alpena\tMI\tUSA\tAPN| null|    null|  null|       null|
|Altoona\tPA\tUSA\...| null|    null|  null|       null|
|Amarillo\tTX\tUSA...| null|    null|  null|       null|
|Anahim Lake\tBC\t...| null|   

In [0]:
# filtrando valores missing
df.filter(df.delay.isNull()).show(10)

+--------------------+-----+--------+------+-----------+
|                date|delay|distance|origin|destination|
+--------------------+-----+--------+------+-----------+
|Abbotsford\tBC\tC...| null|    null|  null|       null|
|Aberdeen\tSD\tUSA...| null|    null|  null|       null|
|Abilene\tTX\tUSA\...| null|    null|  null|       null|
| Akron\tOH\tUSA\tCAK| null|    null|  null|       null|
|Alamosa\tCO\tUSA\...| null|    null|  null|       null|
|Albany\tGA\tUSA\tABY| null|    null|  null|       null|
|Albany\tNY\tUSA\tALB| null|    null|  null|       null|
|Albuquerque\tNM\t...| null|    null|  null|       null|
|Alexandria\tLA\tU...| null|    null|  null|       null|
|Allentown\tPA\tUS...| null|    null|  null|       null|
+--------------------+-----+--------+------+-----------+
only showing top 10 rows



In [0]:
# preenche os dados missing com o valor 0
df.na.fill(value=0).show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
|01061215|   -6|     602|   ABE|        ATL|
|01061725|   69|     602|   ABE|        ATL|
|01061230|    0|     369|   ABE|        DTW|
|01060625|   -3|     602|   ABE|        ATL|
|01070600|    0|     369|   ABE|        DTW|
|01071725|    0|     602|   ABE|        ATL|
|01071230|    0|     369|   ABE|        DTW|
|01070625|    0|     602|   ABE|        ATL|
|01071219|    0|     569|   ABE|        ORD|
|01080600|

In [0]:
# preenche valores missing com valor 0 apenas da coluna delay
df.na.fill(value=0, subset=['delay']).show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
|01061215|   -6|     602|   ABE|        ATL|
|01061725|   69|     602|   ABE|        ATL|
|01061230|    0|     369|   ABE|        DTW|
|01060625|   -3|     602|   ABE|        ATL|
|01070600|    0|     369|   ABE|        DTW|
|01071725|    0|     602|   ABE|        ATL|
|01071230|    0|     369|   ABE|        DTW|
|01070625|    0|     602|   ABE|        ATL|
|01071219|    0|     569|   ABE|        ORD|
|01080600|

In [0]:
# preenche os dados com valores de string vazia
df.na.fill("").show(100)

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
|01061215|   -6|     602|   ABE|        ATL|
|01061725|   69|     602|   ABE|        ATL|
|01061230|    0|     369|   ABE|        DTW|
|01060625|   -3|     602|   ABE|        ATL|
|01070600|    0|     369|   ABE|        DTW|
|01071725|    0|     602|   ABE|        ATL|
|01071230|    0|     369|   ABE|        DTW|
|01070625|    0|     602|   ABE|        ATL|
|01071219|    0|     569|   ABE|        ORD|
|01080600|

In [0]:
df.filter("delay is NULL").select(col("delay")).count()

Out[40]: 526

In [0]:
# remove qualquer linha nula de qualquer coluna
df.na.drop().show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
|01061215|   -6|     602|   ABE|        ATL|
|01061725|   69|     602|   ABE|        ATL|
|01061230|    0|     369|   ABE|        DTW|
|01060625|   -3|     602|   ABE|        ATL|
|01070600|    0|     369|   ABE|        DTW|
|01071725|    0|     602|   ABE|        ATL|
|01071230|    0|     369|   ABE|        DTW|
|01070625|    0|     602|   ABE|        ATL|
|01071219|    0|     569|   ABE|        ORD|
|01080600|

#### Tarefas básicas em dataframes

In [0]:
# Adicionando uma coluna ao dataframe
df = df.withColumn('Nova Coluna', df['delay'] + 2)
df.show(10)

+--------+-----+--------+------+-----------+-----------+
|    date|delay|distance|origin|destination|Nova Coluna|
+--------+-----+--------+------+-----------+-----------+
|01011245|    6|     602|   ABE|        ATL|        8.0|
|01020600|   -8|     369|   ABE|        DTW|       -6.0|
|01021245|   -2|     602|   ABE|        ATL|        0.0|
|01020605|   -4|     602|   ABE|        ATL|       -2.0|
|01031245|   -4|     602|   ABE|        ATL|       -2.0|
|01030605|    0|     602|   ABE|        ATL|        2.0|
|01041243|   10|     602|   ABE|        ATL|       12.0|
|01040605|   28|     602|   ABE|        ATL|       30.0|
|01051245|   88|     602|   ABE|        ATL|       90.0|
|01050605|    9|     602|   ABE|        ATL|       11.0|
+--------+-----+--------+------+-----------+-----------+
only showing top 10 rows



In [0]:
# Removendo coluna
df = df.drop('Nova Coluna')
df.show(10)

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
+--------+-----+--------+------+-----------+
only showing top 10 rows



In [0]:
# Renomenando uma coluna no dataframe
df.withColumn('Nova Coluna', df['delay'] + 2).withColumnRenamed('Nova Coluna', 'Delay_2').show()

+--------+-----+--------+------+-----------+-------+
|    date|delay|distance|origin|destination|Delay_2|
+--------+-----+--------+------+-----------+-------+
|01011245|    6|     602|   ABE|        ATL|    8.0|
|01020600|   -8|     369|   ABE|        DTW|   -6.0|
|01021245|   -2|     602|   ABE|        ATL|    0.0|
|01020605|   -4|     602|   ABE|        ATL|   -2.0|
|01031245|   -4|     602|   ABE|        ATL|   -2.0|
|01030605|    0|     602|   ABE|        ATL|    2.0|
|01041243|   10|     602|   ABE|        ATL|   12.0|
|01040605|   28|     602|   ABE|        ATL|   30.0|
|01051245|   88|     602|   ABE|        ATL|   90.0|
|01050605|    9|     602|   ABE|        ATL|   11.0|
|01061215|   -6|     602|   ABE|        ATL|   -4.0|
|01061725|   69|     602|   ABE|        ATL|   71.0|
|01061230|    0|     369|   ABE|        DTW|    2.0|
|01060625|   -3|     602|   ABE|        ATL|   -1.0|
|01070600|    0|     369|   ABE|        DTW|    2.0|
|01071725|    0|     602|   ABE|        ATL|  

#### Trabalhando com UDFs
- Integraçáo de código entre as APIs
- É preciso cuidado com performance dos códigos usando UDFs

In [0]:
# define a função
def quadrado(s):
  return s * s

In [0]:
# registra no banco de dados do spark e define o tipo de retorno por padrão é stringtype
from pyspark.sql.types import LongType
spark.udf.register("Func_Py_Quadrado", quadrado, LongType())

Out[4]: <function __main__.quadrado(s)>

In [0]:
# gera valores de 1 a 19
spark.range(1, 20).show()

+---+
| id|
+---+
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+---+



In [0]:
# cria a visão View_temp
spark.range(1, 20).createOrReplaceTempView("View_temp")

In [0]:
%sql
-- Usando a função criada em python juntamente com código SQL
select id, Func_Py_Quadrado(id) as id_ao_quadrado
from View_temp

id,id_ao_quadrado
1,1
2,4
3,9
4,16
5,25
6,36
7,49
8,64
9,81
10,100


##### UDFs com Dataframes

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
Func_Py_Quadrado = udf(quadrado, LongType())

In [0]:
type(Func_Py_Quadrado)

Out[10]: function

In [0]:
df = spark.table("View_temp")

In [0]:
df.show()

+---+
| id|
+---+
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+---+



In [0]:
display(df.select("id", Func_Py_Quadrado("id").alias("id_quadrado")))

id,id_quadrado
1,1
2,4
3,9
4,16
5,25
6,36
7,49
8,64
9,81
10,100


#### Koalas
- Koalas é um projeto de código aberto que fornece um substituto imediato para os pandas. 
- O pandas é comumente usado por ser um pacote que fornece estruturas de dados e ferramentas de análise de dados fáceis de usar para a linguagem de programação Python.
- O Koalas preenche essa lacuna fornecendo APIs equivalentes ao pandas que funcionam no Apache Spark. 
- Koalas é útil não apenas para usuários de pandas, mas também para usuários de PySpark.
  - Koalas suporta muitas tarefas que são difíceis de fazer com PySpark, por exemplo, plotar dados diretamente de um PySpark DataFrame.
- Koalas suporta SQL diretamente em seus dataframes.

In [0]:
import numpy as np
import pandas as pd
import pyspark.pandas as ks

In [0]:
# cria um pandas DataFrame
pdf = pd.DataFrame({'A': np.random.rand(5),
                    'B': np.random.rand(5)})

In [0]:
# imprime um pandas dataframe
type(pdf)

Out[14]: pandas.core.frame.DataFrame

In [0]:
# Cria um Koalas DataFrame
kdf = ks.DataFrame({'A': np.random.rand(5),
                    'B': np.random.rand(5)})

In [0]:
# imprime o tipo de dados
type(kdf)

Out[16]: pyspark.pandas.frame.DataFrame

In [0]:
# Cria um Koalas dataframe a partir de um pandas dataframe
kdf = ks.DataFrame(pdf)
type(kdf)

Out[17]: pyspark.pandas.frame.DataFrame

In [0]:
# outra forma de converter
kdf = ks.from_pandas(pdf)
type(kdf)

Out[18]: pyspark.pandas.frame.DataFrame

In [0]:
# métodos já conhecidos
pdf.head()

Unnamed: 0,A,B
0,0.510659,0.373997
1,0.701124,0.188805
2,0.523315,0.232915
3,0.050938,0.706159
4,0.038022,0.25022


In [0]:
# métodos já conhecidos
kdf.head()

Unnamed: 0,A,B
0,0.510659,0.373997
1,0.701124,0.188805
2,0.523315,0.232915
3,0.050938,0.706159
4,0.038022,0.25022


In [0]:
# método describe()
kdf.describe()

Unnamed: 0,A,B
count,5.0,5.0
mean,0.364812,0.350419
std,0.301997,0.2104
min,0.038022,0.188805
25%,0.050938,0.232915
50%,0.510659,0.25022
75%,0.523315,0.373997
max,0.701124,0.706159


In [0]:
# ordenando um dataframe
kdf.sort_values(by='B')

Unnamed: 0,A,B
1,0.701124,0.188805
2,0.523315,0.232915
4,0.038022,0.25022
0,0.510659,0.373997
3,0.050938,0.706159


In [0]:
# define configurações de layout de células
from databricks.koalas.config import set_option, get_option
ks.get_option('compute.max_rows')
ks.set_option('compute.max_rows', 2000)



In [0]:
# slice
kdf[['A', 'B']]

Unnamed: 0,A,B
0,0.510659,0.373997
1,0.701124,0.188805
2,0.523315,0.232915
3,0.050938,0.706159
4,0.038022,0.25022


In [0]:
# loc
kdf.loc[1:2]

Unnamed: 0,A,B
1,0.534563,0.055601
2,0.704126,0.03555


In [0]:
kdf.info()

<class 'pyspark.pandas.frame.DataFrame'>
Int64Index: 5 entries, 0 to 4
Data columns (total 2 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   A       5 non-null      float64
 1   B       5 non-null      float64
dtypes: float64(2)

In [0]:
kdf.loc[kdf["B"] >= 0.04].count()

Out[34]: A    5
B    5
dtype: int64

In [0]:
# iloc
kdf.iloc[:3, 1:2]

Unnamed: 0,B
0,0.373997
1,0.188805
2,0.232915


**Usando funções python com dataframe koalas**

In [0]:
# cria função python
def quadrado(x):
    return x ** 2

In [0]:
# habilita computação de dataframes e séries
# por padrão temos desabilitado por questões de perfomance
from databricks.koalas.config import set_option, reset_option
set_option("compute.ops_on_diff_frames", True)

In [0]:
# cria uma nova coluna a partir da função quadrado
kdf['C'] = kdf.A.apply(quadrado)

In [0]:
# visualizando o dataframe
kdf.head()

Unnamed: 0,A,B,C
0,0.510659,0.373997,0.260773
1,0.701124,0.188805,0.491575
2,0.523315,0.232915,0.273858
3,0.050938,0.706159,0.002595
4,0.038022,0.25022,0.001446


In [0]:
# agrupando dados
kdf.groupby('A').sum()

Unnamed: 0_level_0,B,C
A,Unnamed: 1_level_1,Unnamed: 2_level_1
0.599634,0.706762,0.359561
0.534563,0.055601,0.285758
0.704126,0.03555,0.495794
0.161522,0.768715,0.026089
0.304619,0.921843,0.092793


In [0]:
# agrupando mais de uma coluna
kdf.groupby(['A', 'B']).sum()

Unnamed: 0_level_0,Unnamed: 1_level_0,C
A,B,Unnamed: 2_level_1
0.599634,0.706762,0.359561
0.534563,0.055601,0.285758
0.704126,0.03555,0.495794
0.161522,0.768715,0.026089
0.304619,0.921843,0.092793


In [0]:
# This is needed for visualizing plot on notebook
%matplotlib inline

In [0]:
speed = [0.1, 17.5, 40, 48, 52, 69, 88]
lifespan = [2, 8, 70, 1.5, 25, 12, 28]

index = ['snail', 'pig', 'elephant',
         'rabbit', 'giraffe', 'coyote', 'horse']

kdf = ks.DataFrame({'speed': speed,
                   'lifespan': lifespan}, index=index)
kdf.plot.bar()

**Usando SQL no Koalas**

In [0]:
# cria um dataframe Koalas
kdf = ks.DataFrame({'year': [1990, 1997, 2003, 2009, 2014],
                    'pig': [20, 18, 489, 675, 1776],
                    'horse': [4, 25, 281, 600, 1900]})

In [0]:
# Faz query no dataframe koalas
ks.sql("SELECT * FROM {kdf} WHERE pig > 100")

Unnamed: 0,year,pig,horse
0,2003,489,281
1,2009,675,600
2,2014,1776,1900


In [0]:
# cria um dataframe pandas
pdf = pd.DataFrame({'year': [1990, 1997, 2003, 2009, 2014],
                    'sheep': [22, 50, 121, 445, 791],
                    'chicken': [250, 326, 589, 1241, 2118]})

In [0]:
# Query com inner join entre dataframe pandas e koalas
ks.sql('''
    SELECT ks.pig, pd.chicken
    FROM {kdf} ks 
    INNER JOIN {pdf} pd
    ON ks.year = pd.year
    ORDER BY ks.pig, pd.chicken''')

Unnamed: 0,pig,chicken
0,18,326
1,20,250
2,489,589
3,675,1241
4,1776,2118


In [0]:
# converte koalas dataframe para Pyspark
kdf = ks.DataFrame({'A': [1, 2, 3, 4, 5], 'B': [10, 20, 30, 40, 50]})
pydf = kdf.to_spark()

In [0]:
type(pydf)