In [84]:
!pip install boto3



Importamos las librerías necesarias

In [85]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import calendar
import re
import sqlalchemy
import boto3

In [86]:
connection_uri = "postgresql://postgres:private@db-test-nequi-flights.private.us-east-2.rds.amazonaws.com:5432/flight_db"
db_engine = sqlalchemy.create_engine(connection_uri)

# Function to extract table to a pandas DataFrame
def extract_table_to_pandas(tablename, db_engine,columnnames='*'):
    query = 'SELECT {} FROM "{}"'.format(columnnames, tablename)
    return pd.read_sql(query, db_engine)

In [87]:
# nos conectamos a S3 para obtener los archivos
session = boto3.Session(
    aws_access_key_id='private',
    aws_secret_access_key='private',
)
s3_s = session.client('s3')

# States:

In [88]:
# importamos correctamente
cols = ['code','description']
states_obj = s3_s.get_object(Bucket='test-nequi-data-engineer-amga', Key='data/states.txt')
states_df = pd.read_csv(states_obj['Body'], header=None, names=cols)

Declaramos función para obtener el código y nombre del estado

In [89]:
def regex_to_states(text,colname):
    '''Función para obtener el código o nombre del estado
    '''
    if colname == 'code':
        regex = '\((.+?)\)'
        groupToGet = 1
    else:
        regex = '^(\w|\s)+'
        groupToGet = 0
    return re.search(regex, text).group(groupToGet).strip()

In [90]:
codes = states_df.apply(
    lambda row: regex_to_states(row['code'],'code'),axis=1
)
names = states_df.apply(
    lambda row: regex_to_states(row['code'],'description'),axis=1
)

In [91]:
states_df_process = pd.concat([codes, names],axis=1)
states_df_process.columns = cols

Eliminamos posibles duplicados

In [92]:
states_df_process.drop_duplicates(inplace=True)

## Inserta en postgreSQL

In [93]:
states_ids = np.arange(1,len(states_df_process)+1)
states_df_to_insert=states_df_process.copy()
states_df_to_insert['id'] = states_ids

In [94]:
states_df_to_insert.to_sql("States", db_engine, schema="public", if_exists='append', method='multi', index=False)

# Airlines:

In [95]:
airlines_obj = s3_s.get_object(Bucket='test-nequi-data-engineer-amga', Key='data/airlines.csv')
airlines_df = pd.read_csv(airlines_obj['Body'])


## Insertar en postgreSQL

In [96]:
airlines_ids = np.arange(1,len(airlines_df)+1)
airlines_df
airlines_df_to_insert=airlines_df.copy()
airlines_df_to_insert.columns = ['iataCode','description']
airlines_df_to_insert['id'] = airlines_ids

In [97]:
airlines_df_to_insert.to_sql("Airlines", db_engine, schema="public", if_exists='append', method='multi', index=False)

# Airports: organizar archivo de aeropuertos

In [98]:
airports_obj = s3_s.get_object(Bucket='test-nequi-data-engineer-amga', Key='data/airports.json')
airports_df = pd.read_json(airports_obj['Body'])

In [99]:
airports_df.fillna(0, inplace=True)

Verificamos que no hayan duplicados(iata_code)

## Insertar en postgreSQL

### Ciudades
Se obtienen de aeropuertos

In [100]:
cities_df = airports_df[['CITY','STATE']].drop_duplicates()

In [101]:
# obtenemos los id de los estados.
states_sql_df = extract_table_to_pandas('States',db_engine,'*')

In [102]:

cities_df_to_insert = states_sql_df.merge(cities_df, left_on='code', right_on='STATE')
cities_df_to_insert
cities_df_to_insert = cities_df_to_insert[['id','CITY']]
cities_df_to_insert.columns = ['stateId','description']
ids = np.arange(1,len(cities_df_to_insert)+1)
cities_df_to_insert['id'] = ids

In [103]:
cities_df_to_insert.to_sql("Cities", db_engine, schema="public", if_exists='append', method='multi', index=False)


In [104]:
sql = extract_table_to_pandas('Cities',db_engine,'*')
len(sql) == len(cities_df_to_insert)

True

### Aeropuertos

In [105]:
query = 'select c.id as cityId, c.description as city, S.code as state from "Cities" as c inner join "States" as S ON c."stateId" = S.id'
cities_sql_df = pd.read_sql(query, db_engine)
# cities_sql_df.head()
airports_df_to_insert = cities_sql_df.merge(airports_df, left_on=['city','state'], right_on=['CITY','STATE'], how='right')
data_incomplete = airports_df_to_insert[airports_df_to_insert.isnull().any(1)]

