In [1]:
import pyspark

try: 
    type(sc)
except NameError:
    sc = pyspark.SparkContext('local[*]')

In [2]:
type(sc)

pyspark.context.SparkContext

# Spark SQL

## Introducción

Spark SQL es un modulo del Framework de Apache Spark que permite procesar datos estructurados. A diferencia de la api de RDD, las interfaces provistas por Spark SQL brinda mas informacion sobre la estructura de datos y los computos que se van a realizar sobre esas estructuras.

A partir de esto Spark SQL usa esta informacion extra para realizar optimizaciones extra, a partir del **Catalyst Optimizer**. Existen distintos tipos de API incluyendo SQL y Dataset API.


Para realizar los procesamientos el mismo motor de ejecucion es utilizado, independientemente del API/languaje que estes utilizando para expresar tu computation, eso significa que es relativamente facil pasar de una implementacion a otra.

### SQL

Una de las posibilidades es la de ejecutar SQL queries sobre distintos tipo de fuentes (desde motores relacionales a traves de JDBC/ODBC, Instalaciones de Apache, Hive hasta fuentes de datos construidas como Dataframes/Datasets desde multiples formatos (json, parquet, orc, csv, tsv, etc)). 

Los resultados de la ejecucion de una query es un DataSet/DataFrame.


### DataSets y DataFrames

Un Dataset es una collecion distribuida de datos. Dataset es una interfaz que brinda los beneficios de los RDDs (tipado fuerte y la habilidad de utilizar potentes funciones lambda) con los beneficios de utilizar el optimizado engine de Spark SQL. 

#### DataSet

Un Dataset puede ser construido desde objetos de la JVM y luego manipulado por transformaciones funcionales (map, flatMap, filter, etc) estando solo disponible su API en Scala y Java. Para mas informacion de como realizar su construccion ver el siguiente link: https://spark.apache.org/docs/latest/sql-programming-guide.html#creating-datasets


#### DataFrame

Un DataFrame es un Dataset organizado en filas (Dataset of Rows) con nombre.
Podemos pensar en el conceptualmente como equivalente a una tabla relacional en una base de datos relacional o un data frame en R o Python.

Estos pueden construirse de una variedad diversa de fuentes:

- Archivos de informacion estructurada (csv, tsv, json, parquet, orc).
- Tablas en Hive
- Bases de Datos Relacionales Externas
- RDDs existentes

In [3]:
# el punto de inicio de la funcionalidad es la clase SparkSession
# podemos inicializar usando el siguiente builder
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

In [4]:
type(spark)

pyspark.sql.session.SparkSession

In [5]:
type(sc)

pyspark.context.SparkContext

## Creando DataFrames a partir de CSV

Para comenzar a trabajar en algunos ejemplos usaremos el siguiente set de datos:

https://www.kaggle.com/usdot/flight-delays

El set de datos contiene informacion sobre retrasos de vuelos y cancelaciones durante el año 2015.

En los siguientes ejemplos veremos distintas variables de como cargar estos ```DataFrames``` indicando inferencia de tipos.

In [6]:
# carga de CSV con load indicando el formato
df_f = spark.read.load("../data/flight-delays/flights.csv",
                    format="csv", sep=",", inferSchema="true", header="true")

In [7]:
# variante con metodo CSV
df_a = spark.read.csv("../data/flight-delays/airlines.csv",
                    sep=",", inferSchema="true", header="true")

# existen otros metodos especificos para distintos formatos
# por ejemplo: json, csv, tsv indicando el separador, parquet, orc, etc.

In [8]:
df_f.rdd.map(lambda x: type(x)).take(1)

[pyspark.sql.types.Row]

In [9]:
df_a.columns

['IATA_CODE', 'AIRLINE']

In [10]:
df_a.describe()

DataFrame[summary: string, IATA_CODE: string, AIRLINE: string]

In [11]:
df_a.printSchema()

root
 |-- IATA_CODE: string (nullable = true)
 |-- AIRLINE: string (nullable = true)



In [12]:
df_f.columns

['YEAR',
 'MONTH',
 'DAY',
 'DAY_OF_WEEK',
 'AIRLINE',
 'FLIGHT_NUMBER',
 'TAIL_NUMBER',
 'ORIGIN_AIRPORT',
 'DESTINATION_AIRPORT',
 'SCHEDULED_DEPARTURE',
 'DEPARTURE_TIME',
 'DEPARTURE_DELAY',
 'TAXI_OUT',
 'WHEELS_OFF',
 'SCHEDULED_TIME',
 'ELAPSED_TIME',
 'AIR_TIME',
 'DISTANCE',
 'WHEELS_ON',
 'TAXI_IN',
 'SCHEDULED_ARRIVAL',
 'ARRIVAL_TIME',
 'ARRIVAL_DELAY',
 'DIVERTED',
 'CANCELLED',
 'CANCELLATION_REASON',
 'AIR_SYSTEM_DELAY',
 'SECURITY_DELAY',
 'AIRLINE_DELAY',
 'LATE_AIRCRAFT_DELAY',
 'WEATHER_DELAY']

In [13]:
df_f.printSchema()

