# PREPARACIÓN DEL ENTORNO (LIBRERIAS)




**Instalación de librerias**

In [None]:
!pip install requests
!pip install deltalake
!pip install pyarrow
!pip install deltalake

**Importación de librerias**

In [2]:
import requests
import urllib3
import certifi
import pandas as pd
import pyarrow as pa
from pprint import pprint
from datetime import datetime, timedelta
from deltalake import write_deltalake, DeltaTable
from deltalake.exceptions import TableNotFoundError
from configparser import ConfigParser

# FUNCIONES

**Función para realizar una solicitud GET a una API**

In [3]:
# FUNCIÓN PARA REALIZAR GET A ALGUNA API
def get_data(base_url, endpoint, data_field, params=None, headers=None):

    """
    REALIZA UNA SOLICITUD GET A UNA API PARA OBTENER DATOS.

    PARÁMETROS:
    base_url (str): LA URL DE LA BASE API.
    endpoint (str): EL ENDPOINT DE LA API AL QUE SE REALIZARÁ LA SOLICITUD.
    params (dict): PARÁMETROS DE CONSULTA PARA ENVIAR CON LA SOLICITUD.
    data_field (str): EL NOMBRE DEL CAMPO EN EL JSON QUE CONTIENE LOS DATOS.
    headers (dict): ENCABEZADOS PARA ENVIAR CON LA SOLICITUD.

    RETORNA:
    dict: LOS DATOS OBTENIDOS DE LA API EN FORMATO JSON.
    """
    try:
        # UNIMOS LA URL CON EL ENDPOINT.
        endpoint_url = f"{base_url}/{endpoint}"

        # EJECUTA LA CONSULTA Y LO ALMACENA EN response.

        # SI LA base_url ES DISTINTA DE LA URL DE LA API DEL BCRA
        if base_url != "https://api.bcra.gob.ar":
          # ASIGNACIÓN NORMAL DE response
          response = requests.get(endpoint_url, params=params, headers=headers)
        else:
          # ASIGNACIÓN PERSONALIZADA PARA BCRA EVITAR ERRORES DE SSL

          # DESACTIVAR LA ADVERTENCIA DE SSL (NO ES RECOMENDABLE)
          urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

          # OPCIÓN 1: DESACTIVAR EL verify (NO ES RECOMENDABLE)
          response = requests.get(endpoint_url, params=params, headers=headers, verify=False)

          # OPCIÓN 2: REALIZAR LA SOLICITUD Y PASAR EL ARCHIVO DE CERTIFICADOS DE 'certifi' PARA LA VERIFICACIÓN SSL
          #response = requests.get(endpoint_url, params=params, headers=headers, verify=certifi.where())

        # LEVANTA UNA EXCEPCIÓN SI HAY UN ERROR EN LA RESPUESTA HTTP.
        response.raise_for_status()

        # VERIFICA SI LOS DATOS ESTAN EN FORMATO JSON.
        try:
            # ACCEDEMOS AL JSON.
            data = response.json()
            # ACCEDEMOS AL ARREGLO DENTRO DEL JSON SI data_field FUE ASIGNADO.
            if data_field != None:
              # FUE ASIGNADO.
              data = data[data_field]
        except:
            # NO ESTA EN FORMATO JSON.
            print("El formato de respuesta no es el esperado")
            # NO RETORNA NADA.
            return None

        # RETORNA EL JSON.
        return data

    except requests.exceptions.RequestException as e:
        # CAPTURA CUALQUIER ERROR DE SOLICITUD (COMO ERRORES HTTP).
        print(f"La petición ha fallado. Código de error : {e}")
        # NO RETORNA NADA.
        return None

**Función para construir un DATAFRAME**

