### Data Wrangling

In [1]:
#Se importan las librerias correspondientes
import os
import pyspark
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import year
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import  IntegerType, StringType, DateType
import matplotlib
import matplotlib.pyplot as plt
matplotlib.style.use('ggplot')
import pandas as pd


In [2]:
#Se confirma que los datos se encuentren en el espacio de trabajo, si no se ecnuentran se descargan y preprocesan los datos
if  not (os.path.isdir("raw") and os.path.isfile(os.path.join("trusted", 'trafico_aereo_1992_2022.csv'))):
    import preprocess
    preprocess

In [3]:
# creamos el contexto y la sesión
sc = pyspark.SparkContext()
spark = SparkSession.builder.getOrCreate()
spark

In [4]:

#Se define el esquema de la tabla
schema = StructType([
    StructField('Fecha', DateType()),
    StructField('Sigla Empresa', StringType()),
    StructField('Origen', StringType()),
    StructField('Destino', StringType()),
    StructField('Pasajeros', IntegerType()),
    StructField('Trafico', StringType()),
    StructField('TipoVuelo', StringType()),
    StructField('Ciudad Origen', StringType()),
    StructField('Ciudad Destino', StringType()),
    StructField('Pais Origen', StringType()),
    StructField('Pais Destino', StringType()),
    StructField('Nombre Empresa', StringType()),
    StructField('Apto_Origen', StringType()),
    StructField('Apto_Destino', StringType()),
])

In [5]:
#Se lee el archivo de trusted
filepath = os.path.join("trusted", 'trafico_aereo_1992_2022.csv')
df = spark.read.csv(filepath, header=True, schema=schema)
df.show(n=3)

+----------+-------------+------+-------+---------+-------+---------+-------------+--------------+--------------+------------+--------------+--------------------+--------------------+
|     Fecha|Sigla Empresa|Origen|Destino|Pasajeros|Trafico|TipoVuelo|Ciudad Origen|Ciudad Destino|   Pais Origen|Pais Destino|Nombre Empresa|         Apto_Origen|        Apto_Destino|
+----------+-------------+------+-------+---------+-------+---------+-------------+--------------+--------------+------------+--------------+--------------------+--------------------+
|2019-04-01|          AAL|   ABQ|    BOG|       37|      I|        R| NUEVO MEXICO|        BOGOTA|ESTADOS UNIDOS|    COLOMBIA|      AMERICAN|ALBUQUERQUE INTL ...|   BOGOTA - ELDORADO|
|2019-04-01|          AAL|   ABZ|    CLO|        1|      I|        R|     ABENDEEN|          CALI|    INGLATERRA|    COLOMBIA|      AMERICAN|                CYDE|CALI - ALFONSO BO...|
|2019-04-01|          AAL|   AMS|    CTG|        2|      I|        R|    AMSTERD

In [6]:
#Tamaño de los datos
print((df.count(), len(df.columns)))

(1820258, 14)


In [7]:
#Cantidad de valores nulos
for c in df.columns:
    nuls = df.filter(col(c).isNull()).count()
    print(f'{c} - {nuls}')

Fecha - 15
Sigla Empresa - 12
Origen - 24
Destino - 15
Pasajeros - 201
Trafico - 24
TipoVuelo - 24
Ciudad Origen - 8351
Ciudad Destino - 10998
Pais Origen - 8351
Pais Destino - 10992
Nombre Empresa - 15
Apto_Origen - 18
Apto_Destino - 30


In [8]:
#Se eliminan los valores nulos en 
df = df.na.drop(subset=["Pasajeros", "Nombre Empresa"])


In [9]:
#Se toma la decision de unificar los nombres de las aerolineas a aquellas que han cambiado de nombre
# debido a unificaciones o compras, las transformaciones se pueden observar en el diccionario:
dict_aerolineas={ "LATAM":["AIRES" "LAN", "TAM", ],
                 "FAST COLOMBIA":"VIVA AIR"
    
}

