In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, DateType
from pyspark.sql import functions as F
from pyspark.sql.functions import mean
from pyspark.sql.functions import to_timestamp
import datetime

spark = (SparkSession.builder
        .appName("MySparkApp")
        .enableHiveSupport()
        .getOrCreate())

In [13]:
csvVuelosSchema = StructType([
    StructField("fecha", StringType(), True),
    StructField("horaUTC", StringType(), False),
    StructField("clase_de_vuelo", StringType(), True),
    StructField("clasificacion_de_vuelo", StringType(), True),
    StructField("tipo_de_movimiento", StringType(), True),
    StructField("aeropuerto", StringType(), True),
    StructField("origen_destino", StringType(), True),
    StructField("aerolinea_nombre", StringType(), True),
    StructField("aeronave", StringType(), True),
    StructField("pasajeros", IntegerType(), True)])

csvAeropuertosSchema = StructType([
    StructField("local", StringType(), True),
    StructField("oac", StringType(), True),
    StructField("iata", StringType(), True),
    StructField("tipo", StringType(), True),
    StructField("denominacion", StringType(), True),
    StructField("coordenadas", StringType(), True),
    StructField("latitud", StringType(), True),
    StructField("longitud", StringType(), True),
    StructField("elev", FloatType(), True),
    StructField("uom_elev", StringType(), True),
    StructField("ref", StringType(), True),
    StructField("distancia_ref", FloatType(), True),
    StructField("direccion_ref", StringType(), True),
    StructField("condicion", StringType(), True),
    StructField("control", StringType(), True),
    StructField("region", StringType(), True),
    StructField("uso", StringType(), True),
    StructField("trafico", StringType(), True),
    StructField("sna", StringType(), True),
    StructField("concesionado", StringType(), True),
    StructField("provincia", StringType(), True)])

# Load the datasets

In [12]:
vuelos_2021 = (spark.read.option('header', 'true')
                         .option('delimiter', ';')
                         .csv('hdfs://172.17.0.2:9000/ingest/vuelos/2021-informe-ministerio.csv'))
vuelos_2022 = (spark.read.option('header', 'true')
                         .option('delimiter', ';')
                         .csv('hdfs://172.17.0.2:9000/ingest/vuelos/202206-informe-ministerio.csv'))
aeropuertos_detalle = (spark.read.option('header', 'true')
                            .option('delimiter', ';')
                            .csv('hdfs://172.17.0.2:9000/ingest/vuelos/aeropuertos_detalle.csv'))

vuelos_2021 = vuelos_2021.withColumn('fecha', F.to_date(F.col('fecha'), 'dd/MM/yyyy'))
vuelos_2022 = vuelos_2022.withColumn('fecha', F.to_date(F.col('fecha'), 'dd/MM/yyyy'))


# Union the two tables
vuelos = vuelos_2022.unionByName(vuelos_2021)



## Rename the columns

In [18]:
vuelos = (vuelos.withColumnRenamed('Hora UTC', 'horaUTC')
                .withColumnRenamed('Clase de Vuelo (todos los vuelos)', 'clase_de_vuelo')
                .withColumnRenamed('Clasificación Vuelo', 'clasificacion_de_vuelo')
                .withColumnRenamed('Tipo de Movimiento', 'tipo_de_movimiento')
                .withColumnRenamed('Aeropuerto', 'aeropuerto')
                .withColumnRenamed('Origen / Destino', 'origen_destino')
                .withColumnRenamed('Aerolinea Nombre', 'aerolinea_nombre')
                .withColumnRenamed('Aeronave', 'aeronave')
                .withColumnRenamed('Calidad dato', 'calidad_dato'))

## Cast some columns to the correct types

In [19]:
vuelos = (vuelos.withColumn("fecha", F.to_date(F.col("fecha"), "dd/MM/yyyy"))
                .withColumn('pasajeros', F.col("pasajeros").cast(IntegerType())))

aeropuertos_detalle = (aeropuertos_detalle.withColumn('elev', F.col('elev').cast(FloatType()))
                                          .withColumn('distancia_ref', F.col('distancia_ref')
                                                      .cast(FloatType())))


In [6]:
aeropuertos_detalle.printSchema()

