# Capitulo 3: Apache Spark's Structure APIs

Iremos leyendo y realizando los ejemplos del capitulo 3 del libro, complementandolo con unos actividades sobre los mismos ejemplos. 

Como comentamos en el anterior Script sobre los capítulos 1 y 2:

##### Siempre debemos iniciar una instancia SparkSession al principio. 

Por lo que antes de comenzar crearemos la SparkSession correspondiente:

In [22]:
import pyspark
from pyspark.sql import SparkSession

# Creamos SparkSession (IMPORTANTE cambiar el nombre si estamos en otros scripts)
spark = SparkSession\
    .builder\
    .appName("LibroSpark_cap3")\
    .getOrCreate()


### Schemas and Creating DataFrames

Podemos crear los DF manualmente o Importarlos en base a diferentes formatos. Un dataframe esta organizado en columnas y filas, como en una tabla, y tiene un schema predefinido con los tipos de datos que formarán las columnas (int, double, string, etc). 

En cuanto al schema, a la hora de crear el DF se puede definir de dos maneras:
- Importar/Crear el DF "Infiriendo el schema". De esta manera, Spark determinará, evaluando algunas fijas de nuestro DF, cuales son los tipos de datos que lo forman.

- Definir el schema por nuestra cuenta, lo cual tiene algunas ventajas: Liberamos a Spark de la carga de inferlirlo, evitamos que se cree un job separado sólo para leer una porccion de los datos del archivo para acertar el schema, y podemos detectar algunos errores en la definición de los datos de manera anticipada. Podemos definirlo de dos formas:
 1. Programáticamente
 2. Empleando Lenduajes de Definición de Datos (DDL)

In [2]:
# Importamos archivo flights-jan-apr-2018.csv en un DF IINFIRIENDO SU SCHEMA
flightsDF = (spark.read
             .option("header", "true")
             .option("inferSchema", "true")
             .csv("./Datasets/flights-jan-apr-2018.csv") # pon aquí la ruta en tu bucket
            )
# Vemos el schema inferido
flightsDF.printSchema()

