In [1]:
from pyspark import SparkContext

In [None]:
# download our dataset tiny-shakespeare.txt

In [6]:
!mkdir datasets
!wget -P datasets/ https://raw.githubusercontent.com/jcjohnson/torch-rnn/master/data/tiny-shakespeare.txt

mkdir: cannot create directory ‘datasets’: File exists
--2022-12-14 13:09:58--  https://raw.githubusercontent.com/jcjohnson/torch-rnn/master/data/tiny-shakespeare.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.111.133, 185.199.108.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.111.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1115394 (1.1M) [text/plain]
Saving to: ‘datasets/tiny-shakespeare.txt’


2022-12-14 13:10:00 (1.10 MB/s) - ‘datasets/tiny-shakespeare.txt’ saved [1115394/1115394]



# PySpark

Como se discutió en las clases, cada aplicación Spark tiene un controlador(driver) Spark. Es el programa que declara las transformaciones y acciones en los RDD de datos y envía dichas solicitudes al administrador del clúster. En realidad, el controlador es el programa que crea SparkContext, conectándose a un administrador de clúster determinado, como Spark Master, YARN u otros. Los ejecutores ejecutan código de usuario, ejecutan cálculos y pueden almacenar datos en caché para su aplicación. SparkContext creará un trabajo que se dividirá en etapas. Las etapas se dividen en tareas programadas por SparkContext en un ejecutor.

Al iniciar PySpark con el comando pyspark o usar un cuaderno bien configurado (como este), SparkContext se crea automáticamente en la variable sc.

In [2]:
sc = SparkContext()

In [3]:
sc

In [5]:
input_file = sc.textFile("datasets/tiny-shakespeare.txt")
num_lines = input_file.count()
print(f"there are {num_lines} lines in the file")

NameError: name 'sc' is not defined

## Working with the Apache Spark Web UI

La interfaz de usuario web de Apache Spark se puede utilizar para diseccionar la vida de la ejecución de un trabajo.

* Verá información sobre las máquinas de trabajo de su clúster (cantidad de núcleos, memoria disponible, ...) y una lista de aplicaciones. Verá una aplicación en ejecución, correspondiente al shell pyspark conectado a su Jupyter Notebook. Haga clic en su ID de aplicación para inspeccionar todos los trabajos que ha ejecutado esta aplicación.
* Llegará a una página general con un resumen de su aplicación de shell pyspark.
Siga la interfaz de usuario de detalles de la aplicación.

Si es la primera vez que ejecuta la celda Notebook con el trabajo simple de "recuento de líneas", verá un solo elemento en la lista de trabajos. Preste atención al nombre del trabajo: corresponde al nombre de la Acción que disparó el trabajo, es decir, la acción de conteo. En este nivel, solo ve un resumen de trabajo aproximado: su tiempo de envío, su duración, etc.

* Haga clic en la descripción del trabajo. Llegará a una página con una gran cantidad de detalles sobre su trabajo, comenzando con sus etapas (recuerde, los trabajos están hechos de etapas, que a su vez están hechas de tareas).
* Expanda las secciones sobre la línea de tiempo del evento y la visualización de DAG.
* A continuación, haga clic en la etapa del trabajo con el mismo nombre que el recuento de acciones que especificamos en nuestro código

### Preguntas
Usando el Web UI de Spark, responda las siguientes preguntas:

* ¿Cuántas tareas (tasks) se ejecutaron en esta etapa (stage)?
* ¿Cuántos recursos se utilizaron para ejecutar esta etapa?
* ¿Cuánto tiempo tomó esta etapa?
* ¿Cuánto tiempo tomó la acción de conteo?

## Ejercicios

### Ejercicio 1: Contar palabras


En el siguiente ejemplo, estamos interesados en las 10 palabras principales en términos de frecuencia de aparición. Para hacerlo, usamos un pequeño archivo de texto como entrada y deseamos trazar la frecuencia de términos de esas 10 palabras principales usando Matplotlib.

Primero, usando el método textFile de SparkContext sc, creamos un RDD de strings. Cada string en el RDD es representativa de una línea en el archivo de texto. De forma vaga, podemos pensar que el primer RDD es un RDD de líneas de texto.

Debido a que trabajamos en el alcance de las palabras, tenemos que transformar una línea del RDD actual en múltiples palabras, cada palabra es un objeto del nuevo RDD. Esto se hace usando la función flatMap.

Luego, una función de mapa transforma cada palabra en el RDD en una sola tupla con 2 componentes: la palabra en sí y la cuenta de 1. Como habrás adivinado, este es un PairRDD, donde cada objeto es un par key-value.

