<a href="https://colab.research.google.com/github/Mondin0/data-eng/blob/main/CEL_Extracci%C3%B3n_y_almacenamiento_en_Delta_APIs.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install requests
!pip install deltalake
!pip install pyarrow

Collecting deltalake
  Downloading deltalake-0.21.0-cp38-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.0 kB)
Downloading deltalake-0.21.0-cp38-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (36.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m36.3/36.3 MB[0m [31m26.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: deltalake
Successfully installed deltalake-0.21.0


In [None]:
import requests
import pandas as pd
import pyarrow as pa
from deltalake import write_deltalake, DeltaTable
from deltalake.exceptions import TableNotFoundError
from datetime import datetime, timedelta

In [None]:
def get_data(base_url, endpoint, data_field=None, params=None, headers=None):
    """
    Realiza una solicitud GET a una API para obtener datos.

    Parámetros:
    base_url (str): La URL base de la API.
    endpoint (str): El endpoint de la API al que se realizará la solicitud.
    data_field (str): Atribudo del json de respuesta donde estará la lista
    de objetos con los datos que requerimos
    params (dict): Parámetros de consulta para enviar con la solicitud.
    headers (dict): Encabezados para enviar con la solicitud.

    Retorna:
    dict: Los datos obtenidos de la API en formato JSON.
    """
    try:
        endpoint_url = f"{base_url}/{endpoint}"
        response = requests.get(endpoint_url, params=params, headers=headers)
        response.raise_for_status()  # Levanta una excepción si hay un error en la respuesta HTTP.

        # Verificar si los datos están en formato JSON.
        try:
            data = response.json()
            if data_field:
              data = data[data_field]
        except:
            print("El formato de respuesta no es el esperado")
            return None
        return data

    except requests.exceptions.RequestException as e:
        # Capturar cualquier error de solicitud, como errores HTTP.
        print(f"La petición ha fallado. Código de error : {e}")
        return None

def build_table(json_data, record_path=None):
    """
    Construye un DataFrame de pandas a partir de datos en formato JSON.

    Parámetros:
    json_data (dict): Los datos en formato JSON obtenidos de una API.

    Retorna:
    DataFrame: Un DataFrame de pandas que contiene los datos.
    """
    try:
        df = pd.json_normalize(
            json_data,
            record_path)
        return df
    except:
        print("Los datos no están en el formato esperado")
        return None

def save_data_as_delta(df, path, mode="overwrite", partition_cols=None):
    """
    Guarda un dataframe en formato Delta Lake en la ruta especificada.
    A su vez, es capaz de particionar el dataframe por una o varias columnas.
    Por defecto, el modo de guardado es "overwrite".

    Args:
      df (pd.DataFrame): El dataframe a guardar.
      path (str): La ruta donde se guardará el dataframe en formato Delta Lake.
      mode (str): El modo de guardado. Son los modos que soporta la libreria
      deltalake: "overwrite", "append", "error", "ignore".
      partition_cols (list or str): La/s columna/s por las que se particionará el
      dataframe. Si no se especifica, no se particionará.
    """
    write_deltalake(
        path, df, mode=mode, partition_by=partition_cols
    )

def save_new_data_as_delta(new_data, data_path, predicate, partition_cols=None):
    """
    Guarda solo nuevos datos en formato Delta Lake usando la operación MERGE,
    comparando los datos ya cargados con los datos que se desean almacenar
    asegurando que no se guarden registros duplicados.

    Args:
      new_data (pd.DataFrame): Los datos que se desean guardar.
      data_path (str): La ruta donde se guardará el dataframe en formato Delta Lake.
      predicate (str): La condición de predicado para la operación MERGE.
    """

    try:
      dt = DeltaTable(data_path)
      new_data_pa = pa.Table.from_pandas(new_data)
      # Se insertan en target, datos de source que no existen en target
      dt.merge(
          source=new_data_pa,
          source_alias="source",
          target_alias="target",
          predicate=predicate
      ) \
      .when_not_matched_insert_all() \
      .execute()

    # Si no existe la tabla Delta Lake, se guarda como nueva
    except TableNotFoundError:
      save_data_as_delta(new_data, data_path, partition_cols=partition_cols)

def upsert_data_as_delta(data, data_path, predicate):
    """
    Guardar datos en formato Delta Lake usando la operacion MERGE.
    Cuando no haya registros coincidentes, se insertarán nuevos registros.
    Cuando haya registros coincidentes, se actualizarán los campos.

    Args:
      data (pd.DataFrame): Los datos que se desean guardar.
      data_path (str): La ruta donde se guardará el dataframe en formato Delta Lake.
      predicate (str): La condición de predicado para la operación MERGE.
    """
    try:
        dt = DeltaTable(data_path)
        data_pa = pa.Table.from_pandas(data)
        dt.merge(
            source=data_pa,
            source_alias="source",
            target_alias="target",
            predicate=predicate
        ) \
        .when_matched_update_all() \
        .when_not_matched_insert_all() \
        .execute()
    except TableNotFoundError:
        save_data_as_delta(data, data_path)

In [None]:
base_url = "https://api.luchtmeetnet.nl/open_api"

### Obtener datos de todas las estaciones

In [None]:
# Obtener todas las estaciones
endpoint = "stations"
params = {"organisation_id": 2}

stations = get_data(base_url, endpoint, data_field="data", params=params)
stations

[{'number': 'NL10248', 'location': 'Nistelrode-Gagelstraat'},
 {'number': 'NL10320', 'location': 'Burgh-Haamstede'},
 {'number': 'NL10404', 'location': 'Den Haag-Rebecquestraat'},
 {'number': 'NL10445', 'location': 'Den Haag-Amsterdamse Veerkade'},
 {'number': 'NL10246', 'location': 'Fijnaart-Zwingelspaansedijk'},
 {'number': 'NL10301', 'location': 'Zierikzee-Lange Slikweg'},
 {'number': 'NL10938', 'location': 'Groningen-Nijensteinheerd'},
 {'number': 'NL10107', 'location': 'Posterholt-Vlodropperweg'},
 {'number': 'NL10722', 'location': 'Eibergen-Lintveldseweg'},
 {'number': 'NL10929', 'location': 'Valthermond-Noorderdiep'},
 {'number': 'NL10617', 'location': 'Biddinghuizen-Kuilweg'},
 {'number': 'NL10538', 'location': 'Wieringerwerf-Medemblikkerweg'},
 {'number': 'NL10741', 'location': 'Nijmegen-Graafseweg'},
 {'number': 'NL10738', 'location': 'Wekerom-Riemterdijk'},
 {'number': 'NL10138', 'location': 'Heerlen-Jamboreepad'},
 {'number': 'NL10133', 'location': 'Wijnandsrade-Opfergeltst

### Obtener detalles de cada estación

In [None]:
# Obtener detalles de cada station
all_stations = []

for station in stations:
  endpoint = f"stations/{station['number']}"

  station_details = get_data(base_url, endpoint, "data")
  if station_details:
    station_details["number"] = station["number"]
    station_details["location"] = station["location"]
    station_details.pop("province", None)
    all_stations.append(station_details)

In [None]:
df_stations = build_table(all_stations)

In [None]:
df_stations.head()

Unnamed: 0,type,components,municipality,url,organisation,location,year_start,number,geometry.type,geometry.coordinates,description.NL,description.EN
0,Regional,"[NO2, NO, PM10, NH3, LKI, PM25]",Bernheze,,RIVM,Nistelrode-Gagelstraat,2022.0,NL10248,point,"[5.5433281, 51.69818779]",Nistelrode-Gagelstraat,Nistelrode
1,Regional,"[NH3, LKI]",Schouwen-Duiveland,,RIVM,Burgh-Haamstede,2023.0,NL10320,point,"[3.7145, 51.70644]",Burgh-Haamstede,nieuw station
2,Municipal,"[NO, NO2, O3, PM10, PM25]",'s-Gravenhage,,RIVM,Den Haag-Rebecquestraat,,NL10404,point,"[4.289185, 52.077148]",Den Haag-Rebecquestraat,Den Haag-Rebecquestraat
3,Traffic,"[NO, PM10, NO2]",'s-Gravenhage,,RIVM,Den Haag-Amsterdamse Veerkade,,NL10445,point,"[4.315872, 52.075071]",Den Haag-Amsterdamse Veerkade,Den Haag-Amsterdamse Veerkade
4,Regional,"[NO, PM10, NO2]",Moerdijk,,RIVM,Fijnaart-Zwingelspaansedijk,,NL10246,point,"[4.515271, 51.653729]",Fijnaart-Zwingelspaansedijk,Fijnaart-Zwingelspaansedijk


#### Guardar, opcion 1

In [None]:
bronze_dir = "datalake/bronze/luchtmeetnet_api"
stations_raw_dir = f"{bronze_dir}/stations_opc1"

In [None]:
save_data_as_delta(df_stations, stations_raw_dir, mode="append")

In [None]:
# A modo de chequeo, leemos los datos guardados y contamos la cantidad de fila
dt = DeltaTable(stations_raw_dir)
print(f"Cant de filas: {dt.to_pandas().shape[0]}")

Cant de filas: 25


In [None]:
# Que pasa si volvemos a guardar repitiendo el codigo
save_data_as_delta(df_stations, stations_raw_dir, mode="append")
dt = DeltaTable(stations_raw_dir)
print(f"Cant de filas: {dt.to_pandas().shape[0]}")

Cant de filas: 50


In [None]:
# Probamos el uso de overwrite
save_data_as_delta(df_stations, stations_raw_dir, mode="overwrite")
dt = DeltaTable(stations_raw_dir)
print(f"Cant de filas: {dt.to_pandas().shape[0]}")

Cant de filas: 25


In [None]:
# Si volvemos a ejecutar en modo overwrite, se mantiene la cantidad de filas
# Pero se crea un nuevo archivo interno
save_data_as_delta(df_stations, stations_raw_dir, mode="overwrite")
dt = DeltaTable(stations_raw_dir)
print(f"Cant de filas: {dt.to_pandas().shape[0]}")

Cant de filas: 25


#### Guardar, opcion 2

In [None]:
stations_raw_dir = f"{bronze_dir}/stations"
save_new_data_as_delta(
    df_stations,
    stations_raw_dir,
    "target.number = source.number"
    )

### Obtener mediciones de la última hora cada diferentes estaciones

In [None]:
# Obtener mediciones

endpoint = "measurements"

start_date = datetime.utcnow() - timedelta(hours=5)

end_date = start_date.strftime("%Y-%m-%dT%H:59:59Z")
start_date = start_date.strftime("%Y-%m-%dT%H:00:00Z")

params = {
    "start": start_date,
    "end": end_date
    }


measurements = get_data(base_url, endpoint, params=params)
df_measurements = build_table(measurements, "data")

In [None]:
df_measurements.sort_values("timestamp_measured", ascending=True)

Unnamed: 0,station_number,value,timestamp_measured,formula
0,NL49002,79.30,2024-11-05T22:00:00+00:00,NO
322,NL10450,51.79,2024-11-05T22:00:00+00:00,PM10
321,NL10450,0.14,2024-11-05T22:00:00+00:00,O3
320,NL10450,41.22,2024-11-05T22:00:00+00:00,NO2
319,NL10450,46.49,2024-11-05T22:00:00+00:00,NO
...,...,...,...,...
152,NL10404,53.07,2024-11-05T22:00:00+00:00,PM10
151,NL10404,-0.01,2024-11-05T22:00:00+00:00,O3
150,NL10404,41.16,2024-11-05T22:00:00+00:00,NO2
162,NL10437,10.83,2024-11-05T22:00:00+00:00,PM10


#### Opcion 1

In [None]:
df_measurements["timestamp_measured"] = pd.to_datetime(df_measurements.timestamp_measured)
df_measurements["fecha"] = df_measurements.timestamp_measured.dt.date
df_measurements["hora"] = df_measurements.timestamp_measured.dt.hour

measurements_raw_dir = f"{bronze_dir}/measurements"
save_data_as_delta(df_measurements, measurements_raw_dir, partition_cols=["fecha", "hora"])

#### Opcion 2

In [None]:
df_measurements["timestamp_measured"] = pd.to_datetime(df_measurements.timestamp_measured)
df_measurements["fecha"] = df_measurements.timestamp_measured.dt.date
df_measurements["hora"] = df_measurements.timestamp_measured.dt.hour

measurements_raw_dir = f"{bronze_dir}/measurements_opc2"
save_new_data_as_delta(
    df_measurements,
    measurements_raw_dir,
    """target.timestamp_measured = source.timestamp_measured
    AND target.station_number = source.station_number
    AND target.formula = source.formula""",
    partition_cols=["fecha", "hora"]
    )