In [10]:
#Se crea una nueva columna a partir de la limpieza de la columna 'Nombre empresa'
df = df.select("*",
            when(df["Nombre Empresa"].contains("21 AIR LLC."), "21 AIR").
            when(df["Nombre Empresa"].contains("REGAIR"), "REGAIR").
            when(df["Nombre Empresa"].contains("ABX AIR INC"), "ABX AIR INC").
            when(df["Nombre Empresa"].contains("AER CARIBE"), "AER CARIBE").
            when(df["Nombre Empresa"].contains("DAMOJH"), "DAMOJH").
            when(df["Nombre Empresa"].contains("AEROUNIËN"), "AEROUNIÓN").
            when(df["Nombre Empresa"].contains("ALBATROS"), "ALBATROS AIRLINES").
            when(df["Nombre Empresa"].contains("AMERICA┤S AIR SAS"), "AMERICA´S AIR SAS").
            when(df["Nombre Empresa"].contains("AVIANLINE CHARTE┤S S.A.S."), "AVIANLINE CHARTE´S S.A.S.").
            when(df["Nombre Empresa"].contains("CARGO THREE INC."), "CARGO THREE").
            when(df["Nombre Empresa"].contains("FLEXAIR S A S"), "FLEXAIR S.A.S").
            when(df["Nombre Empresa"].contains("HORIZONTAL DE AVIACION S.A.S"), "HORIZONTAL S A S").
            when(df["Nombre Empresa"].contains("ICARO S.A."), "ICARO").
            when(df["Nombre Empresa"].contains("LATAM AIRLINES GROUP S.A SUCURSAL COLOMBIA"), "LATAM").
            when(df["Nombre Empresa"].contains("CARGUERA DE COLOMBIA S.A"), "LÍNEA AÉREA CARGUERA DE COLOMBIA S.A.").
            when(df["Nombre Empresa"].contains("NO FIGURA"), "NO REGISTRA").
            when(df["Nombre Empresa"].contains("NO REGISTRA SIGLA"), "NO REGISTRA").
            when(df["Nombre Empresa"].contains("SERVICIO AEREO A TERRITORIOS NACIONALES SATENA"), "SATENA").
            when(df["Nombre Empresa"].contains("SKY LEASE I. INC  (GREENSBORO.NC)"), "SKY LEASE").
            when(df["Nombre Empresa"].contains("SKY LEASE I. INC (GREENSBORO.NC)"), "SKY LEASE").
            when(df["Nombre Empresa"].contains("SOUTHERN AIR TRANSPORT INC."), "SOUTHERN AIR INC.").
            when(df["Nombre Empresa"].contains("TAG"), "TAG AVIATION").
            when(df["Nombre Empresa"].contains("TAMPA CARGO"), "TAMPA CARGO").
            when(df["Nombre Empresa"].contains("VIANA S.A.S"), "VIANA S.A.S").
            when(df["Nombre Empresa"].contains("TAXCO S.A.S"), "TAXCO").
            when(df["Nombre Empresa"].contains("VUELA COMPAÑÍA DE AVIACION S.A.P.I DE"), "VUELA COMPAÑÍA DE AVIACION").
            when(df["Nombre Empresa"].contains("AERO LLANOS DEL ORIENTE"), "AERO LLANOS DEL ORIENTE").
            when(df["Nombre Empresa"].contains("ASES LTDA."), "ASES LTDA.").
            when(df["Nombre Empresa"].contains("AERO TAXI GUAYMARAL ATG S.A.S"), "AERO TAXI GUAYMARAL").
            when(df["Nombre Empresa"].contains("MITCHELL AAERO, INC."), "MITCHELL AAERO, INC.").
            when(df["Nombre Empresa"].contains("ROYAL AIR MAROC-COMPAGNIE"), "ROYAL AIR MAROC-COMPAGNIE").
            when(df["Nombre Empresa"].contains("AIR TRANSPORT ASSOCIATION OF AMERCA"), "AIR TRANSPORT ASSOCIATION OF AMERCA").
            when(df["Nombre Empresa"].contains("OMEGA AIR HOLDING"), "OMEGA AIR HOLDING").
            when(df["Nombre Empresa"].contains("KALITTA FLYING SERVICE"), "KALITTA FLYING SERVICE").
            when(df["Nombre Empresa"].contains("LINEA AEREA BOLIVARIANA"), "LINEA AEREA BOLIVARIANA").
            when(df["Nombre Empresa"].contains("TRANS AMERICAN AIR LINES"), "TRANS AMERICAN AIR LINES").
            when(df["Nombre Empresa"].contains("UNITED PARCEL SERVICE"), "UNITED PARCEL SERVICE").
            when(df["Nombre Empresa"].contains("AMERIJET INTERNATIOANL"), "AMERIJET INTERNATIONAL").
            when(df["Nombre Empresa"].contains("AEROTRANSPORTE DE CARGA UNION"), "AEROTRANSPORTE DE CARGA UNION").
            when(df["Nombre Empresa"].contains("DUTCH ANTILLES EXPRESS"), "DUTCH ANTILLES EXPRESS").
            when(df["Nombre Empresa"].contains("AEROGALAN"), "STARBLUE AIRLINES"). #Aerogalan cambia nombre a starblue
            when(df["Nombre Empresa"].contains("VRG LINHAS AEREAS"), "VRG LINHAS AEREAS").
            when(df["Nombre Empresa"].contains("DYNAMIC AIRWAYS"), "DYNAMIC AIRWAYS").
            when(df["Nombre Empresa"].contains("AVIANCA"), "AVIANCA").
            when(df["Nombre Empresa"].contains("JETBLUE AIRWAYS CORPORATION"), "JETBLUE AIRWAYS").
            when(df["Nombre Empresa"].contains("AIR CLASS"), "AIR CLASS").
            when(df["Nombre Empresa"].contains("VERTICAL DE AVIACION"), "VERTICAL DE AVIACION").
            when(df["Nombre Empresa"].contains("REGIONAL EXPRESS AMERICAS"), "REGIONAL EXPRESS AMERICAS").
            when(df["Nombre Empresa"].contains("AEROMEXICO"), "AEROMEXICO").
            when(df["Nombre Empresa"].contains("AEROLINEA DE ANTIOQUIA"), "ADA").
            when(df["Nombre Empresa"].contains("LATAM"), "LATAM").
            when(df["Nombre Empresa"].contains("FAST COLOMBIA"), "VIVA AIR").
            when(df["Nombre Empresa"].contains("AIRES"), "LATAM").
            when(df["Nombre Empresa"].contains("LAN"), "LATAM").
            when(df["Nombre Empresa"].contains("TAM"), "LATAM").
            when(df["Nombre Empresa"].contains("AEROEXPRESO DEL PACIFICO S.A."), "AEXPA").
            	
            otherwise(df["Nombre Empresa"]).alias("Nombre_Empresa_clean")
           )




In [13]:
# Convertimos PySpark DataFrame a Pandas DataFrame
pandas_df = df.toPandas()

# Exportamos Pandas DataFrame a CSV 
pandas_df.to_csv(os.path.join("trusted","trafico_aereo_process.csv"), index=False)