Podemos aprovechar la función **reduceByKey** para sumar todas las frecuencias de la misma palabra. Ahora, cada elemento en el RDD tiene la forma de: (palabra, frecuencia_total). Para ordenar las palabras por frecuencia de ocurrencia, podemos usar muchos enfoques. Uno de las maneras más simples es intercambiar cada tupla de modo que la frecuencia se convierta en el key y usar la función sortByKey.

In [12]:
words = sc.textFile("datasets/tiny-shakespeare.txt").repartition(8)\
            .flatMap(lambda line: line.split(" "))\
            .map(lambda word: (word, 1))\
            .reduceByKey(lambda a, b: a + b)\
            .map(lambda x: (x[1], x[0]))\
            .sortByKey(False)

#### Preguntas

* ¿Cuál es el tamaño del archivo de entrada?
* ¿En cuántos bloques se divide el archivo de entrada? ¿Cuál es el tamaño del bloque?
* ¿Cuántas tareas(tasks) se ejecutan en paralelo?
* ¿Cuántos trabajos (jobs) se iniciaron tras la ejecución del código anterior? ¿Por qué (tenga en cuenta que no hay acciones!)?
* ¿Qué significa una etapa "skipped"?
* ¿Cuál es el número de bytes shuffled? ¿Cómo se compara esto con el número de bytes de entrada?
* ¿Cree que Spark está haciendo un buen trabajo al equilibrar la carga entre los trabajadores? ¿Qué puede salir mal con el equilibrio de carga?

Ahora tomamos las 10 palabras principales. Para hacerlo, usamos la función take. Esta función es una acción, por lo que inicia un trabajo. El trabajo se ejecuta en paralelo en el clúster. La función take devuelve una lista de tuplas (frecuencia, palabra) en el driver. El driver es la máquina que ejecuta el código Python. En nuestro caso, el driver es el nodo maestro del clúster.

In [13]:
top10 = words.take(10)
print(top10)

[(7241, ''), (5437, 'the'), (4403, 'I'), (3923, 'to'), (3678, 'and'), (3275, 'of'), (2677, 'my'), (2610, 'a'), (2130, 'you'), (2073, 'in')]


**ATENCIÓN**: La accion de colectar puede ser problemática. De hecho, un RDD puede tener un tamaño muy grande (¡es por eso que se distribuyen en varias máquinas en primer lugar!) y, por lo tanto, ¡podría agotar la RAM disponible en la máquina que ejecuta el controlador!. Es por eso que es mejor usar la función take, que devuelve una lista de tamaño fijo.

### Ejercicio 2: Flights

Tenemos un archivo CSV con datos de vuelos. El archivo contiene 29 columnas, que son:

* **Year**: 1987-2008
* **Month**: 1-12
* **DayofMonth**: 1-31
* **DayOfWeek**: 1 (Monday) - 7 (Sunday)
* **DepTime**: actual departure time (local, hhmm)
* **CRSDepTime**: scheduled departure time (local, hhmm)
* **ArrTime**: actual arrival time (local, hhmm)
* **CRSArrTime**: scheduled arrival time (local, hhmm)
* **UniqueCarrier**: unique carrier code
* **FlightNum**: flight number
* **TailNum**: plane tail number
* **ActualElapsedTime**: in minutes
* **CRSElapsedTime**: in minutes
* **AirTime**: in minutes
* **ArrDelay**: arrival delay, in minutes
* **DepDelay**: departure delay, in minutes
* **Origin**: origin IATA airport code
* **Dest**: destination IATA airport code
* **Distance**: in miles
* **TaxiIn**: taxi in time, in minutes
* **TaxiOut**: taxi out time in minutes
* **Cancelled**: was the flight cancelled? (1 = yes)
* **CancellationCode**: reason for cancellation (A = carrier, B = weather, C = NAS, D = security)
* **Diverted**: 1 = yes, 0 = no
* **CarrierDelay**: in minutes
* **WeatherDelay**: in minutes
* **NASDelay**: in minutes
* **SecurityDelay**: in minutes
* **LateAircraftDelay**: in minutes


#### Ejercicio 2.1

En este ejercicio, estamos interesados solo en las siguientes columnas:

* **CRSDepTime**: scheduled departure time (local, hhmm)
* **UniqueCarrier**: unique carrier code

Asuma que un night flight es un vuelo que sale después de las 19:00.

Responda las siguientes preguntas:

* ¿Cuántos vuelos nocturnos hay en total?
* ¿Cuántos vuelos nocturnos hay por cada aerolínea? Muestre los 5 primeros resultados en terminos de volumen de vuelos.

In [None]:
""" YOUR CODE HERE """

In [36]:
# how many night flights are there in total?
data = sc.textFile("datasets/airline/1987.csv")
header = data.first()
data = data.filter(lambda line: line != header)
total_flights = data.count()
print("Total number of flights: {}".format(total_flights))

Total number of flights: 1311826


In [4]:
# SOLUTION
data = sc.textFile("datasets/airline/1987.csv")

