# ETL

En este archivo se presenta el proceso ETL que se almacenaría en una Cloud Function y se deja deja listo para correr en local con el fin de ver datos nuevos en la BD,

In [1]:
import logging
import os

from google.cloud import bigquery
from google.oauth2 import service_account
import pandas as pd
import requests

In [2]:
# Crear un directorio de registros si no existe
log_dir = "logs"
os.makedirs(log_dir, exist_ok=True)

# Ajustar la configuración de registro
log_file = os.path.join(log_dir, "application_etl.log")
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(lineno)s - %(message)s"
)
file_handler = logging.FileHandler(log_file, "w")
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(
    logging.Formatter("%(asctime)s - %(levelname)s - %(lineno)s - %(message)s")
)
logging.getLogger().addHandler(file_handler)

logging.debug("Prueba de mensaje de depuración")
logging.info("Prueba de mensaje informativo")
logging.warning("Prueba de mensaje de advertencia")
logging.error("Prueba de mensaje de error")

2023-05-22 01:20:29,989 - INFO - 18 - Prueba de mensaje informativo
2023-05-22 01:20:29,995 - ERROR - 20 - Prueba de mensaje de error


Se carga el archivo original para simular como llegaria la data de la fuente original como una API u otra.

In [3]:
# Define la ruta del archivo
ruta_archivo = "datasets/flights.csv"

# Lee el archivo CSV en un DataFrame de pandas, por velocidad se hace con una muestra de 400.000 filas
# df = pd.read_csv(ruta_archivo, delimiter="|", nrows = 400000)
df = pd.read_csv(ruta_archivo, delimiter="|")

# Muestra las primeras filas del DataFrame
df.head()

Unnamed: 0,TRANSACTIONID,FLIGHTDATE,AIRLINECODE,AIRLINENAME,TAILNUM,FLIGHTNUM,ORIGINAIRPORTCODE,ORIGAIRPORTNAME,ORIGINCITYNAME,ORIGINSTATE,...,WHEELSON,TAXIIN,CRSARRTIME,ARRTIME,ARRDELAY,CRSELAPSEDTIME,ACTUALELAPSEDTIME,CANCELLED,DIVERTED,DISTANCE
0,54548800,20020101,WN,Southwest Airlines Co.: WN,N103@@,1425,ABQ,AlbuquerqueNM: Albuquerque International Sunport,Albuquerque,NM,...,1648.0,4.0,1655,1652.0,-3.0,90.0,87.0,F,False,580 miles
1,55872300,20020101,CO,Continental Air Lines Inc.: CO,N83872,150,ABQ,AlbuquerqueNM: Albuquerque International Sunport,Albuquerque,NM,...,1419.0,16.0,1426,1435.0,9.0,116.0,119.0,False,F,744 miles
2,54388800,20020101,WN,Southwest Airlines Co.: WN,N334@@,249,ABQ,AlbuquerqueNM: Albuquerque International Sunport,Albuquerque,NM,...,1618.0,2.0,1500,1620.0,80.0,105.0,102.0,F,False,718 miles
3,54486500,20020101,WN,Southwest Airlines Co.: WN,N699@@,902,ABQ,AlbuquerqueNM: Albuquerque International Sunport,Albuquerque,NM,...,1947.0,1.0,1950,1948.0,-2.0,85.0,83.0,0,0,487 miles
4,55878700,20020103,CO,Continental Air Lines Inc.: CO,N58606,234,ABQ,AlbuquerqueNM: Albuquerque International Sunport,Albuquerque,NM,...,1742.0,5.0,1750,1747.0,-3.0,115.0,114.0,F,False,744 miles


