# **Procesamiento y Almacenamiento de Archivos Parquet del Data Lake en un Datawarehouse**

### **Importación de Librerias y Carga de los Archivos Parquet en Dataframes**

In [1]:
#Importo todas las Librerias que utilizaré
import pandas as pd
from sqlalchemy import text
from ydata_profiling import ProfileReport
from utils_db import *
from utils_parquet import *
from utils_processing import *

In [2]:
#Creo la ruta al archivo.parquet a leer
file_path = "datalake/bronze/finage_api/Crypto_Historical_Market_Depth"

#Creo un dataframe con el/los archivo/s parquet almacenado/s en la ruta especificada con read_parquet() de utils_parquet.py
df_incremental = read_parquet(file_path) 
df_incremental

Unnamed: 0,symbol,p,q,t
0,btcusd,7226.29,0.106986,2020-01-01 06:02:23
1,btcusd,7226.39,0.001527,2020-01-01 06:02:23
2,btcusd,7226.39,0.013473,2020-01-01 06:02:25
3,ethusd,130.35,1.0,2020-01-01 06:02:23
4,ethusd,130.35,1.868,2020-01-01 06:02:25
5,ethusd,130.35,0.0901,2020-01-01 06:02:26
6,adausd,0.03309,1453.0,2020-01-01 06:03:19
7,adausd,0.0331,342.6,2020-01-01 06:03:19
8,adausd,0.03309,998.0,2020-01-01 06:03:20
9,dogeusd,0.0020143,5000.0,2020-01-01 06:21:09


In [3]:
#Creo la ruta al archivo.parquet a leer
file_path = "datalake/bronze/finage_api/Crypto_Aggregates"

#Creo un dataframe con el/los archivo/s parquet almacenado/s en la ruta especificada con read_parquet() de utils_parquet.py
df_full = read_parquet(file_path)
df_full

Unnamed: 0,symbol,o,c,h,l,v,t
0,BTCUSD,7197.32,7361.28,7420.0,7150.0,76059.15,2020-04-08
1,BTCUSD,7360.26,7283.54,7371.92,7108.08,61094.87,2020-04-09
2,BTCUSD,7283.54,6858.92,7295.75,6739.98,104674.6,2020-04-10
3,ETHUSD,164.62,173.11,174.48,163.59,1023576.0,2020-04-08
4,ETHUSD,173.15,169.52,173.38,165.19,719837.2,2020-04-09
5,ETHUSD,169.56,157.81,170.1,152.11,1314841.0,2020-04-10
6,ADAUSD,0.03528,0.03639,0.03683,0.03522,220316000.0,2020-04-08
7,ADAUSD,0.03638,0.03607,0.0366,0.03518,158350200.0,2020-04-09
8,ADAUSD,0.03609,0.03321,0.0362,0.03215,297588600.0,2020-04-10
9,DOGEUSD,0.001997,0.002004,0.002043,0.001967,90384910.0,2020-04-08


### **Tareas de procesamiento y transformación de datos sobre los Dataframes con Pandas**
- Eliminación de duplicados
- Eliminación o reemplazo de nulos
- Conversión de tipos de datos de columnas
- Renombrar columnas
- Formatear columnas de tipo fecha.
- Crear nuevas columnas a partir de alguna lógica (Por ejemplo, una columna booleana que indique si una temperatura está por arriba de un límite)
- Cruzar dataframes usando JOINS
- Aplicar agregaciones por medio de GROUP BY y funciones como MAX, MIN, AVG, etc.

#### **Inspeccion de Dataframes**

In [None]:
# Antes de hacer el procesamiento vamos a inspeccionar los DataFrames.
# Inspecciono df_incremental.
# La siguiente libreria nos da info como la cantidad de nulos, duplicados, etc.
profile_incremental = ProfileReport(df_incremental)
profile_incremental

In [None]:
# Ahora inspecciono df_full
profile_full = ProfileReport(df_full)
profile_full

#### **1) Renombramiento de Columnas**