In [4]:
# FUNCIÓN PARA CONTRUIR UN DATAFRAME
def build_table(json_data):
    """
    CONSTRUYE UN DATAFRAME EN PANDAS A PARTIR DE DATOS EN FORMATO JSON.

    PARÁMETROS:
    json_data (dict): LOS DATOS EN FORMATO JSON OBTENIDOS DE UNA API.

    RETORNA:
    DATAFRAME: UN DATAFRAME DE PANADS QUE OBTIENE LOS DATOS.
    """
    try:
        # TRANSFORMA EL JSON OBTENIDO EN UN DATAFRAME
        df = pd.json_normalize(json_data)
        # RETORNA EL DATAFRAME
        return df
    except:
        # ERROR
        print("Los datos no están en el formato esperado")
        # NO RETORNA NADA
        return None

**Función para mostrar de forma más personalizada el DATAFRAME**

In [5]:
def improved_dataframe(json_data):
    """
    CONVIERTE UNA LISTA DE JSON EN UN DATAFRAME ESTRUCTURADO.

    PARÁMETROS:
    json_data (list): UNA LISTA DE DICCIONARIOS DONDE CADA DICCIONARIO CONTIENE 'fecha' Y 'detalle'.

    RETORNA:
    pd.DataFrame: UN DATAFRAME DE PANDAS CON LAS COLUMNAS ESTRUCTURADAS COMO 'Fecha', 'CodigoMoneda', 'Descripcion', 'TipoCotizacion', y 'TipoPase'.
    """

    # INICIALIZA UNA LISTA VACÍA PARA ALMACENAR LOS DATOS ESTRUCTURADOS
    data = []

    # ITERA A TRAVÉS DE LOS ELEMENTOS DEL JSON
    for entry in json_data:
        # EXTRAE LA FECHA (FECHA)
        fecha = entry['fecha']

        # ITERA SOBRE 'detalle' Y DESESTRUCTURA EL DICIONARIO ANIDADO
        for detalle in entry['detalle']:
            # CREA UN DICCIONARIO CON LA FECHA Y LOS VALORES DEL DETALLE
            data.append({
                'fecha': fecha,
                'codigoMoneda': detalle['codigoMoneda'],
                'descripcion': detalle['descripcion'],
                'tipoCotizacion': detalle['tipoCotizacion'],
                'tipoPase': detalle['tipoPase']
            })

    # CONVIERTE LA LISTA DE DICCIONARIOS EN UN DATAFRAME
    df = pd.DataFrame(data)

    # RETORNA EL DATAFRAME ESTRUCTURADO
    return df



**Función para guardar todos los datos en formato DELTA LAKE**

In [6]:
# CREA UNA TABLA DELTA LAKE NUEVA
# SIEMPRE AGREGA
def save_data_as_delta(df, path, mode="overwrite", partition_cols=None):
    """
    Guarda un dataframe en formato Delta Lake en la ruta especificada.
    A su vez, es capaz de particionar el dataframe por una o varias columnas.
    Por defecto, el modo de guardado es "overwrite".

    Args:
      df (pd.DataFrame): El dataframe a guardar.
      path (str): La ruta donde se guardará el dataframe en formato Delta Lake.
      mode (str): El modo de guardado. Son los modos que soporta la libreria
      deltalake: "overwrite", "append", "error", "ignore".
      partition_cols (list or str): La/s columna/s por las que se particionará el
      dataframe. Si no se especifica, no se particionará.
    """
    write_deltalake(
        path, df, mode=mode, partition_by=partition_cols
    )

**Función para guardar solo datos nuevos en formato DELTA LAKE usando MERGE**