root
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- FlightDate: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- OriginCity: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- DestCity: string (nullable = true)
 |-- DepTime: integer (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- ArrTime: integer (nullable = true)
 |-- ArrDelay: double (nullable = true)
 |-- Cancelled: double (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: double (nullable = true)
 |-- ActualElapsedTime: double (nullable = true)
 |-- AirTime: double (nullable = true)
 |-- Distance: double (nullable = true)
 |-- CarrierDelay: double (nullable = true)
 |-- WeatherDelay: double (nullable = true)
 |-- NASDelay: double (nullable = true)
 |-- SecurityDelay: double (nullable = true)
 |-- LateAircraftDelay: double (nullable = true)



In [3]:
from pyspark.sql.types import *

# 1 Definimos el schema "Programáticamente". Utilizamos la funcion Structype y dentro StructField para cada columna
schema_1 = StructType([StructField("author", StringType(), False),
                     StructField("title", StringType(), False),
                     StructField("pages", IntegerType(), False)])

# 2 Definimos el schema usando DDL
schema_2 = "author STRING, title STRING, pages INT"

Es importante destacar que podemos elegir cualesquiera de las dos formas de definición del schema. Incluso las 2 a la vez.

In [4]:
## EJ 3.6
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Definimos schema forma programatica
schema_3_6 = StructType([
   StructField("Id", IntegerType(), False),
   StructField("First", StringType(), False),
   StructField("Last", StringType(), False),
   StructField("Url", StringType(), False),
   StructField("Published", StringType(), False),
   StructField("Hits", IntegerType(), False),
   StructField("Campaigns", ArrayType(StringType()), False)])

# Definimos el schema usando DDL
schema = "`Id` INT, `First` STRING, `Last` STRING, `Url` STRING, `Published` STRING, `Hits` INT, `Campaigns` ARRAY<STRING>"

# Creamos los datos 
data = [[1, "Jules", "Damji", "https://tinyurl.1", "1/4/2016", 4535, ["twitter", "LinkedIn"]],
       [2, "Brooke","Wenig","https://tinyurl.2", "5/5/2018", 8908, ["twitter", "LinkedIn"]],
       [3, "Denny", "Lee", "https://tinyurl.3","6/7/2019",7659, ["web", "twitter", "FB", "LinkedIn"]],
       [4, "Tathagata", "Das","https://tinyurl.4", "5/12/2018", 10568, ["twitter", "FB"]],
       [5, "Matei","Zaharia", "https://tinyurl.5", "5/14/2014", 40578, ["web", "twitter", "FB", "LinkedIn"]],
       [6, "Reynold", "Xin", "https://tinyurl.6", "3/2/2015", 25568, ["twitter", "LinkedIn"]]
      ]

# Creamos el DF (eligiendo cualquiera de los dos schemas)
blogs_df = spark.createDataFrame(data, schema)
    
# Inspeccionamos
blogs_df.show(5)

# Vemos el schema definido en DDL
print(blogs_df.printSchema())

# Puedes ver el schema creado en DDL de manera programatica invocando el shcema
blogs_df.schema

+---+---------+-------+-----------------+---------+-----+--------------------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|
+---+---------+-------+-----------------+---------+-----+--------------------+
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB...|
+---+---------+-------+-----------------+---------+-----+--------------------+
only showing top 5 rows

root
 |-- Id: integer (nullable = true)
 |-- First: string (nullable = true)
 |-- Last: string (nullable = true)
 |-- Url: string (nullable = true)
 |-- Published: string (nullable = true)
 |-- Hits: integer (nullable = true)
 |-- Campaigns: array (nullable = true

StructType(List(StructField(Id,IntegerType,true),StructField(First,StringType,true),StructField(Last,StringType,true),StructField(Url,StringType,true),StructField(Published,StringType,true),StructField(Hits,IntegerType,true),StructField(Campaigns,ArrayType(StringType,true),true)))

### Columns and Expressions

Las columnas en DF son conceptualmente similares a como se crean en Pandas, R o una tabla RDBMS, describen el tipo de dato de un campo. 

Puedes usar expresiones lógicas o matemáticas en las columnas con funciones como expr() o col(). Vemos algunos ejemplos de que podemos hacer sobre las columnas

In [5]:
# Multiplicamos columna Hits por 2 en un select con cualquiera de las dos funciones mencionadas
blogs_df.select(expr("Hits * 2")).show(2)
blogs_df.select(col("Hits") * 2).show(2)

# Añadimos una nueva columna 'Big Hitters' basada en una expresión condicional (valores de Hits mayores que 10000)
# utilizamos la funcion expr() para meter a condicion
blogs_df.withColumn("Big Hitters", (expr("Hits > 10000"))).show()

+----------+
|(Hits * 2)|
+----------+
|      9070|
|     17816|
+----------+
only showing top 2 rows

+----------+
|(Hits * 2)|
+----------+
|      9070|
|     17816|
+----------+
only showing top 2 rows

+---+---------+-------+-----------------+---------+-----+--------------------+-----------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|Big Hitters|
+---+---------+-------+-----------------+---------+-----+--------------------+-----------+
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|      false|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|      false|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|      false|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|       true|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB...|       true|
|  6|  Reynold|    Xin|https://tinyurl.6| 3/2/2015|25568| [twitter

In [6]:
# Concatenar tres columnas, crear una nueva columna y mostrar la nueva columna concatenada
blogs_df.withColumn("AuthorsId", (concat(expr("First"), expr("Last"), expr("Id")))).select(col("AuthorsId")).show(4)

# Estas sentencias devuelven el mismo valor, mostrando que expr es lo mismo que una llamada al método col
blogs_df.select(expr("Hits")).show(2)
blogs_df.select(col("Hits")).show(2)
blogs_df.select("Hits").show(2)

# Ordenamos con Sort by la columna "Id" en orden descendente
 # blogs_df.sort(col("Id").desc).show()
 # blogs_df.sort($"Id".desc).show()

# Destacamos aqui que podemos referirnos a las columnas como col("Id") o $"Id"

+-------------+
|    AuthorsId|
+-------------+
|  JulesDamji1|
| BrookeWenig2|
|    DennyLee3|
|TathagataDas4|
+-------------+
only showing top 4 rows

+----+
|Hits|
+----+
|4535|
|8908|
+----+
only showing top 2 rows

+----+
|Hits|
+----+
|4535|
|8908|
+----+
only showing top 2 rows

+----+
|Hits|
+----+
|4535|
|8908|
+----+
only showing top 2 rows



### Rows (Filas)

Row es un objeto generico de sparkq, conteniendo una o mas columnas en forma de colección ordenada de campos. Cada columna puede ser del mismo data type (integer, string, map, array, etc.). o no.

Se puede crear una fila Row desde cero con la funcion Row() y acceder a ella usando índices (index)

In [7]:
from pyspark.sql import Row

#Creamos Row desde cero.
blog_row = Row(6, "Reynold", "Xin", "https://tinyurl.6", 255568, "3/2/2015", ["twitter", "LinkedIn"])

# Acceso mediante índice para elementos individuales
blog_row[1]
'Reynold'

'Reynold'

Tambien puedes usar los objetos Row paara crear Dataframes.

In [8]:
from pyspark.sql import Row

# Creamos objeto row
rows = [Row("Matei Zaharia", "CA"), Row("Reynold Xin", "CA")]

# Creamos Df con createDataFrame
authors_df = spark.createDataFrame(rows, ["Authors", "State"]).show()

+-------------+-----+
|      Authors|State|
+-------------+-----+
|Matei Zaharia|   CA|
|  Reynold Xin|   CA|
+-------------+-----+



En la mayoría de casos, dado que los archivos serán muy frantes, será mejor práctica crear los DF definiendo su schema y cargarlos será mas rápido y eficiente.

### Common DataFrame Operations

Vamos a usar DataFrameReader de spark para cargar un DF con el cual podemos cargar los datos en diferentes formatos JSON, CSV, Parquet, Text, Avro, ORC, etc. 

Para escribir un DataFrame y guardarlo en una fuente de datos en un formato particular, Spark utiliza DataFrameWriter.

#### Using DataFrameReader and DataFrameWriter
Escribir y cargar un DF con Spark es simple con Spark por que podemos cargarlo de diferentes fuentes, ya sean RDBMS, NOSQL, Streaming, etc

Para comenzar cargaremos un CSV que contiene los datos de las llamadas del departamento de bomberos de San Francisco. 
como hemos comentado previamente, es más eficiente definir el schema de este archivo usando la clase DataFrameReader y sus metodos. El archivo contiene 28 columnas y mas de 4 millones de registros. 

In [9]:
from pyspark.sql.types import *

# Definimos el schema programaticamente
fire_schema = StructType([StructField('CallNumber', IntegerType(), True),
                          StructField('UnitID', StringType(), True),
                          StructField('IncidentNumber', IntegerType(), True),
                          StructField('CallType', StringType(), True),
                          StructField('CallDate', StringType(), True),
                          StructField('WatchDate', StringType(), True),
                          StructField('CallFinalDisposition', StringType(), True),
                          StructField('AvailableDtTm', StringType(), True),
                          StructField('Address', StringType(), True),
                          StructField('City', StringType(), True),
                          StructField('Zipcode', IntegerType(), True),
                          StructField('Battalion', StringType(), True),
                          StructField('StationArea', StringType(), True),
                          StructField('Box', StringType(), True),
                          StructField('OriginalPriority', StringType(), True),
                          StructField('Priority', StringType(), True),
                          StructField('FinalPriority', IntegerType(), True),
                          StructField('ALSUnit', BooleanType(), True),
                          StructField('CallTypeGroup', StringType(), True),
                          StructField('NumAlarms', IntegerType(), True),
                          StructField('UnitType', StringType(), True),
                          StructField('UnitSequenceInCallDispatch', IntegerType(), True),
                          StructField('FirePreventionDistrict', StringType(), True),
                          StructField('SupervisorDistrict', StringType(), True),
                          StructField('Neighborhood', StringType(), True),
                          StructField('Location', StringType(), True),
                          StructField('RowID', StringType(), True),
                          StructField('Delay', FloatType(), True)])

# Leemos el CSV con DataFrameReader y su funcion spark.read.csv y indicando el schema)
sf_fire_file = "./Datasets/sf-fire-calls.csv"
# Creamos DF
fire_df = spark.read.csv(sf_fire_file, header=True, schema=fire_schema)

print(fire_df.printSchema())

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 

Como hemos contentado usamos DataframeWriter para guardarlo en diferentes formatos. 

Parquet es un formato que guarda el schema como parte de los metadatos de parquet, por lo que si quieres volver a leerlo en formato parquet, no será necesario indicar un schema manualmente.

### Saving a DataFrame as a Parquet file or SQL table

Es comun guardar el DF en formato parquet, como vista tempora, pudiendo ejecutar consultas de SQLpuro, o como tabla SQL. 

In [10]:
# Guardar DF como parquet con modo overwrite para sobreescribir el archivo
fire_df.write.mode('overwrite').parquet("./Datasets/DFs_saved/fire_df_parquet")

# Crea un archivo que termina en snappy.parquet pero está vacio y sigue dando el error
# Snappy es el algoritmo de compresión de parquet por defecto

fire_df.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 

In [11]:
# Creamos vista temporal a través del DF para poder usar consultas de SQLpuro
fire_df.createOrReplaceTempView("fire_df_tempview")

results_fire_df = spark.sql("SELECT * FROM fire_df_tempview LIMIT 5").show()

+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|    Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+------------

In [12]:
# Tambien podemos guardar nuestro DF en formato parquet Particionando por una o varias columnas
 # De esta manera, se crea en el directorio la carpeta "AlSUnit"
    # y subdividida en las categorias de la misma columna los datos particionados

fire_df.write.partitionBy("ALSUnit").mode("overwrite").parquet("./Datasets/DFs_saved/fire_df_parquet_partition")

### Transformations and actions

Ahora que tenemos nuestro DF distribuido en memoria "fire_df", lo primero que haremos será examinarlo para ver cuantas filas tiene y verificar si las columnas se ven bien, si los datos están correctos, si se pueden convertir en otros tipos, si tiene valores nulos, etc. Todo esto a través de transformaciones, acciones y querys.

Usamos metodos como en una query dentro select para examinar algunos aspectos especificos del dataset. A continuacion se pueden ver varios ejemplos de querys para obtener o examnidar aspectos o infformación especifica del dataset

In [13]:
# Select 3 columnas cuando CallType es Medical incident con where

few_fire_df = (fire_df.select("IncidentNumber", "AvailableDtTm", "CallType")
               .where(col("CallType") != "Medical Incident"))

few_fire_df.show(5, truncate=False)


+--------------+----------------------+--------------+
|IncidentNumber|AvailableDtTm         |CallType      |
+--------------+----------------------+--------------+
|2003235       |01/11/2002 01:51:44 AM|Structure Fire|
|2003250       |01/11/2002 04:16:46 AM|Vehicle Fire  |
|2003259       |01/11/2002 06:01:58 AM|Alarms        |
|2003279       |01/11/2002 08:03:26 AM|Structure Fire|
|2003301       |01/11/2002 09:46:44 AM|Alarms        |
+--------------+----------------------+--------------+
only showing top 5 rows



In [14]:
from pyspark.sql.functions import *
# ¿Cuántos CallTypes distintos se registraron como causas de las llamadas de incendio? usando countDistinct dentro de agg
(fire_df
.select("CallType")
.where(col("CallType").isNotNull())
.agg(countDistinct("CallType").alias("DistinctCallTypes"))
.show())

+-----------------+
|DistinctCallTypes|
+-----------------+
|               30|
+-----------------+



In [15]:
# Filtramos sólo los CallTypes distintos no nulos de todas las filas
(fire_df
.select("CallType")
.where(col("CallType").isNotNull())
.distinct()
.show(10, False))

+-----------------------------------+
|CallType                           |
+-----------------------------------+
|Elevator / Escalator Rescue        |
|Marine Fire                        |
|Aircraft Emergency                 |
|Confined Space / Structure Collapse|
|Administrative                     |
|Alarms                             |
|Odor (Strange / Unknown)           |
|Citizen Assist / Service Call      |
|HazMat                             |
|Watercraft in Distress             |
+-----------------------------------+
only showing top 10 rows



### Renaming, adding, and dropping columns

Mostramos algunos ejemplos de como renombrar (withColumnRenamed), añadir (withColumn) o eliminar (drop) columnas.

In [16]:
# Creamos nuevo DF renombrando la columna "Delay" por "ResponseDelayedinMins" y 
# Hacemos un select que muestre aquellas llamadas con un delay de más de 5 minutos
new_fire_df = fire_df.withColumnRenamed("Delay", "ResponseDelayedinMins")
(new_fire_df
.select("ResponseDelayedinMins")
.where(col("ResponseDelayedinMins") > 5)
.show(5, False))

+---------------------+
|ResponseDelayedinMins|
+---------------------+
|5.35                 |
|6.25                 |
|5.2                  |
|5.6                  |
|7.25                 |
+---------------------+
only showing top 5 rows



In [17]:
# Añadimos columnas "IncidentDate", "OnWatchDate", "AvailableDtTS" borrando sus columnas que no necesitamos despues 
 # añadiendo sus datatypes, en el caso de datos timestamp añadiendo el formato de la fecha
    
fire_ts_df = (new_fire_df
.withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy"))
.drop("CallDate")
.withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy"))
.drop("WatchDate")
.withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"),
"MM/dd/yyyy hh:mm:ss a"))
.drop("AvailableDtTm"))

# Vemos las columnas añadidas
(fire_ts_df
.select("IncidentDate", "OnWatchDate", "AvailableDtTS")
.show(5, False))

+-------------------+-------------------+-------------------+
|IncidentDate       |OnWatchDate        |AvailableDtTS      |
+-------------------+-------------------+-------------------+
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 01:51:44|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 03:01:18|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 02:39:50|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 04:16:46|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 06:01:58|
+-------------------+-------------------+-------------------+
only showing top 5 rows



Ahora que tenemos los datos tipo fecha modificados, podemos usar funciones de consulta como like month(), year(), and day() para explorar aún mñas los datos de tipo fecha. Algunos ejemplos de operacoines comunes:

In [18]:
# Seleccinamos los años de la columna de datos IncidentDate con la guncion year, ordenandolos de manera ascendente
(fire_ts_df
 .select(year('IncidentDate'))
 .distinct()
 .orderBy(year('IncidentDate'))
 .show(5))

+------------------+
|year(IncidentDate)|
+------------------+
|              2000|
|              2001|
|              2002|
|              2003|
|              2004|
+------------------+
only showing top 5 rows



#### Operaciones de agregado
Una última operación común es agrupar datos por valores en una columna y agregar los datos de alguna manera, como simplemente contarlos o filtrarlos.

Para ello, utilizamos transformaciones o acciones en DF como groupBy(), orderBy(), and count() que ofrecen la posibilidad de agregar por columna y contrar a traves de ella.

Algunos ejemplos:

In [19]:
# Contamos el numero de filas no nulas de la columna CallType orenandolas por la columna count en orden ascendente
(fire_ts_df
.select("CallType")
.where(col("CallType").isNotNull())
.groupBy("CallType")
.count()
.orderBy("count", ascending=False)
.show(n=10, truncate=False))

# De esta consulta podemos sacar la conclusion de que tipo de llamada mas comun es por un Incidente Medico

+-------------------------------+------+
|CallType                       |count |
+-------------------------------+------+
|Medical Incident               |113794|
|Structure Fire                 |23319 |
|Alarms                         |19406 |
|Traffic Collision              |7013  |
|Citizen Assist / Service Call  |2524  |
|Other                          |2166  |
|Outside Fire                   |2094  |
|Vehicle Fire                   |854   |
|Gas Leak (Natural and LP Gases)|764   |
|Water Rescue                   |755   |
+-------------------------------+------+
only showing top 10 rows



#### Other common DataFrame operations.

La API DF tambien tiene algunas funciones estadisticas como min(), max(), sum() y avg(). Algunos ejemplos:

In [20]:
# Calculamos el total de alarmas y los valores minimos, maximos y medios de DelayResponse
import pyspark.sql.functions as F # importamos las funciones como F
(fire_ts_df
.select(F.sum("NumAlarms"), 
        F.avg("ResponseDelayedinMins"),
        F.min("ResponseDelayedinMins"), 
        F.max("ResponseDelayedinMins"))
.show())

+--------------+--------------------------+--------------------------+--------------------------+
|sum(NumAlarms)|avg(ResponseDelayedinMins)|min(ResponseDelayedinMins)|max(ResponseDelayedinMins)|
+--------------+--------------------------+--------------------------+--------------------------+
|        176170|         3.892364154521585|               0.016666668|                   1844.55|
+--------------+--------------------------+--------------------------+--------------------------+



Como puede ver, es fácil componer y encadenar consultas expresivas con la API de alto nivel y los operadores DSL y DSL. No podemos imaginar la opacidad y la ilegibilidad comparativa si hiciesemos lo mismo con los RDDs

## Dataset API
Un Dataset es una colección fuertemente tipificada de objetos específicos del dominio que pueden ser transformados en paralelo
utilizando operaciones funcionales o relacionales. 

En los lenguajes soportados por Spark, los Datasets sólo tienen sentido en Java y Scala, mientras que en Python y R sólo tienen sentido los DataFrames. Esto se debe a que Python y R no son compilados; los tipos se infieren o asignan dinámicamente durante la ejecución, no durante el tiempo de compilación.

Desde Spark2.0 se unifico la API de DF y Dataset como una API estrucutrada con interfaces similares para que desarolladores no tuvieran que aprender una de las dos APIS. El Dataset API tiene una serie de ventajas como la detección de errores en tiempo de compilación entre otros que SOLO pueden ser aprovechadas por Scala, por lo que como estamos utilizando Python, seguiremos usando DF.

Tenemos que tener en cuenta que en scala/java un DataFRame = Dataset<Row> (es decir un Dataset cuyo tipo de dato es generico, el Row).

Recapitulando, las operaciones que podemos realizar en Datasets en Scala/Java como -filter(), map(), groupBy(), select(), take(), etc. - son similares a las de los DataFrames. En cierto modo, los Datasets son similares a los RDD en el sentido de que proporcionan una interfaz similar a sus métodos mencionados y la seguridad en tiempo de compilación, pero con una lectura mucho más fácil y una de programación orientada a objetos.


## SparkSQL y su Motor Subyacente

A nivel de programacion, SparkSQL permite a los desarolladores hacer querys en datos estructurados con un schema, DFs (compratibles con ANSI SQL:2003). Aparte de esto, el motor de Spark permite:
- Unificar los componentes de Spark y permitir abstraccion en todos los leguajes que soporta sprark, simplificando el trabajo con datasets estructurados
- Conectar con el Hive Metastore y tablas
- Leer y escribir datos estructurados con u esquema especifico en diferentes formatos (JSON, CSV, Text, Avro, Parquet, ORC, etc.) y convertirlos en vistas o tablas temporales
- Hacer de puente para herramientas externas via conectors de bbdd JDBC/OBDC
- Crear querys con un plan optimicao por la JVM para su ejecución

<center><img src="./images/SparkSQL_stack.PNG"></center>

En el core del motor de SparkSQL, se encuentran el Catalyst Optimizer y el Project Tungsten. Juntos crean las querys en DF o DF. Nos centraremos en este script en el Catalyst Optimizer para ver como optimiza las querys convirtiendolas en un plan de ejecución.

##### Catalyst Optimizer
Convierte las querys en un plan de ejecución en base a 4 fases:
1. Analisis
2. Optimización Lógica
3. Plan Fisico
4. Generacion del codigo

Descacamos que independientemente del lenguaje que usemos estemos usando (SQLpuro a través de una vista temporal o DFs de Spark ya sea con R,Python,Java o Scala) spark utiliza este componente para crear de manera lógica un plan antes de ejecutar el codigo, por lo que el rendimiento es el mismo. Podemos ver un ejemplo gráfico de como el Catalyst Optimizer optimiza y transforma una query enl la siguiente iustración.

<center><img src="./images/catalyst_optimizer.png"></center>

Vamos a ver un ejemplo utilizando una query realizada con el DF de MnM del anterior capitulo.

In [21]:
from pyspark.sql.functions import *

# Ejemplo pg78
# Cargamos DataFrame de M&M
mnm_df = spark.read.option("header", "true") \
          .option("inferSchema", "true") \
          .csv("./Datasets/mnm_dataset.csv")

# Con DFs, realizamos un select que muestre el total de colores agrupados por estado y color ordenados por Total en orden descendente
count_mnm_df = (mnm_df
.select("State", "Color", "Count")
.groupBy("State", "Color")
.agg(count("Count")
.alias("Total"))
.orderBy("Total", ascending=False))

# Utilizamos el metodo .explain() para ver el proceso interno que hace spark
count_mnm_df.explain(True) #importante añadir true para hacer la info mas legible

== Parsed Logical Plan ==
'Sort ['Total DESC NULLS LAST], true
+- Aggregate [State#208, Color#209], [State#208, Color#209, count(Count#210) AS Total#221L]
   +- Project [State#208, Color#209, Count#210]
      +- Relation[State#208,Color#209,Count#210] csv

== Analyzed Logical Plan ==
State: string, Color: string, Total: bigint
Sort [Total#221L DESC NULLS LAST], true
+- Aggregate [State#208, Color#209], [State#208, Color#209, count(Count#210) AS Total#221L]
   +- Project [State#208, Color#209, Count#210]
      +- Relation[State#208,Color#209,Count#210] csv

== Optimized Logical Plan ==
Sort [Total#221L DESC NULLS LAST], true
+- Aggregate [State#208, Color#209], [State#208, Color#209, count(Count#210) AS Total#221L]
   +- Relation[State#208,Color#209,Count#210] csv

== Physical Plan ==
*(3) Sort [Total#221L DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(Total#221L DESC NULLS LAST, 200), true, [id=#131]
   +- *(2) HashAggregate(keys=[State#208, Color#209], functions=[count(Count#

**Fase 1: Análisis**

El motor Spark SQL comienza generando un árbol de sintaxis abstracta (AST) para la consulta SQL o DataFrame. En esta fase inicial, cualquier columna o nombre de tabla se resolverá consultando un Catálogo interno, una interfaz programática de Spark SQL que contiene una lista de nombres de columnas, tipos de datos, funciones, tablas, bases de datos, etc. Una vez que se han resueltos con éxito, la consulta pasa a la siguiente fase.

**Fase 2: Optimización lógica**

Como muestra la ilustración anterior, esta fase comprende dos etapas internas. 
 - Aplicando un enfoque de optimización **basado en reglas estándar y el optimizador de Catalyst construye primero un conjunto de planes múltiples y 
 - A continuación utiliza un **optimizador basado en costes (CBO)**, asignando costes a cada plan. 

Estos planes se presentan como árboles de operadores pueden incluir, por ejemplo, el proceso de plegado de constantes, pushdown de predicados, poda de proyecciones, simplificación de expresiones booleanas, etc. Este plan lógico es la entrada en el plan físico.


**Fase 3: Planificación física**

En esta fase, Spark SQL genera un plan físico óptimo para el plan lógico seleccionado seleccionado, utilizando operadores físicos que coinciden con los disponibles en el motor de ejecución de Spark motor de ejecución de Spark.

**Fase 4: Generación de código**

La fase final de la optimización de la consulta consiste en generar un bytecode Java eficiente para ejecutar en cada máquina. Dado que Spark SQL puede operar sobre conjuntos de datos cargados en memoria, Spark puede utilizar tecnología de compilación de última generación para la generación de código para acelerar ejecución. En otras palabras, actúa como un compilador. El proyecto Tungsten, que facilita la la generación de código en toda la etapa, desempeña un papel importante.

¿Qué es la generación de código en toda la fase? Es una fase de optimización de la consulta física que que reduce toda la consulta a una sola función, eliminando las llamadas a funciones virtuales y empleando los registros de la CPU para los datos intermedios. La segunda generación del motor Tungsten de segunda generación, introducido en Spark 2.0, utiliza este enfoque para generar un código RDD compacto para la ejecución final. Esta estrategia racionalizada mejora significativamente la eficiencia y el rendimiento de la CPU y el rendimiento.



In [21]:
spark.stop()