In [4]:
try:
    # Se convierte la fecha
    df["FLIGHTDATE"] = pd.to_datetime(df["FLIGHTDATE"], format="%Y%m%d")

    # Se ajusta la columna Distance
    df["DISTANCE"] = df["DISTANCE"].str.replace(" miles", "")
    df["DISTANCE"] = pd.to_numeric(df["DISTANCE"], errors="raise")

    df["DEPDELAY"] = pd.to_timedelta(
        df["DEPDELAY"], unit="min", errors="ignore")
    df["TAXIOUT"] = pd.to_timedelta(df["TAXIOUT"], unit="min", errors="ignore")
    df["TAXIIN"] = pd.to_timedelta(df["TAXIIN"], unit="min", errors="ignore")
    df["ARRDELAY"] = pd.to_timedelta(
        df["ARRDELAY"], unit="min", errors="ignore")
    df["CRSELAPSEDTIME"] = pd.to_timedelta(
        df["CRSELAPSEDTIME"], unit="min", errors="ignore"
    )
    df["ACTUALELAPSEDTIME"] = pd.to_timedelta(
        df["ACTUALELAPSEDTIME"], unit="min", errors="ignore"
    )

    # Se ajustan las columnas booleanas
    df["CANCELLED"] = df["CANCELLED"].map(
        {"0": False, "1": True, "True": True, "False": False, "T": True, "F": False}
    )
    df["DIVERTED"] = df["DIVERTED"].map(
        {"0": False, "1": True, "True": True, "False": False, "T": True, "F": False}
    )

except Exception as e:
    logging.exception(f"Se ha presentado una excepcion: {e}")

  base = data.astype(np.int64)
  data = (base * m + (frac * m).astype(np.int64)).view("timedelta64[ns]")


Como se dijo en el proceso de carga el último año de información no se cargó para simular que se haría en la ETL, por lo tanto se filtra este en este punto

In [5]:
ultimo_ano = df["FLIGHTDATE"].dt.year.max()
df = df[df["FLIGHTDATE"].dt.year == ultimo_ano]

In [6]:
# Convertir columnas a formato fecha hora
columnas_hora = [
    "CRSDEPTIME",
    "DEPTIME",
    "WHEELSOFF",
    "WHEELSON",
    "CRSARRTIME",
    "ARRTIME",
]
for columna in columnas_hora:
    if df[columna].isnull().sum() == 0:
        df[columna] = df[columna].astype(str).str.zfill(4)
        df[columna] = pd.to_datetime(
            df[columna], errors="coerce", format="%H%M"
        ).dt.time
        df[columna] = (
            df["FLIGHTDATE"].dt.strftime("%Y-%m-%d") + " " + df[columna].apply(str)
        )
        df[columna] = pd.to_datetime(df[columna], errors="coerce")

In [7]:
# Crear un diccionario de mapeo entre DESTAIRPORTCODE y DESTSTATE
codigos = df.set_index("DESTAIRPORTCODE")["DESTSTATE"].to_dict()

# Rellenar los valores faltantes en ORIGINSTATE usando el diccionario de mapeo
df["ORIGINSTATE"] = df["ORIGINSTATE"].fillna(df["ORIGINAIRPORTCODE"].map(codigos))

In [8]:
url = "https://airport-info.p.rapidapi.com/airport"


def airport_info(airport_code, url):
    url = url

    querystring = {"iata": airport_code}

    headers = {
        "X-RapidAPI-Key": "6f294ed6f0mshd9fbb45d9c15ffbp112336jsn9ed08e8c22a6",
        "X-RapidAPI-Host": "airport-info.p.rapidapi.com",
    }
    try:
        response = requests.get(url, headers=headers, params=querystring)

    except Exception as e:
        logging.exception(
            f"Se ha presentado una excepcion al consultar la API: {e}")
        return

    print(response.json())

    return response

In [9]:
# Filtrar el diccionario para obtener solo las claves con valor NaN
codigos_faltantes = {key: value for key, value in codigos.items() if pd.isna(value)}

codigos_completos = {}

for key in codigos_faltantes:
    codigos_completos[key] = (airport_info(key,url)).json()["state"]