In [7]:
# FUNCIÓN PARA GUARDAR SOLO NUEVOS DATOS EN FORMATO DELTA LAKE USANDO MERGE
# SI HAY COINCIDENCIAS -> NADA
# SI NO HAY COINCIDENCIAS -> REGISTRA
def save_new_data_as_delta(new_data, data_path, predicate, partition_cols=None):
    """
    Guarda solo nuevos datos en formato Delta Lake usando la operación MERGE,
    comparando los datos ya cargados con los datos que se desean almacenar
    asegurando que no se guarden registros duplicados.

    Args:
      new_data (pd.DataFrame): Los datos que se desean guardar.
      data_path (str): La ruta donde se guardará el dataframe en formato Delta Lake.
      predicate (str): La condición de predicado para la operación MERGE.
    """

    try:
      dt = DeltaTable(data_path)
      new_data_pa = pa.Table.from_pandas(new_data)
      # SE INSERTAN EN TARGET, DATOS DE SOURCE QUE NO EXISTEN EN TARGET
      dt.merge(
          source=new_data_pa,
          source_alias="source",
          target_alias="target",
          predicate=predicate
      ) \
      .when_not_matched_insert_all() \
      .execute()# NO HAY COINCIDENCIAS, LO INSERTA

    # SI NO EXISTE LA TABLA DELTA LAKE, SE GUARDA COMO NUEVA
    except TableNotFoundError:
      save_data_as_delta(new_data, data_path, partition_cols=partition_cols)

**Función para actualizar los datos ya registrados y agregar los nuevos en formato DELTA LAKE usando MERGE completo**

In [8]:
# FUNCIÓN QUE HACE EL MARGE COMPLETO
# SI HAY COINCIDENCIAS -> ACTUALIZA
# SI NO HAY COINCIDENCIAS -> REGISTRA
def upsert_data_as_delta(data, data_path, predicate):
    """
    Guardar datos en formato Delta Lake usando la operacion MERGE.
    Cuando no haya registros coincidentes, se insertarán nuevos registros.
    Cuando haya registros coincidentes, se actualizarán los campos.

    Args:
      data (pd.DataFrame): Los datos que se desean guardar.
      data_path (str): La ruta donde se guardará el dataframe en formato Delta Lake.
      predicate (str): La condición de predicado para la operación MERGE.
    """
    try:
        dt = DeltaTable(data_path)
        data_pa = pa.Table.from_pandas(data)
        dt.merge(
            source=data_pa,
            source_alias="source",
            target_alias="target",
            predicate=predicate
        ) \
        .when_matched_update_all() \
        .when_not_matched_insert_all() \
        .execute()
    # SI NO EXISTE LA TABLA DELTA LAKE, SE GUARDA COMO NUEVA
    except TableNotFoundError:
        save_data_as_delta(data, data_path)

# PREPARACIÓN DEL ENTORNO (KEY/TOKEN)

**IMPORTANTE: Recuerda crear un archivo denominado 'pipeline.conf' que dentro debe tener tu key/token para las consultas GET.**

**EJEMPLO:**

Tu archivo **'pipeline.conf'** debe verse así:

```
[bcra_api]
access_token = TU_TOKEN_API
```

Puedes **obtener tu key/token** en: https://estadisticasbcra.com/api/registracion


# EXTRACCIÓN FULL

1. **Asignación de URL, ENDPOINT, NOMBRE DEL CAMPO DEL JSON. y ENCABEZADOS**
2. **Llamado a la función get_data.**
3. **Almacenar el JSON obtenido en json_data**
4. **Asignar el DATAFRAME en df_currency_masters**



In [9]:
# ASIGNAMOS LA URL.
base_url = "https://api.bcra.gob.ar"

# ASIGNAMOS EL ENDPOINT.
endpoint = "estadisticascambiarias/v1.0/Maestros/Divisas"

# ASIGNAMOS LA KEY/TOKEN (DEPENDIENDO DE LA API).
# IMSTANCIAMOS UN CONFIGPARSER, PARA LEER EL ARCHIVO .conf
parser = ConfigParser()

# LEEMOS EL ARCHIVO
parser.read("pipeline.conf")

# ACCEDER A LA SECCIÓN QUE TENGA LAS CREDENCIALES NECESARIAS Y GUARDARLAS EN UNA VARIABLE.
api_credentials = parser["bcra_api"]

# GUARDAMOS EL TOKEN EN LA VARIABLE access_token
access_token = api_credentials["access_token"]

# ASIGNAMOS FILTRO POR FECHA (OPCIONAL).
#start_date = datetime.utcnow()

# ASIGNAMOS EL/LOS ENCABEZADOS.
headers = {
    "Authorization": f"Bearer {access_token}",
    "accept": "application/json"
}

