In [38]:
import math

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, explode, desc, split
from pyspark.sql.types import IntegerType, StringType, StructType, StructField, ArrayType
from time import time


# PySpark - DataFrames

En este laboratorio utilizaremos Spark SQL. Spark SQL es un modulo de Spark que nos permite trabajar con una abstraccion de datos llamada DataFrame. Un DataFrame es una estructura de datos distribuida en filas, con nombre de columnas y tipos de datos. Los DataFrames son similares a las tablas en una base de datos relacional o las tablas en un archivo Excel, con la diferencia de que los DataFrames pueden ser distribuidos en multiples nodos de un cluster.

In [2]:
ss = SparkSession.builder.getOrCreate()

In [3]:
ss

## RDD vs DataFrame
En nuestro notebook anterior, hemos trabajado con RDD (Conjunto de datos distribuido resistente), la abstracción básica en Spark.

En esta sesión de laboratorio, estudiamos (o revisamos) la API de DataFrame, una colección de datos distribuidos e inmutables. Los DataFrames permiten a los desarrolladores imponer una estructura en una colección distribuida de datos, lo que permite una abstracción de mayor nivel; Además, DataFrames proporciona una API de lenguaje específico de dominio (DSL) para manipular datos estructurados y distribuidos. En última instancia, el objetivo es hacer que Spark sea accesible a un público más amplio, más allá de los investigadores y los ingenieros de datos especializados.

In [9]:
sc = ss.sparkContext

In [10]:
time_start = time()
input_file = sc.textFile("datasets/tiny-shakespeare.txt")
num_lines = input_file.count()
time_end = time()
print("Number of lines: %i" % (num_lines))
print("Elapsed time: %f" % (time_end - time_start))

Number of lines: 40000
Elapsed time: 1.207739


In [11]:
time_start = time()
input_file = ss.read.text("datasets/tiny-shakespeare.txt")
num_lines = input_file.count()
time_end = time()
print("Number of lines: %i" % (num_lines))
print("Elapsed time: %f" % (time_end - time_start))

Number of lines: 40000
Elapsed time: 0.235776


### Preguntas
Usando el Web UI de Spark, responda las siguientes preguntas:

1. ¿Cuántos trabajos se ejecutaron para contar el número de líneas en el archivo tiny-shakespeare.txt?
2. ¿Por qué el tiempo de ejecución es diferente para RDD y DataFrame? Explique.

## Ejercicio 1: Contar palabras

En este ejercicio, obtendremos la frecuencia de palabras en el archivo tiny-shakespeare.txt. Mostraremos las 10 palabras más comunes en el archivo.

In [12]:
time_start = time()
words = sc.textFile("datasets/tiny-shakespeare.txt").repartition(8)\
            .flatMap(lambda line: line.split(" "))\
            .map(lambda word: (word, 1))\
            .reduceByKey(lambda a, b: a + b)\
            .map(lambda x: (x[1], x[0]))\
            .sortByKey(False)

top_words = words.take(10)
time_end = time()

print("Top words: %s" % (top_words))
print("Elapsed time: %f" % (time_end - time_start))

Top words: [(7241, ''), (5437, 'the'), (4403, 'I'), (3923, 'to'), (3678, 'and'), (3275, 'of'), (2677, 'my'), (2610, 'a'), (2130, 'you'), (2073, 'in')]
Elapsed time: 2.729319


In [19]:
time_start = time()
words = ss.read.text("datasets/tiny-shakespeare.txt").repartition(8)\
            .selectExpr("split(value, ' ') as words")\
            .select(explode(col("words")).alias("word"))\
            .groupBy("word")\
            .count()\
            .orderBy(col("count").desc())

top_words = words.take(10)
time_end = time()

print("Top words: %s" % (top_words))
print("Elapsed time: %f" % (time_end - time_start))

Top words: [Row(word='', count=7241), Row(word='the', count=5437), Row(word='I', count=4403), Row(word='to', count=3923), Row(word='and', count=3678), Row(word='of', count=3275), Row(word='my', count=2677), Row(word='a', count=2610), Row(word='you', count=2130), Row(word='in', count=2073)]
Elapsed time: 1.268705


In [20]:
time_start = time()
words = ss.read.text("datasets/tiny-shakespeare.txt").repartition(8)\
            .selectExpr("split(value, ' ') as words")\
            .select(explode(col("words")).alias("word"))\
            .groupBy("word")\
            .count()\
            .orderBy(col("count").desc())
top_words = words.take(10)
time_end = time()

print("Top words: %s" % (top_words))
print("Elapsed time: %f" % (time_end - time_start))

