# Execution Plan

In this notebook we try to understand Spark execution plans. We will use the weather example and analyse all the steps in order to get a better understanding.

## Exeuction Model of Spark

In contrast to many other (mainly non-distributed) frameworks, Spark does not execute any transformation immediately, but only records the step and builds a so called execution plan. This plan is the basis for Sparks resilience against failure of individual nodes (since the result can be reconstructed from the execution plan), but also allows Spark to perform optimizations which span all transformation steps.

Specifically with Spark DataFrames (as opposed to the more low level RDD interface), Spark uses an advanced optimizer. The general steps of query processing in response to an action (like a "show" or "save" action)" are always as follows:
1. Parse logical execution plan
2. Analyze logical execution plan and resolve all symbols (tables, columns, functions)
3. Optimize logical execution plan
4. Create physical execution plan by mapping all steps to RDD operations

## Relation to RDDs
Note that RDDs are only used in the very last step, although the general conception is that DataFrames sit on top of RDDs. But the point is, that a DataFrame first collects all transformations on a higher level of abstraction and RDDs only come into play in this very last step.

Actually you can access an RDD of any DataFrame. BUT: This access will actually create the physical execution plan for this specific RDD. Before accessing this RDD it even didn't exist. This also means that using a DataFrames RDD actually is an optimization barrier.

## Weather Example

In the following steps, we will try to understand how Spark executes a simplified version of the weather analysis including aggregations and joins.

In [None]:
spark.conf.set("spark.sql.adaptive.enabled", False)

# 1. Load Data

First we load the weather data, which consists of the measurement data and some station metadata.

In [1]:
storageLocation = "s3://dimajix-training/data/weather"

## 1.1 Load Measurements

Measurements are stored in multiple directories (one per year)

In [2]:
from pyspark.sql.functions import *
from functools import reduce

# Read in all years, store them in an Python array
raw_weather_per_year = [spark.read.text(storageLocation + "/" + str(i)).withColumn("year", lit(i)) for i in range(2003,2006)]

# Union all years together
raw_weather = reduce(lambda l,r: l.union(r), raw_weather_per_year)                        

# Display first 10 records
raw_weather.limit(10).toPandas()

Unnamed: 0,value,year
0,0494703160256242003010100003+55200-162717SY-MT...,2003
1,0228703160256242003010100174+55200-162730FM-16...,2003
2,044070316025624200301010053C+55200-162717FM-15...,2003
3,0071703160256242003010101009+55200-162717NSRDB...,2003
4,042770316025624200301010153C+55200-162717FM-15...,2003
5,0071703160256242003010102009+55200-162717NSRDB...,2003
6,046870316025624200301010253C+55200-162717FM-15...,2003
7,0071703160256242003010103009+55200-162717NSRDB...,2003
8,041570316025624200301010353C+55200-162717FM-15...,2003
9,0054703160256242003010104009+55200-162717NSRDB...,2003


### Extract Measurements

Measurements were stored in a proprietary text based format, with some values at fixed positions. We need to extract these values with a simple `SELECT` statement.

In [3]:
weather = raw_weather.select(
    col("year"),
    substring(col("value"),5,6).alias("usaf"),
    substring(col("value"),11,5).alias("wban"),
    substring(col("value"),16,8).alias("date"),
    substring(col("value"),24,4).alias("time"),
    substring(col("value"),42,5).alias("report_type"),
    substring(col("value"),61,3).alias("wind_direction"),
    substring(col("value"),64,1).alias("wind_direction_qual"),
    substring(col("value"),65,1).alias("wind_observation"),
    (substring(col("value"),66,4).cast("float") / lit(10.0)).alias("wind_speed"),
    substring(col("value"),70,1).alias("wind_speed_qual"),
    (substring(col("value"),88,5).cast("float") / lit(10.0)).alias("air_temperature"),
    substring(col("value"),93,1).alias("air_temperature_qual")
)
    
weather.limit(10).toPandas()