# ASIGNAMOS EL DATAFIELD (SI NO SE ENVÍA ASIGNAR "None").
data_field = "results"

# LLAMAMOS A LA FUNCIÓN Y ALMACENAMOS EL json EN LA VARIABLE json_data
json_data = get_data(base_url, endpoint, data_field=data_field, headers=headers)

# LLAMAMOS A LA FUNCIÓN Y ASIGNAMOS EL DATAFRAME A LA VARIABLE df_currency_masters
df_currency_masters = build_table(json_data)

**Mostrar el JSON obtenido**

In [None]:
# MUESTRA EL JSON
pprint(json_data)

**Mostrar el DATAFRAME con sus respectivas cabeceras**

In [None]:
# MUESTRA EL DATAFRAME
df_currency_masters.head()

# ALMACENAMIENTO DE EXTRACCIÓN FULL (BRONZE)

1. **Creación del directorio donde se almacenarán los DELTALAKE**
2. **Llamamos a la función y declaramos el mode en APPEND**

El modo de guardado se realizara en **append**, ya que almacenaremos **datos de divisas**, y es muy poco probable que se quite o agrege una nueva.



In [None]:
# CREA EL DIRECTORIO (1 POR CADA ENTIDAD)
bronze_dir = "datalake/bronze/bcra_api"

# CREA UNA SUB CARPETA DENTRO DEL DIRECTORIO
currency_masters_raw_dir = f"{bronze_dir}/maestros_divisas"

# LLAMAMOS A LA FUNCIÓN PARA ALMACENAR EL DATAFRAME EN FORMATO DELTALAKE
save_data_as_delta(df_currency_masters, currency_masters_raw_dir, mode="append")

# A MODO DE CHEQUEO, LEEMOS LOS DATOS GUARDADOS Y CONTAMOS LA CANTIDAD DE FILAS
dt = DeltaTable(currency_masters_raw_dir)
print(f"Cantidad de filas: {dt.to_pandas().shape[0]}")

# TRANSFORMACIÓN DE DATOS FULL

**Antes de comenzar a transformar los datos, consultamos cuanto espacio en memoria ocupa el DATAFRAME (memory usage). También nos sirve para obtener un vistazo sobre el DATAFRAME para detectar la existencia de valores nulos, tipo de datos, etc.**

In [None]:
# MEMORIA QUE OCUPA -> "memory_usage"
df_currency_masters.info(memory_usage='deep')

**Verificación de valores nulos.**
1. **Si hay valores nulos en el campo 'codigo', los eliminara.**
2. **Si no hay valores nulos, no hara nada.**



In [None]:
# VERIFICA SI HAY VALORES NULOS EN EL CAMPO 'codigo'
if df_currency_masters["codigo"].isnull().sum() > 0:
    # MUESTRO LA CANTIDAD DE VALORES NULOS
    print(f"Hay {df_currency_masters['codigo'].isnull().sum()} valores nulos en el campo 'codigo'.")
    # ELIMINA LAS FILAS CON VALORES NULOS EN EL CAMPO 'codigo
    # HAY VALORES NULOS
    # ELIMINA LAS FILAS CON VALORES NULOS EN EL CAMPO 'codigo'
    df_currency_masters = df_currency_masters.dropna(subset=["codigo"])
else:
    # NO HAY VALORES NULOS
    print("No hay valores nulos en el campo 'codigo'.")

**Imputación con valores por defecto (en el caso de que sean datos nulos)**
1. **Si hay valores nulos en el campo 'denominacion', los imputara.**
2. **Si no hay valores nulos, no hara nada.**


In [16]:
# DECLARAMOS EL MAPEO DE IMPUTACION
imputation_maping = {
    # SI EL CAMPO DENOMINACIÓN ESTA VACIO, ASIGNAMOS 'Not found'
    "denominacion": "Not found"
}

# UTILIZAMOS fillna PARA AFECTAR SOLO LOS VALORES NULOS
df_currency_masters = df_currency_masters.fillna(imputation_maping)

**Conversión de tipos de datos**