{'id': 5548, 'iata': 'OKC', 'icao': 'KOKC', 'name': 'Will Rogers World Airport', 'location': 'Oklahoma City, Oklahoma, United States', 'street_number': '7100', 'street': 'Terminal Drive', 'city': 'Oklahoma City', 'county': '', 'state': 'Oklahoma', 'country_iso': 'US', 'country': 'United States', 'postal_code': '73159', 'phone': '+1 405-316-3200', 'latitude': 35.393055, 'longitude': -97.600555, 'uct': -300, 'website': 'http://www.flyokc.com/'}
{'id': 7645, 'iata': 'TUL', 'icao': 'KTUL', 'name': 'Tulsa International Airport', 'location': 'Tulsa, Oklahoma, United States', 'street_number': '7777', 'street': 'East Apache Street', 'city': 'Tulsa', 'county': 'Tulsa County', 'state': 'Oklahoma', 'country_iso': 'US', 'country': 'United States', 'postal_code': '74115', 'phone': '+1 918-838-5000', 'latitude': 36.198776, 'longitude': -95.883865, 'uct': -300, 'website': 'http://www.tulsaairports.com/'}
{'id': 3125, 'iata': 'ICT', 'icao': 'KICT', 'name': 'Wichita Dwight D. Eisenhower National Airpor

In [10]:
df["ORIGINSTATENAME"] = df["ORIGINSTATENAME"].fillna(
    df["ORIGINAIRPORTCODE"].map(codigos_completos)
)
df["DESTSTATENAME"] = df["DESTSTATENAME"].fillna(
    df["DESTAIRPORTCODE"].map(codigos_completos)
)

In [11]:
# Se carga el dataset
df_estados = pd.read_csv("datasets/us_states.tsv", sep="\t")

# Se extraen los estados y su abreviatura
abreviaturas = df_estados.set_index("name")["state"].to_dict()

# Se elimina el dataset para liberar memoria
del df_estados

In [12]:
df["ORIGINSTATE"] = df["ORIGINSTATE"].fillna(df["ORIGINSTATENAME"].map(abreviaturas))
df["DESTSTATE"] = df["DESTSTATE"].fillna(df["DESTSTATENAME"].map(abreviaturas))

In [13]:
# Convertir columnas a formato fecha hora
columnas_hora = ["DEPTIME"]
for columna in columnas_hora:
    df[columna] = df[columna].astype("Int64").astype(str)
    df[columna] = df[columna].fillna(df["CRSDEPTIME"])
    df[columna] = df[columna].astype(str).str.zfill(4)
    df[columna] = pd.to_datetime(
        df[columna], errors="coerce", format="%H%M").dt.time
    df[columna] = (
        df["FLIGHTDATE"].dt.strftime("%Y-%m-%d") + " " + df[columna].apply(str)
    )
    df[columna] = pd.to_datetime(df[columna], errors="coerce")
    df.loc[(df[columna].isnull()) & (df["DEPDELAY"].isnull()), columna] = df[
        "CRSDEPTIME"
    ]
    df.loc[(df[columna].isnull()) & (df["DEPDELAY"].notnull()), columna] = (
        df["CRSDEPTIME"] + df["DEPDELAY"]
    )

columnas_hora = ["ARRTIME"]
for columna in columnas_hora:
    df[columna] = df[columna].astype("Int64").astype(str)
    df[columna] = df[columna].astype(str).str.zfill(4)
    df[columna] = pd.to_datetime(
        df[columna], errors="coerce", format="%H%M").dt.time
    df[columna] = (
        df["FLIGHTDATE"].dt.strftime("%Y-%m-%d") + " " + df[columna].apply(str)
    )
    df[columna] = pd.to_datetime(df[columna], errors="coerce")
    df.loc[(df[columna].isnull()) & (df["ARRDELAY"].isnull()), columna] = df[
        "CRSARRTIME"
    ]
    df.loc[(df[columna].isnull()) & (df["ARRDELAY"].notnull()), columna] = (
        df["CRSARRTIME"] + df["ARRDELAY"]
    )


columnas_hora = ["WHEELSOFF"]
for columna in columnas_hora:
    df[columna] = df[columna].astype("Int64").astype(str)
    df[columna] = df[columna].astype(str).str.zfill(4)
    df[columna] = pd.to_datetime(
        df[columna], errors="coerce", format="%H%M").dt.time
    df[columna] = (
        df["FLIGHTDATE"].dt.strftime("%Y-%m-%d") + " " + df[columna].apply(str)
    )
    df[columna] = pd.to_datetime(df[columna], errors="coerce")
    df.loc[df[columna].isnull(), columna] = df["DEPTIME"] + df["TAXIOUT"]

columnas_hora = ["WHEELSON"]
for columna in columnas_hora:
    df[columna] = df[columna].astype("Int64").astype(str)
    df[columna] = df[columna].astype(str).str.zfill(4)
    df[columna] = pd.to_datetime(
        df[columna], errors="coerce", format="%H%M").dt.time
    df[columna] = (
        df["FLIGHTDATE"].dt.strftime("%Y-%m-%d") + " " + df[columna].apply(str)
    )
    df[columna] = pd.to_datetime(df[columna], errors="coerce")
    df.loc[df[columna].isnull(), columna] = df["ARRTIME"] - df["TAXIIN"]

df["DEPDELAY"] = df["DEPDELAY"].dt.total_seconds() / 60
df["TAXIOUT"] = df["TAXIOUT"].dt.total_seconds() / 60
df["TAXIIN"] = df["TAXIIN"].dt.total_seconds() / 60
df["ARRDELAY"] = df["ARRDELAY"].dt.total_seconds() / 60
df["CRSELAPSEDTIME"] = df["CRSELAPSEDTIME"].dt.total_seconds() / 60
df["ACTUALELAPSEDTIME"] = df["ACTUALELAPSEDTIME"].dt.total_seconds() / 60

In [14]:
def arreglar_fecha(row):
    if row["WHEELSOFF"] < row["CRSDEPTIME"]:
        row["WHEELSOFF"] = row["WHEELSOFF"] + pd.to_timedelta(
            1, unit="day", errors="ignore"
        )
        row["WHEELSON"] = row["WHEELSON"] + pd.to_timedelta(
            1, unit="day", errors="ignore"
        )
        row["CRSARRTIME"] = row["CRSARRTIME"] + pd.to_timedelta(
            1, unit="day", errors="ignore"
        )
        row["ARRTIME"] = row["ARRTIME"] + pd.to_timedelta(
            1, unit="day", errors="ignore"
        )
        return row
    elif row["WHEELSON"] < row["WHEELSOFF"]:
        row["WHEELSON"] = row["WHEELSON"] + pd.to_timedelta(
            1, unit="day", errors="ignore"
        )
        row["CRSARRTIME"] = row["CRSARRTIME"] + pd.to_timedelta(
            1, unit="day", errors="ignore"
        )
        row["ARRTIME"] = row["ARRTIME"] + pd.to_timedelta(
            1, unit="day", errors="ignore"
        )
        return row
    elif row["ARRTIME"] < row["WHEELSON"]:
        row["CRSARRTIME"] = row["CRSARRTIME"] + pd.to_timedelta(
            1, unit="day", errors="ignore"
        )
        row["ARRTIME"] = row["ARRTIME"] + pd.to_timedelta(
            1, unit="day", errors="ignore"
        )
        return row
    elif row["ARRTIME"] < row["WHEELSOFF"]:
        row["ARRTIME"] = row["ARRTIME"] + pd.to_timedelta(
            1, unit="day", errors="ignore"
        )
        return row
    elif row["CRSARRTIME"] < row["WHEELSOFF"]:
        row["CRSARRTIME"] = row["CRSARRTIME"] + pd.to_timedelta(
            1, unit="day", errors="ignore"
        )
        return row
    elif row["ARRTIME"] < row["DEPTIME"]:
        row["ARRTIME"] = row["ARRTIME"] + pd.to_timedelta(
            1, unit="day", errors="ignore"
        )

    return row

In [15]:
df = df.apply(lambda row: arreglar_fecha(row), axis=1)

In [16]:
df.loc[df["CRSELAPSEDTIME"].isnull(), "CRSELAPSEDTIME"] = (
    pd.to_timedelta(
        df["CRSARRTIME"] - df["CRSDEPTIME"], unit="min", errors="ignore"
    ).dt.total_seconds()
    / 60
)
df.loc[df["ACTUALELAPSEDTIME"].isnull(), "ACTUALELAPSEDTIME"] = (
    pd.to_timedelta(
        df["ARRTIME"] - df["DEPTIME"], unit="min", errors="ignore"
    ).dt.total_seconds()
    / 60
)

In [17]:
df_aeropuertos_origen = df[["ORIGINAIRPORTCODE",
                            "ORIGAIRPORTNAME", "ORIGINCITYNAME"]]
df_aeropuertos_origen = df_aeropuertos_origen.rename(
    columns={
        "ORIGINAIRPORTCODE": "AIRPORTCODE",
        "ORIGAIRPORTNAME": "AIRPORTNAME",
        "ORIGINCITYNAME": "AIRPORTCITY",
    }
)
df_aeropuertos_origen = df_aeropuertos_origen.drop_duplicates()
df_aeropuertos_destino = df[["DESTAIRPORTCODE",
                             "DESTAIRPORTNAME", "DESTCITYNAME"]]
df_aeropuertos_destino = df_aeropuertos_origen.rename(
    columns={
        "DESTAIRPORTCODE": "AIRPORTCODE",
        "DESTAIRPORTNAME": "AIRPORTNAME",
        "DESTCITYNAME": "AIRPORTCITY",
    }
)
df_aeropuertos_destino = df_aeropuertos_destino.drop_duplicates()
df_aeropuertos = pd.concat([df_aeropuertos_origen, df_aeropuertos_destino])
df_aeropuertos = df_aeropuertos.drop_duplicates()

In [18]:
df_aerolineas = df[["AIRLINECODE", "AIRLINENAME"]]
df_aerolineas = df_aerolineas.drop_duplicates()

In [19]:
df_vuelos = df.drop(
    columns=[
        "ORIGAIRPORTNAME",
        "ORIGINCITYNAME",
        "DESTAIRPORTNAME",
        "DESTCITYNAME",
        "AIRLINENAME",
        "ORIGINSTATENAME",
        "DESTSTATENAME",
    ]
)
ultimo_ano = df_vuelos["FLIGHTDATE"].dt.year.max()
df_vuelos = df_vuelos[df_vuelos["FLIGHTDATE"].dt.year == ultimo_ano]
del df

In [20]:
json_gcp = "service_account.json"
credentials = service_account.Credentials.from_service_account_file(
    json_gcp, scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
project = "pruebatec-387103"


def upload_dataframe_to_bigquery(dataframe, project, table_name):
    # Initialize BigQuery client
    client = bigquery.Client(credentials=credentials, project=project)

    # Create BigQuery load job configuration
    job_config = bigquery.LoadJobConfig(
        autodetect=True, write_disposition="WRITE_APPEND",
    )

    # Start BigQuery load job
    job = client.load_table_from_dataframe(
        dataframe, table_name, job_config=job_config
    )  # Make an API request.
    job.result()  # Wait for the job to complete.

    table = client.get_table(table_name)  # Make an API request.
    print(
        "Loaded {} rows and {} columns to {}".format(
            table.num_rows, len(table.schema), table_name
        )
    )

Se define una función adicional para ejecutar querys desde Python en Bigquery por medio de la API de google.

In [21]:
project = "pruebatec-387103"


def query_bigquery(query, project):
    client = bigquery.Client(credentials=credentials, project=project)

    try:
        query_job = client.query(query)

        results = query_job.result()  # Waits for job to complete.

        print(results)

    except Exception as e:
        logging.exception(f"Se ha presentado una excepcion al hacer query a BQ: {e}")

Para evitar que se duplique data en la tabla de vuelos se procede a hacer una inserción delta, es decir, se eliminan datos cuya fecha coincida con la de la nueva data que se va a agregar.

In [23]:
query_eliminar_vuelos = f"""DELETE FROM `pruebatec-387103.vuelosnq.FACT_VUELOS` WHERE EXTRACT(YEAR FROM FLIGTHDATE)={ultimo_ano}"""

query_eliminar_vuelos = f"""CREATE OR REPLACE TABLE `pruebatec-387103.vuelosnq.FACT_VUELOS` AS (
SELECT * FROM `pruebatec-387103.vuelosnq.FACT_VUELOS` WHERE EXTRACT(YEAR FROM FLIGHTDATE)!={ultimo_ano})"""

query_bigquery(query_eliminar_vuelos, project)

<google.cloud.bigquery.table._EmptyRowIterator object at 0x0000029451604940>


Como la cuenta de GCP está en modo Sandbox no es posible realizar querys DML y la primera query no se puede ejecutar, por lo tanto se hace la siguiente query, menos eficiente y más costosa pero el resultado es el mismo.

In [24]:
upload_dataframe_to_bigquery(
    df_aerolineas, project, "pruebatec-387103.vuelosnq.DIM_AEROLINEAS"
)
upload_dataframe_to_bigquery(
    df_aeropuertos, project, "pruebatec-387103.vuelosnq.DIM_AEROPUERTOS"
)
upload_dataframe_to_bigquery(
    df_vuelos, project, "pruebatec-387103.vuelosnq.FACT_VUELOS"
)

Loaded 38 rows and 2 columns to pruebatec-387103.vuelosnq.DIM_AEROLINEAS
Loaded 660 rows and 3 columns to pruebatec-387103.vuelosnq.DIM_AEROPUERTOS
Loaded 1128454 rows and 24 columns to pruebatec-387103.vuelosnq.FACT_VUELOS


Después de subir los nuevos registros para aeropuertos y aerolineas se hace una limpieza para eliminar duplicados

In [25]:
query_eliminar_duplicados_aerolineas = """CREATE OR REPLACE TABLE `pruebatec-387103.vuelosnq.DIM_AEROLINEAS` AS (
SELECT DISTINCT * FROM `pruebatec-387103.vuelosnq.DIM_AEROLINEAS`)"""

query_bigquery(query_eliminar_duplicados_aerolineas, project)

query_eliminar_duplicados_aeropuertos = """CREATE OR REPLACE TABLE `pruebatec-387103.vuelosnq.DIM_AEROPUERTOS` AS (
SELECT DISTINCT * FROM `pruebatec-387103.vuelosnq.DIM_AEROPUERTOS`)"""

query_bigquery(query_eliminar_duplicados_aeropuertos, project)

<google.cloud.bigquery.table._EmptyRowIterator object at 0x00000294515B2590>
<google.cloud.bigquery.table._EmptyRowIterator object at 0x0000029451604910>


Se actualiza la tabla que se conectará directamente al dashboard

In [26]:
query_dashboard = f"""CREATE OR REPLACE TABLE `pruebatec-387103.vuelosnq.DASHBOARD` AS (
SELECT T1.*,T2.AIRLINENAME,T3.AIRPORTNAME AS ORIGINAIRPORTNAME,T3.AIRPORTCITY AS ORIGINAIRPOTCITY,T4.AIRPORTNAME AS DESTAIRPORTNAME,
T4.AIRPORTCITY AS DESTAIRPORTCITY,T5.STATENAME AS ORIGINSTATENAME, T6.STATENAME AS DESTSTATENAME
FROM `pruebatec-387103.vuelosnq.FACT_VUELOS` T1
LEFT JOIN `pruebatec-387103.vuelosnq.DIM_AEROLINEAS` T2 USING(AIRLINECODE)
LEFT JOIN `pruebatec-387103.vuelosnq.DIM_AEROPUERTOS` T3 ON T1.ORIGINAIRPORTCODE=T3.AIRPORTCODE
LEFT JOIN `pruebatec-387103.vuelosnq.DIM_AEROPUERTOS` T4 ON T1.DESTAIRPORTCODE=T4.AIRPORTCODE
LEFT JOIN `pruebatec-387103.vuelosnq.DIM_ESTADOS` T5 ON T1.ORIGINSTATE=T5.STATECODE
LEFT JOIN `pruebatec-387103.vuelosnq.DIM_ESTADOS` T6 ON T1.DESTSTATE=T6.STATECODE
)"""

query_bigquery(query_dashboard, project)

<google.cloud.bigquery.table._EmptyRowIterator object at 0x0000029441288D00>


Con esto finaliza el proceso ETL