def extract_CRSDepTime_Carier(line):
    cols = line.split(",")
    return (int(cols[5]), cols[8])


header = data.first()
data_without_header = data.filter(lambda line: line != header)

# Pregunta 1: ¿Cuántos vuelos nocturnos hay en total?
mapped_data = data_without_header.map(extract_CRSDepTime_Carier).cache()

night_flights = mapped_data.filter(lambda x: 1900 < x[0] < 2359)


print("Total number of night flights: {}".format(night_flights.count()))

# Pregunta 2: ¿Cuántos vuelos nocturnos hay por cada aerolínea? Muestre los 5 primeros resultados en terminos de volumen de vuelos.
night_flights_per_carrier = night_flights.map(lambda x: (x[1], 1)).reduceByKey(lambda a, b: a + b)
print(f"Top 5 carriers with the most night flights: {night_flights_per_carrier.takeOrdered(5, key=lambda x: -x[1])}")

Total number of night flights: 208331
Top 5 carriers with the most night flights: [('DL', 32972), ('AA', 25676), ('UA', 21954), ('EA', 20403), ('PI', 19200)]


In [15]:
# TEST
# show plane tail numbers
data = sc.textFile("datasets/airline/2008.csv")
def extract_TailNum(line):
    cols = line.split(",")
    return cols[10]
header = data.first()
data_without_header = data.filter(lambda line: line != header)
data_without_header.map(extract_TailNum).distinct().take(20)

['N240WN',
 'N215WN',
 'N792SW',
 '',
 'N631SW',
 'N341SW',
 'N689SW',
 'N295WN',
 'N272WN',
 'N461WN',
 'N690SW',
 'N476WN',
 'N707SA',
 'N346SW',
 'N399WN',
 'N387SW',
 'N685SW',
 'N663SW',
 'N238WN',
 'N309SW']

#### Ejercicio 2.2

En este ejercicio, considerando las siguientes columnas:

* **UniqueCarrier**: unique carrier code
* **DepDelay**: departure delay, in minutes

Nombre de la aerolinea ordenar ascendente, mostrar 5

Se les pide responder las siguientes preguntas:
* Calcular el retraso promedio por aerolínea.
* Calcular el retraso promedio por aerolínea y por mes.
* Calcular el retraso promedio por aerolínea y por día de la semana.

In [None]:
""" YOUR CODE HERE """

In [41]:
# SOLUTION
data = sc.textFile("datasets/airline/1987.csv")

def extract_Carrier_Delay(line):
    cols = line.split(",")
    if cols[15] == "NA":
        return (cols[8], 0)
    else:
        return (cols[8], int(cols[15]))

header = data.first()
data_without_header = data.filter(lambda line: line != header)

# Pregunta 1: Calcular el retraso promedio por aerolínea.
carrier_delay = data_without_header.map(extract_Carrier_Delay).cache()

# filter out the flights with no delay
carrier_delay = carrier_delay.filter(lambda x: x[1] > 0)
carrier_delay_avg = carrier_delay.mapValues(lambda x: (x, 1)).reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1])).mapValues(lambda x: x[0] / x[1])
print(f"Average delay per carrier: {carrier_delay_avg.take(5)}")

# Pregunta 2: Calcular el retraso promedio por aerolínea y por mes.
def extract_Carrier_Month_Delay(line):
    cols = line.split(",")
    if cols[15] == "NA":
        return ((cols[8], int(cols[1])), 0)
    else:
        return ((cols[8], int(cols[1])), int(cols[15]))

carrier_month_delay = data_without_header.map(extract_Carrier_Month_Delay).cache()

# filter out the flights with no delay // order by Nombre Aerolinea y take 20
carrier_month_delay = carrier_month_delay.filter(lambda x: x[1] > 0)
carrier_month_delay_avg = carrier_month_delay.mapValues(lambda x: (x, 1)).reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1])).mapValues(lambda x: x[0] / x[1]).sortByKey(ascending=True)
print(f"Average delay per carrier and month: {carrier_month_delay_avg.take(20)}")

Average delay per carrier: [('UA', 21.0218020806202), ('PA (1)', 23.875240980446158), ('AS', 24.995272186951237), ('EA', 35.032679013714834), ('PI', 12.83028809963731)]
Average delay per carrier and month: [(('UA', 10), 15.720145762544936), (('PA (1)', 10), 20.55041518386714), (('AS', 10), 25.47847533632287), (('PS', 11), 19.033396430502975), (('NW', 11), 18.80029207740051), (('DL', 11), 16.757431883314084), (('TW', 12), 22.16971279373368), (('WN', 12), 24.30888464562507), (('HP', 12), 20.068478829064297), (('CO', 12), 46.382716049382715), (('AA', 12), 15.6775821942505), (('EA', 10), 29.092371192712783), (('PI', 10), 8.272955808414924), (('US', 10), 10.594040164111423), (('TW', 11), 16.13349434625096), (('WN', 11), 21.299598809369744), (('HP', 11), 16.095202815336172), (('CO', 11), 29.26831385642738), (('AA', 11), 10.898827148625264), (('PS', 12), 27.18508565670138)]