In [106]:
# airports_df_to_insert.dropna(inplace=True, axis=1)
airports_df_to_insert.dropna(inplace=True)
# airports_df_to_insert
airports_df_to_insert.reset_index(inplace=True, drop=True)
airports_df_to_insert.cityid = airports_df_to_insert.cityid.astype(int)
airports_df_to_insert = airports_df_to_insert[['IATA_CODE','AIRPORT','cityid','LATITUDE','LONGITUDE']]
airports_df_to_insert.columns = ['iataCode','description','cityId','latitude','longitude']
ids = np.arange(1,len(airports_df_to_insert)+1)
airports_df_to_insert['id'] = ids


In [107]:
airports_df_to_insert.to_sql("Airports", db_engine, schema="public", if_exists='append', method='multi', index=False)

In [108]:
sql = extract_table_to_pandas('Airports',db_engine,'*')
len(sql) == len(airports_df_to_insert)

True

# Flights: Organizar archivo de vuelos

Definimos dos funciones para manupilar los datos de las columnas con tiempo

In [109]:
def convert_min_to_time(minutes):
  '''Convierte minutos(enteros) a string (HH:MM)
  '''
  result = ''
  try:
    hour, minute = divmod(int(minutes), 60)
    result = str(hour).zfill(2) + ':' + str(minute).zfill(2)
  except:
    result = None
  return result

def convert_str_to_min(str_time):
  '''Convierte tiemplo(str HHMM) en minutos(int)
  '''
  result = 0
  try:
    if len(str_time) == 4:
      result = (int(str_time[:2]) * 60) + int(str_time[2:])
  except:
    result = None
  return result

def calc_diff_time(min_one, min_two):
  try:
    minutes_to_add = 0
    diff = min_one - min_two
    if(abs(diff) > (1380)): #13 horas
      minutes_to_add = 1440 #24 horas
      if diff > 0:
        diff -= minutes_to_add
      else:
        diff += minutes_to_add
    return diff
  except:
    return 0

In [110]:
pathflights = './data/flights.csv'
pathflights = '/content/drive/MyDrive/Nequi/data/flights.csv'

In [111]:
flight_chunksize = 5000
dtype_flights = {"DEPARTURE_TIME" : str, "WHEELS_OFF": str, "WHEELS_ON":str, "ARRIVAL_TIME":str, "SCHEDULED_ARRIVAL":str, "SCHEDULED_DEPARTURE": str}
cols_to_use = ["YEAR", "MONTH", "DAY", "DAY_OF_WEEK", "AIRLINE", "TAIL_NUMBER", "ORIGIN_AIRPORT", "DESTINATION_AIRPORT", "SCHEDULED_DEPARTURE", "DEPARTURE_TIME", "WHEELS_OFF", "DISTANCE", "WHEELS_ON", "SCHEDULED_ARRIVAL", "ARRIVAL_TIME", "DIVERTED", "CANCELLED", "CANCELLATION_REASON", "AIR_SYSTEM_DELAY", "SECURITY_DELAY", "AIRLINE_DELAY", "LATE_AIRCRAFT_DELAY", "WEATHER_DELAY"]

# Obtenemos información almacenada previamente en la DB para cambiarlas por su respectivo id más adelante
airpots_sql_df = extract_table_to_pandas('Airports',db_engine,'"id" as airportId, "iataCode" ')
airlines_sql_df = extract_table_to_pandas('Airlines',db_engine,'"id" as airline_id, "iataCode" ')
cancel_reasons_sql_df = extract_table_to_pandas('CancellationReasons',db_engine,'"id" as cancel_id, "code" as cancel_code')

cols_to_get = ['YEAR','MONTH','DAY','DAY_OF_WEEK','AIRLINE','TAIL_NUMBER','ORIGIN_AIRPORT','DESTINATION_AIRPORT',
              'SCHEDULED_DEPARTURE','DEPARTURE_TIME','DEPARTURE_DELAY','TAXI_OUT','WHEELS_OFF','AIR_TIME','DISTANCE',
              'WHEELS_ON','TAXI_IN','SCHEDULED_ARRIVAL','ARRIVAL_TIME','ARRIVAL_DELAY','DIVERTED','CANCELLED','CANCELLATION_REASON',
              'AIR_SYSTEM_DELAY','SECURITY_DELAY','AIRLINE_DELAY','LATE_AIRCRAFT_DELAY','WEATHER_DELAY','DEPARTURE_TIME_MIN',
              'WHEELS_OFF_MIN','WHEELS_ON_MIN','ARRIVAL_TIME_MIN','SCHEDULED_ARRIVAL_MIN','SCHEDULED_DEPARTURE_MIN']