In [17]:
# DECLARAMOS EL MAPEO DE CONVERSION
conversion_mapping = {
    # CAMPO codigo SERA STRING
    "codigo": "string",
    # CAMPO denominacion SERA STRING
    "denominacion": "string"
}

# UTILIZAMOS astype PARA AFECTAR EL TIPO DE DATO DE LAS COLUMNAS
df_currency_masters = df_currency_masters.astype(conversion_mapping)

**Eliminación de duplicados**
1. **Primero ordenaremos el DATAFRAME por código (alfabético ascendente)**
2. **Comparamos los campos 'codigo' y 'denominacion', si hay coincidencias, se eliminara el duplicado.**



In [18]:
# ORDENAR EL DATAFRAME POR LA COLUMNA 'codigo' EN ORDEN ALFABÉTICO ASCENDENTE
df_currency_masters = df_currency_masters.sort_values(by='codigo', ascending=True)
# ELIMINAMOS LAS FILAS DUPLICADAS, CONSERVAMOS EL PRIMER REGISTRO
df_currency_masters = df_currency_masters.drop_duplicates(subset=['codigo', 'denominacion'], keep="first")

**Volvemos a consultar el espacio en memoria que ocupa el DATAFRAME**

In [None]:
# MEMORIA QUE OCUPA -> "memory_usage"
df_currency_masters.info(memory_usage='deep')

# ALMACENAMIENTO DE TRANSFORMACIÓN FULL (SILVER)

1. **Creación del directorio donde se almacenarán los DELTALAKE**
2. **Llamamos a la función y declaramos el mode en APPEND**

El modo de guardado se realizara en **append**, ya que almacenaremos **datos de divisas**, y es muy poco probable que se quite o agrege una nueva.

In [None]:
# CREA EL DIRECTORIO (1 POR CADA ENTIDAD)
silver_dir = "datalake/silver/bcra_api"

# CREA UNA SUB CARPETA DENTRO DEL DIRECTORIO
currency_masters_raw_dir = f"{silver_dir}/maestros_divisas"

# LLAMAMOS A LA FUNCIÓN PARA ALMACENAR EL DATAFRAME EN FORMATO DELTALAKE
save_data_as_delta(df_currency_masters, currency_masters_raw_dir, mode="append")

# A MODO DE CHEQUEO, LEEMOS LOS DATOS GUARDADOS Y CONTAMOS LA CANTIDAD DE FILAS
dt = DeltaTable(currency_masters_raw_dir)
print(f"Cantidad de filas: {dt.to_pandas().shape[0]}")

# EXTRACCIÓN INCREMENTAL


1.   **Asignación de URL, ENDPOINT, PARAMETROS, NOMBRE DEL CAMPO DEL JSON. y ENCABEZADOS**
2.   **Llamado a la función get_data.**
3. **Almacenar el JSON obtenido en json_data**
4. **Asignar el DATAFRAME en df_quotations**




In [21]:
# ASIGNAMOS LA URL.
base_url = "https://api.bcra.gob.ar"

# ASIGNAMOS EL ENDPOINT.
endpoint = "estadisticascambiarias/v1.0/Cotizaciones/EUR"

# ASIGNAMOS LA KEY/TOKEN (DEPENDIENDO DE LA API).
# IMSTANCIAMOS UN CONFIGPARSER, PARA LEER EL ARCHIVO .conf
parser = ConfigParser()
# LEEMOS EL ARCHIVO
parser.read("pipeline.conf")
# ACCEDER A LA SECCIÓN QUE TENGA LAS CREDENCIALES NECESARIAS Y GUARDARLAS EN UNA VARIABLE.
api_credentials = parser["bcra_api"]
# GUARDAMOS EL TOKEN EN LA VARIABLE access_token
access_token = api_credentials["access_token"]

# ASIGNAMOS FILTRO POR FECHA (OPCIONAL).
#start_date = datetime.utcnow()

# ASIGNAMOS EL/LOS ENCABEZADOS.
headers = {
    "Authorization": f"Bearer {access_token}",
    "accept": "application/json"
}

