# Entrega Final Data Engineering - UTN
### El siguiente notebook desarrolla una solución para extraer y almacenar datos desde la API [Aviationstack](https://aviationstack.com/), la cual ofrece información sobre vuelos, aeropuertos, y temáticas relacionadas.

In [1]:
#instalación de las librerías a utilizar: requests para peticiones a la API,
#deltalake para el almacenamiento en formato open table
#pyarrow para permitir funcionalidades merge
#faker para genera datos falsos de prueba
!pip install requests deltalake pyarrow faker dotenv

Collecting deltalake
  Downloading deltalake-1.3.2-cp310-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.4 kB)
Collecting faker
  Downloading faker-40.1.2-py3-none-any.whl.metadata (16 kB)
Collecting dotenv
  Downloading dotenv-0.9.9-py2.py3-none-any.whl.metadata (279 bytes)
Collecting arro3-core>=0.5.0 (from deltalake)
  Downloading arro3_core-0.6.5-cp311-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (363 bytes)
Collecting deprecated>=1.2.18 (from deltalake)
  Downloading deprecated-1.3.1-py2.py3-none-any.whl.metadata (5.9 kB)
Downloading deltalake-1.3.2-cp310-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (37.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m37.7/37.7 MB[0m [31m24.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading faker-40.1.2-py3-none-any.whl (2.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.0/2.0 MB[0m [31m22.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dotenv-0.9.9-py2.py3-none-an

In [4]:
import requests
import pandas as pd
import pyarrow as pa
from faker import Faker
import random
from deltalake import write_deltalake, DeltaTable
from deltalake.exceptions import TableNotFoundError
from datetime import datetime, timedelta, timezone
from dotenv import load_dotenv
import os

### Funciones de lectura/escritura en datalake

In [3]:
def get_data(base_url, endpoint, data_field=None, params=None, headers=None):
    """
    Realiza una solicitud GET a una API para obtener datos.

    Parámetros:
    base_url (str): La URL base de la API.
    endpoint (str): El endpoint de la API al que se realizará la solicitud.
    data_field (str): Atribudo del json de respuesta donde estará la lista
    de objetos con los datos que requerimos
    params (dict): Parámetros de consulta para enviar con la solicitud.
    headers (dict): Encabezados para enviar con la solicitud.

    Retorna:
    dict: Los datos obtenidos de la API en formato JSON.
    """
    try:
        endpoint_url = f"{base_url}/{endpoint}"
        response = requests.get(endpoint_url, params=params, headers=headers)
        response.raise_for_status()  # Levanta una excepción si hay un error en la respuesta HTTP.

        # Verificar si los datos están en formato JSON.
        try:
            data = response.json()
            if data_field:
              data = data[data_field]
        except:
            print("El formato de respuesta no es el esperado")
            return None
        return data

    except requests.exceptions.RequestException as e:
        # Capturar cualquier error de solicitud, como errores HTTP.
        print(f"La petición ha fallado. Código de error : {e}")
        return None

def build_table(json_data, record_path=None):
    """
    Construye un DataFrame de pandas a partir de datos en formato JSON.

    Parámetros:
    json_data (dict): Los datos en formato JSON obtenidos de una API.

    Retorna:
    DataFrame: Un DataFrame de pandas que contiene los datos.
    """
    try:
        df = pd.json_normalize(
            json_data,
            record_path)
        return df
    except:
        print("Los datos no están en el formato esperado")
        return None

def save_data_as_delta(df, path, storage_options, mode="overwrite", partition_cols=None, schema_mode="merge"):
    """
    Guarda un dataframe en formato Delta Lake en la ruta especificada.
    A su vez, es capaz de particionar el dataframe por una o varias columnas.
    Por defecto, el modo de guardado es "overwrite".

    Args:
      df (pd.DataFrame): El dataframe a guardar.
      path (str): La ruta donde se guardará el dataframe en formato Delta Lake.
      mode (str): El modo de guardado. Son los modos que soporta la libreria
      deltalake: "overwrite", "append", "error", "ignore".
      partition_cols (list or str): La/s columna/s por las que se particionará el
      dataframe. Si no se especifica, no se particionará.
      schema_mode: Especifica de qué manera se manejarán las diferencias de
      esquema entre cargas. Si no se especifica, se usará "merge".
    """
    write_deltalake(
        path, df, mode=mode, storage_options=storage_options, partition_by=partition_cols, schema_mode=schema_mode
    )

def save_new_data_as_delta(new_data, data_path, predicate, storage_options, partition_cols=None):
    """
    Guarda solo nuevos datos en formato Delta Lake usando la operación MERGE,
    comparando los datos ya cargados con los datos que se desean almacenar
    asegurando que no se guarden registros duplicados.

    Args:
      new_data (pd.DataFrame): Los datos que se desean guardar.
      data_path (str): La ruta donde se guardará el dataframe en formato Delta Lake.
      predicate (str): La condición de predicado para la operación MERGE.
    """

    try:
      dt = DeltaTable(data_path, storage_options=storage_options)
      new_data_pa = pa.Table.from_pandas(new_data)
      # Se insertan en target, datos de source que no existen en target
      dt.merge(
          source=new_data_pa,
          source_alias="source",
          target_alias="target",
          predicate=predicate
      ) \
      .when_not_matched_insert_all() \
      .execute()

    # Si no existe la tabla Delta Lake, se guarda como nueva
    except TableNotFoundError:
      save_data_as_delta(new_data, data_path, storage_options=storage_options, partition_cols=partition_cols)

def upsert_data_as_delta(data, data_path, predicate, storage_options, partition_cols=None):
    """
    Guardar datos en formato Delta Lake usando la operacion MERGE.
    Cuando no haya registros coincidentes, se insertarán nuevos registros.
    Cuando haya registros coincidentes, se actualizarán los campos.

    Args:
      data (pd.DataFrame): Los datos que se desean guardar.
      data_path (str): La ruta donde se guardará el dataframe en formato Delta Lake.
      predicate (str): La condición de predicado para la operación MERGE.
    """
    try:
        dt = DeltaTable(data_path, storage_options=storage_options)
        data_pa = pa.Table.from_pandas(data)
        dt.merge(
            source=data_pa,
            source_alias="source",
            target_alias="target",
            predicate=predicate
        ) \
        .when_matched_update_all() \
        .when_not_matched_insert_all() \
        .execute()
    except TableNotFoundError:
        save_data_as_delta(data, data_path, storage_options=storage_options, partition_cols=partition_cols)

def read_most_recent_partition(data_path, storage_options):
    """
    Lee la particion mas reciente de una tabla Delta Lake, teniendo en cuenta
    la fecha y hora actual.
    Supone que la tabla delta lake esta particionada por fecha y hora

    Args:
      data_path (str): La ruta donde se encuentra la tabla Delta Lake.

    Returns:
      pd.DataFrame: Los datos de la particion mas reciente.
    """
    try:
      requested_date = datetime.now(timezone.utc) - timedelta(hours=1)
      dt = DeltaTable(data_path, storage_options=storage_options)
      df_recent = dt.to_pandas(
        partitions=[
        ("fecha", "=", requested_date.strftime("%Y-%m-%d")),
        ("hora", "=", requested_date.strftime("%H"))
        ]
        )
      return df_recent
    except:
      raise Exception(f"No se pudo procesar la tabla Delta Lake")
      return None

### Parámetros de conexión con MinIO, API key y declaración de rutas

In [5]:
load_dotenv()
API_KEY = os.getenv("API_KEY")
# Configuraciones para MinIO (Alternativa opensource para almacenamiento en
# nube tipo AWS)
storage_options = {
    'AWS_ENDPOINT_URL': 'http://31.97.241.212:9002',
    'AWS_ACCESS_KEY_ID': 'rodrigogomez', # username
    'AWS_SECRET_ACCESS_KEY': 'rodrigogomez', # contraseña
    'AWS_ALLOW_HTTP': 'true',
    'aws_conditional_put': 'etag',
    'AWS_S3_ALLOW_UNSAFE_RENAME': 'true'
}

bkt_name = "rodrigogomez-bucket"
base_url="https://api.aviationstack.com/v1"
api_key=API_KEY

In [None]:
#direcciones de almacenamiento
bronze_dir = f"s3://{bkt_name}/datalake/bronze/aviationstack_api"
silver_dir = f"s3://{bkt_name}/datalake/silver/aviationstack_api"
gold_dir = f"s3://{bkt_name}/datalake/gold/aviationstack_api"
airports_raw_dir = f"{bronze_dir}/airports"
airports_clean_dir = f"{silver_dir}/airports"
airports_gold_dir = f"{gold_dir}/airports"
flights_raw_dir = f"{bronze_dir}/flights"
flights_clean_dir = f"{silver_dir}/flights"
flights_gold_dir = f"{gold_dir}/flights"

# Ingesta Full
  Se produce la ingesta de datos a partir de una petición get al endpoint '/airports' de la API. Se produce un filtrado únicamente a partir del país deseado.

In [None]:
#obtener aeropuetos por país, en este caso Argentina:

country="argentina"
endpoint="airports"
params={"access_key":api_key, "country_name":country}
airports=get_data(base_url, endpoint, data_field="data", params=params)
df_airports_raw=build_table(airports)
print(f"Cantidad de filas: {df_airports_raw.shape[0]}")
print(f"Cantidad de columnas: {df_airports_raw.shape[1]}")
df_airports_raw.head()

Cantidad de filas: 85
Cantidad de columnas: 14


Unnamed: 0,id,gmt,airport_id,iata_code,city_iata_code,icao_code,country_iso2,geoname_id,latitude,longitude,airport_name,country_name,phone_number,timezone
0,5761281,-3,107,AEP,BUE,SABE,AR,6301847,-34.55622,-58.41667,Aeroparque Jorge Newbery,Argentina,,America/Argentina/Buenos_Aires
1,5761288,-3,114,AFA,AFA,SAMR,AR,6300529,-34.58917,-68.40056,San Rafael,Argentina,,America/Argentina/Mendoza
2,5761484,-3,310,AOL,AOL,SARL,AR,6300550,-29.683332,-57.15,Paso De Los Libres,Argentina,,America/Argentina/Cordoba
3,5761515,-3,341,APZ,APZ,SAHZ,AR,7730148,-38.916668,-70.083336,Zapala,Argentina,,America/Argentina/Salta
4,5761541,-3,367,ARR,ARR,SAVR,AR,7730773,-45.033333,-70.833336,Alto Rio Senguerr,Argentina,,America/Argentina/Catamarca


### Almacenado de los datos "en crudo"
En capa bronze. Al ser datos que no varían en el tiempo, se realiza en modo overwrite.




In [None]:
save_data_as_delta(df_airports_raw, airports_raw_dir, storage_options=storage_options)

### Lectura de los datos desde capa bronze

In [None]:
dt_airports = DeltaTable(airports_raw_dir, storage_options=storage_options)
df_airports = dt_airports.to_pandas()

## Procesamiento de datos
---
- Observamos la tabla en búsqueda de posibles optimizaciones:




In [None]:
df_airports.head(10)

Unnamed: 0,id,gmt,airport_id,iata_code,city_iata_code,icao_code,country_iso2,geoname_id,latitude,longitude,airport_name,country_name,phone_number,timezone
0,5720129,-3,107,AEP,BUE,SABE,AR,6301847,-34.55622,-58.41667,Aeroparque Jorge Newbery,Argentina,,America/Argentina/Buenos_Aires
1,5720136,-3,114,AFA,AFA,SAMR,AR,6300529,-34.58917,-68.40056,San Rafael,Argentina,,America/Argentina/Mendoza
2,5720332,-3,310,AOL,AOL,SARL,AR,6300550,-29.683332,-57.15,Paso De Los Libres,Argentina,,America/Argentina/Cordoba
3,5720363,-3,341,APZ,APZ,SAHZ,AR,7730148,-38.916668,-70.083336,Zapala,Argentina,,America/Argentina/Salta
4,5720389,-3,367,ARR,ARR,SAVR,AR,7730773,-45.033333,-70.833336,Alto Rio Senguerr,Argentina,,America/Argentina/Catamarca
5,5720720,-3,698,BHI,BHI,SAZB,AR,6300580,-38.730556,-62.150555,Comandante Espora,Argentina,,America/Argentina/Buenos_Aires
6,5720942,-3,920,BRC,BRC,SAZS,AR,6300590,-41.145966,-71.16109,San Carlos de Bariloche Airport,Argentina,,America/Argentina/Salta
7,5721458,-3,1436,CNQ,CNQ,SARC,AR,6300546,-27.449722,-58.762222,Camba Punta,Argentina,,America/Argentina/Cordoba
8,5721470,-3,1448,COC,COC,SAAC,AR,6300509,-31.297222,-57.996387,Concordia,Argentina,,America/Argentina/Cordoba
9,5721485,-3,1463,COR,COR,SACO,AR,6300518,-31.31548,-64.21377,Pajas Blancas,Argentina,,America/Argentina/Cordoba


In [None]:
df_airports.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 85 entries, 0 to 84
Data columns (total 14 columns):
 #   Column          Non-Null Count  Dtype 
---  ------          --------------  ----- 
 0   id              85 non-null     object
 1   gmt             85 non-null     object
 2   airport_id      85 non-null     object
 3   iata_code       85 non-null     object
 4   city_iata_code  85 non-null     object
 5   icao_code       85 non-null     object
 6   country_iso2    85 non-null     object
 7   geoname_id      85 non-null     object
 8   latitude        85 non-null     object
 9   longitude       85 non-null     object
 10  airport_name    85 non-null     object
 11  country_name    85 non-null     object
 12  phone_number    1 non-null      object
 13  timezone        85 non-null     object
dtypes: object(14)
memory usage: 9.4+ KB


- Se eliminan columnas nulas:

In [None]:
#Para todos los aeropuertos de Argentina, había solo un número de teléfono
#y estaba incompleto
df_airports = df_airports.drop(columns="phone_number")

Unnamed: 0,id,gmt,airport_id,iata_code,city_iata_code,icao_code,country_iso2,geoname_id,latitude,longitude,airport_name,country_name,timezone
0,5720129,-3,107,AEP,BUE,SABE,AR,6301847,-34.55622,-58.41667,Aeroparque Jorge Newbery,Argentina,America/Argentina/Buenos_Aires
1,5720136,-3,114,AFA,AFA,SAMR,AR,6300529,-34.58917,-68.40056,San Rafael,Argentina,America/Argentina/Mendoza
2,5720332,-3,310,AOL,AOL,SARL,AR,6300550,-29.683332,-57.15,Paso De Los Libres,Argentina,America/Argentina/Cordoba
3,5720363,-3,341,APZ,APZ,SAHZ,AR,7730148,-38.916668,-70.083336,Zapala,Argentina,America/Argentina/Salta
4,5720389,-3,367,ARR,ARR,SAVR,AR,7730773,-45.033333,-70.833336,Alto Rio Senguerr,Argentina,America/Argentina/Catamarca
...,...,...,...,...,...,...,...,...,...,...,...,...,...
80,5728254,-3,8232,UZU,UZU,SATU,AR,6300562,-29.778889,-58.095554,Curuzu Cuatia,Argentina,America/Argentina/Cordoba
81,5728298,-3,8276,VDM,VDM,SAVV,AR,6300568,-40.85,-63.016666,Viedma,Argentina,America/Argentina/Salta
82,5728300,-3,8278,VDR,VDR,SAOD,AR,6300541,-31.94111,-65.14222,Villa Dolores,Argentina,America/Argentina/Cordoba
83,5728357,-3,8335,VLG,VLG,SAZV,AR,6300592,-38.016666,-57.583332,Villa Gesell,Argentina,America/Argentina/Buenos_Aires


- Se agrega columna que combina latitud y longitud

In [None]:
df_airports["coordinates"] = list(zip(df_airports["latitude"], df_airports["longitude"]))

- Se divide la información de la zona horaria en continente, país y provincia, creando una columna para cada caso. Al realizar pruebas con otros países, me di cuenta que la información podía venir en distintos formatos, _"continente/país/provincia"_, _"continente/provincia"_ o nulo. Por esto mismo se toman pasos adicionales.

In [None]:
splitted_data = df_airports["timezone"].str.split("/", n=2, expand=True)

if splitted_data.shape[1]>1:
  df_airports["timezone_continental"] = splitted_data[0]
  if splitted_data.shape[1] == 3:
    df_airports["timezone_country"] = splitted_data[1]
    df_airports["timezone_state"] = splitted_data[2].str.replace("_", " ")
  else:
    df_airports["timezone_country"] = "N/A"
    df_airports["timezone_state"] = splitted_data[1].str.replace("_", " ")
else:
  df_airports["timezone_continental"] = "N/A"
  df_airports["timezone_country"] = "N/A"
  df_airports["timezone_state"] = "N/A"

- Se renombran columnas por simplificación

In [None]:
df_airports = df_airports.rename(columns={"iata_code" : "iata", "city_iata_code" : "city_iata", "icao_code" : "icao"})

- Se castea a tipos de datos adecuados. _coordinates_ se deja en tipo **object** por considerarse el más adecuado

In [None]:
df_airports["id"] = df_airports["id"].astype("int32")
df_airports["gmt"] = df_airports["gmt"].astype("int32")
df_airports["airport_id"] = df_airports["airport_id"].astype("int16")
df_airports["latitude"] = df_airports["latitude"].astype("float")
df_airports["longitude"] = df_airports["longitude"].astype("float")
df_airports["icao"] = df_airports["icao"].astype("string")
df_airports["iata"] = df_airports["iata"].astype("string")
df_airports["city_iata"] = df_airports["city_iata"].astype("string")
df_airports["country_iso2"] = df_airports["country_iso2"].astype("string")
df_airports["geoname_id"] = df_airports["geoname_id"].astype("int32")
df_airports["airport_name"] = df_airports["airport_name"].astype("string")
df_airports["country_name"] = df_airports["country_name"].astype("string")
df_airports["timezone"] = df_airports["timezone"].astype("string")
df_airports["timezone_continental"] = df_airports["timezone_continental"].astype("string")
df_airports["timezone_country"] = df_airports["timezone_country"].astype("string")
df_airports["timezone_state"] = df_airports["timezone_state"].astype("string")

df_airports.dtypes

Unnamed: 0,0
id,int32
gmt,int32
airport_id,int16
iata,string[python]
city_iata,string[python]
icao,string[python]
country_iso2,string[python]
geoname_id,int32
latitude,float64
longitude,float64


### Almacenamiento en capa silver

In [None]:
save_data_as_delta(df_airports, airports_clean_dir, storage_options=storage_options)

In [None]:
dt_airports_clean=DeltaTable(airports_clean_dir, storage_options=storage_options)
df_airports_clean=dt_airports_clean.to_pandas()
print(f"Cantidad de filas: {df_airports_clean.shape[0]}")
print(f"Cantidad de columnas: {df_airports_clean.shape[1]}")
df_airports_clean.head()

Cantidad de filas: 85
Cantidad de columnas: 18


Unnamed: 0,id,gmt,airport_id,iata,city_iata,icao,country_iso2,geoname_id,latitude,longitude,airport_name,country_name,phone_number,timezone,coordinates,timezone_continental,timezone_country,timezone_state
0,5709841,-3,107,AEP,BUE,SABE,AR,6301847,-34.55622,-58.41667,Aeroparque Jorge Newbery,Argentina,,America/Argentina/Buenos_Aires,"[-34.55622, -58.41667]",America,Argentina,Buenos Aires
1,5709848,-3,114,AFA,AFA,SAMR,AR,6300529,-34.58917,-68.40056,San Rafael,Argentina,,America/Argentina/Mendoza,"[-34.58917, -68.40056]",America,Argentina,Mendoza
2,5710044,-3,310,AOL,AOL,SARL,AR,6300550,-29.683332,-57.15,Paso De Los Libres,Argentina,,America/Argentina/Cordoba,"[-29.683332, -57.15]",America,Argentina,Cordoba
3,5710075,-3,341,APZ,APZ,SAHZ,AR,7730148,-38.916668,-70.083336,Zapala,Argentina,,America/Argentina/Salta,"[-38.916668, -70.083336]",America,Argentina,Salta
4,5710101,-3,367,ARR,ARR,SAVR,AR,7730773,-45.033333,-70.833336,Alto Rio Senguerr,Argentina,,America/Argentina/Catamarca,"[-45.033333, -70.833336]",America,Argentina,Catamarca


# Ingesta Delta
  Se produce la ingesta de datos a partir de una petición get al endpoint '/flights' de la API. Se produce un filtrado únicamente a partir del código IATA del aeropuerto deseado.

In [None]:
#extracción incremental, intentaremos obtener los vuelos cuyo
#arrivo al aerouerto de interés esté programado en la última hora
#en este caso lo haremos con el aeropuerto EZE
endpoint = "flights"
selected_iata = "EZE"
flight_date = datetime.now(timezone.utc).strftime("%Y-%m-%d")
#la idea era filtrar por fecha de hoy, pero por alguna razón la API
#restringía el acceso, devolviendo 403

#sin pasar el parámetro limit, se obtienen 100 registros por default
params={"access_key":api_key,"arr_iata":selected_iata}
flights=get_data(base_url, endpoint,  params=params)
df_flights_raw=build_table(flights, record_path="data")

## Filtrado
  Al no poder filtrar por hora directamente en la petición a la API para realizar la ingesta incremental, se deben realizar algunos pasos previos al almacenamiento en crudo. Además de seleccionar los datos de interés anidados, se crea una llave _id_ a la que se le asigna el número de vuelo.

In [None]:
#filtrado de la información en crudo:

#obtención de la información de interés:

filtered_data=[]
for flight in flights['data']:
  if flight['flight_date'] == flight_date:
    filtered_data.append({
        'id':flight['flight']['number'],
        'arrival_iata':flight['arrival']['iata'],
        'arrival_airport':flight['arrival']['airport'],
        'departure_iata':flight['departure']['iata'],
        'departure_airport':flight['departure']['airport'],
        'flight_status':flight['flight_status'],
        'scheduled':flight['arrival']['scheduled'],
        'estimated':flight['arrival']['estimated'],
        'actual':flight['arrival']['actual'],
        # Los valores en tiempo real se fuerzan a 0 en caso de ser nulos
        # ya que producía errores en la carga
        'latitude':flight['live']['latitude'] if flight['live'] else 0,
        'longitude':flight['live']['longitude'] if flight['live'] else 0,
        'altitude':flight['live']['altitude'] if flight['live'] else 0,
        # al ser un valor temporal se setea en nulo para ser manejado luego
        'updated':flight['live']['updated'] if flight['live'] else pd.NA
    })
df_filtered_data=build_table(filtered_data)
print(f"Cantidad de filas: {df_filtered_data.shape[0]}")
print(f"Cantidad de columnas: {df_filtered_data.shape[1]}")
df_filtered_data.head(10)

Cantidad de filas: 100
Cantidad de columnas: 13


Unnamed: 0,id,arrival_iata,arrival_airport,departure_iata,departure_airport,flight_status,scheduled,estimated,actual,latitude,longitude,altitude,updated
0,3048,EZE,Ezeiza Ministro Pistarini,BRC,San Carlos de Bariloche Airport,scheduled,2025-12-09T01:15:00+00:00,,,0.0,0.0,0.0,
1,1679,EZE,Ezeiza Ministro Pistarini,BRC,San Carlos de Bariloche Airport,scheduled,2025-12-09T00:15:00+00:00,,,0.0,0.0,0.0,
2,8308,EZE,Ezeiza Ministro Pistarini,BRC,San Carlos de Bariloche Airport,scheduled,2025-12-09T00:15:00+00:00,,,0.0,0.0,0.0,
3,3176,EZE,Ezeiza Ministro Pistarini,BRC,San Carlos de Bariloche Airport,scheduled,2025-12-09T00:15:00+00:00,,,0.0,0.0,0.0,
4,5955,EZE,Ezeiza Ministro Pistarini,BRC,San Carlos de Bariloche Airport,scheduled,2025-12-09T00:15:00+00:00,,,0.0,0.0,0.0,
5,7457,EZE,Ezeiza Ministro Pistarini,BRC,San Carlos de Bariloche Airport,scheduled,2025-12-09T00:15:00+00:00,,,0.0,0.0,0.0,
6,1759,EZE,Ezeiza Ministro Pistarini,FMA,El Pucu,scheduled,2025-12-08T22:40:00+00:00,,,0.0,0.0,0.0,
7,101,EZE,Ezeiza Ministro Pistarini,ATL,Hartsfield-jackson Atlanta International,scheduled,2025-12-09T09:00:00+00:00,,,0.0,0.0,0.0,
8,2375,EZE,Ezeiza Ministro Pistarini,LIM,Jorge Chavez International,scheduled,2025-12-09T02:50:00+00:00,,,0.0,0.0,0.0,
9,3019,EZE,Ezeiza Ministro Pistarini,SLA,Gen Belgrano,scheduled,2025-12-09T01:08:00+00:00,,,0.0,0.0,0.0,


## Selección de franja horaria
  Una vez filtrados los campos de interés, se producen a filtrar aquellos vuelos con una fecha y hora de llegada programada dentro de la franja deseada.

In [None]:
#configuración de la franja horaria para la carga delta
offset_hours=1

time_start = (datetime.now(timezone.utc)).strftime('%Y-%m-%d %H:00:00')
time_end = (datetime.now(timezone.utc) + timedelta(hours=offset_hours)).strftime('%Y-%m-%d %H:59:59')

#filtrado de los vuelos con arrivo programado dentro de la última hora

range_data=[]
for flight in filtered_data:
  if(flight['scheduled'] is not None):
    scheduled_date=datetime.fromisoformat(flight['scheduled']).strftime('%Y-%m-%d %H:%M:%S')
    if(scheduled_date > time_start and scheduled_date < time_end):
      range_data.append(flight)

df_range_data=build_table(range_data)
df_range_data.sort_values("scheduled", ascending=True)
print(f"Cantidad de filas: {df_range_data.shape[0]}")
print(f"Cantidad de columnas: {df_range_data.shape[1]}")
df_range_data.head(10)

Cantidad de filas: 23
Cantidad de columnas: 13


Unnamed: 0,id,arrival_iata,arrival_airport,departure_iata,departure_airport,flight_status,scheduled,estimated,actual,latitude,longitude,altitude,updated
0,1421,EZE,Ezeiza Ministro Pistarini,MDZ,El Plumerillo,scheduled,2025-12-08T20:59:00+00:00,,,0,0,0,
1,5959,EZE,Ezeiza Ministro Pistarini,MDZ,El Plumerillo,scheduled,2025-12-08T20:59:00+00:00,,,0,0,0,
2,9248,EZE,Ezeiza Ministro Pistarini,MDZ,El Plumerillo,scheduled,2025-12-08T20:59:00+00:00,,,0,0,0,
3,3128,EZE,Ezeiza Ministro Pistarini,MDZ,El Plumerillo,scheduled,2025-12-08T20:59:00+00:00,,,0,0,0,
4,7608,EZE,Ezeiza Ministro Pistarini,MDZ,El Plumerillo,scheduled,2025-12-08T20:59:00+00:00,,,0,0,0,
5,6343,EZE,Ezeiza Ministro Pistarini,MDZ,El Plumerillo,scheduled,2025-12-08T20:59:00+00:00,,,0,0,0,
6,7532,EZE,Ezeiza Ministro Pistarini,MDZ,El Plumerillo,scheduled,2025-12-08T20:59:00+00:00,,,0,0,0,
7,4214,EZE,Ezeiza Ministro Pistarini,MDZ,El Plumerillo,scheduled,2025-12-08T20:59:00+00:00,,,0,0,0,
8,628,EZE,Ezeiza Ministro Pistarini,SCL,Arturo Merino Benitez,scheduled,2025-12-08T20:45:00+00:00,,,0,0,0,
9,1789,EZE,Ezeiza Ministro Pistarini,IGR,Cataratas,scheduled,2025-12-08T20:35:00+00:00,,,0,0,0,


Se castea la columna _"scheduled"_ a datetime para poder crear las columnas _"hora"_ y _"fecha"_ que serán usadas para particionar el almacenamiento. Las columnas _"actual"_ y _"estimated"_ son casteadas para evitar problemas al momento del almacenamiento.

In [None]:
df_range_data['scheduled'] = pd.to_datetime(df_range_data['scheduled'])
df_range_data['estimated'] = pd.to_datetime(df_range_data['estimated'])
df_range_data['actual'] = pd.to_datetime(df_range_data['actual'])
df_range_data['updated'] = pd.to_datetime(df_range_data['updated'])
df_range_data["hora"] = df_range_data["scheduled"].dt.hour
df_range_data["fecha"] = df_range_data["scheduled"].dt.date


In [None]:
df_range_data.head()

Unnamed: 0,id,arrival_iata,arrival_airport,departure_iata,departure_airport,flight_status,scheduled,estimated,actual,latitude,longitude,altitude,updated,hora,fecha
0,1421,EZE,Ezeiza Ministro Pistarini,MDZ,El Plumerillo,scheduled,2025-12-08 20:59:00+00:00,NaT,NaT,0,0,0,NaT,20,2025-12-08
1,5959,EZE,Ezeiza Ministro Pistarini,MDZ,El Plumerillo,scheduled,2025-12-08 20:59:00+00:00,NaT,NaT,0,0,0,NaT,20,2025-12-08
2,9248,EZE,Ezeiza Ministro Pistarini,MDZ,El Plumerillo,scheduled,2025-12-08 20:59:00+00:00,NaT,NaT,0,0,0,NaT,20,2025-12-08
3,3128,EZE,Ezeiza Ministro Pistarini,MDZ,El Plumerillo,scheduled,2025-12-08 20:59:00+00:00,NaT,NaT,0,0,0,NaT,20,2025-12-08
4,7608,EZE,Ezeiza Ministro Pistarini,MDZ,El Plumerillo,scheduled,2025-12-08 20:59:00+00:00,NaT,NaT,0,0,0,NaT,20,2025-12-08


## Almacenamiento en modo upsert
  Ahora podemos almacenar en la capa bronze.

In [None]:
upsert_data_as_delta(
    df_range_data,
    flights_raw_dir,
    "target.scheduled = source.scheduled",
    storage_options=storage_options,
    partition_cols=["fecha", "hora"]
)

### Lectura de los datos desde capa bronze, para la hora y fecha determinada
- Se declara una función que nos permitirá obtener los intervalos en los que debemos leer los datos

In [None]:
def leer_rango_tiempo(offset, prev=0):

  """
  Función que retorna una lista compuesta por otras dos listas, una con las
  horas (separadas por día) consecutivas en orden y otra con las fechas
  consecutivas en orden que componen el rango temporal entre el momento en el
  que se llama la función y el desfasaje en horas deseado.
  Se pensó en devolver también fechas porque en los casos en que se requiera
  trabajar en un rango de 2hs o más, puede darse que se produzca un cambio
  de fecha (por ejemplo, desde las 23:00 hasta las 0:59 del dia siguiente).
  Acualmente funciona dentro de un rango de hasta 23hs

  Args:
    offset (int): cantidad de horas que se desean tener en cuenta en el rango.
    prev (int): cantidad de horas que se desean tener en cuenta antes de la hora actual.
  Returns:
    (list) Lista de la forma: [[rango_fechas], [[horas_dia1],[horas_dia2]]
  """
  if offset < 0 or offset > 23:
    raise ValueError("El argumento 'offset' debe estar dentro del rango de 0 a 23.")

  fecha_inicio = (datetime.now(timezone.utc) - timedelta(hours=prev)).strftime('%Y-%m-%d')
  fecha_fin = (datetime.now(timezone.utc) + timedelta(hours=offset)).strftime('%Y-%m-%d')
  hora_inicio = (datetime.now(timezone.utc) - timedelta(hours=prev)).hour
  hora_fin = (datetime.now(timezone.utc) + timedelta(hours=offset)).hour

  rango_fechas = []

  for fecha in pd.date_range(fecha_inicio, fecha_fin, freq='D'):
    rango_fechas.append(fecha.strftime('%Y-%m-%d'))

  rango_horas = [[], []]
  if hora_fin < hora_inicio:
    h = hora_inicio
    while h < 24:
      rango_horas[0].append(h)
      h += 1
    h = 0
    while h < hora_fin + 1:
      rango_horas[1].append(h)
      h += 1
  else:
    for h in range(hora_inicio, hora_fin + 1):
      rango_horas[0].append(h)

  return [rango_fechas, rango_horas]

In [None]:
leer_rango_tiempo(0, 1)

[['2025-12-08'], [[20, 21], []]]

- Se procede a realizar la lectura de la o las particiones correspondientes, como alternativa a la función mostrada por el docente _read_ __most_ __recent_ __partitions_ se crea un código que permita cargar varias particiones a la vez, en caso de querer trabajar con un rango más amplio de horas.

In [None]:
# Primero se debe realizar una lectura en el directorio base, no podemos
#acceder directamente a las carpetas de partición.
dt_flights_raw = DeltaTable(flights_raw_dir, storage_options=storage_options)

# Lista que contendrá los DataFrame que se obtengan
dfs = []

rango_lectura = leer_rango_tiempo(offset=0, prev=1)
rango_horas = rango_lectura[1]
rango_fechas = rango_lectura[0]

# Se itera para leer las particiones en el rango de fechas y horas requerido:
if len(rango_fechas) > 1:
  i = 0
  for fecha in rango_fechas:
    print(f"Cargando partición: fecha='{fecha}'")
    for hora in rango_horas[i]:
      print(f"Cargando partición: fecha='{fecha}', hora='{hora}'")
      try:
        # Lectura y agregado a lista de las particiones dentro del rango:
        df_hour_partition = dt_flights_raw.to_pandas(partitions=[("fecha", "=", fecha), ("hora", "=", str(hora))])
        if not df_hour_partition.empty:
            dfs.append(df_hour_partition)
        else:
            print(f"No data found for fecha='{fecha}', hora='{hora}'")
      except Exception as e:
        print(f"Could not load partition for fecha='{fecha}', hora='{hora}': {e}")
    i += 1
elif len(rango_fechas) == 1:
    for hora in rango_horas[0]:
        print(f"Cargando partición: fecha='{rango_fechas[0]}', hora='{hora}'")
        try:
            # Lectura y agregado a lista de las particiones dentro del rango:
            df_hour_partition = dt_flights_raw.to_pandas(partitions=[("fecha", "=", rango_fechas[0]), ("hora", "=", str(hora))])
            if not df_hour_partition.empty:
                dfs.append(df_hour_partition)
            else:
                print(f"No se encontraron datos para: fecha='{rango_fechas[0]}', hora='{hora}'")
        except Exception as e:
            print(f"No se pudo cargar la partición para: fecha='{rango_fechas[0]}', hora='{hora}': {e}")

# Se concatenan todos los DataFrames obtenidos
if dfs:
    df_flights = pd.concat(dfs, ignore_index=True)
    print(f"Cantidad de filas: {df_flights.shape[0]}")
    print(f"Cantidad de columnas: {df_flights.shape[1]}")
    df_flights.head()
else:
    print("No se han obtenido datos de las particiones solicitadas.")

Cargando partición: fecha='2025-12-08', hora='20'
Cargando partición: fecha='2025-12-08', hora='21'
No se encontraron datos para: fecha='2025-12-08', hora='21'
Cantidad de filas: 20
Cantidad de columnas: 15


In [None]:
df_flights.head()

Unnamed: 0,id,arrival_iata,arrival_airport,departure_iata,departure_airport,flight_status,scheduled,estimated,actual,latitude,longitude,altitude,updated,hora,fecha
0,1421,EZE,Ezeiza Ministro Pistarini,MDZ,El Plumerillo,scheduled,2025-12-08 20:59:00+00:00,NaT,NaT,0.0,0.0,0,NaT,20,2025-12-08
1,5959,EZE,Ezeiza Ministro Pistarini,MDZ,El Plumerillo,scheduled,2025-12-08 20:59:00+00:00,NaT,NaT,0.0,0.0,0,NaT,20,2025-12-08
2,9248,EZE,Ezeiza Ministro Pistarini,MDZ,El Plumerillo,scheduled,2025-12-08 20:59:00+00:00,NaT,NaT,0.0,0.0,0,NaT,20,2025-12-08
3,3128,EZE,Ezeiza Ministro Pistarini,MDZ,El Plumerillo,scheduled,2025-12-08 20:59:00+00:00,NaT,NaT,0.0,0.0,0,NaT,20,2025-12-08
4,7608,EZE,Ezeiza Ministro Pistarini,MDZ,El Plumerillo,scheduled,2025-12-08 20:59:00+00:00,NaT,NaT,0.0,0.0,0,NaT,20,2025-12-08


## Procesamiento de datos
---
Se observan los datos en búsqueda de optimizaciones:

In [None]:
df_flights.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 20 entries, 0 to 19
Data columns (total 15 columns):
 #   Column             Non-Null Count  Dtype              
---  ------             --------------  -----              
 0   id                 20 non-null     object             
 1   arrival_iata       20 non-null     object             
 2   arrival_airport    20 non-null     object             
 3   departure_iata     20 non-null     object             
 4   departure_airport  20 non-null     object             
 5   flight_status      20 non-null     object             
 6   scheduled          20 non-null     datetime64[us, UTC]
 7   estimated          0 non-null      datetime64[us, UTC]
 8   actual             0 non-null      datetime64[us]     
 9   latitude           20 non-null     float64            
 10  longitude          20 non-null     float64            
 11  altitude           20 non-null     int64              
 12  updated            0 non-null      datetime64[us, UT

- Se cambia el nombre de la columna _flight_status_ a _status_ para simplificar

In [None]:
df_flights = df_flights.rename(columns={"flight_status" : "status"})

- Se castean las columnas a tipos adecuados:

In [None]:
df_flights["id"] = df_flights["id"].astype("int16")
df_flights["arrival_iata"] = df_flights["arrival_iata"].astype("string")
df_flights["arrival_airport"] = df_flights["arrival_airport"].astype("string")
df_flights["departure_iata"] = df_flights["departure_iata"].astype("string")
df_flights["departure_airport"] = df_flights["departure_airport"].astype("string")
df_flights["status"] = df_flights["status"].astype("string")
df_flights["latitude"] = df_flights["latitude"].astype("float")
df_flights["longitude"] = df_flights["longitude"].astype("float")
df_flights["altitude"] = df_flights["altitude"].astype("float")
df_flights["hora"] = df_flights["hora"].astype("int8")
df_flights["fecha"] = df_flights["fecha"].astype("string")

df_flights.dtypes

Unnamed: 0,0
id,int16
arrival_iata,string[python]
arrival_airport,string[python]
departure_iata,string[python]
departure_airport,string[python]
status,string[python]
scheduled,"datetime64[us, UTC]"
estimated,"datetime64[us, UTC]"
actual,"datetime64[ns, UTC]"
latitude,float64


- Reemplazo de valores nulos, utilizando valores extremos o strings:

In [None]:
df_flights["updated"] = df_flights["updated"].fillna("Sin Datos")
df_flights["estimated"] = df_flights["estimated"].fillna(pd.to_datetime('1900-01-01T00:00:00+00:00'))
df_flights["actual"] = df_flights["actual"].fillna(pd.to_datetime('1900-01-01T00:00:00+00:00'))

  df_flights["actual"] = df_flights["actual"].fillna(pd.to_datetime('1900-01-01T00:00:00+00:00'))


- Se crea una columna que indica si el vuelo está a tiempo o se retrasó:

In [None]:
df_flights["is_late"] = df_flights["estimated"] > df_flights["scheduled"]

df_flights.head()

Unnamed: 0,id,arrival_iata,arrival_airport,departure_iata,departure_airport,status,scheduled,estimated,actual,latitude,longitude,altitude,updated,hora,fecha,is_late
0,1421,EZE,Ezeiza Ministro Pistarini,MDZ,El Plumerillo,scheduled,2025-12-08 20:59:00+00:00,1900-01-01 00:00:00+00:00,1900-01-01 00:00:00+00:00,0.0,0.0,0.0,Sin Datos,20,2025-12-08,False
1,5959,EZE,Ezeiza Ministro Pistarini,MDZ,El Plumerillo,scheduled,2025-12-08 20:59:00+00:00,1900-01-01 00:00:00+00:00,1900-01-01 00:00:00+00:00,0.0,0.0,0.0,Sin Datos,20,2025-12-08,False
2,9248,EZE,Ezeiza Ministro Pistarini,MDZ,El Plumerillo,scheduled,2025-12-08 20:59:00+00:00,1900-01-01 00:00:00+00:00,1900-01-01 00:00:00+00:00,0.0,0.0,0.0,Sin Datos,20,2025-12-08,False
3,3128,EZE,Ezeiza Ministro Pistarini,MDZ,El Plumerillo,scheduled,2025-12-08 20:59:00+00:00,1900-01-01 00:00:00+00:00,1900-01-01 00:00:00+00:00,0.0,0.0,0.0,Sin Datos,20,2025-12-08,False
4,7608,EZE,Ezeiza Ministro Pistarini,MDZ,El Plumerillo,scheduled,2025-12-08 20:59:00+00:00,1900-01-01 00:00:00+00:00,1900-01-01 00:00:00+00:00,0.0,0.0,0.0,Sin Datos,20,2025-12-08,False


### Almacenamiento de datos limpios en capa silver:

In [None]:
upsert_data_as_delta(
    df_flights,
    flights_clean_dir,
    "target.scheduled = source.scheduled",
    storage_options=storage_options,
    partition_cols=["fecha", "hora"]
)

### Lectura desde capa silver:

In [None]:
df_silver_flights = read_most_recent_partition(flights_clean_dir, storage_options=storage_options)
df_silver_flights.head()

Unnamed: 0,id,arrival_iata,arrival_airport,departure_iata,departure_airport,status,scheduled,estimated,actual,latitude,longitude,altitude,updated,hora,fecha,is_late
0,1421,EZE,Ezeiza Ministro Pistarini,MDZ,El Plumerillo,scheduled,2025-12-08 20:59:00+00:00,1900-01-01 00:00:00+00:00,1900-01-01 00:00:00+00:00,0.0,0.0,0.0,Sin Datos,20,2025-12-08,False
1,5959,EZE,Ezeiza Ministro Pistarini,MDZ,El Plumerillo,scheduled,2025-12-08 20:59:00+00:00,1900-01-01 00:00:00+00:00,1900-01-01 00:00:00+00:00,0.0,0.0,0.0,Sin Datos,20,2025-12-08,False
2,9248,EZE,Ezeiza Ministro Pistarini,MDZ,El Plumerillo,scheduled,2025-12-08 20:59:00+00:00,1900-01-01 00:00:00+00:00,1900-01-01 00:00:00+00:00,0.0,0.0,0.0,Sin Datos,20,2025-12-08,False
3,3128,EZE,Ezeiza Ministro Pistarini,MDZ,El Plumerillo,scheduled,2025-12-08 20:59:00+00:00,1900-01-01 00:00:00+00:00,1900-01-01 00:00:00+00:00,0.0,0.0,0.0,Sin Datos,20,2025-12-08,False
4,7608,EZE,Ezeiza Ministro Pistarini,MDZ,El Plumerillo,scheduled,2025-12-08 20:59:00+00:00,1900-01-01 00:00:00+00:00,1900-01-01 00:00:00+00:00,0.0,0.0,0.0,Sin Datos,20,2025-12-08,False


### Agregaciones:

In [None]:
df_sumarized = pd.pivot_table(
    df_silver_flights,
    values="altitude", # Columna donde se aplican las agregaciones
    index=["arrival_iata", "departure_iata"], # GROUP BY
    aggfunc=["mean", "min", "max", "std"] # Tipos de agregaciones a aplicar sobre value
    )
df_sumarized.head()

Unnamed: 0_level_0,Unnamed: 1_level_0,mean,min,max,std
Unnamed: 0_level_1,Unnamed: 1_level_1,altitude,altitude,altitude,altitude
arrival_iata,departure_iata,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2
EZE,COR,0.0,0.0,0.0,0.0
EZE,IGR,0.0,0.0,0.0,0.0
EZE,MDZ,0.0,0.0,0.0,0.0
EZE,SCL,0.0,0.0,0.0,


### Almacenamiento en capa gold

In [None]:
summarized_dir = f"{gold_dir}/summarized"
save_data_as_delta(df_sumarized, summarized_dir, storage_options=storage_options)

---
## Herramientas adicionales utilizadas durante el desarrollo del TP
---

### Generador de datos falsos con Faker:
  Se crea un generador de datos falsos para economizar los pedidos a la API, ya que la versión gratuita permite 100 solicitudes mensuales.

In [None]:
fake = Faker()

FAKE_DEPARTURE_DATA = [
    {"departure_airport": "Miami International Airport", "departure_iata": "MIA"},
    {"departure_airport": "Aeroporto Internacional Guarulhos", "departure_iata": "GRU"},
    {"departure_airport": "Charles De Gaulle", "departure_iata": "CDG"},
    {"departure_airport": "Barajas", "departure_iata": "MAD"},
    {"departure_airport": "Cataratas", "departure_iata": "IGR"},
    {"departure_airport": "El Plumerillo", "departure_iata": "MDZ"},
    {"departure_airport": "El Nuevo Dorado International", "departure_iata": "BOG"}
]

FAKE_STATUS = ["scheduled", "active", "canceled", "landed"]

def generate_fake_data(n_registers, date_str, arrival_airport="Aeroparque Jorge Newbery", arrival_iata= "EZE", seed=0):
  fake.seed_instance(seed)
  flights = []
  start_dt = datetime.strptime(date_str, '%Y-%m-%dT%H:%M:%S+00:00')
  end_scheduled_dt = start_dt + timedelta(hours=1) - timedelta(seconds=1)
  end_dt = start_dt + timedelta(hours=3) - timedelta(seconds=1)

  for _ in range(n_registers):
    departure = random.choice(FAKE_DEPARTURE_DATA)
    sch_time = fake.date_time_between_dates(datetime_start=start_dt, datetime_end=end_scheduled_dt).strftime('%Y-%m-%dT%H:%M:00+00:00')
    est_time = fake.date_time_between_dates(datetime_start=start_dt, datetime_end=end_dt).strftime('%Y-%m-%dT%H:%M:00+00:00')
    act_time = fake.date_time_between_dates(datetime_start=datetime.strptime(est_time, '%Y-%m-%dT%H:%M:00+00:00'), datetime_end=(datetime.strptime(est_time, '%Y-%m-%dT%H:%M:00+00:00') +timedelta(hours=1))).strftime('%Y-%m-%dT%H:%M:00+00:00')
    upd_time = fake.date_time_between_dates(datetime_start=start_dt, datetime_end=end_scheduled_dt).strftime('%Y-%m-%dT%H:%M:00+00:00')
    data = {
      "ID": fake.random_int(min=1000, max=9999),
      "arrival_iata": arrival_iata,
      "arrival_airport": arrival_airport,
      "departure_iata": departure["departure_iata"],
      "departure_airport": departure["departure_airport"],
      "flight_status": random.choice(FAKE_STATUS),
      "scheduled": sch_time,
      "estimated": est_time,
      "actual": act_time,
      "latitude": fake.latitude(),
      "longitude": fake.longitude(),
      "altitude": fake.random_int(min=0, max=1000),
      "updated": upd_time
    }

    flights.append(data)
  return pd.DataFrame(flights)

In [None]:
actual_date = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S+00:00')
df_fake_flights = generate_fake_data(30, actual_date)
df_fake_flights["scheduled"] = pd.to_datetime(df_fake_flights["scheduled"])
df_fake_flights["hora"] = df_fake_flights["scheduled"].dt.hour
df_fake_flights["fecha"] = df_fake_flights["scheduled"].dt.date
df_fake_flights.head()


Unnamed: 0,ID,arrival_iata,arrival_airport,departure_iata,departure_airport,flight_status,scheduled,estimated,actual,latitude,longitude,altitude,updated,hora,fecha
0,9376,EZE,Aeroparque Jorge Newbery,MAD,Barajas,scheduled,2025-12-08 21:31:00+00:00,2025-12-08T22:57:00+00:00,2025-12-08T23:22:00+00:00,40.440222,37.397359,940,2025-12-08T20:56:00+00:00,21,2025-12-08
1,4578,EZE,Aeroparque Jorge Newbery,BOG,El Nuevo Dorado International,landed,2025-12-08 21:28:00+00:00,2025-12-08T21:35:00+00:00,2025-12-08T22:03:00+00:00,45.475847,-105.227997,288,2025-12-08T21:16:00+00:00,21,2025-12-08
2,9725,EZE,Aeroparque Jorge Newbery,BOG,El Nuevo Dorado International,canceled,2025-12-08 20:49:00+00:00,2025-12-08T20:58:00+00:00,2025-12-08T21:45:00+00:00,71.571856,-101.100699,317,2025-12-08T21:40:00+00:00,20,2025-12-08
3,2649,EZE,Aeroparque Jorge Newbery,GRU,Aeroporto Internacional Guarulhos,scheduled,2025-12-08 20:47:00+00:00,2025-12-08T20:54:00+00:00,2025-12-08T21:45:00+00:00,4.970136,53.094228,323,2025-12-08T21:01:00+00:00,20,2025-12-08
4,9541,EZE,Aeroparque Jorge Newbery,GRU,Aeroporto Internacional Guarulhos,landed,2025-12-08 21:17:00+00:00,2025-12-08T23:25:00+00:00,2025-12-09T00:22:00+00:00,-20.074628,-146.563904,824,2025-12-08T21:09:00+00:00,21,2025-12-08


In [None]:
df_fake_flights["id"] = df_flights["id"].astype("int16")
df_fake_flights["arrival_iata"] = df_flights["arrival_iata"].astype("string")
df_fake_flights["arrival_airport"] = df_flights["arrival_airport"].astype("string")
df_fake_flights["departure_iata"] = df_flights["departure_iata"].astype("string")
df_fake_flights["departure_airport"] = df_flights["departure_airport"].astype("string")
df_fake_flights["flight_status"] = df_flights["flight_status"].astype("string")
df_fake_flights["latitude"] = df_flights["latitude"].astype("float")
df_fake_flights["longitude"] = df_flights["longitude"].astype("float")
df_fake_flights["altitude"] = df_flights["altitude"].astype("float")
df_fake_flights["hora"] = df_flights["hora"].astype("int8")
df_fake_flights["fecha"] = pd.to_datetime(df_flights["fecha"])

df_fake_flights.dtypes

Unnamed: 0,0
ID,int64
arrival_iata,string[python]
arrival_airport,string[python]
departure_iata,string[python]
departure_airport,string[python]
flight_status,string[python]
scheduled,"datetime64[ns, UTC]"
estimated,object
actual,object
latitude,float64


## Reporte de perfilado de datos YData

In [None]:
!pip install ydata-profiling

Collecting ydata-profiling
  Downloading ydata_profiling-4.18.0-py2.py3-none-any.whl.metadata (22 kB)
Collecting visions<0.8.2,>=0.7.5 (from visions[type_image_path]<0.8.2,>=0.7.5->ydata-profiling)
  Downloading visions-0.8.1-py3-none-any.whl.metadata (11 kB)
Collecting minify-html>=0.15.0 (from ydata-profiling)
  Downloading minify_html-0.18.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (18 kB)
Collecting filetype>=1.0.0 (from ydata-profiling)
  Downloading filetype-1.2.0-py2.py3-none-any.whl.metadata (6.5 kB)
Collecting phik<0.13,>=0.12.5 (from ydata-profiling)
  Downloading phik-0.12.5-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl.metadata (5.6 kB)
Collecting multimethod<2,>=1.4 (from ydata-profiling)
  Downloading multimethod-1.12-py3-none-any.whl.metadata (9.6 kB)
Collecting imagehash==4.3.2 (from ydata-profiling)
  Downloading ImageHash-4.3.2-py2.py3-none-any.whl.metadata (8.4 kB)
Collecting dacite<2,>=1.9 (from ydata-profiling)
  Downloading

In [None]:
from ydata_profiling import ProfileReport
profile = ProfileReport(df_flights, title="Pandas Profiling Report")
profile