In [5]:
#Renombro las columnas del dataframe 'df_incremental' con nombres mas descriptivos
new_columns = {
    'symbol':'symbol',
    'p':'price',
    'q':'quantity',
    't':'time'
}
df_incremental = df_incremental.rename(columns=new_columns)

#Impresion de control
df_incremental

Unnamed: 0,symbol,price,quantity,time
0,btcusd,7226.29,0.106986,2020-01-01 06:02:23
1,btcusd,7226.39,0.001527,2020-01-01 06:02:23
2,btcusd,7226.39,0.013473,2020-01-01 06:02:25
3,ethusd,130.35,1.0,2020-01-01 06:02:23
4,ethusd,130.35,1.868,2020-01-01 06:02:25
5,ethusd,130.35,0.0901,2020-01-01 06:02:26
6,adausd,0.03309,1453.0,2020-01-01 06:03:19
7,adausd,0.0331,342.6,2020-01-01 06:03:19
8,adausd,0.03309,998.0,2020-01-01 06:03:20
9,dogeusd,0.0020143,5000.0,2020-01-01 06:21:09


In [6]:
#Renombro las columnas del dataframe con nombres mas descriptivos
new_columns = {
    'symbol':'symbol',
    'o':'open_price',
    'c':'close_price',
    'h':'high_price',
    'l':'low_price',
    'v':'volume',
    't':'date'
}
df_full = df_full.rename(columns=new_columns)

#Impresion de control
df_full

Unnamed: 0,symbol,open_price,close_price,high_price,low_price,volume,date
0,BTCUSD,7197.32,7361.28,7420.0,7150.0,76059.15,2020-04-08
1,BTCUSD,7360.26,7283.54,7371.92,7108.08,61094.87,2020-04-09
2,BTCUSD,7283.54,6858.92,7295.75,6739.98,104674.6,2020-04-10
3,ETHUSD,164.62,173.11,174.48,163.59,1023576.0,2020-04-08
4,ETHUSD,173.15,169.52,173.38,165.19,719837.2,2020-04-09
5,ETHUSD,169.56,157.81,170.1,152.11,1314841.0,2020-04-10
6,ADAUSD,0.03528,0.03639,0.03683,0.03522,220316000.0,2020-04-08
7,ADAUSD,0.03638,0.03607,0.0366,0.03518,158350200.0,2020-04-09
8,ADAUSD,0.03609,0.03321,0.0362,0.03215,297588600.0,2020-04-10
9,DOGEUSD,0.001997,0.002004,0.002043,0.001967,90384910.0,2020-04-08


#### **2) Reemplazo y Eliminacion de Nulos**

In [7]:
#Reemplazo los valores nulos de las columnas price y quantity de 'df_incremental' por 0.0 con 'fill_null_values()' de 
#utils_processing.py, ya que si no hay un dato en esas columnas lo mas probable es que haya sido porque no hubo transacciones 
#(en el caso de quantity) o las transacciones fueron unidireccionales, es decir se transfirieron cyptos sin retribucion 
#monetaria (en el caso de price)

cols = ["price", "quantity"]
values_to_fill = [0.0, 0.0]

for col, val in zip(cols, values_to_fill):
    df_incremental = fill_null_values(df_incremental, col, val)

In [8]:
#Reemplazo los valores nulos de las columnas 'open_price', 'close_price', 'high_price', 'low_price' y 'volume' de 'df_full' por 0.0 
#con 'fill_null_vslues()' de utils_processing.py, ya que si no hay un dato en esas columnas lo mas probable es que haya sido 
#porque no hubo transacciones

cols = ['open_price', 'close_price', 'high_price', 'low_price', 'volume']
values_to_fill = [0.0, 0.0, 0.0, 0.0, 0.0]

for col, val in zip(cols, values_to_fill):
    df_full = fill_null_values(df_full, col, val)

In [9]:
#Elimino los registros de 'df_incremental' con algun valor nulo en 'symbol' o 'time' con 
#'delete_null_records()' de utils_processing.py, ya que no me sirven para crear una serie historica