# variable que tiene el nombre de las columnas reales para trabajar en la db
cols_name_real = ['year','month','day','dayOfWeek','airlineId','tailNumber','originAirportId','destinationAirportId','scheduleDeparture','departureTime','departureDelay','taxiOut','wheelsOff','airTime','distance','wheelsOn','taxiIn','scheduledArrival','arrivalTime','arrivalDelay','diverted','cancelled','cancellationReasonId','airSystemDelay','securityDelay','airlineDelay','lateAircraftDelay','weatherDelay','departureTimeMinute','wheelsOffMinute','wheelsOnMinute','arrivalTimeMinute','scheduleArrivalMinute','scheduleDepartureMinute']
# nuevas columnas que son las hora pasadas a minutos
new_columns_minutes = ['DEPARTURE_TIME_MIN','WHEELS_OFF_MIN','WHEELS_ON_MIN','ARRIVAL_TIME_MIN','SCHEDULED_ARRIVAL_MIN','SCHEDULED_DEPARTURE_MIN']

# Creamos el diccionario para cambiar el dtype a los datos al insertar al db
# primero unos booleanos
dtypes_to_insert_flights = {"diverted": sqlalchemy.types.Boolean(), "cancelled" : sqlalchemy.types.Boolean()}
# segundo son varios en smallint, por lo tanto se hace dinamico
dtype_int_lst = ['airSystemDelay','securityDelay','airlineDelay','lateAircraftDelay','weatherDelay','departureTimeMinute','wheelsOffMinute','wheelsOnMinute','arrivalDelay','taxiIn','departureDelay','taxiOut','airTime']
dtype_int = {name: sqlalchemy.types.SmallInteger() for name in dtype_int_lst}
# se integra en un solo diccionario
dtypes_to_insert_flights.update(dtype_int)

# tomamos el maximo id, para tomarlo en cuenta al momento de ingresar
max_id = extract_table_to_pandas('Flights',db_engine,'max(id) as max_id')
max_id = max_id.iloc[0,0]
if max_id is None:
  max_id = 0

# obtenemos archivo de s3
flights_obj = s3_s.get_object(Bucket='test-nequi-data-engineer-amga', Key='data/flights.csv')

