# Data Rangers: Ingenieria de datos

## 1. Base de datos

- La base de datos se desplego localmente, en la ultima imagen de docker de ```postgresql```. Como se detalla en el ```README.md```. Si se cuenta con una base de datos inicial se pueden reemplazar los valores en el archivo ```.env```

- En este caso se asume que las tablas pueden o no estar creada dentro de la empresa, de ahi que se usan los objetos relacionales de sqlalchemy para mapear estas tablas con sus columnas y esquemas.

- Se propone un calendario que va a tener una relacion one-to-many, esto para los usuarios que requieran esta data para tableros de BI, tendrian un calendario que estrictamente va a contener las fechas cargadas en la base de datos.

## 2. Pipeline

Para este pipeline en particular simulando procesamiento cercano al tiempo real, es importante que los archivos no se almacenen en memoria. Para ello, se utilizan generadores, que al final terminan siendo objetos (mucho mas reducidos comparado al peso del archivo).  
Estos objetos van a arrojar un iterable que luego vamos a poder acceder siempre que se necesite. Es similar al lazy-evaluation de ``` apache spark```.  

En ese sentido, se utilizo la palabra clave ```yield``` y se accedio al archivo solo cuando se necesita.  Adicional, se aprovecho la funcionalidad de la API de Pandas para generar chunks, estos chunks tambien funcionan creando un objeto iterable que luego podemos acceder cuando se necesita y asi realizar la ingesta.  

#### Este pipeline tiene las siguientes ventajas:
- Carga informacion solo cuando se necesita.
- Procesa cada fila de forma individual.
- Toma el tiempo de insercion y procesamiento, esto es vital en estas bases de datos ya que esto facilita para el DBA la programacion sus jobs o procedimientos.
- Parcialmente utiliza el ORM de sqlalchemy lo que lo hace bastante sencillo luego de integrar con API`s basadas en ```Flask```,```Django``` o ```Fast API``` o en su defecto, utilizando el ORM propio de estos frameworks. 
- Crea una tabla calendario

#### A su vez, tiene estos puntos por mejorar:
- Configurar de tal manera que el motor de base de datos pueda ser determinado por el usuario que programa el pipeline.
- Insertar pruebas unitarias
- Utilizar frameworks mas avanzados en procesamiento NRT (near-real-time) como ```apache akafka```.

Las metricas del pipeline se pueden revisar en el archivo "pipeline.log".

### 2.1. Paquetes necesarios

In [1]:
import utils.dbconfig as dbc # Modulo de configuraciones en carpeta utils

from utils.functions   import list_datafile # Funcion para listar archivos

from utils.logs        import PipelineLogs  # Clase para los logs
from database.database import Database      # Clase para conexion a base de datos

from etl_pipeline      import pipeline

In [2]:
## Variables de entorno:
USER = dbc.DB_USER
PWD  = dbc.DB_PASSWORD
HOST = dbc.DB_HOST
DB   = dbc.DATABASE_NAME

CHUNKSIZE = 4

In [3]:
pathlist = list_datafile(path='./data')

plsqldb = Database(HOST, USER, PWD, DB)

logs = PipelineLogs("pipeline-prove", "pipeline.log").pipeline_logs()

In [4]:
logs.info("---- Inicio del Pipeline ----")
pipeline(pathlist, plsqldb, CHUNKSIZE, logs)

[LINE:1] #INFO     [2025-02-12 20:27:48,160] ---- Inicio del Pipeline ----
[LINE:29] #INFO     [2025-02-12 20:27:48,161] Leyendo archivo ./data/2012-1.csv
[LINE:44] #INFO     [2025-02-12 20:27:48,163] Transformando microbatch 1
[LINE:73] #INFO     [2025-02-12 20:27:48,245] 
                ----------- Pipeline Metrics -----------
                - Sales Table:
                    Total de filas: 1
                
                Columna 'price':
                Promedio: 50.00
                Maximo: 50
                Minimo: 50

                Tiempo de insercion fila actual: 
                Total: 5.03 ms
                Promedio: 5.03 ms

                
[LINE:73] #INFO     [2025-02-12 20:27:48,253] 
                ----------- Pipeline Metrics -----------
                - Sales Table:
                    Total de filas: 2
                
                Columna 'price':
                Promedio: 68.50
                Maximo: 87
                Minimo: 50

                Tie

### 2.2. Validacion de los datos

In [None]:
# Realizamos con el metodo SELECT un query hacia la base de datos, y al devolver una lista con una tupla, tomamos el primer valor
logs.info("Validando informacion")
valores = plsqldb.select("SELECT COUNT(1), MAX(price), MIN(price), AVG(price) FROM public.sales") [0]
logs.info(
f"""
    Total de registros: {valores[0]}
    Precio maximo: {valores[1]}
    Precio minimo: {valores[2]}
    Precio promedio: {valores[3]:.2f}
""")

[LINE:2] #INFO     [2025-02-12 20:34:45,490] Validando informacion
[LINE:4] #INFO     [2025-02-12 20:34:45,493] 
    Total de registros: 143
    Precio maximo: 100.0
    Precio minimo: 10.0
    Precio promedio: 57.884892086330936



### 2.3. Cargando archivo de validacion

In [11]:
logs.info("Cargando solo archivo de validacion")
val_file = list_datafile(path='./data', validation = True)
logs.info("---- Inicio del Pipeline ----")
pipeline(val_file, plsqldb, CHUNKSIZE, logs)

[LINE:1] #INFO     [2025-02-12 20:48:51,614] Cargando solo archivo de validacion
[LINE:3] #INFO     [2025-02-12 20:48:51,615] ---- Inicio del Pipeline ----
[LINE:29] #INFO     [2025-02-12 20:48:51,616] Leyendo archivo ./data/validation.csv
[LINE:44] #INFO     [2025-02-12 20:48:51,617] Transformando microbatch 1
>Table calendar already has this date
[LINE:73] #INFO     [2025-02-12 20:48:51,627] 
                ----------- Pipeline Metrics -----------
                - Sales Table:
                    Total de filas: 1
                
                Columna 'price':
                Promedio: 26.00
                Maximo: 26
                Minimo: 26

                Tiempo de insercion fila actual: 
                Total: 6.31 ms
                Promedio: 6.31 ms

                
>Table calendar already has this date
[LINE:73] #INFO     [2025-02-12 20:48:51,635] 
                ----------- Pipeline Metrics -----------
                - Sales Table:
                    Total de fila

In [10]:
logs.info("Validando informacion")
valores = plsqldb.select("SELECT COUNT(1), MAX(price), MIN(price), AVG(price) FROM public.sales") [0]
logs.info(
f"""
    Total de registros: {valores[0]}
    Precio maximo: {valores[1]}
    Precio minimo: {valores[2]}
    Precio promedio: {valores[3]:.2f}
""")

[LINE:1] #INFO     [2025-02-12 20:48:47,011] Validando informacion
[LINE:3] #INFO     [2025-02-12 20:48:47,013] 
    Total de registros: 151
    Precio maximo: 100.0
    Precio minimo: 10.0
    Precio promedio: 57.01