cols = ['symbol', 'time']

for col in cols:
    df_incremental = delete_null_records(df_incremental, col)

In [10]:
#Elimino los registros de 'df_full' con algun valor nulo en 'symbol' o 'date' con 'delete_null_records()' de 
#utils_processing.py, ya que la informacion en esos campos es imprescindible y sin ella, no me sirven 

cols = ['symbol', 'date']

for col in cols:
    df_full = delete_null_records(df_full, col)

#### **3) Creacion de Nuevas Columnas**

A partir de `'time'` (de tipo string) de **`'df_incremental'`**, con formato 'YYYY-mm-dd HH:MM:SS' creo otra columna llamada `'historical_market_depth_id'` (de tipo entera) con el siguiente formato **'YYmmddHHMMSSI'**, donde **'YY'** son las ultimas dos cifras del año e **'I'** es el indice del registro del dataframe

In [11]:
# Creo la columna 'historical_market_depth_id' con el formato 'YYmmddHHMMSSI' utilizando apply
df_incremental['historical_market_depth_id'] = df_incremental['time'].apply(lambda x: pd.to_datetime(x).strftime('%y%m%d%H%M%S')) # Fecha y hora con formato 'YYmmddHHMMSS'

# Obtengo los indices de df_incremental
index = df_incremental.index.astype(str) # Índice del registro

# Le agrego el numero correspondiente al registro a la columna 'historical_market_depth_id'
df_incremental['historical_market_depth_id'] = df_incremental['historical_market_depth_id'] + index

#Reordeno las columnas del DataFrame
columns = ['historical_market_depth_id', 'symbol', 'price', 'quantity', 'time']

#Reindexo el DataFrame con las columnas en el nuevo orden
df_incremental = df_incremental.reindex(columns=columns)

# Impresion de control
df_incremental.head(12)

Unnamed: 0,historical_market_depth_id,symbol,price,quantity,time
0,2001010602230,btcusd,7226.29,0.106986,2020-01-01 06:02:23
1,2001010602231,btcusd,7226.39,0.001527,2020-01-01 06:02:23
2,2001010602252,btcusd,7226.39,0.013473,2020-01-01 06:02:25
3,2001010602233,ethusd,130.35,1.0,2020-01-01 06:02:23
4,2001010602254,ethusd,130.35,1.868,2020-01-01 06:02:25
5,2001010602265,ethusd,130.35,0.0901,2020-01-01 06:02:26
6,2001010603196,adausd,0.03309,1453.0,2020-01-01 06:03:19
7,2001010603197,adausd,0.0331,342.6,2020-01-01 06:03:19
8,2001010603208,adausd,0.03309,998.0,2020-01-01 06:03:20
9,2001010621099,dogeusd,0.0020143,5000.0,2020-01-01 06:21:09


A partir de `'date'` (de tipo string) de **`'df_full'`**, con formato 'YYYY-mm-dd' creo otra columna llamada `'aggregates_id'` (de tipo entera) con el siguiente formato **'YYmmddI'**, donde **'YY'** son las ultimas dos cifras del año e **'I'** es el indice del registro del dataframe

In [12]:
# Creo la columna 'aggregates_id' con el formato 'YYmmddI' utilizando apply
df_full['aggregates_id'] = df_full['date'].apply(lambda x: pd.to_datetime(x).strftime('%y%m%d')) # Fecha con formato 'YYmmdd'

# Obtengo los indices de df_full
index = df_full.index.astype(str) # Índice del registro

# Le agrego el numero correspondiente al registro a la columna 'aggregates_id'
df_full['aggregates_id'] = df_full['aggregates_id'] + index

# Impresion de control
df_full.head()