#### Ejercicio 2.3

En este ejercicio, considerando las siguientes columnas:

* **UniqueCarrier**: unique carrier code, Columna 8
* **Cancelled**: was the flight cancelled? (1 = yes), Columna 21
* **CancelationCode**: reason for cancellation (A = carrier, B = weather, C = NAS, D = security), Columna 22

Se les pide responder las siguientes preguntas:

* ¿Cuantos vuelos fueron cancelados en 1987? y en 2008?

Responder las preguntas siguientes utilizando el año 2008:

* ¿Cuál es la razón más común de cancelación de vuelos?
* Por cada aerolínea, ¿cuál es la razón más común de cancelación de vuelos? Muestre los 5 primeros resultados en terminos de volumen de vuelos.
* ¿Cuantos vuelos fueron cancelados por cada aerolínea, por razones de mal tiempo? Muestre los 5 primeros resultados en terminos de volumen de vuelos.



In [None]:
""" YOUR CODE HERE """

In [62]:
# SOLUTION

# Pregunta 1: ¿Cuantos vuelos fueron cancelados en 1987? y en 2008?
data_1987 = sc.textFile("datasets/airline/1987.csv")
data_2008 = sc.textFile("datasets/airline/2008.csv")

def extract_Carrier_Cancelled_CancelCode(line):
    cols = line.split(",")
    return (cols[8], (int(cols[21]), cols[22]))

def clear_header(data):
    header = data.first()
    data_without_header = data.filter(lambda line: line != header)
    return data_without_header

def filter_cancelled(data):
    return data.filter(lambda x: x[1][0] == 1)

data_without_header = clear_header(data_1987)
cancelled_flights_1987 = filter_cancelled(data_without_header.map(extract_Carrier_Cancelled_CancelCode))
print(f"Total number of cancelled flights in 1987: {cancelled_flights_1987.count()}")

data_without_header = clear_header(data_2008)
cancelled_flights_2008 = filter_cancelled(data_without_header.map(extract_Carrier_Cancelled_CancelCode))
print(f"Total number of cancelled flights in 2008: {cancelled_flights_2008.count()}")

# Pregunta 2: ¿Cuál es la razón más común de cancelación de vuelos en 2008?
cancelled_flights_2008_reduced = cancelled_flights_2008.map(lambda x: (x[1][1], 1)).reduceByKey(lambda a, b: a + b)
print(f"Most common reason for cancellation in 2008: {cancelled_flights_2008_reduced.takeOrdered(1, key=lambda x: -x[1])}")

# Pregunta 3: Por cada aerolínea, ¿cuál es la razón más común de cancelación de vuelos? Muestre los 5 primeros resultados en terminos de volumen de vuelos.
cancelled_flights_2008_reduced = cancelled_flights_2008.map(lambda x: ((x[0], x[1][1]), 1)).reduceByKey(lambda a, b: a + b)
cancelled_flights_2008_reduced = cancelled_flights_2008_reduced.map(lambda x: (x[0][0], (x[0][1], x[1]))).groupByKey()
cancelled_flights_2008_reduced = cancelled_flights_2008_reduced.mapValues(lambda x: sorted(x, key=lambda y: -y[1])[0])
print(f"Most common reason for cancellation per carrier in 2008: {cancelled_flights_2008_reduced.take(5)}")

# Pregunta 4: ¿Cuantos vuelos fueron cancelados por cada aerolínea, por razones de mal tiempo? Muestre los 5 primeros resultados en terminos de volumen de vuelos.
cancelled_flights_2008_reduced = cancelled_flights_2008.filter(lambda x: x[1][1] == "B").map(lambda x: (x[0], 1)).reduceByKey(lambda a, b: a + b)
print(f"Number of flights cancelled per carrier due to bad weather in 2008: {cancelled_flights_2008_reduced.takeOrdered(5, key=lambda x: -x[1])}")

Total number of cancelled flights in 1987: 19685
Total number of cancelled flights in 2008: 64442
Most common reason for cancellation in 2008: [('A', 26075)]
Most common reason for cancellation per carrier in 2008: [('YV', ('A', 2946)), ('NW', ('A', 503)), ('XE', ('B', 2271)), ('HA', ('A', 112)), ('AS', ('A', 575))]
Number of flights cancelled per carrier due to bad weather in 2008: [('MQ', 4490), ('AA', 3274), ('OO', 2397), ('XE', 2271), ('OH', 2020)]
