# Ejercicio Flights

Importaremos y trataremos los datos obtenidos de un Dataset: 
https://www.kaggle.com/datasets/usdot/flight-delays?select=flights.csv

Pondremos en práctica lo estudiado de Apache Spark.


In [1]:
import findspark

In [2]:
#Creamos nuestra sesión de spark.
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("Spark Flight App").getOrCreate()
spark

df_flight=spark \
    .read \
    .option("inferSchema","true") \  Queremos que spark adivine cual es nuestro squema de datos e identifique el tipo de datos
    .option("header","true") \ # queremos que la primera fila sea el encabezado.
    .csv("flights.csv")

In [3]:
df_flight=spark.read.option("inferSchema","true").option("header","true").csv("flights.csv")

In [4]:
df_flight

DataFrame[YEAR: int, MONTH: int, DAY: int, DAY_OF_WEEK: int, AIRLINE: string, FLIGHT_NUMBER: int, TAIL_NUMBER: string, ORIGIN_AIRPORT: string, DESTINATION_AIRPORT: string, SCHEDULED_DEPARTURE: int, DEPARTURE_TIME: int, DEPARTURE_DELAY: int, TAXI_OUT: int, WHEELS_OFF: int, SCHEDULED_TIME: int, ELAPSED_TIME: int, AIR_TIME: int, DISTANCE: int, WHEELS_ON: int, TAXI_IN: int, SCHEDULED_ARRIVAL: int, ARRIVAL_TIME: int, ARRIVAL_DELAY: int, DIVERTED: int, CANCELLED: int, CANCELLATION_REASON: string, AIR_SYSTEM_DELAY: int, SECURITY_DELAY: int, AIRLINE_DELAY: int, LATE_AIRCRAFT_DELAY: int, WEATHER_DELAY: int]

In [5]:
#Obtenemos el numero de filas 
df_flight.count()

5819079

In [6]:
#Observamos cuantas particiones va a utilizar:
df_flight.rdd.getNumPartitions()

8

In [7]:
df_flight.schema