Unnamed: 0,symbol,open_price,close_price,high_price,low_price,volume,date,aggregates_id
0,BTCUSD,7197.32,7361.28,7420.0,7150.0,76059.15,2020-04-08,2004080
1,BTCUSD,7360.26,7283.54,7371.92,7108.08,61094.87,2020-04-09,2004091
2,BTCUSD,7283.54,6858.92,7295.75,6739.98,104674.6,2020-04-10,2004102
3,ETHUSD,164.62,173.11,174.48,163.59,1023576.0,2020-04-08,2004083
4,ETHUSD,173.15,169.52,173.38,165.19,719837.2,2020-04-09,2004094


Creo dos nuevas columnas `'open_close_difference'` y `'max_daily_difference'` a partir de `'open_price'` y `'close_price'` para el primer caso, y `'high_price'` y `'low_price'` para el segundo caso, en la tabla **`'df_full'`**, en donde los valores de `'open_close_difference'` resulten de la diferencia entre el valor de `'close_price'` y `'open_price'` del mismo registro, y los valores de `'max_daily_difference'` resulten de la diferencia entre el valor de `'high_price'` y `'low_price'` del mismo registro.

In [13]:
# Calculo las nuevas columnas
df_full['open_close_difference'] = df_full['close_price'] - df_full['open_price']
df_full['max_daily_difference'] = df_full['high_price'] - df_full['low_price']

# Impresion de control
df_full.head(12)

Unnamed: 0,symbol,open_price,close_price,high_price,low_price,volume,date,aggregates_id,open_close_difference,max_daily_difference
0,BTCUSD,7197.32,7361.28,7420.0,7150.0,76059.15,2020-04-08,2004080,163.96,270.0
1,BTCUSD,7360.26,7283.54,7371.92,7108.08,61094.87,2020-04-09,2004091,-76.72,263.84
2,BTCUSD,7283.54,6858.92,7295.75,6739.98,104674.6,2020-04-10,2004102,-424.62,555.77
3,ETHUSD,164.62,173.11,174.48,163.59,1023576.0,2020-04-08,2004083,8.49,10.89
4,ETHUSD,173.15,169.52,173.38,165.19,719837.2,2020-04-09,2004094,-3.63,8.19
5,ETHUSD,169.56,157.81,170.1,152.11,1314841.0,2020-04-10,2004105,-11.75,17.99
6,ADAUSD,0.03528,0.03639,0.03683,0.03522,220316000.0,2020-04-08,2004086,0.00111,0.00161
7,ADAUSD,0.03638,0.03607,0.0366,0.03518,158350200.0,2020-04-09,2004097,-0.00031,0.00142
8,ADAUSD,0.03609,0.03321,0.0362,0.03215,297588600.0,2020-04-10,2004108,-0.00288,0.00405
9,DOGEUSD,0.001997,0.002004,0.002043,0.001967,90384910.0,2020-04-08,2004089,7e-06,7.6e-05


In [14]:
#Reordeno las columnas del DataFrame
columns = ['aggregates_id', 'symbol', 'open_price', 'close_price', 'open_close_difference', 'high_price', 'low_price', 'max_daily_difference', 'volume', 'date']

#Reindexo el DataFrame con las columnas en el nuevo orden
df_full = df_full.reindex(columns=columns)

# Impresion de control
df_full.head(12)

Unnamed: 0,aggregates_id,symbol,open_price,close_price,open_close_difference,high_price,low_price,max_daily_difference,volume,date
0,2004080,BTCUSD,7197.32,7361.28,163.96,7420.0,7150.0,270.0,76059.15,2020-04-08
1,2004091,BTCUSD,7360.26,7283.54,-76.72,7371.92,7108.08,263.84,61094.87,2020-04-09
2,2004102,BTCUSD,7283.54,6858.92,-424.62,7295.75,6739.98,555.77,104674.6,2020-04-10
3,2004083,ETHUSD,164.62,173.11,8.49,174.48,163.59,10.89,1023576.0,2020-04-08
4,2004094,ETHUSD,173.15,169.52,-3.63,173.38,165.19,8.19,719837.2,2020-04-09
5,2004105,ETHUSD,169.56,157.81,-11.75,170.1,152.11,17.99,1314841.0,2020-04-10
6,2004086,ADAUSD,0.03528,0.03639,0.00111,0.03683,0.03522,0.00161,220316000.0,2020-04-08
7,2004097,ADAUSD,0.03638,0.03607,-0.00031,0.0366,0.03518,0.00142,158350200.0,2020-04-09
8,2004108,ADAUSD,0.03609,0.03321,-0.00288,0.0362,0.03215,0.00405,297588600.0,2020-04-10
9,2004089,DOGEUSD,0.001997,0.002004,7e-06,0.002043,0.001967,7.6e-05,90384910.0,2020-04-08