Unnamed: 0,year,usaf,wban,date,time,report_type,wind_direction,wind_direction_qual,wind_observation,wind_speed,wind_speed_qual,air_temperature,air_temperature_qual
0,2003,703160,25624,20030101,0,SY-MT,10,5,N,5.2,5,-0.6,5
1,2003,703160,25624,20030101,17,FM-16,20,1,N,4.6,1,-2.0,1
2,2003,703160,25624,20030101,53,FM-15,10,5,N,5.2,5,-2.8,5
3,2003,703160,25624,20030101,100,NSRDB,999,9,9,999.9,9,999.9,9
4,2003,703160,25624,20030101,153,FM-15,10,5,N,6.2,5,-2.2,5
5,2003,703160,25624,20030101,200,NSRDB,999,9,9,999.9,9,999.9,9
6,2003,703160,25624,20030101,253,FM-15,10,5,N,7.2,5,-3.3,5
7,2003,703160,25624,20030101,300,NSRDB,999,9,9,999.9,9,999.9,9
8,2003,703160,25624,20030101,353,FM-15,20,5,N,6.2,5,-1.1,5
9,2003,703160,25624,20030101,400,NSRDB,999,9,9,999.9,9,999.9,9


## 1.2 Load Station Metadata

We also need to load the weather station meta data containing information about the geo location, country etc of individual weather stations.

In [4]:
stations = spark.read \
    .option("header", True) \
    .csv(storageLocation + "/isd-history")

# Display first 10 records    
stations.limit(10).toPandas()

Unnamed: 0,USAF,WBAN,STATION NAME,CTRY,STATE,ICAO,LAT,LON,ELEV(M),BEGIN,END
0,7005,99999,CWOS 07005,,,,,,,20120127,20120127
1,7011,99999,CWOS 07011,,,,,,,20111025,20121129
2,7018,99999,WXPOD 7018,,,,0.0,0.0,7018.0,20110309,20130730
3,7025,99999,CWOS 07025,,,,,,,20120127,20120127
4,7026,99999,WXPOD 7026,AF,,,0.0,0.0,7026.0,20120713,20141120
5,7034,99999,CWOS 07034,,,,,,,20121024,20121106
6,7037,99999,CWOS 07037,,,,,,,20111202,20121125
7,7044,99999,CWOS 07044,,,,,,,20120127,20120127
8,7047,99999,CWOS 07047,,,,,,,20120613,20120717
9,7052,99999,CWOS 07052,,,,,,,20121129,20121130


## 1.3 Perform Analysis

Now for completeness sake, let's reperform the analysis (minimum and maximum temperature per year and country) using `JOIN` and `GROUP BY` operations.

In [7]:
df = weather.join(stations, (weather.usaf == stations.USAF) & (weather.wban == stations.WBAN))
result = df.groupBy(df.CTRY, df.year).agg(
        min(when(df.air_temperature_qual == lit(1), df.air_temperature)).alias('min_temp'),
        max(when(df.air_temperature_qual == lit(1), df.air_temperature)).alias('max_temp'),
        min(when(df.wind_speed_qual == lit(1), df.wind_speed)).alias('min_wind'),
        max(when(df.wind_speed_qual == lit(1), df.wind_speed)).alias('max_wind')
    )

pdf = result.toPandas()    
pdf

Unnamed: 0,CTRY,year,min_temp,max_temp,min_wind,max_wind
0,CA,2006,-43.0,35.9,0.0,31.4
1,SC,2006,20.0,32.0,0.0,20.6
2,IC,2013,-11.0,19.0,0.0,25.0
3,AM,2011,-16.0,42.0,0.0,14.0
4,UK,2014,-6.0,30.4,0.0,20.6
5,SF,2012,2.2,36.0,0.0,10.3
6,GM,2005,-14.0,31.0,0.0,14.4
7,NO,2007,-35.0,29.0,0.0,26.0
8,PO,2010,-1.6,38.0,0.0,21.6
9,FR,2010,-13.3,36.1,0.0,17.5


# 2 Investigate Execution Plans

Now that we have redone the whole analysis, let's try to understand how Spark actually executes these steps. In order to understand the whole aggregation, we start simple and add one step after the other and look how execution plans change.

## 2.1 Reading Data

