#Evaluacion Final Módulo 8

#Proyecto: Análisis de Datos de Vuelos con PySpark

En este ejercicio, usarás el conjunto de datos de vuelos de la Administración Federal de Aviación de los Estados Unidos (FAA) para realizar un análisis básico de los vuelos.

Asegúrate de tener instalado PySpark.

Obtén los Datos

Descarga un conjunto de datos de vuelos de la FAA. Aquí está un enlace a los datos:
http://stat-computing.org/dataexpo/2009/the-data.html

Para este ejercicio, puedes descargar el conjunto de datos de 2008.

In [34]:
!pip install -U pyspark

import numpy as np
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.functions import when
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, BooleanType, IntegerType, DayTimeIntervalType

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [35]:
## define función busqueda
def buscar_mayor(array):
  """
  función busca el mayor dentro de una array
  input: array
  output: valor_mayor, índice
  """
  mayor, indice = 0 , 0
  for inx in range(len(array)):
    if array[inx][2] > mayor:
      mayor = array[inx][2]
      indice = inx
  return mayor, indice

In [36]:
spark = SparkSession.builder.appName(
    "VuelosClassification"
).getOrCreate()

Lee los datos con PySpark.

In [37]:
# Read the data
df = spark.read.option(
    "delimiter", ","
).csv('2008.csv', 
      header=True)

#Explorar los Datos

Explora los datos para familiarizarte con ellos. 

Descrición de los datos:

1,Year,1987-2008

2,Month,12-Jan

3,DayofMonth,31-Jan

4,DayOfWeek,1 (Monday) - 7 (Sunday)

5,DepTime,"actual departure time (local, hhmm)" hora de salida real

6,CRSDepTime,"scheduled departure time (local, hhmm)" hora de salida programada

7,ArrTime,"actual arrival time (local, hhmm)" hora de llegada real

8,CRSArrTime,"scheduled arrival time (local, hhmm)" hora de llegada programada

9,UniqueCarrier,unique carrier code (código de operador único)

10,FlightNum,flight number

11,TailNum,plane tail number (número de cola del avión)

12,ActualElapsedTime,in minutes (Tiempo transcurrido real)

13,CRSElapsedTime,in minutes

14,AirTime,in minutes (the time from the moment an aircraft leaves the surface until it comes into contact with the surface at the next point of landing)

15,ArrDelay,"arrival delay, in minutes" (retraso de llegada)

16,DepDelay,"departure delay, in minutes" (retraso de salida)

17,Origin,origin IATA airport code

18,Dest,destination IATA airport code

19,Distance,in miles

20,TaxiIn,"taxi in time, in minutes" (taxi a tiempo)

21,TaxiOut,taxi out time in minutes (tiempo de salida del taxi)

22,Cancelled,was the flight cancelled?

23,CancellationCode,"reason for cancellation (A = carrier, B = 
weather, C = NAS, D = security)"

24,Diverted,"1 = yes, 0 = no" (Desviado)

25,CarrierDelay,in minutes

26,WeatherDelay,in minutes (tiempo de retraso)

27,NASDelay,in minutes

28,SecurityDelay,in minutes (Retraso de seguridad)

29,LateAircraftDelay,in minutes


#Aquí hay algunas cosas que podrías hacer:

Mostrar las primeras filas del objeto con los datos.

In [38]:
df.show(3)

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|2008|    1|         3|        4|   1343|      1325|   1451|      1435|           WN|      588

Ver la cantidad de filas y columnas del objeto con los datos.

In [39]:
print(f"filas: {df.count()}, columnas: {len(df.columns)}")

filas: 2389217, columnas: 29


Mostrar el esquema del objeto con los datos.

In [40]:
df.printSchema()