#### **4) Formateo de Columnas de Tipo Fecha de**
Separo la columna `'time'` de **`'df_incremental'`** con el formato **'YYYY-mm-dd HH:MM:SS'** en dos columnas `'date'` y `'hour_minute_second'` con los formatos **'YYYY-mm-dd'** y **'HH:MM:SS'** respectivamente.

In [15]:
# Convertir la columna 'time' al tipo datetime
df_incremental['time'] = pd.to_datetime(df_incremental['time'])

# Separar la columna 'time' en 'date' y 'hour_minute_second'
df_incremental['date'] = df_incremental['time'].dt.strftime('%Y-%m-%d')
df_incremental['hour_minute_second'] = df_incremental['time'].dt.strftime('%H:%M:%S')

# Elimino la columna original 'time'
df_incremental.drop(columns=['time'], inplace=True)

# Impresion de control
df_incremental.head(12)

Unnamed: 0,historical_market_depth_id,symbol,price,quantity,date,hour_minute_second
0,2001010602230,btcusd,7226.29,0.106986,2020-01-01,06:02:23
1,2001010602231,btcusd,7226.39,0.001527,2020-01-01,06:02:23
2,2001010602252,btcusd,7226.39,0.013473,2020-01-01,06:02:25
3,2001010602233,ethusd,130.35,1.0,2020-01-01,06:02:23
4,2001010602254,ethusd,130.35,1.868,2020-01-01,06:02:25
5,2001010602265,ethusd,130.35,0.0901,2020-01-01,06:02:26
6,2001010603196,adausd,0.03309,1453.0,2020-01-01,06:03:19
7,2001010603197,adausd,0.0331,342.6,2020-01-01,06:03:19
8,2001010603208,adausd,0.03309,998.0,2020-01-01,06:03:20
9,2001010621099,dogeusd,0.0020143,5000.0,2020-01-01,06:21:09


#### **5) Casteo de Columnas de los Dataframes**

In [16]:
#Verifico los tipos de las columnas de 'df_incremental'
print(df_incremental.dtypes)

print(' ')

#Verifico los tipos de las columnas de 'df_full'
print(df_full.dtypes)

historical_market_depth_id    object
symbol                        object
price                         object
quantity                      object
date                          object
hour_minute_second            object
dtype: object
 
aggregates_id             object
symbol                    object
open_price               float64
close_price              float64
open_close_difference    float64
high_price               float64
low_price                float64
max_daily_difference     float64
volume                   float64
date                      object
dtype: object


In [17]:
# Cambio los tipos de datos de cada columna de 'df_incremental' para que coincidan con los 
# tipos de datos de las columnas de la tabla 'stage.bautistalavielle_historical_market_depth'
df_incremental['historical_market_depth_id'] = df_incremental['historical_market_depth_id'].astype('int64')
df_incremental['symbol'] = df_incremental['symbol'].astype('object')
df_incremental['price'] = df_incremental['price'].astype('float64')
df_incremental['quantity'] = df_incremental['quantity'].astype('float64')
df_incremental['date'] = df_incremental['date'].astype('datetime64[ns]')
df_incremental['hour_minute_second'] = df_incremental['hour_minute_second'].astype('object')