StructType([StructField('YEAR', IntegerType(), True), StructField('MONTH', IntegerType(), True), StructField('DAY', IntegerType(), True), StructField('DAY_OF_WEEK', IntegerType(), True), StructField('AIRLINE', StringType(), True), StructField('FLIGHT_NUMBER', IntegerType(), True), StructField('TAIL_NUMBER', StringType(), True), StructField('ORIGIN_AIRPORT', StringType(), True), StructField('DESTINATION_AIRPORT', StringType(), True), StructField('SCHEDULED_DEPARTURE', IntegerType(), True), StructField('DEPARTURE_TIME', IntegerType(), True), StructField('DEPARTURE_DELAY', IntegerType(), True), StructField('TAXI_OUT', IntegerType(), True), StructField('WHEELS_OFF', IntegerType(), True), StructField('SCHEDULED_TIME', IntegerType(), True), StructField('ELAPSED_TIME', IntegerType(), True), StructField('AIR_TIME', IntegerType(), True), StructField('DISTANCE', IntegerType(), True), StructField('WHEELS_ON', IntegerType(), True), StructField('TAXI_IN', IntegerType(), True), StructField('SCHEDULE

In [8]:
df_flight.take(3)

[Row(YEAR=2015, MONTH=1, DAY=1, DAY_OF_WEEK=4, AIRLINE='AS', FLIGHT_NUMBER=98, TAIL_NUMBER='N407AS', ORIGIN_AIRPORT='ANC', DESTINATION_AIRPORT='SEA', SCHEDULED_DEPARTURE=5, DEPARTURE_TIME=2354, DEPARTURE_DELAY=-11, TAXI_OUT=21, WHEELS_OFF=15, SCHEDULED_TIME=205, ELAPSED_TIME=194, AIR_TIME=169, DISTANCE=1448, WHEELS_ON=404, TAXI_IN=4, SCHEDULED_ARRIVAL=430, ARRIVAL_TIME=408, ARRIVAL_DELAY=-22, DIVERTED=0, CANCELLED=0, CANCELLATION_REASON=None, AIR_SYSTEM_DELAY=None, SECURITY_DELAY=None, AIRLINE_DELAY=None, LATE_AIRCRAFT_DELAY=None, WEATHER_DELAY=None),
 Row(YEAR=2015, MONTH=1, DAY=1, DAY_OF_WEEK=4, AIRLINE='AA', FLIGHT_NUMBER=2336, TAIL_NUMBER='N3KUAA', ORIGIN_AIRPORT='LAX', DESTINATION_AIRPORT='PBI', SCHEDULED_DEPARTURE=10, DEPARTURE_TIME=2, DEPARTURE_DELAY=-8, TAXI_OUT=12, WHEELS_OFF=14, SCHEDULED_TIME=280, ELAPSED_TIME=279, AIR_TIME=263, DISTANCE=2330, WHEELS_ON=737, TAXI_IN=4, SCHEDULED_ARRIVAL=750, ARRIVAL_TIME=741, ARRIVAL_DELAY=-9, DIVERTED=0, CANCELLED=0, CANCELLATION_REASON=Non

In [9]:
#ordenamos los datos por aerolinia por orden alfabético.
df_flight.sort("AIRLINE").take(3)

[Row(YEAR=2015, MONTH=11, DAY=17, DAY_OF_WEEK=2, AIRLINE='AA', FLIGHT_NUMBER=315, TAIL_NUMBER='N5DVAA', ORIGIN_AIRPORT='DFW', DESTINATION_AIRPORT='SAT', SCHEDULED_DEPARTURE=1225, DEPARTURE_TIME=1230, DEPARTURE_DELAY=5, TAXI_OUT=19, WHEELS_OFF=1249, SCHEDULED_TIME=74, ELAPSED_TIME=71, AIR_TIME=46, DISTANCE=247, WHEELS_ON=1335, TAXI_IN=6, SCHEDULED_ARRIVAL=1339, ARRIVAL_TIME=1341, ARRIVAL_DELAY=2, DIVERTED=0, CANCELLED=0, CANCELLATION_REASON=None, AIR_SYSTEM_DELAY=None, SECURITY_DELAY=None, AIRLINE_DELAY=None, LATE_AIRCRAFT_DELAY=None, WEATHER_DELAY=None),
 Row(YEAR=2015, MONTH=7, DAY=4, DAY_OF_WEEK=6, AIRLINE='AA', FLIGHT_NUMBER=1238, TAIL_NUMBER='N3LXAA', ORIGIN_AIRPORT='MIA', DESTINATION_AIRPORT='LAX', SCHEDULED_DEPARTURE=1950, DEPARTURE_TIME=1958, DEPARTURE_DELAY=8, TAXI_OUT=31, WHEELS_OFF=2029, SCHEDULED_TIME=327, ELAPSED_TIME=335, AIR_TIME=297, DISTANCE=2342, WHEELS_ON=2226, TAXI_IN=7, SCHEDULED_ARRIVAL=2217, ARRIVAL_TIME=2233, ARRIVAL_DELAY=16, DIVERTED=0, CANCELLED=0, CANCELLATIO

In [10]:
#Vemos cuantas particiones está usando spark:
df_flight.sort("AIRLINE").rdd.getNumPartitions()

9

In [11]:
#disminuimos el numero de particiones creadas para mejorar los tiempos de ejecución
spark.conf.set("spark.sql.shuffle.partitions","4")

In [12]:
#Vemos cuantas particiones está usando spark:
df_flight.sort("AIRLINE").rdd.getNumPartitions()

4

In [13]:
#Obtenemos el tiempo máximo que un vuelo ha pasado en el aire:
from pyspark.sql.functions import max
df_flight.select(max("AIR_TIME")).collect()

[Row(max(AIR_TIME)=690)]

In [14]:
#Obtenemos qué aeropuertos de destino tiene mayor numero de vuelos cancelados.
from pyspark.sql.functions import desc #importamos descendente
#df_flight.groupby("DESTINATION_AIRPORT").sum("CANCELLED").withColumnRenamed("sum(CANCELLED)","Cancelado").sort(desc("Cancelado")).limit(5).show()
df_flight.groupby("DESTINATION_AIRPORT") \
    .sum("CANCELLED") \
    .withColumnRenamed("sum(CANCELLED)","Cancelado") \
    .sort(desc("Cancelado"))  \
    .limit(5) \
    .show()

+-------------------+---------+
|DESTINATION_AIRPORT|Cancelado|
+-------------------+---------+
|                ORD|     9273|
|                DFW|     6749|
|                LGA|     4418|
|                EWR|     3350|
|                ATL|     2715|
+-------------------+---------+



In [15]:
df_flight.groupby("ORIGIN_AIRPORT") \
    .sum("CANCELLED") \
    .withColumnRenamed("sum(CANCELLED)","Cancelado") \
    .sort(desc("Cancelado"))  \
    .limit(5) \
    .show()

+--------------+---------+
|ORIGIN_AIRPORT|Cancelado|
+--------------+---------+
|           ORD|     8548|
|           DFW|     6254|
|           LGA|     4531|
|           EWR|     3110|
|           BOS|     2654|
+--------------+---------+



In [16]:
#Seleccionamos columnas
df_flight.select("AIRLINE").show()

+-------+
|AIRLINE|
+-------+
|     AS|
|     AA|
|     US|
|     AA|
|     AS|
|     DL|
|     NK|
|     US|
|     AA|
|     DL|
|     DL|
|     AA|
|     DL|
|     DL|
|     DL|
|     AS|
|     DL|
|     UA|
|     AS|
|     DL|
+-------+
only showing top 20 rows



In [17]:
df_flight.select("AIRLINE","ORIGIN_AIRPORT").show()

+-------+--------------+
|AIRLINE|ORIGIN_AIRPORT|
+-------+--------------+
|     AS|           ANC|
|     AA|           LAX|
|     US|           SFO|
|     AA|           LAX|
|     AS|           SEA|
|     DL|           SFO|
|     NK|           LAS|
|     US|           LAX|
|     AA|           SFO|
|     DL|           LAS|
|     DL|           DEN|
|     AA|           LAS|
|     DL|           LAX|
|     DL|           SLC|
|     DL|           SEA|
|     AS|           ANC|
|     DL|           ANC|
|     UA|           SFO|
|     AS|           ANC|
|     DL|           PDX|
+-------+--------------+
only showing top 20 rows



In [18]:
df_flight.select(df_flight["AIRLINE"], df_flight["ORIGIN_AIRPORT"]).show()

+-------+--------------+
|AIRLINE|ORIGIN_AIRPORT|
+-------+--------------+
|     AS|           ANC|
|     AA|           LAX|
|     US|           SFO|
|     AA|           LAX|
|     AS|           SEA|
|     DL|           SFO|
|     NK|           LAS|
|     US|           LAX|
|     AA|           SFO|
|     DL|           LAS|
|     DL|           DEN|
|     AA|           LAS|
|     DL|           LAX|
|     DL|           SLC|
|     DL|           SEA|
|     AS|           ANC|
|     DL|           ANC|
|     UA|           SFO|
|     AS|           ANC|
|     DL|           PDX|
+-------+--------------+
only showing top 20 rows



In [19]:
#Seleccionamos todas las columnas
df_flight.select([col for col in df_flight.columns]).show()

+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+
|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+-

In [20]:
df_flight.select("*").show()

+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+
|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+-

In [21]:
df_flight.schema

StructType([StructField('YEAR', IntegerType(), True), StructField('MONTH', IntegerType(), True), StructField('DAY', IntegerType(), True), StructField('DAY_OF_WEEK', IntegerType(), True), StructField('AIRLINE', StringType(), True), StructField('FLIGHT_NUMBER', IntegerType(), True), StructField('TAIL_NUMBER', StringType(), True), StructField('ORIGIN_AIRPORT', StringType(), True), StructField('DESTINATION_AIRPORT', StringType(), True), StructField('SCHEDULED_DEPARTURE', IntegerType(), True), StructField('DEPARTURE_TIME', IntegerType(), True), StructField('DEPARTURE_DELAY', IntegerType(), True), StructField('TAXI_OUT', IntegerType(), True), StructField('WHEELS_OFF', IntegerType(), True), StructField('SCHEDULED_TIME', IntegerType(), True), StructField('ELAPSED_TIME', IntegerType(), True), StructField('AIR_TIME', IntegerType(), True), StructField('DISTANCE', IntegerType(), True), StructField('WHEELS_ON', IntegerType(), True), StructField('TAXI_IN', IntegerType(), True), StructField('SCHEDULE

In [22]:
df_flight_prueba=df_flight

In [23]:
#Cambiamos el tipo de 'YEAR' de IntengerType() -> "String"
from pyspark.sql.functions import col
df_flight_prueba.withColumn("YEAR", col("YEAR").cast("String")).show()

+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+
|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+-

In [24]:
#Incrementamos en 1 el año. .withColumn no guarda datos!!
df_flight_prueba.withColumn("YEAR",col("YEAR") + 1).show()

+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+
|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+-

In [25]:
df_flight_prueba.schema
#Comprobamos que no se han guardado los cambios.

StructType([StructField('YEAR', IntegerType(), True), StructField('MONTH', IntegerType(), True), StructField('DAY', IntegerType(), True), StructField('DAY_OF_WEEK', IntegerType(), True), StructField('AIRLINE', StringType(), True), StructField('FLIGHT_NUMBER', IntegerType(), True), StructField('TAIL_NUMBER', StringType(), True), StructField('ORIGIN_AIRPORT', StringType(), True), StructField('DESTINATION_AIRPORT', StringType(), True), StructField('SCHEDULED_DEPARTURE', IntegerType(), True), StructField('DEPARTURE_TIME', IntegerType(), True), StructField('DEPARTURE_DELAY', IntegerType(), True), StructField('TAXI_OUT', IntegerType(), True), StructField('WHEELS_OFF', IntegerType(), True), StructField('SCHEDULED_TIME', IntegerType(), True), StructField('ELAPSED_TIME', IntegerType(), True), StructField('AIR_TIME', IntegerType(), True), StructField('DISTANCE', IntegerType(), True), StructField('WHEELS_ON', IntegerType(), True), StructField('TAXI_IN', IntegerType(), True), StructField('SCHEDULE

In [30]:
df_flight_prueba.withColumn("YEAR",col("MONTH") + 1).show()

+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+
|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+-

In [31]:
#Creamos una nueva columna
df_flight_prueba.withColumn("NuevoAño",col("YEAR") + 1).show()

+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+--------+
|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|NuevoAño|
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+-----

In [32]:
#Renombramos una columna nueva
df_flight_prueba.withColumnRenamed("YEAR", "Año").show()

+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+
| Año|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+-

In [33]:
#borramos la columna "YEAR"
df_flight_prueba.drop("YEAR")

DataFrame[MONTH: int, DAY: int, DAY_OF_WEEK: int, AIRLINE: string, FLIGHT_NUMBER: int, TAIL_NUMBER: string, ORIGIN_AIRPORT: string, DESTINATION_AIRPORT: string, SCHEDULED_DEPARTURE: int, DEPARTURE_TIME: int, DEPARTURE_DELAY: int, TAXI_OUT: int, WHEELS_OFF: int, SCHEDULED_TIME: int, ELAPSED_TIME: int, AIR_TIME: int, DISTANCE: int, WHEELS_ON: int, TAXI_IN: int, SCHEDULED_ARRIVAL: int, ARRIVAL_TIME: int, ARRIVAL_DELAY: int, DIVERTED: int, CANCELLED: int, CANCELLATION_REASON: string, AIR_SYSTEM_DELAY: int, SECURITY_DELAY: int, AIRLINE_DELAY: int, LATE_AIRCRAFT_DELAY: int, WEATHER_DELAY: int]

In [34]:
df_flight_prueba.show()
#comprobamos que ninguna de las acciones anteriores han quedado guardadas.

+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+
|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+-

In [35]:
#Filtramos de varias formas 
df_flight_prueba.filter(df_flight_prueba.ORIGIN_AIRPORT == "LAX").take(5)

[Row(YEAR=2015, MONTH=1, DAY=1, DAY_OF_WEEK=4, AIRLINE='AA', FLIGHT_NUMBER=2336, TAIL_NUMBER='N3KUAA', ORIGIN_AIRPORT='LAX', DESTINATION_AIRPORT='PBI', SCHEDULED_DEPARTURE=10, DEPARTURE_TIME=2, DEPARTURE_DELAY=-8, TAXI_OUT=12, WHEELS_OFF=14, SCHEDULED_TIME=280, ELAPSED_TIME=279, AIR_TIME=263, DISTANCE=2330, WHEELS_ON=737, TAXI_IN=4, SCHEDULED_ARRIVAL=750, ARRIVAL_TIME=741, ARRIVAL_DELAY=-9, DIVERTED=0, CANCELLED=0, CANCELLATION_REASON=None, AIR_SYSTEM_DELAY=None, SECURITY_DELAY=None, AIRLINE_DELAY=None, LATE_AIRCRAFT_DELAY=None, WEATHER_DELAY=None),
 Row(YEAR=2015, MONTH=1, DAY=1, DAY_OF_WEEK=4, AIRLINE='AA', FLIGHT_NUMBER=258, TAIL_NUMBER='N3HYAA', ORIGIN_AIRPORT='LAX', DESTINATION_AIRPORT='MIA', SCHEDULED_DEPARTURE=20, DEPARTURE_TIME=15, DEPARTURE_DELAY=-5, TAXI_OUT=15, WHEELS_OFF=30, SCHEDULED_TIME=285, ELAPSED_TIME=281, AIR_TIME=258, DISTANCE=2342, WHEELS_ON=748, TAXI_IN=8, SCHEDULED_ARRIVAL=805, ARRIVAL_TIME=756, ARRIVAL_DELAY=-9, DIVERTED=0, CANCELLED=0, CANCELLATION_REASON=None,

In [36]:
from pyspark.sql.functions import col
df_flight_prueba.filter(col("ORIGIN_AIRPORT")=="LAX").take(2)

[Row(YEAR=2015, MONTH=1, DAY=1, DAY_OF_WEEK=4, AIRLINE='AA', FLIGHT_NUMBER=2336, TAIL_NUMBER='N3KUAA', ORIGIN_AIRPORT='LAX', DESTINATION_AIRPORT='PBI', SCHEDULED_DEPARTURE=10, DEPARTURE_TIME=2, DEPARTURE_DELAY=-8, TAXI_OUT=12, WHEELS_OFF=14, SCHEDULED_TIME=280, ELAPSED_TIME=279, AIR_TIME=263, DISTANCE=2330, WHEELS_ON=737, TAXI_IN=4, SCHEDULED_ARRIVAL=750, ARRIVAL_TIME=741, ARRIVAL_DELAY=-9, DIVERTED=0, CANCELLED=0, CANCELLATION_REASON=None, AIR_SYSTEM_DELAY=None, SECURITY_DELAY=None, AIRLINE_DELAY=None, LATE_AIRCRAFT_DELAY=None, WEATHER_DELAY=None),
 Row(YEAR=2015, MONTH=1, DAY=1, DAY_OF_WEEK=4, AIRLINE='AA', FLIGHT_NUMBER=258, TAIL_NUMBER='N3HYAA', ORIGIN_AIRPORT='LAX', DESTINATION_AIRPORT='MIA', SCHEDULED_DEPARTURE=20, DEPARTURE_TIME=15, DEPARTURE_DELAY=-5, TAXI_OUT=15, WHEELS_OFF=30, SCHEDULED_TIME=285, ELAPSED_TIME=281, AIR_TIME=258, DISTANCE=2342, WHEELS_ON=748, TAXI_IN=8, SCHEDULED_ARRIVAL=805, ARRIVAL_TIME=756, ARRIVAL_DELAY=-9, DIVERTED=0, CANCELLED=0, CANCELLATION_REASON=None,

In [37]:
df_flight_prueba.filter("ORIGIN_AIRPORT =='LAX'").take(2)

[Row(YEAR=2015, MONTH=1, DAY=1, DAY_OF_WEEK=4, AIRLINE='AA', FLIGHT_NUMBER=2336, TAIL_NUMBER='N3KUAA', ORIGIN_AIRPORT='LAX', DESTINATION_AIRPORT='PBI', SCHEDULED_DEPARTURE=10, DEPARTURE_TIME=2, DEPARTURE_DELAY=-8, TAXI_OUT=12, WHEELS_OFF=14, SCHEDULED_TIME=280, ELAPSED_TIME=279, AIR_TIME=263, DISTANCE=2330, WHEELS_ON=737, TAXI_IN=4, SCHEDULED_ARRIVAL=750, ARRIVAL_TIME=741, ARRIVAL_DELAY=-9, DIVERTED=0, CANCELLED=0, CANCELLATION_REASON=None, AIR_SYSTEM_DELAY=None, SECURITY_DELAY=None, AIRLINE_DELAY=None, LATE_AIRCRAFT_DELAY=None, WEATHER_DELAY=None),
 Row(YEAR=2015, MONTH=1, DAY=1, DAY_OF_WEEK=4, AIRLINE='AA', FLIGHT_NUMBER=258, TAIL_NUMBER='N3HYAA', ORIGIN_AIRPORT='LAX', DESTINATION_AIRPORT='MIA', SCHEDULED_DEPARTURE=20, DEPARTURE_TIME=15, DEPARTURE_DELAY=-5, TAXI_OUT=15, WHEELS_OFF=30, SCHEDULED_TIME=285, ELAPSED_TIME=281, AIR_TIME=258, DISTANCE=2342, WHEELS_ON=748, TAXI_IN=8, SCHEDULED_ARRIVAL=805, ARRIVAL_TIME=756, ARRIVAL_DELAY=-9, DIVERTED=0, CANCELLED=0, CANCELLATION_REASON=None,

In [39]:
#filtramos con dos condiciones:
df_flight_prueba.filter((df_flight_prueba.ORIGIN_AIRPORT == "LAX") & (df_flight_prueba.DESTINATION_AIRPORT =="MIA")).take(5)

[Row(YEAR=2015, MONTH=1, DAY=1, DAY_OF_WEEK=4, AIRLINE='AA', FLIGHT_NUMBER=258, TAIL_NUMBER='N3HYAA', ORIGIN_AIRPORT='LAX', DESTINATION_AIRPORT='MIA', SCHEDULED_DEPARTURE=20, DEPARTURE_TIME=15, DEPARTURE_DELAY=-5, TAXI_OUT=15, WHEELS_OFF=30, SCHEDULED_TIME=285, ELAPSED_TIME=281, AIR_TIME=258, DISTANCE=2342, WHEELS_ON=748, TAXI_IN=8, SCHEDULED_ARRIVAL=805, ARRIVAL_TIME=756, ARRIVAL_DELAY=-9, DIVERTED=0, CANCELLED=0, CANCELLATION_REASON=None, AIR_SYSTEM_DELAY=None, SECURITY_DELAY=None, AIRLINE_DELAY=None, LATE_AIRCRAFT_DELAY=None, WEATHER_DELAY=None),
 Row(YEAR=2015, MONTH=1, DAY=1, DAY_OF_WEEK=4, AIRLINE='AA', FLIGHT_NUMBER=115, TAIL_NUMBER='N3CTAA', ORIGIN_AIRPORT='LAX', DESTINATION_AIRPORT='MIA', SCHEDULED_DEPARTURE=105, DEPARTURE_TIME=103, DEPARTURE_DELAY=-2, TAXI_OUT=14, WHEELS_OFF=117, SCHEDULED_TIME=286, ELAPSED_TIME=276, AIR_TIME=255, DISTANCE=2342, WHEELS_ON=832, TAXI_IN=7, SCHEDULED_ARRIVAL=851, ARRIVAL_TIME=839, ARRIVAL_DELAY=-12, DIVERTED=0, CANCELLED=0, CANCELLATION_REASON=N

In [40]:
#Número de vuelos que estan saliendo de los Ángeles:
df_flight_prueba.filter(df_flight_prueba.ORIGIN_AIRPORT == "LAX").count()

194673

In [42]:
#Filtramos por valores:
lista=["LAX","MIA","BOS"]
df_flight_prueba.filter(df_flight_prueba.ORIGIN_AIRPORT.isin(lista)).count()

371861

In [44]:
#Si queremos excluir estos elementos en nuestro filtraje:
lista=["LAX","MIA","BOS"]
df_flight_prueba.filter(df_flight_prueba.ORIGIN_AIRPORT.isin(lista) == False).count()

5447218

In [45]:
#Buscamos todos los aeropuertos de origen que empiecen con "N"
df_flight_prueba.filter(df_flight_prueba.ORIGIN_AIRPORT.startswith ("N")).count()

0

In [46]:
df_flight_prueba.filter(df_flight_prueba.ORIGIN_AIRPORT.like ("%A%")).take(2)

[Row(YEAR=2015, MONTH=1, DAY=1, DAY_OF_WEEK=4, AIRLINE='AS', FLIGHT_NUMBER=98, TAIL_NUMBER='N407AS', ORIGIN_AIRPORT='ANC', DESTINATION_AIRPORT='SEA', SCHEDULED_DEPARTURE=5, DEPARTURE_TIME=2354, DEPARTURE_DELAY=-11, TAXI_OUT=21, WHEELS_OFF=15, SCHEDULED_TIME=205, ELAPSED_TIME=194, AIR_TIME=169, DISTANCE=1448, WHEELS_ON=404, TAXI_IN=4, SCHEDULED_ARRIVAL=430, ARRIVAL_TIME=408, ARRIVAL_DELAY=-22, DIVERTED=0, CANCELLED=0, CANCELLATION_REASON=None, AIR_SYSTEM_DELAY=None, SECURITY_DELAY=None, AIRLINE_DELAY=None, LATE_AIRCRAFT_DELAY=None, WEATHER_DELAY=None),
 Row(YEAR=2015, MONTH=1, DAY=1, DAY_OF_WEEK=4, AIRLINE='AA', FLIGHT_NUMBER=2336, TAIL_NUMBER='N3KUAA', ORIGIN_AIRPORT='LAX', DESTINATION_AIRPORT='PBI', SCHEDULED_DEPARTURE=10, DEPARTURE_TIME=2, DEPARTURE_DELAY=-8, TAXI_OUT=12, WHEELS_OFF=14, SCHEDULED_TIME=280, ELAPSED_TIME=279, AIR_TIME=263, DISTANCE=2330, WHEELS_ON=737, TAXI_IN=4, SCHEDULED_ARRIVAL=750, ARRIVAL_TIME=741, ARRIVAL_DELAY=-9, DIVERTED=0, CANCELLED=0, CANCELLATION_REASON=Non

In [47]:
df_flight_prueba.sort(df_flight_prueba.DISTANCE.desc()).select("DISTANCE").show()

+--------+
|DISTANCE|
+--------+
|    4983|
|    4983|
|    4983|
|    4983|
|    4983|
|    4983|
|    4983|
|    4983|
|    4983|
|    4983|
|    4983|
|    4983|
|    4983|
|    4983|
|    4983|
|    4983|
|    4983|
|    4983|
|    4983|
|    4983|
+--------+
only showing top 20 rows



In [48]:
df_flight_prueba.orderBy(df_flight_prueba.DISTANCE.desc()).select("DISTANCE").show()

+--------+
|DISTANCE|
+--------+
|    4983|
|    4983|
|    4983|
|    4983|
|    4983|
|    4983|
|    4983|
|    4983|
|    4983|
|    4983|
|    4983|
|    4983|
|    4983|
|    4983|
|    4983|
|    4983|
|    4983|
|    4983|
|    4983|
|    4983|
+--------+
only showing top 20 rows



In [50]:
df_flight_prueba.orderBy(df_flight_prueba.DISTANCE.desc(),df_flight_prueba.AIRLINE.desc() ).select("DISTANCE","AIRLINE").show()

+--------+-------+
|DISTANCE|AIRLINE|
+--------+-------+
|    4983|     HA|
|    4983|     HA|
|    4983|     HA|
|    4983|     HA|
|    4983|     HA|
|    4983|     HA|
|    4983|     HA|
|    4983|     HA|
|    4983|     HA|
|    4983|     HA|
|    4983|     HA|
|    4983|     HA|
|    4983|     HA|
|    4983|     HA|
|    4983|     HA|
|    4983|     HA|
|    4983|     HA|
|    4983|     HA|
|    4983|     HA|
|    4983|     HA|
+--------+-------+
only showing top 20 rows



In [51]:
df_flight_prueba.orderBy(df_flight_prueba.DISTANCE.desc(),df_flight_prueba.AIRLINE.asc() ).select("DISTANCE","AIRLINE").show()

+--------+-------+
|DISTANCE|AIRLINE|
+--------+-------+
|    4983|     DL|
|    4983|     DL|
|    4983|     DL|
|    4983|     DL|
|    4983|     DL|
|    4983|     DL|
|    4983|     DL|
|    4983|     DL|
|    4983|     DL|
|    4983|     DL|
|    4983|     DL|
|    4983|     DL|
|    4983|     DL|
|    4983|     DL|
|    4983|     DL|
|    4983|     DL|
|    4983|     DL|
|    4983|     DL|
|    4983|     DL|
|    4983|     DL|
+--------+-------+
only showing top 20 rows



In [52]:
#Calculamos el numero de vuelos cancelados agrupados por aerolinia.
df_flight_prueba.groupBy("AIRLINE").sum("CANCELLED").show()

+-------+--------------+
|AIRLINE|sum(CANCELLED)|
+-------+--------------+
|     US|          4067|
|     EV|         15231|
|     WN|         16043|
|     VX|           534|
|     DL|          3824|
|     UA|          6573|
|     OO|          9960|
|     MQ|         15025|
|     B6|          4276|
|     F9|           588|
|     AS|           669|
|     AA|         10919|
|     NK|          2004|
|     HA|           171|
+-------+--------------+



In [53]:
df_flight_prueba.groupBy("AIRLINE").count().show()

+-------+-------+
|AIRLINE|  count|
+-------+-------+
|     US| 198715|
|     EV| 571977|
|     WN|1261855|
|     VX|  61903|
|     DL| 875881|
|     UA| 515723|
|     OO| 588353|
|     MQ| 294632|
|     B6| 267048|
|     F9|  90836|
|     AS| 172521|
|     AA| 725984|
|     NK| 117379|
|     HA|  76272|
+-------+-------+



In [54]:
df_flight_prueba.groupBy("AIRLINE").min("DISTANCE").show()

+-------+-------------+
|AIRLINE|min(DISTANCE)|
+-------+-------------+
|     US|           83|
|     EV|           52|
|     WN|          148|
|     VX|          189|
|     DL|           74|
|     UA|           36|
|     OO|           49|
|     MQ|           89|
|     B6|           68|
|     F9|          168|
|     AS|           31|
|     AA|           21|
|     NK|          177|
|     HA|           84|
+-------+-------------+



In [55]:
from pyspark.sql.functions import sum as _sum
df_flight_prueba.groupBy("AIRLINE").agg(_sum("DISTANCE")).show()
# Obtenemos la suma de todas las millas recorridas por aerolinea

+-------+-------------+
|AIRLINE|sum(DISTANCE)|
+-------+-------------+
|     US|    181129490|
|     EV|    264397363|
|     WN|    934670301|
|     VX|     87034957|
|     DL|    747671138|
|     UA|    655765355|
|     OO|    292277393|
|     MQ|    124427633|
|     B6|    283651757|
|     F9|     87857929|
|     AS|    206579765|
|     AA|    755995614|
|     NK|    115649669|
|     HA|     48249045|
+-------+-------------+



In [57]:
df_flight_prueba.groupBy("AIRLINE").agg(_sum("DISTANCE").alias("Distancia")).where(col("Distancia")>700000000).show()

+-------+---------+
|AIRLINE|Distancia|
+-------+---------+
|     WN|934670301|
|     DL|747671138|
|     AA|755995614|
+-------+---------+