# ASIGNAMOS EL/LOS PARAMETROS.
params = {
    "fechadesde": "2023-01-01",
    "fechahasta": "2023-01-31"
}

# ASIGNAMOS EL DATAFIELD (SI NO SE ENVÍA ASIGNAR "None").
data_field = "results"

# LLAMAMOS A LA FUNCIÓN Y ALMACENAMOS EL json EN LA VARIABLE json_data
json_data = get_data(base_url, endpoint, data_field=data_field, params=params, headers=headers)

# LLAMAMOS A LA FUNCIÓN Y ASIGNAMOS EL DATAFRAME A LA VARIABLE df_quotations
df_quotations = build_table(json_data)

**Mostrar el JSON obtenido**

In [None]:
# MUESTRA EL JSON
pprint(json_data)

**Mostrar el DATAFRAME con sus respectivas cabeceras**

In [None]:
# MUESTRA EL DATAFRAME
df_quotations.head()

**Mostrar el DATAFRAME expandido, accediendo al array del campo "detalle" y desarmandolo, con cabeceras mas legibles**

In [None]:
# LLAMAMOS A LA FUNCIÓN Y ALMACENAMOS EL DATAFRAME EN LA VARIABLE df_better
df_quotations_expanded = improved_dataframe(json_data)

# MUESTRA EL DATAFRAME
# print(df_better)
df_quotations_expanded.head()

# ALMACENAMIENTO DE EXTRACCIÓN INCREMENTAL (BRONZE)

1. **Creación del directorio donde se almacenarán los DELTALAKE**
2. **Llamamos a la función que usa el operador MERGE para que solo registre datos si no hay coincidencias con los existentes**



In [25]:
# COMANDO PARA BORRAR ALGUN DIRECTORIO
# !rm -r ruta/del/directorio

# METODO PARA PARTICIONAR LA FECHA Y LA HORA
"""
df["timestamp_measured"] = pd.to_datetime(df_measurements.timestamp_measured)
df["fecha"] = df.timestamp_measured.dt.date
df["hora"] = df.timestamp_measured.dt.hour
"""

# CREA UNA SUB CARPETA DENTRO DEL DIRECTORIO
quotations_raw_dir = f"{bronze_dir}/Cotizaciones/EUR"

# LLAMAMOS A LA FUNCIÓN PARA ALMACENAR EL DATAFRAME EN FORMATO DELTALAKE
# save_data_as_delta(df, quotations_raw_dir, partition_cols=["fecha"])

# LLAMAMOS A LA FUNCIÓN QUE APLICA MERGE (INSERTA SOLO CUANDO NO HAY COINCIDENCIAS)
save_new_data_as_delta(
    # LLAMAMOS AL DATAFRAME
    df_quotations_expanded,
    # INDICAMOS EL DIRECTORIO
    quotations_raw_dir,
    # DECLARAMOS LOS PARAMETROS A COMPARAR
    """ target.fecha = source.fecha
    AND target.codigoMoneda = source.codigoMoneda
    AND target.descripcion = source.descripcion
    AND target.tipoCotizacion = source.tipoCotizacion
    AND target.tipoPase = source.tipoPase""",
    # DECLARAMOS LA PARTICIÓN QUE TENDRAN LOS REGISTROS
    partition_cols=["fecha"]
    )

# TRANSFORMACIÓN DE DATOS INCREMENTAL

**Antes de comenzar a transformar los datos, consultamos cuanto espacio en memoria ocupa el DATAFRAME (memory usage). También nos sirve para obtener un vistazo sobre el DATAFRAME para detectar la existencia de valores nulos, tipo de datos, etc.**

In [None]:
# MEMORIA QUE OCUPA -> "memory_usage"
df_quotations_expanded.info(memory_usage='deep')

**Verificación de valores nulos.**

1. **Si hay valores nulos en el campo 'codigoMoneda', los eliminara.**
2. **Si no hay valores nulos, no hara nada.**