Top words: [Row(word='', count=7241), Row(word='the', count=5437), Row(word='I', count=4403), Row(word='to', count=3923), Row(word='and', count=3678), Row(word='of', count=3275), Row(word='my', count=2677), Row(word='a', count=2610), Row(word='you', count=2130), Row(word='in', count=2073)]
Elapsed time: 1.228585


### Preguntas
Usando el Web UI de Spark, responda las siguientes preguntas:

1. ¿Por qué el tiempo de ejecución es diferente para RDD y DataFrame? Explique.
2. ¿Cual es el API de Spark que es mas eficiente (RDD o DataFrame)? Explique.

## Ejercicio 2: Flights

Un Dataframe es una colección de datos distribuidos e inmutables, organizados en columnas con nombres y tipos de datos. Los DataFrames pueden ser creados a partir de una variedad de fuentes, como archivos, tablas de Hive, tablas de HBase, RDDs, etc. En este ejercicio, trabajaremos con un dataset de informacion de vuelos. Usaremos el mismo dataset del laboratorio anterior.

Para construir el dataframe hay varias formas:

* Usando el metodo `read.csv` de SparkSession
* Usando el metodo `read.format("csv").load` de SparkSession
* Usando el metodo `read.load` de SparkSession

En este ejercicio, se pide leer el archivo `2008.csv` usando los tres metodos y comparar los tiempos de ejecucion. Usar la inferencia de tipos de datos para leer el archivo.



In [None]:
""" YOUR CODE HERE """

### Preguntas

1. ¿Cual es el metodo mas eficiente para leer el archivo? Explique.
2. ¿Por que la diferencia de tiempo de ejecucion? Explique.

Nuestra siguiente tarea es leer los archivos sin usar la inferencia de datos. Para esto deberemos crear el esquema manualmente.

El esquema es una estructura de datos que define los nombres y tipos de datos de las columnas de un dataframe. Para crear el esquema, usaremos la clase `StructType` y la clase `StructField`. La clase `StructType` representa un esquema de datos y la clase `StructField` representa una columna del esquema. Para crear un esquema, primero creamos una lista de objetos `StructField` y luego pasamos la lista al constructor de `StructType`.

El constructor de `StructField` toma tres argumentos: el nombre de la columna, el tipo de datos de la columna y un booleano que indica si la columna puede tener valores nulos. Los tipos de datos soportados por Spark son: `StringType`, `IntegerType`, `LongType`, `FloatType`, `DoubleType`, `BooleanType`, `DateType`, `TimestampType`, `BinaryType`, `DecimalType`, `ArrayType`, `MapType`, `StructType` y `NullType`. Para mas informacion, ver la documentacion de [Spark SQL Types](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.types).

In [None]:
""" YOUR CODE HERE """

### Preguntas
1. ¿Por que la diferencia de tiempo de ejecucion entre los metodos que utilizan la inferencia de datos y los que no? Explique.
2. ¿Cual es el metodo mas eficiente para leer el archivo en el cual especificamos manualmente esquema? Explique.

### Queries - Simples

1. ¿Cuantos aeropuertos de destino diferentes hay? ¿Cuantos aeropuertos de origen diferentes hay? Comentar si hay algun aeropuerto que sea de origen y destino al mismo tiempo.
2. ¿Cuantas aerolineas diferentes hay?
3. ¿Cuantos vuelos existen que fueron agendados para salir despues de las 19:00?

In [None]:
""" YOUR CODE HERE """

### Queries - Estadisticas sobre el volumen de vuelos.

1. ¿Cuantos vuelos hay en cada mes del año?
2. ¿Hay alguna relacion entre el dia de la semana y el volumen de vuelos? Comenta y justifica tu respuesta.
3. ¿Cuantos vuelos salen en cada hora del dia? Considera la hora de salida agendada. Analiza los resultados, hay algun resultado que llame tu atencion? Comenta.
4. ¿Cuales son los 10 aeropuertos de origen con mayor volumen de vuelos? ( Esto depende del numero de vuelos que salen Y llegan a ese aeropuerto)
5. ¿Cuales son las 10 aerolineas con mayor volumen de vuelos?

In [None]:
""" YOUR CODE HERE """

### Queries - Estadisticas sobre los vuelos atrasados

1. ¿Cual es el porcentaje de vuelos retrasados (sobre el total de vuelos) hay por cada hora del dia? Considera la hora de salida agendada.
2. ¿Que horas del dia tienen son las mas propensas a tener los retrasos mas largos? Comenta.
3. ¿Cual es el porcentaje de vuelos retrasados que salen de uno de los 10 aeropuertos de origen con mayor volumen de vuelos (vuelos de entrada y salida)? Comenta.

In [None]:
""" YOUR CODE HERE """