# iteración para tomar e insertar los datos a la DB
for flights_df in pd.read_csv(flights_obj['Body'], chunksize=flight_chunksize, usecols=cols_to_use, dtype=dtype_flights):
  # Creamos nuevas columnas con el tiempo HHMM a minutos, esto con el fin de hacer calculos con estas columnas.
  flights_df['DEPARTURE_TIME_MIN'] = flights_df['DEPARTURE_TIME'].apply(convert_str_to_min)
  flights_df['WHEELS_OFF_MIN'] = flights_df['WHEELS_OFF'].apply(convert_str_to_min)
  flights_df['WHEELS_ON_MIN'] = flights_df['WHEELS_ON'].apply(convert_str_to_min)
  flights_df['ARRIVAL_TIME_MIN'] = flights_df['ARRIVAL_TIME'].apply(convert_str_to_min)
  flights_df['SCHEDULED_ARRIVAL_MIN'] = flights_df['SCHEDULED_ARRIVAL'].apply(convert_str_to_min)
  flights_df['SCHEDULED_DEPARTURE_MIN'] = flights_df['SCHEDULED_DEPARTURE'].apply(convert_str_to_min)

  df_mean = flights_df[new_columns_minutes].mean().astype(int)
  flights_df.fillna(df_mean,inplace=True)
  flights_df.TAIL_NUMBER.fillna('Unknown',inplace=True)
  
  # Algunos valores estaban como nulos, por tal motivo se llenan con la media ya previamente calculada en el formato HH:MM
  flights_df['DEPARTURE_TIME'] = flights_df['DEPARTURE_TIME_MIN'].apply(convert_min_to_time)
  flights_df['WHEELS_OFF'] = flights_df['WHEELS_OFF_MIN'].apply(convert_min_to_time)
  flights_df['WHEELS_ON'] = flights_df['WHEELS_ON_MIN'].apply(convert_min_to_time)
  flights_df['SCHEDULED_ARRIVAL'] = flights_df['SCHEDULED_ARRIVAL_MIN'].apply(convert_min_to_time)
  flights_df['SCHEDULED_DEPARTURE'] = flights_df['SCHEDULED_DEPARTURE_MIN'].apply(convert_min_to_time)
  flights_df['ARRIVAL_TIME'] = flights_df['ARRIVAL_TIME_MIN'].apply(convert_min_to_time)

  # Quedan los valores nulos de ANCELLATION_REASON, AIR_SYSTEM_DELAY, SECURITY_DELAY, AIRLINE_DELAY, LATE_AIRCRAFT_DELAY, WEATHER_DELAY
  # Estos se puede llenar en 0
  flights_df.fillna(0, inplace=True)
  # Crear unas columnas faltantes
  flights_df['DEPARTURE_DELAY'] = flights_df.apply(lambda row: calc_diff_time(row['DEPARTURE_TIME_MIN'], row['SCHEDULED_DEPARTURE_MIN']), axis=1)
  flights_df['TAXI_OUT'] = flights_df.apply(lambda row: calc_diff_time(row['WHEELS_OFF_MIN'], row['DEPARTURE_TIME_MIN']), axis=1)
  flights_df['AIR_TIME'] = flights_df.apply(lambda row: calc_diff_time(row['WHEELS_ON_MIN'], row['WHEELS_OFF_MIN']), axis=1)
  flights_df['TAXI_IN'] = flights_df.apply(lambda row: calc_diff_time(row['ARRIVAL_TIME_MIN'], row['WHEELS_ON_MIN']), axis=1)
  flights_df['ARRIVAL_DELAY'] = flights_df.apply(lambda row: calc_diff_time(row['ARRIVAL_TIME_MIN'], row['SCHEDULED_ARRIVAL_MIN']), axis=1)

  # Tomamos las columnas que se requieren para insertar
  flights_df_to_insert = flights_df[cols_to_get]
  # Se cambian los nombres para igualarlos al de la DB
  flights_df_to_insert.columns = cols_name_real
  # se cambian de formato algunas columnas para hacer merge correctamente
  flights_df_to_insert['originAirportId']= flights_df_to_insert['originAirportId'].astype(str)
  flights_df_to_insert['destinationAirportId']= flights_df_to_insert['destinationAirportId'].astype(str)
  flights_df_to_insert['airlineId']= flights_df_to_insert['airlineId'].astype(str)

  # ingresa remplaza los datos de aeropuerto de origen
  flights_df_temp = flights_df_to_insert.reset_index().merge(airpots_sql_df, left_on=['originAirportId'], right_on=['iataCode']).set_index('index')
  flights_df_temp['originAirportId']=flights_df_temp['airportid']
  flights_df_temp.drop(['airportid','iataCode'], axis=1, inplace=True)

  # ingresa remplaza los datos de aeropuerto de destino
  flights_df_temp = flights_df_temp.reset_index().merge(airpots_sql_df, left_on=['destinationAirportId'], right_on=['iataCode']).set_index('index')
  flights_df_temp['destinationAirportId']=flights_df_temp['airportid']
  flights_df_temp.drop(['airportid','iataCode'], axis=1, inplace=True)

  # ingresa remplaza los datos de aerolinea
  flights_df_temp = flights_df_temp.reset_index().merge(airlines_sql_df, left_on=['airlineId'], right_on=['iataCode']).set_index('index')
  flights_df_temp['airlineId']=flights_df_temp['airline_id']
  flights_df_temp.drop(['airline_id','iataCode'], axis=1, inplace=True)

  # ingresa remplaza los datos de motivos de cancelación
  flights_df_temp.cancellationReasonId.replace({0: "N"}, inplace=True)
  flights_df_temp = flights_df_temp.reset_index().merge(cancel_reasons_sql_df, left_on=['cancellationReasonId'], right_on=['cancel_code']).set_index('index')
  flights_df_temp['cancellationReasonId']=flights_df_temp['cancel_id']
  flights_df_temp.drop(['cancel_id','cancel_code'], axis=1, inplace=True)
  flights_df_temp.sort_index(inplace=True)

  # Se calcula la columna id
  ids = np.arange(max_id + 1,len(flights_df_temp) + max_id + 1)
  flights_df_temp['id'] = ids
  max_id += len(flights_df_temp)

  # se inserta los datos
  flights_df_temp.to_sql("Flights", db_engine, schema="public", if_exists='append', method='multi', index=False, dtype=dtypes_to_insert_flights)
  # sql = extract_table_to_pandas('Flights',db_engine,'*')
  # len(sql) == len(flights_df_temp)