In [None]:
try:
    !pip install pyspark=="2.4.5"  --quiet
except:
 print("Running throw py file.")

In [2]:
from pyspark import SparkContext as sc
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark import SparkFiles
from pyspark.sql.types import StringType
import pyspark

In [3]:
spark = SparkSession\
        .builder\
        .appName("Estudo Spark - Query Plans - Fabio Kfouri")\
        .getOrCreate()

## Leitura de dados usando uma fonte pública

Fonte de Dados:

https://data.humdata.org/dataset/faostat-prices-for-brazil

In [4]:
spark.sparkContext.addFile('https://data.humdata.org/dataset/bdf7bcca-28ae-47c5-8993-c87a7c5c04c0/resource/c16c15f5-efaf-4950-b848-94935987d312/download/producer-prices_bra.csv')

In [22]:
df = spark.read.csv(SparkFiles.get("producer-prices_bra.csv"), header = True, sep = ",")

In [23]:
df.show(5, False)

+-------------+-----------+----------+---------+-------------+---------------+---------------+------------+--------------------------+---------+----------+-----------+------------+---------------+--------------------+----+
|Iso3         |StartDate  |EndDate   |Area Code|Area         |Item Code      |Item           |Element Code|Element                   |Year Code|Year      |Months Code|Months      |Unit           |Value               |Flag|
+-------------+-----------+----------+---------+-------------+---------------+---------------+------------+--------------------------+---------+----------+-----------+------------+---------------+--------------------+----+
|#country+code|#date+start|#date+end |null     |#country+name|#indicator+code|#indicator+name|null        |null                      |null     |#date+year|null       |null        |#indicator+type|#indicator+value+num|null|
|BRA          |1991-01-01 |1991-12-31|21       |Brazil       |515            |Apples         |5530        |P