In [None]:
# VERIFICA SI HAY VALORES NULOS EN EL CAMPO 'codigoMoneda'
if df_quotations_expanded["codigoMoneda"].isnull().sum() > 0:
    # MUESTRO LA CANTIDAD DE VALORES NULOS
    print(f"Hay {df_quotations_expanded['codigoMoneda'].isnull().sum()} valores nulos en el campo 'codigoMoneda'.")
    # HAY VALORES NULOS
    # ELIMINA LAS FILAS CON VALORES NULOS EN EL CAMPO 'codigoMoneda'
    df_quotations_expanded = df_quotations_expanded.dropna(subset=["codigoMoneda"])
else:
    # NO HAY VALORES NULOS
    print("No hay valores nulos en el campo 'codigoMoneda'.")

**Imputación con valores por defecto (en el caso de que sean datos nulos)**

1. **Si hay valores nulos en los campos declarados, los imputara.**
2. **Si no hay valores nulos, no hara nada.**

In [28]:
# DECLARAMOS EL MAPEO DE IMPUTACION
imputation_maping_2 = {
    # ASIGNAMOS LOS VALORES A IMPUTAR
    "fecha": "1900-01-01 00:00:00",
    "descripcion": "Not found",
    "tipoCotizacion": -1,
    "tipoPase": -1
}

# UTILIZAMOS fillna PARA AFECTAR SOLO LOS VALORES NULOS
df_quotations_expanded = df_quotations_expanded.fillna(imputation_maping_2)

**Conversión de tipos de datos**

In [29]:
# DECLARAMOS EL MAPEO DE CONVERSION
conversion_mapping_2 = {
    # CONVERSION DE TIPOS DE DATOS
    "fecha": "datetime64[ns]",
    #"fecha": "datetime64[D]",
    "codigoMoneda": "string",
    "descripcion": "string",
    #"tipoCotizacion": "float32",
    "tipoPase": "float32"
}

# UTILIZAMOS astype PARA AFECTAR EL TIPO DE DATO DE LAS COLUMNAS
df_quotations_expanded = df_quotations_expanded.astype(conversion_mapping_2)

**Eliminación de duplicados**

1. **Primero ordenaremos el DATAFRAME por fecha (ascendiente)**
2. **Comparamos los campos declarados, y si hay coincidencias, se eliminara el duplicado.**

In [30]:
# ORDENAR EL DATAFRAME POR LA COLUMNA 'fecha EN ORDEN ASCENDENTE
df_quotations_expanded = df_quotations_expanded.sort_values(by='fecha', ascending=True)
# ELIMINAMOS LAS FILAS DUPLICADAS, CONSERVAMOS EL ULTIMO REGISTRO
df_quotations_expanded = df_quotations_expanded.drop_duplicates(subset=['fecha', 'codigoMoneda'], keep="last")

**Volvemos a consultar el espacio en memoria que ocupa el DATAFRAME**

In [None]:
# MEMORIA QUE OCUPA -> "memory_usage"
df_quotations_expanded.info(memory_usage='deep')

# ALMACENAMIENTO DE TRANSFORMACIÓN INCREMENTAL (SILVER)

1. **Creación del directorio donde se almacenarán los DELTALAKE**
2. **Llamamos a la función que usa el operador MERGE para que solo registre datos si no hay coincidencias con los existentes**

In [32]:
# METODO PARA PARTICIONAR LA FECHA Y LA HORA
"""
df["timestamp_measured"] = pd.to_datetime(df_measurements.timestamp_measured)
df["fecha"] = df.timestamp_measured.dt.date
df["hora"] = df.timestamp_measured.dt.hour
"""

# CREA UNA SUB CARPETA DENTRO DEL DIRECTORIO
quotations_raw_dir = f"{silver_dir}/Cotizaciones/EUR"

# LLAMAMOS A LA FUNCIÓN PARA ALMACENAR EL DATAFRAME EN FORMATO DELTALAKE
# save_data_as_delta(df, quotations_raw_dir, partition_cols=["fecha"])