In [18]:
# Cambio los tipos de datos de cada columna de 'df_full' para que coincidan con los 
# tipos de datos de las columnas de la tabla 'stage.bautistalavielle_aggregates'
df_full['aggregates_id'] = df_full['aggregates_id'].astype('int64')
df_full['symbol'] = df_full['symbol'].astype('object')
df_full['open_price'] = df_full['open_price'].astype('float64')
df_full['close_price'] = df_full['close_price'].astype('float64')
df_full['open_close_difference'] = df_full['open_close_difference'].astype('float64')
df_full['high_price'] = df_full['high_price'].astype('float64')
df_full['low_price'] = df_full['low_price'].astype('float64')
df_full['max_daily_difference'] = df_full['max_daily_difference'].astype('float64')
df_full['volume'] = df_full['volume'].astype('float64')
df_full['date'] = df_full['date'].astype('datetime64[ns]')

In [19]:
#Verifico los tipos de las columnas de 'df_incremental'
print(df_incremental.dtypes)

print(' ')

#Verifico los tipos de las columnas de 'df_full'
print(df_full.dtypes)

historical_market_depth_id             int64
symbol                                object
price                                float64
quantity                             float64
date                          datetime64[ns]
hour_minute_second                    object
dtype: object
 
aggregates_id                     int64
symbol                           object
open_price                      float64
close_price                     float64
open_close_difference           float64
high_price                      float64
low_price                       float64
max_daily_difference            float64
volume                          float64
date                     datetime64[ns]
dtype: object