root
 |-- local: string (nullable = true)
 |-- oaci: string (nullable = true)
 |-- iata: string (nullable = true)
 |-- tipo: string (nullable = true)
 |-- denominacion: string (nullable = true)
 |-- coordenadas: string (nullable = true)
 |-- latitud: string (nullable = true)
 |-- longitud: string (nullable = true)
 |-- elev: float (nullable = true)
 |-- uom_elev: string (nullable = true)
 |-- ref: string (nullable = true)
 |-- distancia_ref: float (nullable = true)
 |-- direccion_ref: string (nullable = true)
 |-- condicion: string (nullable = true)
 |-- control: string (nullable = true)
 |-- region: string (nullable = true)
 |-- fir: string (nullable = true)
 |-- uso: string (nullable = true)
 |-- trafico: string (nullable = true)
 |-- sna: string (nullable = true)
 |-- concesionado: string (nullable = true)
 |-- provincia: string (nullable = true)
 |-- inhab: string (nullable = true)



# Problema 4
- Elminamos las columnas **inhab** y **fir**
- Eliminamos la columna **fir**
- Eliminamos la columna **calidad_dato**
- Filtramos los vuelos nacionales
- En el campo pasajeros: Null -> 0
- En el campo distancia: Null -> 0

In [7]:
aeropuertos_detalle = aeropuertos_detalle.drop('inhab', 'fir')
aeropuertos_detalle = aeropuertos_detalle.fillna(0, subset=['distancia_ref'])
vuelos = vuelos.drop('calidad_dato')
vuelos = vuelos.filter(vuelos.clasificacion_de_vuelo == 'Domestico')
vuelos = vuelos.drop('clasificacion_de_vuelo')
vuelos = vuelos.fillna(0, subset=['pasajeros'])

# Problema 5

## Check that the vuelos table has the correct schema

In [20]:
print ("Schema de la tabla de vuelos")
vuelos.printSchema()

Schema de la tabla de vuelos
root
 |-- fecha: date (nullable = true)
 |-- horaUTC: string (nullable = true)
 |-- clase_de_vuelo: string (nullable = true)
 |-- clasificacion_de_vuelo: string (nullable = true)
 |-- tipo_de_movimiento: string (nullable = true)
 |-- aeropuerto: string (nullable = true)
 |-- origen_destino: string (nullable = true)
 |-- aerolinea_nombre: string (nullable = true)
 |-- aeronave: string (nullable = true)
 |-- pasajeros: integer (nullable = true)
 |-- calidad_dato: string (nullable = true)



## Check that the aeropuerto_detalle has the correct schema

In [8]:
print ("Schema de la tabla aeropuertos_detalles")
aeropuertos_detalle.printSchema()

Schema de la tabla aeropuertos_detalles
root
 |-- local: string (nullable = true)
 |-- oaci: string (nullable = true)
 |-- iata: string (nullable = true)
 |-- tipo: string (nullable = true)
 |-- denominacion: string (nullable = true)
 |-- coordenadas: string (nullable = true)
 |-- latitud: string (nullable = true)
 |-- longitud: string (nullable = true)
 |-- elev: float (nullable = true)
 |-- uom_elev: string (nullable = true)
 |-- ref: string (nullable = true)
 |-- distancia_ref: float (nullable = false)
 |-- direccion_ref: string (nullable = true)
 |-- condicion: string (nullable = true)
 |-- control: string (nullable = true)
 |-- region: string (nullable = true)
 |-- uso: string (nullable = true)
 |-- trafico: string (nullable = true)
 |-- sna: string (nullable = true)
 |-- concesionado: string (nullable = true)
 |-- provincia: string (nullable = true)



## Query 6
Determinar la cantidad de vuelos entre las fechas 01/12/2021 y 31/01/2022. Mostrar
consulta y Resultado de la query

In [21]:
numero_vuelos = vuelos.filter((vuelos.fecha >= '2021-01-01') & (vuelos.fecha <= '2022-01-31')).count()
print ("Número de vuelos: ", numero_vuelos)



Número de vuelos:  358210


                                                                                

## Query 7
Cantidad de pasajeros que viajaron en Aerolíneas Argentinas entre el 01/01/2021 y
30/06/2022. Mostrar consulta y Resultado de la query.

In [22]:
pasajeros = (vuelos.filter((vuelos.fecha >= '2021-01-01') & 
                           (vuelos.fecha <= '2022-01-30') &
                           (vuelos.aerolinea_nombre == 'AEROLINEAS ARGENTINAS SA'))).count()
             

print ("Número de pasajeros: ", pasajeros)



Número de pasajeros:  92614


                                                                                

## Query 8