root
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: integer (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: integer (nullable = true)
 |-- DEPARTURE_TIME: integer (nullable = true)
 |-- DEPARTURE_DELAY: integer (nullable = true)
 |-- TAXI_OUT: integer (nullable = true)
 |-- WHEELS_OFF: integer (nullable = true)
 |-- SCHEDULED_TIME: integer (nullable = true)
 |-- ELAPSED_TIME: integer (nullable = true)
 |-- AIR_TIME: integer (nullable = true)
 |-- DISTANCE: integer (nullable = true)
 |-- WHEELS_ON: integer (nullable = true)
 |-- TAXI_IN: integer (nullable = true)
 |-- SCHEDULED_ARRIVAL: integer (nullable = true)
 |-- ARRIVAL_TIME: integer (nullable = true)
 |-- ARRIVAL_DELAY: integer (null

## Ejecutando Queries de SQL de forma Programática

La funcion ```sql``` en ```SparkSession``` le permite a nuestras aplicaciones correr queries de SQL de forma programatica y obtener el resultado como un ```DataFrame```.

Para poder referenciar desde queries de SQL es necesario crear una vista temporal asociada al ```DataFrame``` conocidad como **Temporary View** para poder referenciarla en este scope

In [14]:
# registramos el DataFrame airlines como una vista SQL temporal
df_a.createOrReplaceTempView("airlines")

In [15]:
# registramos el DataFrame flights como una vista SQL temporal
df_f.createOrReplaceTempView("flights")

Algo a tener en cuenta es que estas vistas temporales que estamos registrando solamente estaran disponibles en esa sesion y desaparecera si la sesion a la que pertenece termina. Si queremos generar vistas globales a todas las sesiones hasta la finalizacion del programa de Spark debemos registrarlas como una **Global Temporary View** con el metodo ```createGlobalTempView```

Una vez hecho esto podemos ejecutar una query usando el metodo ```sql``` de ```SparkSession```

Por ejemplo podemos realizar un ejemplo simple en el cual realizamos una proyeccion de los nombres de aerolineas de la vista ```airlines```

In [16]:
df_airline_names = spark.sql('SELECT AIRLINE from airlines');

In [17]:
type(df_airline_names)

pyspark.sql.dataframe.DataFrame

In [18]:
df_airline_names.show()

+--------------------+
|             AIRLINE|
+--------------------+
|United Air Lines ...|
|American Airlines...|
|     US Airways Inc.|
|Frontier Airlines...|
|     JetBlue Airways|
|Skywest Airlines ...|
|Alaska Airlines Inc.|
|    Spirit Air Lines|
|Southwest Airline...|
|Delta Air Lines Inc.|
|Atlantic Southeas...|
|Hawaiian Airlines...|
|American Eagle Ai...|
|      Virgin America|
+--------------------+



In [19]:
df_take_5 = spark.sql('SELECT YEAR,MONTH,DAY from flights LIMIT 5');

In [20]:
df_take_5.show()

+----+-----+---+
|YEAR|MONTH|DAY|
+----+-----+---+
|2015|    1|  1|
|2015|    1|  1|
|2015|    1|  1|
|2015|    1|  1|
|2015|    1|  1|
+----+-----+---+



## Entendiendo la motivacion de SparkSQL

Al consultar documentacion actualmente podemos preguntarnos por que el API de SparkSQL esta siendo promovida como el API default para desarrollar aplicaciones.

Podemos partir entendiendo que desarrollar utilizando RDDs plantea:

- **Paradigma imperativo, es decir que tenemos que indicar cada paso que queremos realizar para obtener los datos que queremos.**

La principal problematica de esto es que el API de **RDDs es un API de Bajo Nivel** y esto implica:

- Tenemos que indicar cada paso que queremos realizar para obtener los datos que queremos, **con optimizaciones**.
- Estas **optimizaciones** tenemos que **considerarlas tambien hasta en los casos simples, una y otra vez.**

- **Spark no sabe nada sobre nuestros datos** (por ejemplo que datos vamos a usar solamente, o sus tipos) de tal forma que **para realizar optimizaciones lo mejor que podemos hacer es que nuestras funciones (lambdas) apliquen esquemas a nuestros datos**.
- **Spark no sabe nada sobre los calculos que vamos a realizar**, por lo cual **la optimizacion dependera de la implementacion de nuestros lambdas**.

La idea de SparkSQL es justamente resolver esto utilizando un paradigma popular y declarativo como es SQL. 

Al ser **declarativo**, nosotros no realizamos especificaciones paso a paso para obtener el resultado sino que para lograrlo **vamos aplicando limitaciones a la estructura y los datos hasta obtener el resultado deseado**.

Por otro lado al tener un **conjunto de operaciones especificas y acotadas definidas en SQL** y donde **es necesario indicar la estructura de los datos que tenemos y que queremos obtener en un resultado**, esto **nos ayuda a poder tener versiones optimizadas de esas operaciones en el framework**. Este Trabajo es realizado por Catalyst Optimizer. Para aquellos interesados en un deep dive mas profundo les recomendamos el siguiente Articulo, mas alla de la breve introduccion que realizaremos: https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html

Nota: A su vez reduce el overhead de serializacion de pyspark, dado que todas las operaciones estan implementadas en Scala, python solo provee wrapper de alto nivel.

Antes de avanzar entendiendo como funciona el optimizador de queries en SparkSQL, vamos a hacer una breve introduccion a SQL, en particular a los statements de su lenguaje de consultas que vamos a utilizar.

## Muy Breve Introduccion a SQL

SQL (Structured Query Language) es un lenguaje universal de programacion diseñado para manejar informacion en bases de datos. Al tener una sintaxis standard popular por su simplicidad, es soportado por todos los motores de bases de datos relacionales asi como en otro tipo de motores, como SparkSQL.

Usualmente con SQL podemos:

- Definir Estructuras de Datos (DDL)
- Manipular datos (DML)
- Consultar Datos (DQL).

Esencialmente nos vamos a centrar en entender la sintaxis para realizar consultas que es la usualmente utilizada en motores de Big Data como SparkSQL.

Para ello entenderemos los siguientes statements:

- **SELECT (Proyeccion para obtener datos de una tabla especifica)**
    - Todas las columnas
    - Especificar columnas por nombre y indicarles alias.
    - ```DISTINCT``` (Valores unicos para una o mas columnas)
    - Uso de funciones predefinidas: ```COUNT()```

- **JOIN (Nos permite combinar multiples tablas para cruzar informacion)**
    - ```ON``` (para indicar la condicion de join).
    - Tipos: ```INNER JOIN, LEFT JOIN, RIGHT JOIN, OUTER JOIN```.

- **Filtrado y Ordenamiento**
    - WHERE
        - Operadores de Comparacion: ```=``` (igual a) , ```<>``` or ```!=``` (no igual a), y otros que se explican solos: ```>, <, >=, <=```.
        - Operadores Logicos: ```AND, OR, NOT, IN, BETWEEN, LIKE```.
    - ```ORDER BY (ASC, DESC)```


- **Agregacion**
    - ```GROUP BY``` (Separacion de datos en grupos)
        - Uso de funciones predefinidas: ```COUNT()```

In [21]:
df_airline_names.columns
query = 'SELECT airlines.AIRLINE, COUNT(flights.FLIGHT_NUMBER) as total_cancelled\
                        FROM flights INNER JOIN airlines\
                        ON flights.AIRLINE = airlines.IATA_CODE\
                        WHERE CANCELLED = 1\
                        GROUP BY airlines.AIRLINE\
                        ORDER BY total_cancelled DESC'

df_sql = spark.sql(query);

spark.sql(query).show()

+--------------------+---------------+
|             AIRLINE|total_cancelled|
+--------------------+---------------+
|Southwest Airline...|          16043|
|Atlantic Southeas...|          15231|
|American Eagle Ai...|          15025|
|American Airlines...|          10919|
|Skywest Airlines ...|           9960|
|United Air Lines ...|           6573|
|     JetBlue Airways|           4276|
|     US Airways Inc.|           4067|
|Delta Air Lines Inc.|           3824|
|    Spirit Air Lines|           2004|
|Alaska Airlines Inc.|            669|
|Frontier Airlines...|            588|
|      Virgin America|            534|
|Hawaiian Airlines...|            171|
+--------------------+---------------+



In [22]:
spark.sql(query).explain(True)

== Parsed Logical Plan ==
'Sort ['total_cancelled DESC NULLS LAST], true
+- 'Aggregate ['airlines.AIRLINE], ['airlines.AIRLINE, 'COUNT('flights.FLIGHT_NUMBER) AS total_cancelled#285]
   +- 'Filter ('CANCELLED = 1)
      +- 'Join Inner, ('flights.AIRLINE = 'airlines.IATA_CODE)
         :- 'UnresolvedRelation `flights`
         +- 'UnresolvedRelation `airlines`

== Analyzed Logical Plan ==
AIRLINE: string, total_cancelled: bigint
Sort [total_cancelled#285L DESC NULLS LAST], true
+- Aggregate [AIRLINE#83], [AIRLINE#83, count(FLIGHT_NUMBER#15) AS total_cancelled#285L]
   +- Filter (CANCELLED#34 = 1)
      +- Join Inner, (AIRLINE#14 = IATA_CODE#82)
         :- SubqueryAlias `flights`
         :  +- Relation[YEAR#10,MONTH#11,DAY#12,DAY_OF_WEEK#13,AIRLINE#14,FLIGHT_NUMBER#15,TAIL_NUMBER#16,ORIGIN_AIRPORT#17,DESTINATION_AIRPORT#18,SCHEDULED_DEPARTURE#19,DEPARTURE_TIME#20,DEPARTURE_DELAY#21,TAXI_OUT#22,WHEELS_OFF#23,SCHEDULED_TIME#24,ELAPSED_TIME#25,AIR_TIME#26,DISTANCE#27,WHEELS_ON#28,TAXI_IN#

In [23]:
df_sql.rdd.map(lambda a: type(a)).take(1)

[pyspark.sql.types.Row]

## Introducción a Catalyst Optimizer

En esta sección vamos a intentar profundizar como funciona el Catalyst Optimizer. Este componente del framework de ejecución se encarga de interpretar y optimizar, la query planteada, para despues llevarla a codigo en ejecución en el API de RDD.

El pipeline de ejecución de **Catalyst Optimizer** se puede ver en el siguiente grafico.

![title](img/catalyst-pipeline.png)

De cada etapa se obtienen los siguientes resultados:

- Parsed/Unresolved Logical Plan, sin haber resuelto las tablas y atributos de la query.
- Analyzed Logical Plan, luego de analizarlo o resolver las referencias de catalogo.
- Optimized Logical Plan, luego del proceso de optimizacion logica a partir de un set de reglas.
- Physical Plan, obteniendo el physical plan seleccionado luego de haber evaluado multiples a traves de un modelo de costos.

Por ultimo, a partir del physical plan seleccionado se genera el codigo de ejecucion sobre el api de RDDs.

## Entendiendo Catalyst Optimizer a partir del query plan

Como un primer paso para poder entender como funciona, y como recomendacion general en todos los casos, es importante antes de ejecutar queries analizar el query plan a ejecutarse, esto se puede lograr mediante el metodo ```explain```.

In [24]:
# comencemos analizando con una query simple de seleccion 
query = 'SELECT AIRLINE, FLIGHT_NUMBER from flights\
                        WHERE CANCELLED = 1'
spark.sql(query).explain(True)

== Parsed Logical Plan ==
'Project ['AIRLINE, 'FLIGHT_NUMBER]
+- 'Filter ('CANCELLED = 1)
   +- 'UnresolvedRelation `flights`

== Analyzed Logical Plan ==
AIRLINE: string, FLIGHT_NUMBER: int
Project [AIRLINE#14, FLIGHT_NUMBER#15]
+- Filter (CANCELLED#34 = 1)
   +- SubqueryAlias `flights`
      +- Relation[YEAR#10,MONTH#11,DAY#12,DAY_OF_WEEK#13,AIRLINE#14,FLIGHT_NUMBER#15,TAIL_NUMBER#16,ORIGIN_AIRPORT#17,DESTINATION_AIRPORT#18,SCHEDULED_DEPARTURE#19,DEPARTURE_TIME#20,DEPARTURE_DELAY#21,TAXI_OUT#22,WHEELS_OFF#23,SCHEDULED_TIME#24,ELAPSED_TIME#25,AIR_TIME#26,DISTANCE#27,WHEELS_ON#28,TAXI_IN#29,SCHEDULED_ARRIVAL#30,ARRIVAL_TIME#31,ARRIVAL_DELAY#32,DIVERTED#33,... 7 more fields] csv

== Optimized Logical Plan ==
Project [AIRLINE#14, FLIGHT_NUMBER#15]
+- Filter (isnotnull(CANCELLED#34) && (CANCELLED#34 = 1))
   +- Relation[YEAR#10,MONTH#11,DAY#12,DAY_OF_WEEK#13,AIRLINE#14,FLIGHT_NUMBER#15,TAIL_NUMBER#16,ORIGIN_AIRPORT#17,DESTINATION_AIRPORT#18,SCHEDULED_DEPARTURE#19,DEPARTURE_TIME#20,DEPARTU

### Obteniendo el Unresolved Logical Plan

```
== Parsed Logical Plan ==
'Project ['AIRLINE, 'FLIGHT_NUMBER]
+- 'Filter ('CANCELLED = 1)
   +- 'UnresolvedRelation `flights`
```

El pipeline comienza con una query SQL o una operatoria utilizando la API de DataFrame de Spark. La query es parseada y resuelta en Parsed Logical Plan. Este como podemos ver **tiene relaciones no resueltas aun dado que el framework todavia no sabe nada sobre los atributos y tablas en la query.**

### Obteniendo el Analyzed Logical Plan

```
== Analyzed Logical Plan ==
AIRLINE: string, FLIGHT_NUMBER: int
Project [AIRLINE#3744, FLIGHT_NUMBER#3745]
+- Filter (CANCELLED#3764 = 1)
   +- SubqueryAlias flights
      +- Relation[YEAR#3740,MONTH#3741,DAY#3742,DAY_OF_WEEK#3743,AIRLINE#3744,FLIGHT_NUMBER#3745,TAIL_NUMBER#3746,ORIGIN_AIRPORT#3747,DESTINATION_AIRPORT#3748,SCHEDULED_DEPARTURE#3749,DEPARTURE_TIME#3750,DEPARTURE_DELAY#3751,TAXI_OUT#3752,WHEELS_OFF#3753,SCHEDULED_TIME#3754,ELAPSED_TIME#3755,AIR_TIME#3756,DISTANCE#3757,WHEELS_ON#3758,TAXI_IN#3759,SCHEDULED_ARRIVAL#3760,ARRIVAL_TIME#3761,ARRIVAL_DELAY#3762,DIVERTED#3763,... 7 more fields] csv
```

Para obtener el Analyzed Logical Plan Catalyst utiliza el catalogo correspondiente a partir de los origenes de donde se ha generado el dataframe en cuestion. Por ejemplo en nuestro caso estara utilizando el **metastore de Apache Derby local**, pero tambien podria utilizar:

- Si trabajaramos con **Apache Hive**, su metastore.
- Un metastore de HDFS
- Informacion de DBMS si estuvieramos trabajando con conexiones a base de datos relacionales. 

### Obteniendo el Optimized Logical Plan

```
== Optimized Logical Plan ==
Project [AIRLINE#3744, FLIGHT_NUMBER#3745]
+- Filter (isnotnull(CANCELLED#3764) && (CANCELLED#3764 = 1))
   +- Relation[YEAR#3740,MONTH#3741,DAY#3742,DAY_OF_WEEK#3743,AIRLINE#3744,FLIGHT_NUMBER#3745,TAIL_NUMBER#3746,ORIGIN_AIRPORT#3747,DESTINATION_AIRPORT#3748,SCHEDULED_DEPARTURE#3749,DEPARTURE_TIME#3750,DEPARTURE_DELAY#3751,TAXI_OUT#3752,WHEELS_OFF#3753,SCHEDULED_TIME#3754,ELAPSED_TIME#3755,AIR_TIME#3756,DISTANCE#3757,WHEELS_ON#3758,TAXI_IN#3759,SCHEDULED_ARRIVAL#3760,ARRIVAL_TIME#3761,ARRIVAL_DELAY#3762,DIVERTED#3763,... 7 more fields] csv
```

Una vez que el plan esta resuelto Catalyst comienza a optimizar este plan, generando un Optimized Logical Plan.

Vamos a introducir una simplificacion para entender como los planes son transformados:

En general para poder hacer las optimizaciones, **Catalyst utiliza una representacion para representar la query como un arbol inmutable.** Es por eso que **la mayoria de las optimizaciones se realizan analizando la posibilidad de poder aplicar transformaciones funcionales al arbol para poder lograr los mismos resultados.**

![title](img/tree-optimize.png)

En el grafico que tenemos arriba se ve una caso de una posible optimizacion que se podria realizar conocida como **Filter pushdown**. Esencialmente en este caso la optimizacion consiste aplicar el filtro antes para luego tener que realizar un join con menor cantidad de datos. Veremos este ejemplo en la siguiente query a analizar.

Algunos otros ejemplos son:

- Constant Folding
- Predicate Pushdown (filter es un ejemplo)
- Projection Pruning
- Null Propagation
- Boolean Expression Simplification

## Obteniendo el Physical Plan a ejecutarse

```
== Physical Plan ==
*(1) Project [AIRLINE#3744, FLIGHT_NUMBER#3745]
+- *(1) Filter (isnotnull(CANCELLED#3764) && (CANCELLED#3764 = 1))
   +- *(1) FileScan csv [AIRLINE#3744,FLIGHT_NUMBER#3745,CANCELLED#3764] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/ak/code/datos/datos-spark-lesson/data/flight-delays/flights.csv], PartitionFilters: [], PushedFilters: [IsNotNull(CANCELLED), EqualTo(CANCELLED,1)], ReadSchema: struct<AIRLINE:string,FLIGHT_NUMBER:int,CANCELLED:int>
```

Una vez obtenido especificamente el Optimized Logical Plan, Catalyst va a generar un conjunto de Physical Plan los cuales son evaluados con un modelo de costos, de ser necesario. Finalmente el mas performante segun el modelo de costo, sera el plan usado para generar el codigo executable via RDDs.

## Analizando los distintos tipos de JOINS

Tomando la siguiente query con un INNER JOIN podemos analizar su query plan en los distintos stages

In [25]:
query = 'SELECT airlines.AIRLINE, flights.FLIGHT_NUMBER\
                FROM flights INNER JOIN airlines\
                ON flights.AIRLINE = airlines.IATA_CODE\
                WHERE flights.CANCELLED = 0'

spark.sql(query).explain(True)

== Parsed Logical Plan ==
'Project ['airlines.AIRLINE, 'flights.FLIGHT_NUMBER]
+- 'Filter ('flights.CANCELLED = 0)
   +- 'Join Inner, ('flights.AIRLINE = 'airlines.IATA_CODE)
      :- 'UnresolvedRelation `flights`
      +- 'UnresolvedRelation `airlines`

== Analyzed Logical Plan ==
AIRLINE: string, FLIGHT_NUMBER: int
Project [AIRLINE#83, FLIGHT_NUMBER#15]
+- Filter (CANCELLED#34 = 0)
   +- Join Inner, (AIRLINE#14 = IATA_CODE#82)
      :- SubqueryAlias `flights`
      :  +- Relation[YEAR#10,MONTH#11,DAY#12,DAY_OF_WEEK#13,AIRLINE#14,FLIGHT_NUMBER#15,TAIL_NUMBER#16,ORIGIN_AIRPORT#17,DESTINATION_AIRPORT#18,SCHEDULED_DEPARTURE#19,DEPARTURE_TIME#20,DEPARTURE_DELAY#21,TAXI_OUT#22,WHEELS_OFF#23,SCHEDULED_TIME#24,ELAPSED_TIME#25,AIR_TIME#26,DISTANCE#27,WHEELS_ON#28,TAXI_IN#29,SCHEDULED_ARRIVAL#30,ARRIVAL_TIME#31,ARRIVAL_DELAY#32,DIVERTED#33,... 7 more fields] csv
      +- SubqueryAlias `airlines`
         +- Relation[IATA_CODE#82,AIRLINE#83] csv

== Optimized Logical Plan ==
Project [AIRLINE#83

```
== Parsed Logical Plan ==
'Project ['airlines.AIRLINE, 'flights.FLIGHT_NUMBER]
+- 'Filter ('flights.CANCELLED = 0)
   +- 'Join Inner, ('flights.AIRLINE = 'airlines.IATA_CODE)
      :- 'UnresolvedRelation `flights`
      +- 'UnresolvedRelation `airlines`

== Analyzed Logical Plan ==
AIRLINE: string, FLIGHT_NUMBER: int
Project [AIRLINE#83, FLIGHT_NUMBER#15]
+- Filter (CANCELLED#34 = 0)
   +- Join Inner, (AIRLINE#14 = IATA_CODE#82)
      :- SubqueryAlias `flights`
      :  +- Relation[YEAR#10,MONTH#11,DAY#12,DAY_OF_WEEK#13,AIRLINE#14,FLIGHT_NUMBER#15,TAIL_NUMBER#16,ORIGIN_AIRPORT#17,DESTINATION_AIRPORT#18,SCHEDULED_DEPARTURE#19,DEPARTURE_TIME#20,DEPARTURE_DELAY#21,TAXI_OUT#22,WHEELS_OFF#23,SCHEDULED_TIME#24,ELAPSED_TIME#25,AIR_TIME#26,DISTANCE#27,WHEELS_ON#28,TAXI_IN#29,SCHEDULED_ARRIVAL#30,ARRIVAL_TIME#31,ARRIVAL_DELAY#32,DIVERTED#33,... 7 more fields] csv
      +- SubqueryAlias `airlines`
         +- Relation[IATA_CODE#82,AIRLINE#83] csv

== Optimized Logical Plan ==
Project [AIRLINE#83, FLIGHT_NUMBER#15]
+- Join Inner, (AIRLINE#14 = IATA_CODE#82)
   :- Project [AIRLINE#14, FLIGHT_NUMBER#15]
   :  +- Filter ((isnotnull(CANCELLED#34) && (CANCELLED#34 = 0)) && isnotnull(AIRLINE#14))
   :     +- Relation[YEAR#10,MONTH#11,DAY#12,DAY_OF_WEEK#13,AIRLINE#14,FLIGHT_NUMBER#15,TAIL_NUMBER#16,ORIGIN_AIRPORT#17,DESTINATION_AIRPORT#18,SCHEDULED_DEPARTURE#19,DEPARTURE_TIME#20,DEPARTURE_DELAY#21,TAXI_OUT#22,WHEELS_OFF#23,SCHEDULED_TIME#24,ELAPSED_TIME#25,AIR_TIME#26,DISTANCE#27,WHEELS_ON#28,TAXI_IN#29,SCHEDULED_ARRIVAL#30,ARRIVAL_TIME#31,ARRIVAL_DELAY#32,DIVERTED#33,... 7 more fields] csv
   +- Filter isnotnull(IATA_CODE#82)
      +- Relation[IATA_CODE#82,AIRLINE#83] csv

== Physical Plan ==
*(2) Project [AIRLINE#83, FLIGHT_NUMBER#15]
+- *(2) BroadcastHashJoin [AIRLINE#14], [IATA_CODE#82], Inner, BuildRight
   :- *(2) Project [AIRLINE#14, FLIGHT_NUMBER#15]
   :  +- *(2) Filter ((isnotnull(CANCELLED#34) && (CANCELLED#34 = 0)) && isnotnull(AIRLINE#14))
   :     +- *(2) FileScan csv [AIRLINE#14,FLIGHT_NUMBER#15,CANCELLED#34] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/ak/code/datos/datos-spark-lesson/data/flight-delays/flights.csv], PartitionFilters: [], PushedFilters: [IsNotNull(CANCELLED), EqualTo(CANCELLED,0), IsNotNull(AIRLINE)], ReadSchema: struct<AIRLINE:string,FLIGHT_NUMBER:int,CANCELLED:int>
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
      +- *(1) Project [IATA_CODE#82, AIRLINE#83]
         +- *(1) Filter isnotnull(IATA_CODE#82)
            +- *(1) FileScan csv [IATA_CODE#82,AIRLINE#83] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/ak/code/datos/datos-spark-lesson/data/flight-delays/airlines.csv], PartitionFilters: [], PushedFilters: [IsNotNull(IATA_CODE)], ReadSchema: struct<IATA_CODE:string,AIRLINE:string>
```            

In [26]:
query = 'SELECT airlines.AIRLINE, flights.FLIGHT_NUMBER\
                FROM airlines INNER JOIN flights\
                ON airlines.IATA_CODE = flights.AIRLINE\
                WHERE flights.CANCELLED = 0'

spark.sql(query).explain(True)

== Parsed Logical Plan ==
'Project ['airlines.AIRLINE, 'flights.FLIGHT_NUMBER]
+- 'Filter ('flights.CANCELLED = 0)
   +- 'Join Inner, ('airlines.IATA_CODE = 'flights.AIRLINE)
      :- 'UnresolvedRelation `airlines`
      +- 'UnresolvedRelation `flights`

== Analyzed Logical Plan ==
AIRLINE: string, FLIGHT_NUMBER: int
Project [AIRLINE#83, FLIGHT_NUMBER#15]
+- Filter (CANCELLED#34 = 0)
   +- Join Inner, (IATA_CODE#82 = AIRLINE#14)
      :- SubqueryAlias `airlines`
      :  +- Relation[IATA_CODE#82,AIRLINE#83] csv
      +- SubqueryAlias `flights`
         +- Relation[YEAR#10,MONTH#11,DAY#12,DAY_OF_WEEK#13,AIRLINE#14,FLIGHT_NUMBER#15,TAIL_NUMBER#16,ORIGIN_AIRPORT#17,DESTINATION_AIRPORT#18,SCHEDULED_DEPARTURE#19,DEPARTURE_TIME#20,DEPARTURE_DELAY#21,TAXI_OUT#22,WHEELS_OFF#23,SCHEDULED_TIME#24,ELAPSED_TIME#25,AIR_TIME#26,DISTANCE#27,WHEELS_ON#28,TAXI_IN#29,SCHEDULED_ARRIVAL#30,ARRIVAL_TIME#31,ARRIVAL_DELAY#32,DIVERTED#33,... 7 more fields] csv

== Optimized Logical Plan ==
Project [AIRLINE#83

```
== Parsed Logical Plan ==
'Project ['airlines.AIRLINE, 'flights.FLIGHT_NUMBER]
+- 'Filter ('flights.CANCELLED = 0)
   +- 'Join Inner, ('airlines.IATA_CODE = 'flights.AIRLINE)
      :- 'UnresolvedRelation `airlines`
      +- 'UnresolvedRelation `flights`

== Analyzed Logical Plan ==
AIRLINE: string, FLIGHT_NUMBER: int
Project [AIRLINE#83, FLIGHT_NUMBER#15]
+- Filter (CANCELLED#34 = 0)
   +- Join Inner, (IATA_CODE#82 = AIRLINE#14)
      :- SubqueryAlias `airlines`
      :  +- Relation[IATA_CODE#82,AIRLINE#83] csv
      +- SubqueryAlias `flights`
         +- Relation[YEAR#10,MONTH#11,DAY#12,DAY_OF_WEEK#13,AIRLINE#14,FLIGHT_NUMBER#15,TAIL_NUMBER#16,ORIGIN_AIRPORT#17,DESTINATION_AIRPORT#18,SCHEDULED_DEPARTURE#19,DEPARTURE_TIME#20,DEPARTURE_DELAY#21,TAXI_OUT#22,WHEELS_OFF#23,SCHEDULED_TIME#24,ELAPSED_TIME#25,AIR_TIME#26,DISTANCE#27,WHEELS_ON#28,TAXI_IN#29,SCHEDULED_ARRIVAL#30,ARRIVAL_TIME#31,ARRIVAL_DELAY#32,DIVERTED#33,... 7 more fields] csv

== Optimized Logical Plan ==
Project [AIRLINE#83, FLIGHT_NUMBER#15]
+- Join Inner, (IATA_CODE#82 = AIRLINE#14)
   :- Filter isnotnull(IATA_CODE#82)
   :  +- Relation[IATA_CODE#82,AIRLINE#83] csv
   +- Project [AIRLINE#14, FLIGHT_NUMBER#15]
      +- Filter ((isnotnull(CANCELLED#34) && (CANCELLED#34 = 0)) && isnotnull(AIRLINE#14))
         +- Relation[YEAR#10,MONTH#11,DAY#12,DAY_OF_WEEK#13,AIRLINE#14,FLIGHT_NUMBER#15,TAIL_NUMBER#16,ORIGIN_AIRPORT#17,DESTINATION_AIRPORT#18,SCHEDULED_DEPARTURE#19,DEPARTURE_TIME#20,DEPARTURE_DELAY#21,TAXI_OUT#22,WHEELS_OFF#23,SCHEDULED_TIME#24,ELAPSED_TIME#25,AIR_TIME#26,DISTANCE#27,WHEELS_ON#28,TAXI_IN#29,SCHEDULED_ARRIVAL#30,ARRIVAL_TIME#31,ARRIVAL_DELAY#32,DIVERTED#33,... 7 more fields] csv

== Physical Plan ==
*(2) Project [AIRLINE#83, FLIGHT_NUMBER#15]
+- *(2) BroadcastHashJoin [IATA_CODE#82], [AIRLINE#14], Inner, BuildLeft
   :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
   :  +- *(1) Project [IATA_CODE#82, AIRLINE#83]
   :     +- *(1) Filter isnotnull(IATA_CODE#82)
   :        +- *(1) FileScan csv [IATA_CODE#82,AIRLINE#83] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/ak/code/datos/datos-spark-lesson/data/flight-delays/airlines.csv], PartitionFilters: [], PushedFilters: [IsNotNull(IATA_CODE)], ReadSchema: struct<IATA_CODE:string,AIRLINE:string>
   +- *(2) Project [AIRLINE#14, FLIGHT_NUMBER#15]
      +- *(2) Filter ((isnotnull(CANCELLED#34) && (CANCELLED#34 = 0)) && isnotnull(AIRLINE#14))
         +- *(2) FileScan csv [AIRLINE#14,FLIGHT_NUMBER#15,CANCELLED#34] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/ak/code/datos/datos-spark-lesson/data/flight-delays/flights.csv], PartitionFilters: [], PushedFilters: [IsNotNull(CANCELLED), EqualTo(CANCELLED,0), IsNotNull(AIRLINE)], ReadSchema: struct<AIRLINE:string,FLIGHT_NUMBER:int,CANCELLED:int>
```                               

Algunos puntos interesantes a considerar:

- Ver como se produce un **filter pushdown** en el optimized logical plan.
- Ver como se produce un **proyeccion de las columnas necesarias** antes del join en el optimized logical plan para reducir la cantidad de datos a utilizar.
- Ver el tipo de JOIN a realizarse: **BroadcastHashJoin**.

## Como pueden implementarse los Joins en forma general

Existen las siguientes estrategias para implementar joins:

### Nested Loop Join

- Metodo Naive para implementar un join con dos loops anidados. 
- ```O(n*m)``` = ***MUY LENTO***.

### Hash Join
- Crear una Hashmap con la tabla mas chica del join
- ```O(n+m)``` dado que necesita crear un Hashmap, usando memoria extra.
- Solamente soporta comparacion a claves iguales para el join (equijoin)

### Sort Merge Join
- ```O(n (log(n)) + m (log(m)))```, un poco mas lento que Hash Join.
- No hace uso de una meta estructura y puede ser usado con tablas muy grandes.
- Las keys deben poder ordenarse.

### Joins en Spark y como se efectuan en el Physical Plan

Existen cuatro tipos de physical plans para efectuar joins en Spark.

### Sort-Merge Join

- Implementacion Default de Join en Spark.
- Keys deben poder ordenarse.
- Produce Shuffle.

### Shuffle Hash Join

- Utilizado cuando las keys no pueden ordenarse o el sort merge join esta deshabilitado.
- Se usa cuando usualmente un "lado" es mas chico que el otro. (3 veces)
- Spark debe ser capaz de poder crear un hashmap local con los recursos que tiene disponibles. (lo estima)
- Podemos forzarlo si creemos que nos ayuda como mejora de performance.
- Produce Shuffle

### Broadcast Join

- No produce Shuffle, dado que realiza broadcast completo de un lado del join a todos los executors.
- Spark debe evaluar si puede realizar el broadcast.
- Hace broadcast del dataframe mas chico.
- Hace el join del lado del map.


### Broadcast Nested Loop Join

- Caso PATOLOGICO que queremos evitar.

Otro aspecto importante a considerar al trabajar con joins Grandes el nivel de ```paralellism``` es decir la cantidad de particiones que utilizamos dependiendo de como estan distribuidas nuestras keys. (skewed data)

## Analizando el query plan de Agregacion (Group By)

Por ultimo veamos un ejemplo mas para ver algunos tipos mas de operaciones y como se optimizan (agregacion).

In [27]:
query = 'SELECT airlines.AIRLINE,\
                count(flights.FLIGHT_NUMBER) as number_of_fligths\
                FROM flights JOIN airlines\
                WHERE flights.AIRLINE = airlines.IATA_CODE\
                AND flights.CANCELLED = 0\
                GROUP BY airlines.AIRLINE'

spark.sql(query).explain(True)

== Parsed Logical Plan ==
'Aggregate ['airlines.AIRLINE], ['airlines.AIRLINE, 'count('flights.FLIGHT_NUMBER) AS number_of_fligths#305]
+- 'Filter (('flights.AIRLINE = 'airlines.IATA_CODE) && ('flights.CANCELLED = 0))
   +- 'Join Inner
      :- 'UnresolvedRelation `flights`
      +- 'UnresolvedRelation `airlines`

== Analyzed Logical Plan ==
AIRLINE: string, number_of_fligths: bigint
Aggregate [AIRLINE#83], [AIRLINE#83, count(FLIGHT_NUMBER#15) AS number_of_fligths#305L]
+- Filter ((AIRLINE#14 = IATA_CODE#82) && (CANCELLED#34 = 0))
   +- Join Inner
      :- SubqueryAlias `flights`
      :  +- Relation[YEAR#10,MONTH#11,DAY#12,DAY_OF_WEEK#13,AIRLINE#14,FLIGHT_NUMBER#15,TAIL_NUMBER#16,ORIGIN_AIRPORT#17,DESTINATION_AIRPORT#18,SCHEDULED_DEPARTURE#19,DEPARTURE_TIME#20,DEPARTURE_DELAY#21,TAXI_OUT#22,WHEELS_OFF#23,SCHEDULED_TIME#24,ELAPSED_TIME#25,AIR_TIME#26,DISTANCE#27,WHEELS_ON#28,TAXI_IN#29,SCHEDULED_ARRIVAL#30,ARRIVAL_TIME#31,ARRIVAL_DELAY#32,DIVERTED#33,... 7 more fields] csv
      +- Subq

```
== Parsed Logical Plan ==
'Aggregate ['airlines.AIRLINE], ['airlines.AIRLINE, 'count('flights.FLIGHT_NUMBER) AS number_of_fligths#305]
+- 'Filter (('flights.AIRLINE = 'airlines.IATA_CODE) && ('flights.CANCELLED = 0))
   +- 'Join Inner
      :- 'UnresolvedRelation `flights`
      +- 'UnresolvedRelation `airlines`

== Analyzed Logical Plan ==
AIRLINE: string, number_of_fligths: bigint
Aggregate [AIRLINE#83], [AIRLINE#83, count(FLIGHT_NUMBER#15) AS number_of_fligths#305L]
+- Filter ((AIRLINE#14 = IATA_CODE#82) && (CANCELLED#34 = 0))
   +- Join Inner
      :- SubqueryAlias `flights`
      :  +- Relation[YEAR#10,MONTH#11,DAY#12,DAY_OF_WEEK#13,AIRLINE#14,FLIGHT_NUMBER#15,TAIL_NUMBER#16,ORIGIN_AIRPORT#17,DESTINATION_AIRPORT#18,SCHEDULED_DEPARTURE#19,DEPARTURE_TIME#20,DEPARTURE_DELAY#21,TAXI_OUT#22,WHEELS_OFF#23,SCHEDULED_TIME#24,ELAPSED_TIME#25,AIR_TIME#26,DISTANCE#27,WHEELS_ON#28,TAXI_IN#29,SCHEDULED_ARRIVAL#30,ARRIVAL_TIME#31,ARRIVAL_DELAY#32,DIVERTED#33,... 7 more fields] csv
      +- SubqueryAlias `airlines`
         +- Relation[IATA_CODE#82,AIRLINE#83] csv

== Optimized Logical Plan ==
Aggregate [AIRLINE#83], [AIRLINE#83, count(FLIGHT_NUMBER#15) AS number_of_fligths#305L]
+- Project [FLIGHT_NUMBER#15, AIRLINE#83]
   +- Join Inner, (AIRLINE#14 = IATA_CODE#82)
      :- Project [AIRLINE#14, FLIGHT_NUMBER#15]
      :  +- Filter ((isnotnull(CANCELLED#34) && (CANCELLED#34 = 0)) && isnotnull(AIRLINE#14))
      :     +- Relation[YEAR#10,MONTH#11,DAY#12,DAY_OF_WEEK#13,AIRLINE#14,FLIGHT_NUMBER#15,TAIL_NUMBER#16,ORIGIN_AIRPORT#17,DESTINATION_AIRPORT#18,SCHEDULED_DEPARTURE#19,DEPARTURE_TIME#20,DEPARTURE_DELAY#21,TAXI_OUT#22,WHEELS_OFF#23,SCHEDULED_TIME#24,ELAPSED_TIME#25,AIR_TIME#26,DISTANCE#27,WHEELS_ON#28,TAXI_IN#29,SCHEDULED_ARRIVAL#30,ARRIVAL_TIME#31,ARRIVAL_DELAY#32,DIVERTED#33,... 7 more fields] csv
      +- Filter isnotnull(IATA_CODE#82)
         +- Relation[IATA_CODE#82,AIRLINE#83] csv

== Physical Plan ==
*(3) HashAggregate(keys=[AIRLINE#83], functions=[count(FLIGHT_NUMBER#15)], output=[AIRLINE#83, number_of_fligths#305L])
+- Exchange hashpartitioning(AIRLINE#83, 200)
   +- *(2) HashAggregate(keys=[AIRLINE#83], functions=[partial_count(FLIGHT_NUMBER#15)], output=[AIRLINE#83, count#310L])
      +- *(2) Project [FLIGHT_NUMBER#15, AIRLINE#83]
         +- *(2) BroadcastHashJoin [AIRLINE#14], [IATA_CODE#82], Inner, BuildRight
            :- *(2) Project [AIRLINE#14, FLIGHT_NUMBER#15]
            :  +- *(2) Filter ((isnotnull(CANCELLED#34) && (CANCELLED#34 = 0)) && isnotnull(AIRLINE#14))
            :     +- *(2) FileScan csv [AIRLINE#14,FLIGHT_NUMBER#15,CANCELLED#34] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/ak/code/datos/datos-spark-lesson/data/flight-delays/flights.csv], PartitionFilters: [], PushedFilters: [IsNotNull(CANCELLED), EqualTo(CANCELLED,0), IsNotNull(AIRLINE)], ReadSchema: struct<AIRLINE:string,FLIGHT_NUMBER:int,CANCELLED:int>
            +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
               +- *(1) Project [IATA_CODE#82, AIRLINE#83]
                  +- *(1) Filter isnotnull(IATA_CODE#82)
                     +- *(1) FileScan csv [IATA_CODE#82,AIRLINE#83] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/ak/code/datos/datos-spark-lesson/data/flight-delays/airlines.csv], PartitionFilters: [], PushedFilters: [IsNotNull(IATA_CODE)], ReadSchema: struct<IATA_CODE:string,AIRLINE:string>
```

Algunos puntos interesantes a considerar:

- Ver como se produce un **filter pushdown** en el optimized logical plan.
- Ver como se produce un **proyeccion de las columnas necesarias** antes del join en el optimized logical plan para reducir la cantidad de datos a utilizar.
- Ver el tipo de JOIN a realizarse: **BroadcastHashJoin**.

## Aspectos importantes al Utilizar SparkSQL:

- **SIEMPRE** verificar el **plan de ejecucion** producido por el **explain** method.
- Verificar si los filtros bajan en el arbol (filter push down), de tal forma que se reduzcan los set de datos a operar.
- Verificar COMO se va a realizar el JOIN en el Physical Plan.
- Verificar si columnas innecesarias fueron eliminas en las etapas de la relacion, de tal forma que reduzca los sets de datos a operar.

## Definir operaciones sin necesidad de SQL usando el API

Existe la posibilidad de trabajar con dataframes de una forma bastante similar a la con la que trabajamos en pandas. A continuacion algunos ejemplos.

In [28]:
df_a.select("AIRLINE").show()

+--------------------+
|             AIRLINE|
+--------------------+
|United Air Lines ...|
|American Airlines...|
|     US Airways Inc.|
|Frontier Airlines...|
|     JetBlue Airways|
|Skywest Airlines ...|
|Alaska Airlines Inc.|
|    Spirit Air Lines|
|Southwest Airline...|
|Delta Air Lines Inc.|
|Atlantic Southeas...|
|Hawaiian Airlines...|
|American Eagle Ai...|
|      Virgin America|
+--------------------+



In [29]:
df_f.select('DAY_OF_WEEK').distinct().orderBy('DAY_OF_WEEK').show()

+-----------+
|DAY_OF_WEEK|
+-----------+
|          1|
|          2|
|          3|
|          4|
|          5|
|          6|
|          7|
+-----------+



In [30]:
df_f.groupBy("YEAR").count().show()

+----+-------+
|YEAR|  count|
+----+-------+
|2015|5819079|
+----+-------+



In [31]:
df_f.groupBy("YEAR","MONTH").count().orderBy("YEAR","MONTH").show()

+----+-----+------+
|YEAR|MONTH| count|
+----+-----+------+
|2015|    1|469968|
|2015|    2|429191|
|2015|    3|504312|
|2015|    4|485151|
|2015|    5|496993|
|2015|    6|503897|
|2015|    7|520718|
|2015|    8|510536|
|2015|    9|464946|
|2015|   10|486165|
|2015|   11|467972|
|2015|   12|479230|
+----+-----+------+



## Algunos puntos interesantes a desarrollar e investigar

- Interoperabilidad con RDDs (usando reflection o especificando el esquema).
- Uso y Optimizacion de UDF 
- Manejo de DataSources de distintos tipos (JSON, ORC, Parquet).
- Save Modes
- Interoperabilidad con Apache Hive.
- Interoperabilidad con Pandas.
- Interoperabilidad con fuentes JDBC.
- Distributed SQL Engine (Hola Presto)