### **Almacenamiento de dataframes procesados en una Base de Datos OLAP**
En este caso almacenare los dataframes procesados anteriormente en un datawarehouse implementado en una base de datos PostgreSQL alojada en Aiven. Este datawarehouse tendrá un área de staging, que contendrá una tabla `bautistalavielle_historical_market_depth` y otra `bautistalavielle_aggregates`.
En el [modelo dimensional](https://miro.com/app/board/uXjVKaplXV0=/?share_link_id=415454751190), tendremos:
- Tablas de hechos: `bautistalavielle_historical_market_depth_fact` y `bautistalavielle_aggregates_fact`
- Tablas de dimensiones: `bautistalavielle_cryptos_dim`, `bautistalavielle_dates_dim` y `bautistalavielle_times_dim`

In [20]:
#Me conecto a la base de datos con los datos de conexión y credenciales a Postgres almacenados en el archivo 
#de configuración 'pipeline.conf', en la seccion 'postgres', utilizando connect_to_db() de utils_db.py
engine = connect_to_db(
    "pipeline.conf",
    "postgres"
    )

conn = engine.connect()

#### **Carga de los Dataframes a la Base de Datos**
Se cargara **`df_incremental`** a la tabla **`bautistalavielle_historical_market_depth`** del esquema **`stage`** y **`df_full`** a la tabla **`bautistalavielle_aggregates`** del esquema **`stage`**

In [21]:
# Carga de datos en 'stage.bautistalavielle_historical_market_depth'
load_data(df_incremental, 'bautistalavielle_historical_market_depth', 'stage', engine, mode='replace')

Datos cargados exitosamente en la tabla bautistalavielle_historical_market_depth


In [22]:
# Carga de datos en 'stage.bautistalavielle_aggregates'
load_data(df_full, 'bautistalavielle_aggregates', 'stage', engine, mode='replace')

Datos cargados exitosamente en la tabla bautistalavielle_aggregates


#### **Carga de la Tabla `'bautistalavielle_historical_market_depth_fact'` de `'datawarehouse'` y Actualizacion de `'bautistalavielle_cryptos_dim'` Utilizando Estratregia SCD de Tipo 0 (No se campturan los cambios)**

In [23]:
insert_query = text(
        """
        BEGIN;
        -- Insertar nuevo registro si el símbolo no existe
        INSERT INTO datawarehouse.bautistalavielle_cryptos_dim (symbol)
        SELECT
            UPPER(stage.symbol)
        FROM
            stage.bautistalavielle_historical_market_depth AS stage
        WHERE
            NOT EXISTS (
                SELECT 1
                FROM datawarehouse.bautistalavielle_cryptos_dim AS crypto
                WHERE LOWER(crypto.symbol) = LOWER(stage.symbol)
            )
        ON CONFLICT DO NOTHING;

        -- Actualizar la tabla datawarehouse.bautistalavielle_historical_market_depth_fact utilizando SCD Tipo 0
        INSERT INTO datawarehouse.bautistalavielle_historical_market_depth_fact (
            historical_market_depth_id,
            symbol_id,
            price,
            quantity,
            historical_market_depth_date_id,
            historical_market_depth_time_id
        )
        SELECT
            stage.historical_market_depth_id AS historical_market_depth_id,
            crypto.crypto_id AS symbol_id,
            stage.price AS price,
            stage.quantity AS quantity,
            to_number(to_char(stage.date, 'YYMMDD'), '999999') AS historical_market_depth_date_id,
            to_number(replace(stage.hour_minute_second, ':', ''), '999999') AS historical_market_depth_time_id
        FROM
            stage.bautistalavielle_historical_market_depth AS stage
        LEFT JOIN
            datawarehouse.bautistalavielle_cryptos_dim AS crypto
        ON
            LOWER(crypto.symbol) = LOWER(stage.symbol)
        LEFT JOIN
            datawarehouse.bautistalavielle_historical_market_depth_fact AS datawarehouse
        ON 
            stage.historical_market_depth_id = datawarehouse.historical_market_depth_id
        WHERE
            datawarehouse.historical_market_depth_id IS NULL;
        COMMIT;
        """
        )

with engine.connect() as conn:
    conn.execute(insert_query)

#### **Carga de la Tabla `'bautistalavielle_aggregates_fact'` de `'datawarehouse'` y Actualizacion de `'bautistalavielle_cryptos_dim'` Utilizando Estratregia SCD de Tipo 0 (No se campturan los cambios)**

In [24]:
insert_query = text(
        """
        BEGIN;
        -- Insertar nuevo registro si el símbolo no existe
        INSERT INTO datawarehouse.bautistalavielle_cryptos_dim (symbol)
        SELECT
            UPPER(stage.symbol)
        FROM
            stage.bautistalavielle_aggregates AS stage
        WHERE
            NOT EXISTS (
                SELECT 1
                FROM datawarehouse.bautistalavielle_cryptos_dim AS crypto
                WHERE LOWER(crypto.symbol) = LOWER(stage.symbol)
            )
        ON CONFLICT DO NOTHING;

        -- Actualizar la tabla datawarehouse.bautistalavielle_aggregates_fact utilizando SCD Tipo 0
        INSERT INTO datawarehouse.bautistalavielle_aggregates_fact (
            aggregates_id,
            symbol_id,
            open_price,
            close_price,
            open_close_difference,
            high_price,
            low_price,
            max_daily_difference,
            volume,
            aggregates_date_id
        )
        SELECT
            stage.aggregates_id AS aggregates_id,
            cryptos.crypto_id AS symbol_id,
            stage.open_price AS open_price,
            stage.close_price AS close_price,
            stage.open_close_difference AS open_close_difference,
            stage.high_price AS high_price,
            stage.low_price AS low_price,
            stage.max_daily_difference AS max_daily_difference,
            stage.volume AS volume,
            to_number(to_char(stage.date, 'YYMMDD'), '999999') AS aggregates_date_id
        FROM
            stage.bautistalavielle_aggregates AS stage
        LEFT JOIN
            datawarehouse.bautistalavielle_cryptos_dim AS cryptos
        ON
            LOWER(cryptos.symbol) = LOWER(stage.symbol)
        LEFT JOIN
            datawarehouse.bautistalavielle_aggregates_fact AS datawarehouse
        ON 
            stage.aggregates_id = datawarehouse.aggregates_id
        WHERE
            datawarehouse.aggregates_id IS NULL;
        COMMIT;
        """
        )

with engine.connect() as conn:
    conn.execute(insert_query)