root
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- DayofMonth: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: string (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: string (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: string (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: string (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: string (nullable = true)
 |-- CarrierDelay:

Qué tipo de objeto es?

In [41]:
#Es un tipo de dato estructura de la clase StructType, y que anida una colección de StructField que define 
# el nombre de la columna (String), el tipo de columna (DataType), la columna anulable (Boolean) y los metadatos (MetaData)

# Todos los campos son tipo string
df.dtypes

[('Year', 'string'),
 ('Month', 'string'),
 ('DayofMonth', 'string'),
 ('DayOfWeek', 'string'),
 ('DepTime', 'string'),
 ('CRSDepTime', 'string'),
 ('ArrTime', 'string'),
 ('CRSArrTime', 'string'),
 ('UniqueCarrier', 'string'),
 ('FlightNum', 'string'),
 ('TailNum', 'string'),
 ('ActualElapsedTime', 'string'),
 ('CRSElapsedTime', 'string'),
 ('AirTime', 'string'),
 ('ArrDelay', 'string'),
 ('DepDelay', 'string'),
 ('Origin', 'string'),
 ('Dest', 'string'),
 ('Distance', 'string'),
 ('TaxiIn', 'string'),
 ('TaxiOut', 'string'),
 ('Cancelled', 'string'),
 ('CancellationCode', 'string'),
 ('Diverted', 'string'),
 ('CarrierDelay', 'string'),
 ('WeatherDelay', 'string'),
 ('NASDelay', 'string'),
 ('SecurityDelay', 'string'),
 ('LateAircraftDelay', 'string')]

Limpieza de Datos

Asegúrate de que los datos estén limpios. Puedes necesitar hacer cosas como:

Eliminar filas con datos faltantes.

In [42]:
# En una primera iteración, se observa que el campo "CancellationCode" contien valores Null por lo que al eliminar los registros
# con valores Null sólo quedaban el 1% de los registros por lo que opta por reemplazar todos los valores Null en dicho campo por 
# un valor igual a "N" (No Aplica), así luego de eliminar los registros repetidos y con valores Null se logra conservar el 98% de 
# los datos.

In [43]:
df=df.na.fill({'CancellationCode': 'N'})

In [44]:
print("antes de la eliminación: ", df.count())
df = df.dropDuplicates()
print("después de la eliminación: ", df.count())

antes de la eliminación:  2389217
después de la eliminación:  2389213


In [45]:
print("antes de la eliminación: ", df.count())
df=df.na.drop(how="any")
print("después de la eliminación: ", df.count())

antes de la eliminación:  2389213
después de la eliminación:  2346761


Convertir columnas a los tipos de datos correctos.

In [46]:
df.printSchema()

root
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- DayofMonth: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: string (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: string (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: string (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: string (nullable = true)
 |-- CancellationCode: string (nullable = false)
 |-- Diverted: string (nullable = true)
 |-- CarrierDelay

In [47]:
columnas=['Year','Month','DayofMonth','DayOfWeek','DepTime','CRSDepTime','ArrTime','CRSArrTime','UniqueCarrier','FlightNum',
'TailNum','ActualElapsedTime','CRSElapsedTime','AirTime','ArrDelay','DepDelay','Origin','Dest','Distance','TaxiIn','TaxiOut',
'Cancelled','CancellationCode','Diverted','CarrierDelay','WeatherDelay','NASDelay','SecurityDelay','LateAircraftDelay']

In [48]:
for c in range(0,4):
  df = df.withColumn(columnas[c], df[columnas[c]].cast(IntegerType()))

df = df.withColumn(columnas[9], df[columnas[9]].cast(IntegerType()))

for c in range(11,16):
  df = df.withColumn(columnas[c], df[columnas[c]].cast(IntegerType()))

for c in range(18,21):
  df = df.withColumn(columnas[c], df[columnas[c]].cast(IntegerType()))

df = df.withColumn(columnas[22], df[columnas[22]].cast(BooleanType()))
df = df.withColumn(columnas[24], df[columnas[24]].cast(BooleanType()))

for c in range(24,29):
  df = df.withColumn(columnas[c], df[columnas[c]].cast(IntegerType()))

In [49]:
# las variables o columnas "DepTime","CRSDepTime","ArrTime","CRSArrTime" aunque almacenan hora y minutos no se les cambiará el
# tipo de dato a "Date" ya que dicho cambio generar valores Null durante el proceso entonces, se mantiene su formato string y se 
# deberá proceder de otra forma para para utilizar dicha información.

In [50]:
df.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: string (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: string (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: integer (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- AirTime: integer (nullable = true)
 |-- ArrDelay: integer (nullable = true)
 |-- DepDelay: integer (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: integer (nullable = true)
 |-- TaxiOut: integer (nullable = true)
 |-- Cancelled: string (nullable = true)
 |-- CancellationCode: boolean (nullable = true)
 |-- Diverted: string (nullable = true)
 |--

Luego de cambiar el formato de variables o campos tipo string a integer, si contenian un cero (0) fueron interpretados como valor Null por lo que es necesario volver a cambiar dicho Null por cero (0)

In [51]:
# restablece valores de Null a númerico
df=df.na.fill({'CarrierDelay': 0})
df=df.na.fill({'WeatherDelay': 0})
df=df.na.fill({'NASDelay': 0})
df=df.na.fill({'SecurityDelay': 0})
df=df.na.fill({'LateAircraftDelay': 0})

In [52]:
# revisa los datos
df.show(3)

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|2008|    2|        20|        3|   1220|      1205|   1421|      1426|           9E|     3735

Análisis de Datos

Realiza un análisis básico de los datos.

In [53]:
# total vuelos cancelados
vuelos_cancelados=df.where("Cancelled = 1").count()
print("Total vuelos cancelados: ",vuelos_cancelados)

Total vuelos cancelados:  21993


¿Cuál es el vuelo más largo? 

In [54]:
# Esto es, tiempo total transcurrido desde que el avión comienza a moverse con el propósito de despegar, hasta que se detiene 
# completamente al finalizar el vuelo, entonces, la variable o variables que podrían contiene esta información son:

# "DepTime", hora de salida real y "ArrTime", hora de llegada real, hhmm.
# "AirTime", que el tiempo en minutos desde que despega hasta que atteriza, en hhmm
# "ActualElapsedTime", que es el tiempo transcurrido real en minutos 
 
# Pero estas variables no coinciden en el tiempo que registran
# La idea sería restar ambos tiempos para determinar el tiempo que demoró el vuelo.

In [55]:
# Empleando las variables 
# "DepTime", hora de salida real y "ArrTime", hora de llegada real.

In [56]:
# creación base de cálculo
df.createOrReplaceTempView("vuelos")
vuelos= spark.sql("select UniqueCarrier, FlightNum, DepTime, ArrTime, AirTime from vuelos").collect()

In [57]:
# rutina para calcular tiempo en minutos
# registra el tiempo mayor
mayor_deptime_arrtime=0
mayor_airtime=0
# registra el índice del vuelo
indice_deptime_arrtime=0
indice_airtime=0
# registra inconsistencia en el valor de los datos
inconsistentes=0

horas_a_minutos_salida=0
for inx in range(len(vuelos)):
  try:
  ## tiempo de salida
    salida=vuelos[inx][2]
  ## tiempo de llegada
    llegada=vuelos[inx][3]
  
  ## salida
    minutos_salida=int(salida[-2:])
    horas_salida=salida[0:-2]
    if len(horas_salida) > 0:
      horas_a_minutos_salida=int(horas_salida)
    else:
      horas_a_minutos=0
    salida_minutos = minutos_salida + horas_a_minutos_salida * 60
 
  ## llegada
    minutos_llegada=int(llegada[-2:])
    horas_llegada=llegada[0:-2]
    if len(horas_llegada) > 0:
      horas_a_minutos_llegada=int(horas_llegada)
      # check si el tiempo de llegada se produjo al día siguiente
      if (horas_a_minutos_llegada - horas_a_minutos_salida) < 0:
          horas_a_minutos_llegada = horas_a_minutos_llegada + 24
    else:
      horas_a_minutos_llegada = 0

    llegada_minutos = minutos_llegada + horas_a_minutos_llegada * 60

    ######## calcula tiempo de vuelo
    tiempo_vuelo = llegada_minutos-salida_minutos
    # registra tiempo mayor y ubiación del vuelo en la lista
    if tiempo_vuelo > mayor_deptime_arrtime:
      mayor_deptime_arrtime = tiempo_vuelo
      indice_deptime_arrtime = inx
    if int(vuelos[inx][4]) > mayor_airtime:
      mayor_airtime = int(vuelos[inx][4])
      indice_airtime = inx    
  
  except ValueError:
    # contabiliza vuelos con fechas inconsistentes
    inconsistentes=inconsistentes+1

#Alternativa 1 deptime-arrtime

In [58]:
horas_de_vuelo=mayor_deptime_arrtime // 60
minutos_de_vuelo = mayor_deptime_arrtime - (horas_de_vuelo * 60)

In [59]:
print(f"Existieron varios vuelos El vuelo más largo en tiempo de duración en base variables DepTime y ArrTime :")
print(f"Aerolinea: {vuelos[indice_deptime_arrtime][0]}, Vuelo N° {vuelos[indice_deptime_arrtime][1]}, duración {mayor_deptime_arrtime} minutos")
print(f"lo que equivale en el formato tiempo de {horas_de_vuelo}:{minutos_de_vuelo} horas:minutos")

Existieron varios vuelos El vuelo más largo en tiempo de duración en base variables DepTime y ArrTime :
Aerolinea: WN, Vuelo N° 774, duración 1439 minutos
lo que equivale en el formato tiempo de 23:59 horas:minutos


# Alternativa 2 AirTime

In [60]:
# utilizando la variables
# "AirTime", que el tiempo en minutos desde que despega hasta que atteriza

In [61]:
horas_de_vuelo=mayor_airtime // 60
minutos_de_vuelo = mayor_airtime - (horas_de_vuelo * 60)

In [62]:
print(f"El vuelo más largo en tiempo de duración es en base variable AirTime :")
print(f"Aerolinea: {vuelos[indice_airtime][0]}, Vuelo N° {vuelos[indice_airtime][1]}, duración {mayor_airtime} minutos")
print(f"lo que equivale en el formato tiempo de {horas_de_vuelo}:{minutos_de_vuelo} horas:minutos")

El vuelo más largo en tiempo de duración es en base variable AirTime :
Aerolinea: HA, Vuelo N° 22, duración 886 minutos
lo que equivale en el formato tiempo de 14:46 horas:minutos


# Alternativa 3 ActualElapsedTime

In [63]:
# utilizando la variables
# "ActualElapsedTime", que el tiempo transcurrido real en minutos
# busca vuelo con más minutos registrados
minutos_del_vuelo_de_mayor_duracion = df.select(F.max("ActualElapsedTime")).first()[0]
horas_de_vuelo=minutos_del_vuelo_de_mayor_duracion // 60

In [82]:
minutos_de_vuelo = minutos_del_vuelo_de_mayor_duracion - (horas_de_vuelo * 60)
print(f"El tiempo máximo de vuelo registrado fue: {minutos_del_vuelo_de_mayor_duracion} minutos")
print(f"lo que equivale en el formato tiempo de {horas_de_vuelo}:{minutos_de_vuelo} horas:minutos")

El tiempo máximo de vuelo registrado fue: 905 minutos
lo que equivale en el formato tiempo de 14:65 horas:minutos


Inconsistencias halladas.

In [66]:
print("Vuelos con tiempos inconsistentes: ",inconsistentes)

Vuelos con tiempos inconsistentes:  27646


Que unidad de tiempo es más apropiada? De qué depende? Seleccione una.

In [67]:
# Como observación se señala que existe un problema con los tiempos de viaje registrados ya que no aportan fecha de evento lo que 
# dificulta la comprobación #e la consistencia de los datos registrados, así se da el caso que hay tiempo de llegada
# inferiores en comparación con el salida, esto cuando la llegada es al siguiente día y también que el vuelo duró demasiado tiempo. 

In [68]:
# En este caso, como el tiempo esta expresado en horasminutos, la mínima unidad de tiempo más indicada es timestamp porque
# se registraría la fecha, hora y segundos, así luego se puede validar si la fecha fue registrada correctamente para luego
# extraer las horas, minutos o segundos según se requiera.


¿Cuál es la aerolínea con más vuelos?

In [69]:
# filtrar vuelos que se efectuaron, sin cancelación para procesar vuelos concretados
vuelos_no_cancelados=df.where("Cancelled = 0")

In [70]:
# contabiliza vuelos por aerolinea
vuelos=sorted(vuelos_no_cancelados.groupBy(['UniqueCarrier', vuelos_no_cancelados.UniqueCarrier],).count().collect())

In [83]:
# busca la auerolinea que realizó más vuelos
_, indice = buscar_mayor(vuelos)
print(f"La Aerolinea con más vuelos efectuados (sin cancelar) es: {vuelos[indice][1]} con {vuelos[indice][2]} vuelos")

La Aerolinea con más vuelos efectuados (sin cancelar) es: WN con 394586 vuelos



¿Cuál es el destino más popular?

In [72]:
destinos=sorted(df.groupBy(['Dest', df.Dest]).count().collect())

In [73]:
# busca el destino más popular
_, indice = buscar_mayor(destinos)
print(f"El destino más popular es: {destinos[indice][1]} con {destinos[indice][2]} vuelos")

El destino más popular es: ATL con 136570 vuelos


¿Qué día de la semana tiene más vuelos?

In [74]:
dias_semana=['lunes','martes','miércoles','jueves','viernes','sabado','domingo']

In [75]:
vuelos_dia_semana=sorted(df.groupBy(['DayOfWeek', df.DayOfWeek]).count().collect())

In [76]:
vuelos_dia_semana

[Row(DayOfWeek=1, DayOfWeek=1, count=341679),
 Row(DayOfWeek=2, DayOfWeek=2, count=350504),
 Row(DayOfWeek=3, DayOfWeek=3, count=358231),
 Row(DayOfWeek=4, DayOfWeek=4, count=343873),
 Row(DayOfWeek=5, DayOfWeek=5, count=342974),
 Row(DayOfWeek=6, DayOfWeek=6, count=284835),
 Row(DayOfWeek=7, DayOfWeek=7, count=324665)]

In [77]:
# busca el día de la semana que tiene más vuelos
mayor, indice = buscar_mayor(vuelos_dia_semana)
# identifica el día de la semana
dia=dias_semana[vuelos_dia_semana[indice][0]-1]

In [78]:
# busca vuelos cancelados el día con mayor cantidad de vuelos programados
vuelos_concretados=df.where("DayOfWeek = "+str(vuelos_dia_semana[indice][1])+" and Cancelled = 0").agg({"DayOfWeek": "count"}).collect()

In [79]:
# busca vuelos efectivamente realizados el día con mayor cantidad de vuelos programados
vuelos_cancelados=df.where("DayOfWeek = "+str(vuelos_dia_semana[indice][1])+" and Cancelled = 1").agg({"Cancelled": "count"}).collect()

Aunque el ejercicio no define si se deberán considerar los vuelos programados sin cancelación o los ejectivamente realizados (sin cancelación), se realiza un proceso de confirmación sobre el día que presenta un mayor número de vuelos, verificando que siga siendolo una vez que se le rebajen los vuelos cancelados para ese mismo día y una vez se confirme que se mantiene dicha superioridad, se presentará el resultado obtenido.

In [80]:
print(f"El día de la semana con más vuelos es el día {dia}\n")
print(f" {vuelos_dia_semana[indice][2]} vuelos programados el día {dia}")
print(f"-  {vuelos_cancelados[0][0]} vuelos cancelados el día {dia}")
print("\n               RESULTADO              ")
print("============================================")
print(f"\n {vuelos_concretados[0][0]} vuelos concretados el día {dia}")
print("============================================")

El día de la semana con más vuelos es el día miércoles

 358231 vuelos programados el día miércoles
-  3043 vuelos cancelados el día miércoles

               RESULTADO              

 355188 vuelos concretados el día miércoles


In [81]:
spark.stop()