In [81]:
aeropuerto_salida = (aeropuertos_detalle.select('local', 'ref')
                                        .withColumnRenamed('local', 'aeropuerto')
                                        .withColumnRenamed('ref', 'ciudad_salida'))
aeropuerto_llegada = (aeropuertos_detalle.select('local', 'ref')
                                        .withColumnRenamed('local', 'origen_destino')
                                        .withColumnRenamed('ref', 'ciudad_llegada'))


vuelos_salida = vuelos.join(aeropuerto_salida, on='aeropuerto')
vuelos_salida = vuelos_salida.filter(vuelos_salida.tipo_de_movimiento == 'Despegue')
vuelos_salida = vuelos_salida.join(aeropuerto_llegada, on='origen_destino')

origen_destino = ((vuelos_salida.filter((vuelos_salida.fecha >= '2022-01-01') &
                                        (vuelos_salida.fecha <= '2022-06-30'))
                                .select('fecha', 
                                        'horaUTC', 
                                        'aeropuerto', 
                                        'ciudad_salida', 
                                        'origen_destino', 
                                        'ciudad_llegada', 
                                        'pasajeros'))
                                 .orderBy('pasajeros', ascending=False))   
                          
origen_destino.show()

+----------+-------+----------+--------------------+--------------+--------------------+---------+
|     fecha|horaUTC|aeropuerto|       ciudad_salida|origen_destino|      ciudad_llegada|pasajeros|
+----------+-------+----------+--------------------+--------------+--------------------+---------+
|2022-01-29|  14:11|       BAR|San Carlos de Bar...|           EZE|     Capital Federal|      141|
|2022-02-19|  22:19|       BAR|San Carlos de Bar...|           EZE|     Capital Federal|      141|
|2022-02-15|  14:06|       BAR|San Carlos de Bar...|           EZE|     Capital Federal|      141|
|2022-03-01|  13:52|       BAR|San Carlos de Bar...|           EZE|     Capital Federal|      140|
|2022-02-16|  14:56|       BAR|San Carlos de Bar...|           EZE|     Capital Federal|      140|
|2022-01-31|  14:24|       BAR|San Carlos de Bar...|           EZE|     Capital Federal|      139|
|2022-03-02|  14:57|       BAR|San Carlos de Bar...|           EZE|     Capital Federal|      139|
|2022-02-2

                                                                                

## Query 9

In [26]:
(vuelos.filter((vuelos.fecha >= '2021-01-01') & 
               (vuelos.fecha <= '2022-06-30') & 
               (vuelos.aerolinea_nombre != ''))
               .groupBy('aerolinea_nombre')
               .sum('pasajeros')
               .orderBy('sum(pasajeros)', ascending=False)
               .show(10, truncate=False))



+----------------------------------------+--------------+
|aerolinea_nombre                        |sum(pasajeros)|
+----------------------------------------+--------------+
|AEROLINEAS ARGENTINAS SA                |8890118       |
|JETSMART AIRLINES S.A.                  |1570127       |
|FB LÍNEAS AÉREAS - FLYBONDI             |1553112       |
|AMERICAN AIRLINES INC.                  |526136        |
|IBERIA - LINEAS AÉREAS DE ESPAÑA        |328747        |
|COMPAÑIA PANAMEÑA DE AVIACION           |310636        |
|LAN AIRLINES S.A.                       |220076        |
|TAM - TRANSPORTES AEREOS MERIDIONAIS S/A|187647        |
|SKY AIRLINE S.A.                        |180326        |
|AVIANCA AEROVIAS NACIONALES COLOMBIA    |172783        |
+----------------------------------------+--------------+
only showing top 10 rows



                                                                                

## Query 10

In [78]:
aeropuerto_salida = (aeropuertos_detalle.filter((aeropuertos_detalle.provincia == 'BUENOS AIRES') |
                                                (aeropuertos_detalle.provincia == 'CIUDAD AUTÓNOMA DE BUENOS AIRES'))
                                        .select('local', 'provincia')
                                        .withColumnRenamed('local', 'aeropuerto'))
aeronaves = vuelos.join(aeropuerto_salida, on='aeropuerto')
aeronaves = (aeronaves.filter((aeronaves.fecha >= '2021-01-01') &
                             (aeronaves.fecha <= '2022-06-30') &
                             (aeronaves.tipo_de_movimiento == 'Despegue') &
                             (aeronaves.aeronave != '0'))
                       .groupBy('aeronave')
                       .count()
                       .orderBy('count', ascending=False))