# LLAMAMOS A LA FUNCIÓN QUE APLICA MERGE (INSERTA SOLO CUANDO NO HAY COINCIDENCIAS)
save_new_data_as_delta(
    # LLAMAMOS AL DATAFRAME
    df_quotations_expanded,
    # INDICAMOS EL DIRECTORIO
    quotations_raw_dir,
    # DECLARAMOS LOS PARAMETROS A COMPARAR
    """ target.fecha = source.fecha
    AND target.codigoMoneda = source.codigoMoneda
    AND target.descripcion = source.descripcion
    AND target.tipoCotizacion = source.tipoCotizacion
    AND target.tipoPase = source.tipoPase""",
    # DECLARAMOS LA PARTICIÓN QUE TENDRAN LOS REGISTROS
    partition_cols=["fecha"]
    )

# SUMARIZACIÓN DE DATOS INCREMENTAL

1. **Realizamos una sumarización del promedio de los campos 'tipoCotizacion' y 'tipoPase'.**
2. **Contamos la cantidad de registros a través del campo 'codigoMoneda'.**
3. **Almacenamos las operaciones en el DATAFRAME df_quotations_sumatory**

In [33]:
# ASIGNAMOS LA FILTRACION EN EL DATAFRAME df_quotations_sumatory
# AGRUPAMOS POR 'codigoMoneda'
df_quotations_sumatory = df_quotations_expanded.groupby('codigoMoneda').agg(
    # CALCULAMOS LOS PROMEDIOS UTILIZANDO 'mean'
    promedio_tipoCotizacion=('tipoCotizacion', 'mean'),
    promedio_tipoPase=('tipoPase', 'mean'),
    # CONTAMOS LOS REGISTROS UTILIZANDO 'count'
    cantidad_registros=('codigoMoneda', 'count')
).reset_index() # ASEGURA QUE EL RESULTADO ESTE EN FORMATO ADECUADO(CON SUS COLUMNAS)

**Mostramos el resultado de la ejecución anterior**

In [None]:
df_quotations_sumatory.head()

# ALMACENAMIENTO DE SUMARIZACIÓN DE DATOS INCREMENTALES (GOLD)

1. **Creación del directorio donde se almacenarán los DELTALAKE**
2. **Llamamos a la función que usa el operador MERGE para que solo registre datos si no hay coincidencias con los existentes**

In [35]:
# METODO PARA PARTICIONAR LA FECHA Y LA HORA
"""
df["timestamp_measured"] = pd.to_datetime(df_measurements.timestamp_measured)
df["fecha"] = df.timestamp_measured.dt.date
df["hora"] = df.timestamp_measured.dt.hour
"""
# CREA EL DIRECTORIO (1 POR CADA ENTIDAD)
gold_dir = "datalake/gold/bcra_api"

# CREA UNA SUB CARPETA DENTRO DEL DIRECTORIO
quotationsSumatory_raw_dir = f"{gold_dir}/promedio_cotizaciones/EUR"

# LLAMAMOS A LA FUNCIÓN PARA ALMACENAR EL DATAFRAME EN FORMATO DELTALAKE
# save_data_as_delta(df, quotations_raw_dir, partition_cols=["fecha"])

# LLAMAMOS A LA FUNCIÓN QUE APLICA MERGE (INSERTA Y ACTUALIZA)
upsert_data_as_delta(
    # LLAMAMOS AL DATAFRAME
    df_quotations_sumatory,
    # INDICAMOS EL DIRECTORIO
    quotationsSumatory_raw_dir,
    # DECLARAMOS LOS PARAMETROS A COMPARAR
    """ target.codigoMoneda = source.codigoMoneda
    AND target.promedio_tipoCotizacion = source.promedio_tipoCotizacion
    AND target.promedio_tipoPase = source.promedio_tipoPase
    AND target.cantidad_registros = source.cantidad_registros""",
    )

# *DOCUMENTACIÓN DE LA API*

**Documentación de la API utilizada:** https://www.bcra.gob.ar/Catalogo/Content/files/pdf/estadisticascambiarias-v1.pdf