The first step is to read in data. In order to start simple, we only load a single year into a DataFrame called `raw_weather_2003`. We can inspect the execution plan that would create the records of that DataFrame with the `explain()` method.

In [12]:
raw_weather_2003 = spark.read.text(storageLocation + "/2003")
raw_weather_2003.explain()

== Physical Plan ==
*(1) FileScan text [value#321] Batched: false, Format: Text, Location: InMemoryFileIndex[s3://dimajix-training/data/weather/2003], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value:string>


As we can see, the execution plan actually contains a single operation - reading data from disk. Note two things:
* The phyiscal execution plan has been created specifically for the `explain()` command. It is not stored in the DataFrame, the DataFrame only contains the basis for a *parsed logical plan*
* The plan is not executed, only printed to the console

We can also inspect a more detailed execition plan, if we pass `True` to the `explain()` method as follows:

In [24]:
raw_weather_2003.explain(True)

== Parsed Logical Plan ==
Project [value#323, 2003 AS year#325]
+- AnalysisBarrier
      +- Relation[value#323] text

== Analyzed Logical Plan ==
value: string, year: int
Project [value#323, 2003 AS year#325]
+- Relation[value#323] text

== Optimized Logical Plan ==
Project [value#323, 2003 AS year#325]
+- Relation[value#323] text

== Physical Plan ==
*(1) Project [value#323, 2003 AS year#325]
+- *(1) FileScan text [value#323] Batched: false, Format: Text, Location: InMemoryFileIndex[s3://dimajix-training/data/weather/2003], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value:string>


As you can see, the explanation now contains all four steps:
* Parsed logical execution plan. This directly corresponds to the operations as specified.
* Analyzed logical plan. This resolves all relations and columns and data types.
* Optimized logical plan. This plan is already optimized (we'll see some optimizations later)
* Physical execution plan. This maps all operations and transformations to RDD operations.

## 2.2 Adding Columns

Let's see how the execution plan changes if we add a new column.

In [25]:
raw_weather_2003 = spark.read.text(storageLocation + "/2003").withColumn("year", lit(2003))
raw_weather_2003.explain(True)

== Parsed Logical Plan ==
Project [value#664, 2003 AS year#666]
+- AnalysisBarrier
      +- Relation[value#664] text

== Analyzed Logical Plan ==
value: string, year: int
Project [value#664, 2003 AS year#666]
+- Relation[value#664] text

== Optimized Logical Plan ==
Project [value#664, 2003 AS year#666]
+- Relation[value#664] text

== Physical Plan ==
*(1) Project [value#664, 2003 AS year#666]
+- *(1) FileScan text [value#664] Batched: false, Format: Text, Location: InMemoryFileIndex[s3://dimajix-training/data/weather/2003], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value:string>


### Remarks
We see that a `Project` operation was inserted to all execution plans which is responsible for adding the `year` column.

## 2.3 SELECT Operation

Now let's perform an additional `SELECT` operation after adding the year. We do not add all columns yet in order to keep the output small and more readable. We will add more columns later when we really require them.

In [34]:
weather_2003 = raw_weather_2003.select(
    col("year"),
    substring(col("value"),5,6).alias("usaf"),
    substring(col("value"),11,5).alias("wban")
)
weather_2003.explain(True)

== Parsed Logical Plan ==
'Project [unresolvedalias('year, None), substring('value, 5, 6) AS usaf#949, substring('value, 11, 5) AS wban#950]
+- AnalysisBarrier
      +- Project [value#664, 2003 AS year#666]
         +- Relation[value#664] text

== Analyzed Logical Plan ==
year: int, usaf: string, wban: string
Project [year#666, substring(value#664, 5, 6) AS usaf#949, substring(value#664, 11, 5) AS wban#950]
+- Project [value#664, 2003 AS year#666]
   +- Relation[value#664] text

== Optimized Logical Plan ==
Project [2003 AS year#666, substring(value#664, 5, 6) AS usaf#949, substring(value#664, 11, 5) AS wban#950]
+- Relation[value#664] text

== Physical Plan ==
*(1) Project [2003 AS year#666, substring(value#664, 5, 6) AS usaf#949, substring(value#664, 11, 5) AS wban#950]
+- *(1) FileScan text [value#664] Batched: false, Format: Text, Location: InMemoryFileIndex[s3://dimajix-training/data/weather/2003], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value:string>


### Remarks
Here we see that the original parsed plan and analyzed plan actually contains two `Project` operations. Each of them corresponds to a single transformation (`withColumn` and `select`). But the optimizer merged these operations into a single one, thus simplifying execution.

## 2.4 UNION Operation

Just for completeness, let's see what a `UNION` operation does. We required it after loading all years into individual DataFrames.

In [35]:
# Read in all years, store them in an Python array
raw_weather_per_year = [spark.read.text(storageLocation + "/" + str(i)).withColumn("year", lit(i)) for i in range(2003,2015)]

# Union all years together
raw_weather = reduce(lambda l,r: l.union(r), raw_weather_per_year)                        

# Print execution plan
raw_weather.explain()

== Physical Plan ==
Union
:- *(1) Project [value#2, 2003 AS year#4]
:  +- *(1) FileScan text [value#2] Batched: false, Format: Text, Location: InMemoryFileIndex[s3://dimajix-training/data/weather/2003], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value:string>
:- *(2) Project [value#7, 2004 AS year#9]
:  +- *(2) FileScan text [value#7] Batched: false, Format: Text, Location: InMemoryFileIndex[s3://dimajix-training/data/weather/2004], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value:string>
:- *(3) Project [value#12, 2005 AS year#14]
:  +- *(3) FileScan text [value#12] Batched: false, Format: Text, Location: InMemoryFileIndex[s3://dimajix-training/data/weather/2005], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value:string>
:- *(4) Project [value#17, 2006 AS year#19]
:  +- *(4) FileScan text [value#17] Batched: false, Format: Text, Location: InMemoryFileIndex[s3://dimajix-training/data/weather/2006], PartitionFilters: [], PushedFilters: [

## 2.5 JOIN Operation

The next operation we had to perform was a `JOIN` between the measurements and the station metadata. We will use only a single year instead of the unioned data to keep output small and thereby increase readability of the execution plans.

In [36]:
df = weather_2003.join(stations, (weather_2003.usaf == stations.USAF) & (weather_2003.wban == stations.WBAN))
df.explain(True)

== Parsed Logical Plan ==
Join Inner, ((usaf#949 = USAF#145) && (wban#950 = WBAN#146))
:- Project [year#666, substring(value#664, 5, 6) AS usaf#949, substring(value#664, 11, 5) AS wban#950]
:  +- Project [value#664, 2003 AS year#666]
:     +- Relation[value#664] text
+- Relation[USAF#145,WBAN#146,STATION NAME#147,CTRY#148,STATE#149,ICAO#150,LAT#151,LON#152,ELEV(M)#153,BEGIN#154,END#155] csv

== Analyzed Logical Plan ==
year: int, usaf: string, wban: string, USAF: string, WBAN: string, STATION NAME: string, CTRY: string, STATE: string, ICAO: string, LAT: string, LON: string, ELEV(M): string, BEGIN: string, END: string
Join Inner, ((usaf#949 = USAF#145) && (wban#950 = WBAN#146))
:- Project [year#666, substring(value#664, 5, 6) AS usaf#949, substring(value#664, 11, 5) AS wban#950]
:  +- Project [value#664, 2003 AS year#666]
:     +- Relation[value#664] text
+- Relation[USAF#145,WBAN#146,STATION NAME#147,CTRY#148,STATE#149,ICAO#150,LAT#151,LON#152,ELEV(M)#153,BEGIN#154,END#155] csv

== Opt

### Remarks
Now a `JOIN` results in an interesting execution plan:
* Spark filters columns, since an inner JOIN require non-null values
* Filtering is actually pushed down before the projection. This reduces amount of data as soon as possible
* JOIN operation is performed in two steps:
  * Load data and broadcast it to all nodes (`BroadcastExchange`)
  * Perform the join (`BroadcastHashJoin`)

In addition to the *broadcast join* Spark also supports a different join implementation - more on that later.

### Implicit Filtering

Actually let's have a look at what happens with a left outer join. This should not filter away `NULL` values on the left side:

In [None]:
df = weather_2003.join(stations, (weather_2003.usaf == stations.USAF) & (weather_2003.wban == stations.WBAN), how="left")
df.explain(True)

## 2.6 Aggregation

Finally we want to perform an aggregation on the joined data. We need to restart from measurement extraction, since we did not extract all required columns so far. So we will perform the following steps
* Reuse `raw_weather_2003` which already contains the `year` column
* Extract all requirement measurements
* Join with stations metadata
* Perform grouped aggregation
Again we will only analyze the temperature, just to keep execution plans a little bit smaller. This means that some columns are missing, but the basic operations are all the same.

### Extract Measurements

In [39]:
weather_2003 = raw_weather_2003.select(
    col("year"),
    substring(col("value"),5,6).alias("usaf"),
    substring(col("value"),11,5).alias("wban"),
    substring(col("value"),16,8).alias("date"),
    substring(col("value"),24,4).alias("time"),
    substring(col("value"),42,5).alias("report_type"),
    substring(col("value"),61,3).alias("wind_direction"),
    substring(col("value"),64,1).alias("wind_direction_qual"),
    substring(col("value"),65,1).alias("wind_observation"),
    (substring(col("value"),66,4).cast("float") / lit(10.0)).alias("wind_speed"),
    substring(col("value"),70,1).alias("wind_speed_qual"),
    (substring(col("value"),88,5).cast("float") / lit(10.0)).alias("air_temperature"),
    substring(col("value"),93,1).alias("air_temperature_qual")
)
weather_2003.explain(False)

== Physical Plan ==
*(1) Project [2003 AS year#666, substring(value#664, 5, 6) AS usaf#1093, substring(value#664, 11, 5) AS wban#1094, substring(value#664, 16, 8) AS date#1095, substring(value#664, 24, 4) AS time#1096, substring(value#664, 42, 5) AS report_type#1097, substring(value#664, 61, 3) AS wind_direction#1098, substring(value#664, 64, 1) AS wind_direction_qual#1099, substring(value#664, 65, 1) AS wind_observation#1100, (cast(cast(substring(value#664, 66, 4) as float) as double) / 10.0) AS wind_speed#1101, substring(value#664, 70, 1) AS wind_speed_qual#1102, (cast(cast(substring(value#664, 88, 5) as float) as double) / 10.0) AS air_temperature#1103, substring(value#664, 93, 1) AS air_temperature_qual#1104]
+- *(1) FileScan text [value#664] Batched: false, Format: Text, Location: InMemoryFileIndex[s3://dimajix-training/data/weather/2003], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value:string>


### Join with Stations Metadata

In [40]:
df = weather_2003.join(stations, (weather_2003.usaf == stations.USAF) & (weather_2003.wban == stations.WBAN))
df.explain(False)

== Physical Plan ==
*(2) BroadcastHashJoin [usaf#1093, wban#1094], [USAF#145, WBAN#146], Inner, BuildRight
:- *(2) Project [2003 AS year#666, substring(value#664, 5, 6) AS usaf#1093, substring(value#664, 11, 5) AS wban#1094, substring(value#664, 16, 8) AS date#1095, substring(value#664, 24, 4) AS time#1096, substring(value#664, 42, 5) AS report_type#1097, substring(value#664, 61, 3) AS wind_direction#1098, substring(value#664, 64, 1) AS wind_direction_qual#1099, substring(value#664, 65, 1) AS wind_observation#1100, (cast(cast(substring(value#664, 66, 4) as float) as double) / 10.0) AS wind_speed#1101, substring(value#664, 70, 1) AS wind_speed_qual#1102, (cast(cast(substring(value#664, 88, 5) as float) as double) / 10.0) AS air_temperature#1103, substring(value#664, 93, 1) AS air_temperature_qual#1104]
:  +- *(2) Filter (isnotnull(substring(value#664, 11, 5)) && isnotnull(substring(value#664, 5, 6)))
:     +- *(2) FileScan text [value#664] Batched: false, Format: Text, Location: InMemor

### Perform Grouped Aggregation

In [41]:
result = df.groupBy(df.CTRY, df.year).agg(
        min(when(df.air_temperature_qual == lit(1), df.air_temperature)).alias('min_temp'),
        max(when(df.air_temperature_qual == lit(1), df.air_temperature)).alias('max_temp')
    )
result.explain(True)

== Parsed Logical Plan ==
'Aggregate [CTRY#148, year#666], [CTRY#148, year#666, min(CASE WHEN (air_temperature_qual#1104 = 1) THEN air_temperature#1103 END) AS min_temp#1215, max(CASE WHEN (air_temperature_qual#1104 = 1) THEN air_temperature#1103 END) AS max_temp#1217, min(CASE WHEN (wind_speed_qual#1102 = 1) THEN wind_speed#1101 END) AS min_wind#1219, max(CASE WHEN (wind_speed_qual#1102 = 1) THEN wind_speed#1101 END) AS max_wind#1221]
+- AnalysisBarrier
      +- Join Inner, ((usaf#1093 = USAF#145) && (wban#1094 = WBAN#146))
         :- Project [year#666, substring(value#664, 5, 6) AS usaf#1093, substring(value#664, 11, 5) AS wban#1094, substring(value#664, 16, 8) AS date#1095, substring(value#664, 24, 4) AS time#1096, substring(value#664, 42, 5) AS report_type#1097, substring(value#664, 61, 3) AS wind_direction#1098, substring(value#664, 64, 1) AS wind_direction_qual#1099, substring(value#664, 65, 1) AS wind_observation#1100, (cast(cast(substring(value#664, 66, 4) as float) as double)

### Remarks

Again we can see that Spark performs some simple but clever optiomizations:
* Projections only contains the columns required, not all available columns of df. The required columns are recursively *pushed up* the transformation chain from the last operation (grouped aggregation) to the first transformations
* The aggregation is performed in three steps: 
  * Partial aggregation (`HashAggregate` with `partial_...` functions)
  * Shuffle (`Exchange hashpartitioning`)
  * Final aggregation of partial results (`HashAggregate`)

## 2.7 Sorting

The last operation we like to analyze is sorting. To keep execution plans simple, we just sort the `stations` DataFrame by the stations IDs.

In [6]:
result = stations.sort(stations["usaf"], stations["wban"])
result.explain(True)

== Parsed Logical Plan ==
Sort [usaf#143 ASC NULLS FIRST, wban#144 ASC NULLS FIRST], true
+- AnalysisBarrier
      +- Relation[USAF#143,WBAN#144,STATION NAME#145,CTRY#146,STATE#147,ICAO#148,LAT#149,LON#150,ELEV(M)#151,BEGIN#152,END#153] csv

== Analyzed Logical Plan ==
USAF: string, WBAN: string, STATION NAME: string, CTRY: string, STATE: string, ICAO: string, LAT: string, LON: string, ELEV(M): string, BEGIN: string, END: string
Sort [usaf#143 ASC NULLS FIRST, wban#144 ASC NULLS FIRST], true
+- Relation[USAF#143,WBAN#144,STATION NAME#145,CTRY#146,STATE#147,ICAO#148,LAT#149,LON#150,ELEV(M)#151,BEGIN#152,END#153] csv

== Optimized Logical Plan ==
Sort [usaf#143 ASC NULLS FIRST, wban#144 ASC NULLS FIRST], true
+- Relation[USAF#143,WBAN#144,STATION NAME#145,CTRY#146,STATE#147,ICAO#148,LAT#149,LON#150,ELEV(M)#151,BEGIN#152,END#153] csv

== Physical Plan ==
*(2) Sort [usaf#143 ASC NULLS FIRST, wban#144 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(usaf#143 ASC NULLS FIRST, wban#144

### Remarks

In order to have a globally sorted result, it is not enough to sort within each Spark partition. This implies that some kind of shuffle operation has to be executed. In contrast to all our previous examples, this time Spark uses a `rangepartitioning` by which it simply splits up all data according to the range of the sorting key. After that is done, records will be sorted independently within each partition. Since the ranges were non-overlapping this is enough for a global ordering covering all partitions.