aeronaves.show(10)



+----------------+-----+
|        aeronave|count|
+----------------+-----+
|EMB-ERJ190100IGW|13408|
|        CE-150-L| 8123|
|          CE-152| 7987|
|        CE-150-M| 6082|
|    AIB-A320-232| 5422|
|      BO-737-800| 5047|
|           LJ-60| 3540|
|     BO-B737-800| 3051|
|        CE-150-J| 3012|
|        CE-150-G| 2873|
+----------------+-----+
only showing top 10 rows



                                                                                

# Pregunta 11
Agregaría en este dataset:
- Si el aeropuerto se encuentra en un lugar turístico, y en tal caso cuáles son las épocas de temporada alta y temporada baja.
- El tiempo de llegada de la aeronave. Esto permitiría estimar el tiempo promedio del vuelo. 

In [68]:
vuelos_salida = vuelos.join(aeropuerto_salida, on='aeropuerto')
vuelos_salida = vuelos_salida.filter(vuelos_salida.tipo_de_movimiento == 'Despegue')
vuelos_salida = vuelos_salida.join(aeropuerto_llegada, on='origen_destino')

origen_destino = ((vuelos_salida.filter((vuelos_salida.fecha >= '2021-01-01') &
                                        (vuelos_salida.fecha <= '2022-06-30'))
                                .select('fecha', 
                                        'horaUTC', 
                                        'aeropuerto', 
                                        'ciudad_salida', 
                                        'origen_destino', 
                                        'ciudad_llegada', 
                                        'pasajeros'))
                                 .orderBy('pasajeros', ascending=False)) 

origen_destino = (origen_destino.withColumn('Mes', F.date_format('fecha', 'MMMM'))
                                .withColumn('Año', F.date_format('fecha')))
origen_destino = (origen_destino.filter(((origen_destino.aeropuerto == 'EZE') &
                                        (origen_destino.origen_destino == 'BAR')) |
                                        ((origen_destino.aeropuerto == 'AER') &
                                        (origen_destino.origen_destino == 'BAR'))))
#origen_destino = origen_destino.groupBy('Año', 'Mes').sum('pasajeros')
#origen_destino = (origen_destino.withColumnRenamed('year(fecha)', 'Año')
#                                .withColumnRenamed('month(fecha)', 'Mes')
 #                               .withColumnRenamed('sum(pasajeros)', 'pasajeros'))
origen_destino = (origen_destino.select('Año','Mes', 'pasajeros')
                                .groupBy('Año', 'Mes')
                                .sum('pasajeros'))
origen_destino = origen_destino.withColumnRenamed('sum(pasajeros)', 'pasajeros')                                
origen_destino.orderBy('pasajeros', ascending=False).show()

+----+---+---------+
| Año|Mes|pasajeros|
+----+---+---------+
|2022|  1|    36256|
|2021| 12|    34145|
|2022|  2|    31844|
|2021| 11|    31705|
|2022|  3|    28200|
|2021| 10|    27019|
|2021|  8|    24653|
|2021|  9|    24512|
|2021|  1|    22510|
|2021|  7|    20849|
|2021|  2|    19292|
|2021|  3|    16215|
|2022|  6|    15648|
|2022|  4|    11915|
|2022|  5|    11757|
|2021|  4|    11388|
|2021|  6|     6247|
|2021|  5|     4296|
+----+---+---------+



                                                                                

Agrupando los resultados de la Query 8 se pueden ver los aeropuertos con más flujo de pasajeros:

In [92]:
(origen_destino.groupBy('aeropuerto', 
                        'origen_destino', 
                        'ciudad_salida', 
                        'ciudad_llegada')
               .sum('pasajeros')
               .orderBy('sum(pasajeros)', ascending=False).show(truncate=False))
                      



+----------+--------------+-----------------------+-----------------------+--------------+
|aeropuerto|origen_destino|ciudad_salida          |ciudad_llegada         |sum(pasajeros)|
+----------+--------------+-----------------------+-----------------------+--------------+
|AER       |BAR           |Ciudad de Buenos Aires |San Carlos de Bariloche|106681        |
|BAR       |AER           |San Carlos de Bariloche|Ciudad de Buenos Aires |96315         |
|DOZ       |AER           |Mendoza                |Ciudad de Buenos Aires |84033         |
|AER       |DOZ           |Ciudad de Buenos Aires |Mendoza                |82724         |
|AER       |CBA           |Ciudad de Buenos Aires |Córdoba                |78724         |
|CBA       |AER           |Córdoba                |Ciudad de Buenos Aires |77857         |
|AER       |IGU           |Ciudad de Buenos Aires |Cataratas del Iguazú   |67942         |
|SAL       |AER           |Salta                  |Ciudad de Buenos Aires |66243         |

                                                                                