Criando uma Temp Table (usando registerTempTable que é parte do 1.x API e foi <i>deprecated</i> no Spark 2.0 foi substituida por <b>createOrReplaceTempView</b> e <b>createTempView</b>. Tem a mesma funcao que o createOrReplaceTempView.

In [24]:
df.registerTempTable("df")
spark.catalog.listTables()

[Table(name='df', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

## Running an Explain query

In [25]:
temp = spark.sql('EXPLAIN SELECT * FROM df')

In [26]:
temp.show()

+--------------------+
|                plan|
+--------------------+
|== Physical Plan ...|
+--------------------+



In [27]:
temp.first()

Row(plan='== Physical Plan ==\n*(1) FileScan csv [Iso3#268,StartDate#269,EndDate#270,Area Code#271,Area#272,Item Code#273,Item#274,Element Code#275,Element#276,Year Code#277,Year#278,Months Code#279,Months#280,Unit#281,Value#282,Flag#283] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/fkfouri/AppData/Local/Temp/spark-e3609bcd-046b-4ed7-80a9-6ac6bed..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Iso3:string,StartDate:string,EndDate:string,Area Code:string,Area:string,Item Code:string,...')

Uma forma facil de obter o explain

In [28]:
temp.explain()

== Physical Plan ==
Execute ExplainCommand
   +- ExplainCommand 'Project [*], false, false, false


In [29]:
df.explain()

== Physical Plan ==
*(1) FileScan csv [Iso3#268,StartDate#269,EndDate#270,Area Code#271,Area#272,Item Code#273,Item#274,Element Code#275,Element#276,Year Code#277,Year#278,Months Code#279,Months#280,Unit#281,Value#282,Flag#283] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/fkfouri/AppData/Local/Temp/spark-e3609bcd-046b-4ed7-80a9-6ac6bed..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Iso3:string,StartDate:string,EndDate:string,Area Code:string,Area:string,Item Code:string,...


#### Explain em um Dataframe in cache

In [32]:
df.cache()
df.explain(extended = True)

== Parsed Logical Plan ==
Relation[Iso3#268,StartDate#269,EndDate#270,Area Code#271,Area#272,Item Code#273,Item#274,Element Code#275,Element#276,Year Code#277,Year#278,Months Code#279,Months#280,Unit#281,Value#282,Flag#283] csv

== Analyzed Logical Plan ==
Iso3: string, StartDate: string, EndDate: string, Area Code: string, Area: string, Item Code: string, Item: string, Element Code: string, Element: string, Year Code: string, Year: string, Months Code: string, Months: string, Unit: string, Value: string, Flag: string
Relation[Iso3#268,StartDate#269,EndDate#270,Area Code#271,Area#272,Item Code#273,Item#274,Element Code#275,Element#276,Year Code#277,Year#278,Months Code#279,Months#280,Unit#281,Value#282,Flag#283] csv

== Optimized Logical Plan ==
Relation[Iso3#268,StartDate#269,EndDate#270,Area Code#271,Area#272,Item Code#273,Item#274,Element Code#275,Element#276,Year Code#277,Year#278,Months Code#279,Months#280,Unit#281,Value#282,Flag#283] csv

== Physical Plan ==
*(1) FileScan csv [Is

In [40]:
df.groupBy('Item')\
    .count()\
    .sort(F.desc('count'))\
    .explain(extended = True)

== Parsed Logical Plan ==
'Sort ['count DESC NULLS LAST], true
+- Aggregate [Item#274], [Item#274, count(1) AS count#741L]
   +- Relation[Iso3#268,StartDate#269,EndDate#270,Area Code#271,Area#272,Item Code#273,Item#274,Element Code#275,Element#276,Year Code#277,Year#278,Months Code#279,Months#280,Unit#281,Value#282,Flag#283] csv

== Analyzed Logical Plan ==
Item: string, count: bigint
Sort [count#741L DESC NULLS LAST], true
+- Aggregate [Item#274], [Item#274, count(1) AS count#741L]
   +- Relation[Iso3#268,StartDate#269,EndDate#270,Area Code#271,Area#272,Item Code#273,Item#274,Element Code#275,Element#276,Year Code#277,Year#278,Months Code#279,Months#280,Unit#281,Value#282,Flag#283] csv

== Optimized Logical Plan ==
Sort [count#741L DESC NULLS LAST], true
+- Aggregate [Item#274], [Item#274, count(1) AS count#741L]
   +- Project [Item#274]
      +- InMemoryRelation [Iso3#268, StartDate#269, EndDate#270, Area Code#271, Area#272, Item Code#273, Item#274, Element Code#275, Element#276, Yea

In [41]:
spark.sql(''' 
SELECT Item, count(*) as count 
FROM df
group by Item
Order by count desc
''').explain(extended = True)

== Parsed Logical Plan ==
'Sort ['count DESC NULLS LAST], true
+- 'Aggregate ['Item], ['Item, 'count(1) AS count#906]
   +- 'UnresolvedRelation `df`

== Analyzed Logical Plan ==
Item: string, count: bigint
Sort [count#906L DESC NULLS LAST], true
+- Aggregate [Item#274], [Item#274, count(1) AS count#906L]
   +- SubqueryAlias `df`
      +- Relation[Iso3#268,StartDate#269,EndDate#270,Area Code#271,Area#272,Item Code#273,Item#274,Element Code#275,Element#276,Year Code#277,Year#278,Months Code#279,Months#280,Unit#281,Value#282,Flag#283] csv

== Optimized Logical Plan ==
Sort [count#906L DESC NULLS LAST], true
+- Aggregate [Item#274], [Item#274, count(1) AS count#906L]
   +- Project [Item#274]
      +- InMemoryRelation [Iso3#268, StartDate#269, EndDate#270, Area Code#271, Area#272, Item Code#273, Item#274, Element Code#275, Element#276, Year Code#277, Year#278, Months Code#279, Months#280, Unit#281, Value#282, Flag#283], StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *(1