También podemos contar el número de vuelos

In [91]:
(origen_destino.filter(origen_destino.pasajeros != 0)
               .groupBy('aeropuerto', 
                        'origen_destino', 
                        'ciudad_salida', 
                        'ciudad_llegada')
               .count()
               .orderBy('count', ascending=False).show(10, truncate=False))
        



+----------+--------------+-----------------------+-----------------------+-----+
|aeropuerto|origen_destino|ciudad_salida          |ciudad_llegada         |count|
+----------+--------------+-----------------------+-----------------------+-----+
|AER       |BAR           |Ciudad de Buenos Aires |San Carlos de Bariloche|1372 |
|BAR       |AER           |San Carlos de Bariloche|Ciudad de Buenos Aires |1191 |
|AER       |DOZ           |Ciudad de Buenos Aires |Mendoza                |1165 |
|DOZ       |AER           |Mendoza                |Ciudad de Buenos Aires |1164 |
|AER       |CBA           |Ciudad de Buenos Aires |Córdoba                |1096 |
|CBA       |AER           |Córdoba                |Ciudad de Buenos Aires |1071 |
|AER       |IGU           |Ciudad de Buenos Aires |Cataratas del Iguazú   |895  |
|AER       |SAL           |Ciudad de Buenos Aires |Salta                  |882  |
|SAL       |AER           |Salta                  |Ciudad de Buenos Aires |874  |
|IGU       |AER 

                                                                                

In [89]:
aeropuertos_detalle.filter(aeropuertos_detalle.local == 'MOR').select('*').show()

+-----+----+----+---------+------------+--------------------+------------+------------+----+--------+-----+-------------+-------------+---------+-------+------+----+----+--------+---+------------+------------+-----+
|local|oaci|iata|     tipo|denominacion|         coordenadas|     latitud|    longitud|elev|uom_elev|  ref|distancia_ref|direccion_ref|condicion|control|region| fir| uso| trafico|sna|concesionado|   provincia|inhab|
+-----+----+----+---------+------------+--------------------+------------+------------+----+--------+-----+-------------+-------------+---------+-------+------+----+----+--------+---+------------+------------+-----+
|  MOR|SADM|null|Aeródromo|       MORÓN|"34°40'45""S  58°...|-58.64361111|-34.67916667|29.0|  Metros|Morón|          3.0|           SO|  PUBLICO|CONTROL|  RACE|SAEF|null|Nacional| NO|          NO|BUENOS AIRES|   NO|
+-----+----+----+---------+------------+--------------------+------------+------------+----+--------+-----+-------------+-------------+-

In [90]:
vuelos.filter(vuelos.aeropuerto == vuelos.origen_destino).show()

+----------+-------+--------------------+----------------------+------------------+----------+--------------+----------------+----------------+---------+------------+
|     fecha|horaUTC|      clase_de_vuelo|clasificacion_de_vuelo|tipo_de_movimiento|aeropuerto|origen_destino|aerolinea_nombre|        aeronave|pasajeros|calidad_dato|
+----------+-------+--------------------+----------------------+------------------+----------+--------------+----------------+----------------+---------+------------+
|2022-01-01|  23:44|Vuelo Privado con...|             Doméstico|          Despegue|       FDO|           FDO|               0|     EMB-EMB-500|        0|  DEFINITIVO|
|2022-01-02|  09:05|Vuelo Privado con...|             Doméstico|          Despegue|       IGU|           IGU|               0|    AIR-AT-502-B|        0|  DEFINITIVO|
|2022-01-02|  09:47|          No Regular|             Doméstico|        Aterrizaje|       IGU|           IGU|               0|    AIR-AT-502-B|        0|  DEFINITIVO

# Problema 12
El proyecto es interesante porque estudia cuáles aeronaves son las que más volaron, cuántos pasajeros volaron, ciudades de partidas y aterrizajes entre fechas determinadas entre otras cuestiones.

In [29]:
vuelos.select(F.sum(vuelos.pasajeros)).show()

+--------------+
|sum(pasajeros)|
+--------------+
|      15780468|
+--------------+



                                                                                