# Challenge: Data Engineer

En este archivo puedes escribir lo que estimes conveniente. Te recomendamos detallar tu solución y todas las suposiciones que estás considerando. Aquí puedes ejecutar las funciones que definiste en los otros archivos de la carpeta src, medir el tiempo, memoria, etc.

**Autor:** Leonardo Burbano
<br/>
**Empresa:** TW
<br/>
**Email:** leonardo.burbano@[dominioTW]
<br/>
**Mes/Año:** 05/2023

## 1. Suposiciones

- La descarga/carga del archivo de input no se realizará de forma recurrente. Se realizará una sola vez para efectos de solucionar el presente desafío.
- La ejecución de la descarga/descompresión/carga del archivo se realizará bajo demanda (y manualmente de ser el caso). Se considerará como variable el nombre del archivo, y como ubicación fija un bucket (repositorio) en Cloud Storage (GCP - Google Cloud Platform).
- La prioridad es obtener la respuesta a las preguntas planteadas. No surgirán otras preguntas en el corto plazo.
- La respuesta proviene de una consulta a la API de Twitter que está almacenada en un archivo comprimido en Google Drive.
- El desarrollo no incluye versionamiento de la infraestructura creada o utilización de infraestructura como código.
- No se incluyen configuraciones de permisos y otras tareas de disposición de la infraestructura en GCP.
- El archivo .zip solo contiene un solo archivo. Este archivo será descargado bajo demanda y su url será colocada directamente en este Notebook para efector de la ejecución.
- El proceso de ingesta de la información no será calendarizado o ejecutado de forma recurrente. Al final de este documento se detallan posibles mejoras para implementar considerar ese posible escenario.

## 2. Configuración del ambiente local

- Python 3.9.6 en una MacBook Pro (M1 Pro, 16 Gb RAM, Sonoma)
- Se utilizó Git, Gihub y Git flow, para facilitar el versionamiento y simulación de trabajo colaborativo.
- Se realizó una exploración inicial en la que se fue
- Se utilizó la nube de GCP, se requieré únicamente un archivo de cuenta de servicio con los permisos adecuados para ejecutar el proyecto. Este archivo deberá ser colocado en la carpeta **db**
- Se debe configurar las variables de entorno conforme lo indica el archivo **env.example.txt**. Estas incluyen el nombre del bucket en Cloud Storage y el nombre del dataset de BigQuery.


## 2. Estrategia de solución

### 2.0 Diagrama
![alt text](images/solution_diagram.png "Diagrama de solución")

### 2.1 Flujo general
a. Descarga del archivo comprimido desde Google Drive. Se descarga en la carpeta **src/tmp/**
<br/>
b. Descompresión del archivo comprimido en la carpeta **src/input/**
<br/>
c. Transformación del archivo en un JSON válido, considerando únicamente los campos a ser utilizados por las queries. El archivo es depositado en la carpeta **src/output/**
<br/>
d. El archivo se carga en el bucket "CS_BUCKET_NAME" en Cloud Storage. El archivo mantiene su nombre, ya que ese nombre se utilizará de Input.
<br/>
e. Las funciones **q1_time.py**, **q1_memory.py**, etc. Reciben como argumento el nombre del archivo, estas funciones leen el archivo lo cargan en bigquery en una tabla con el mismo nombre y ejecutan las consultas que están diseñadas en **pipelines/queries.py**
<br/>
f. Bajo este diseño las funciones están optimizadas para tiempo, como para memoria, ya que se almacenerán en disco y no consumiran los archivos se almacenarán en disco, considerando que un archivo puede tener un gran tamaño, y luego serán llevados en su mejor forma a un bucket en la nube para facilitar su paso a una base de datos distribuida y optimizada como lo es BigQuery.

### 2.2 Mapa de carpetas y archivos
- **/**
  - *initial_exploration*
    - ```00_test.ipynb```: Archivo con la exploración previa de la solución.
  - *src*
    - ```challenge.ipynb``` : Notebook que contiene la descrición de la resolución del desafío.
    - ```q1_memory.py``` : Script de Python que contesta la pregunta 1 con optimización de memoria.
    - ```q2_memory.py``` : Script de Python que contesta la pregunta 2 con optimización de memoria.
    - ```q3_memory.py``` : Script de Python que contesta la pregunta 3 con optimización de memoria.
    - ```q1_time.py``` : Script de Python que contesta la pregunta 1 con optimización de tiempo.
    - ```q2_time.py``` : Script de Python que contesta la pregunta 2 con optimización de tiempo.
    - ```q3_time.py``` : Script de Python que contesta la pregunta 3 con optimización de tiempo.
    - *db* : Carpeta que contiene clases que permiten conectarse y ejecutar operaciones sobre las diferentes fuentes de datos (Google Drive, Cloud Storage, BigQuery)
    - *images* : Carpeta que contiene el diagrama en imagen utilizado en este Notebook.
    - *input* : Carpeta que contendrá el archivo descomprimido.
    - *logs* : Carpeta que contendrá los logs de funcionamiento del sistema.
    - *pipelines* : Carpeta que contiene los scripts de Python utilizados como procesos para llevar los datos de un lugar a otro (ejemplo. Cargar un archivo de Cloud Storage a BigQuery).
    - *tmo* : Carpeta contendrá el archivo .zip descargado.
    - *utils* : Carpeta que contiene utilitarios para logging, transformación y descompresión de archivos.
    - *tests* : Carpeta que contiene tests básicos a utilizarse como referencia para futuros desarrollos.

  - requirements.txt : Archivo con las librerías de python necesarias para la instalación.
  - README.md : Archivo markdown con la descripción del desafío.
  - env.example.txt: Archivo de referencia de configuración de las variables de entorno.
  - .gitignore: Archivo de configuración de git para evitar versionar directorios y archivos que no deberían versionarse.


#### 3. Descarga del archivo, descompresión y cargar en Cloud Storage


A continuación, se implementa la descargar del archivo desde la URL compartida. El archivo pasa por carpetas en disco y luego se carga hacia Cloud Storage, para ser utilizado como fuente de consulta de las preguntas.

In [1]:
from pipelines.gdrive_and_cstorage import gdrive_to_cstorage # Module to download from gdrive, unzip the file, transform the file and load to Cloud Storage
import time # Used to measure the execution time

# Google Drive file URL (make sure it's the direct download link)
url = 'https://drive.google.com/uc?id=1ig2ngoXFTxP5Pa8muXo02mDTFexZzsis'

# Record the start time of the execution
start_time = time.time()

# Call the gdrive_to_cstorage function to transfer the downloaded file from local storage to a cloud storage
# Provide the source path of the downloaded file and the destination path in the cloud storage
gdrive_to_cstorage(url, "./tmp/downloaded_file.zip", "./input", "./output")

# Record the end time of the execution
end_time = time.time()

# Calculate the total execution time by subtracting start time from end time
execution_time = end_time - start_time

# Print the execution time with two decimal places
print(f"Execution time: {execution_time:.2f} seconds")

Downloading...
From (original): https://drive.google.com/uc?id=1ig2ngoXFTxP5Pa8muXo02mDTFexZzsis
From (redirected): https://drive.google.com/uc?id=1ig2ngoXFTxP5Pa8muXo02mDTFexZzsis&confirm=t&uuid=919ecb74-4627-4870-9ce9-847f7f817382
To: /Users/leonardoburbano/LatamAir/challenge_de_leonardo_burbano/src/tmp/downloaded_file.zip
100%|██████████| 60.4M/60.4M [00:06<00:00, 8.78MB/s]


Downloaded file saved to: ./tmp/downloaded_file.zip
Processed data has been written to './output/farmers-protest-tweets-2021-2-4.json'
File uploaded to GCS. URI: gs://de_challenge_leonardo_burbano/farmers-protest-tweets-2021-2-4.json
Execution time: 19.66 seconds


**Resultado:**
El tiempo de ejecución es de 19.66 segundos, esto es alcanzable gracias a que se está cargando únicamente los campos necesarios para contestar las preguntas. Lo que reduce el tamaño del archivo, mejora el tiempo de ejecución y consumo de memoria.

**Posibles mejoras:** 
- Eventualmente se podría utilizar un DAG en Airflow (GCP Cloud Composer), un Pipeline de Dataflow o un Notebook en Databricks que facilite la parametrización y calendarización de esta carga. De igual manera, una Cloud Function + Cloud Scheduler de GCP podría ser útil.
- Como esta implementación tiene un trabajo de descarga/transformación y carga sería ideal registrar estadísticas generales como el número de lineas leídas, el numéro de líneas transformadas y el número de líneas u objetos cargados.

**Aprendizajes:**
- La librería gdown, resolbió el problema de descarga del archivo de manera sencilla, sin necesidad de utilizar una implementación más compleja de la API.

**Escalabilidad:**
- Si el tamaño del archivo crece, se podría procesar el archivo en chunks o pedazos para evitar la saturación a nivel de memoria. Y si deseamos una optimización temporal se podría utilizar carga en paralelo para reducir el tiempo de ejecución. Esto sería lograble utilizando Spark con Dataproc, utilizando Spark en Databricks o realizando cargas paralelas en Airflow con Composer.
- El seguimiento de este flujo puede promover oportunidades de mejora, por lo que se podría implementar un sistema de monitoreo como Cloud Monitoring, New Relic, entre otros.

#### 4. Implemetación

In [1]:
# Input: The filename used as input in the next functions, I considered the filename dynamic
file_path = "farmers-protest-tweets-2021-2-4.json"

In [2]:
import cProfile # Used to profile the time of execution
import pstats # Used to organize better the time stats

# Import the functions to answer the questions
from q1_memory import q1_memory
from q1_time import q1_time
from q2_memory import q2_memory
from q2_time import q2_time
from q3_memory import q3_memory
from q3_time import q3_time

## Preguntas:

### q1: Las top 10 fechas donde hay más tweets. Mencionar el usuario (username) que más publicaciones tiene por cada uno de esos días

In [3]:
# Simple execution
print(q1_time(file_path))

[(datetime.date(2021, 2, 12), 'RanbirS00614606'), (datetime.date(2021, 2, 13), 'MaanDee08215437'), (datetime.date(2021, 2, 17), 'RaaJVinderkaur'), (datetime.date(2021, 2, 16), 'jot__b'), (datetime.date(2021, 2, 14), 'rebelpacifist'), (datetime.date(2021, 2, 18), 'neetuanjle_nitu'), (datetime.date(2021, 2, 15), 'jot__b'), (datetime.date(2021, 2, 20), 'MangalJ23056160'), (datetime.date(2021, 2, 23), 'Surrypuria'), (datetime.date(2021, 2, 19), 'Preetm91')]


In [4]:
# Memory evaluation
print(q1_memory(file_path))


Filename: /Users/leonardoburbano/LatamAir/challenge_de_leonardo_burbano/src/q1_memory.py

Line #    Mem usage    Increment  Occurrences   Line Contents
    13     81.2 MiB     81.2 MiB           1   @profile
    14                                         def q1_memory(file_path: str) -> List[Tuple[datetime.date, str]]:
    15                                             """
    16                                             Executes a pipeline to load data from Cloud Storage to BigQuery and perform a query.
    17                                         
    18                                             Args:
    19                                                 file_path (str): The path of the file in Cloud Storage.
    20                                         
    21                                             Returns:
    22                                                 List[Tuple[datetime.date, str]]: A list of tuples containing date and string values.
    23                  

In [5]:
# Time evaluation
cProfile.run('q1_time(file_path)', 'tmp_time_func_stats')
p = pstats.Stats("tmp_time_func_stats")
p.sort_stats("cumulative").print_stats(10)

Fri May  3 01:38:37 2024    tmp_time_func_stats

         77346 function calls (77218 primitive calls) in 10.635 seconds

   Ordered by: cumulative time
   List reduced from 750 to 10 due to restriction <10>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.000    0.000   10.635   10.635 {built-in method builtins.exec}
        1    0.000    0.000   10.635   10.635 <string>:1(<module>)
        1    0.000    0.000   10.635   10.635 /Users/leonardoburbano/LatamAir/challenge_de_leonardo_burbano/src/q1_time.py:12(q1_time)
     10/9    0.000    0.000   10.630    1.181 /Users/leonardoburbano/LatamAir/challenge_de_leonardo_burbano/venv_dev/lib/python3.9/site-packages/google/api_core/retry/retry_unary.py:286(retry_wrapped_func)
     10/9    0.000    0.000   10.630    1.181 /Users/leonardoburbano/LatamAir/challenge_de_leonardo_burbano/venv_dev/lib/python3.9/site-packages/google/api_core/retry/retry_unary.py:85(retry_target)
        1    0.000    0.000    8.6

<pstats.Stats at 0x10965b5b0>

**Resultado:**
El tiempo de ejecución es de 10.635 segundos. Es bastante rápido, dado que incluye la carga de Cloud Storage a BigQuery y la ejecución de la consulta.
El consumo de memoria es de 81.2 MiB, en su mayoría centrado en el profiler. No hay un consumo de memoria local, por lo que es bastante óptimo para el orquestador. La carga de memoria se la envía a BigQuery que tendrá un consumo de slots para responder esta query.

**Posibles mejoras:** 
- El consumo de tiempo ha sido optimizado utilizando CTEs (Common Table Expressions). Esto evita las operaciones de disco I/O que pueden ser más lentas que la lectura en memoria.
- El consumo de memoria ha sido optimizado liberando al ambiente local de tareas de consumo de memoria y utilizando recursos en la nube para facilitar la geestión y escalabilidad.
- Se podría probar utilizando tablas temporales en lugar de CTEs y evaluar la mejora en el consumo de memoria, ya que no consumirá espacio la CTE.

**Aprendizajes:**
- BigQuery posee la capacidad de generar jobs que facilitan la carga de la información. Estos jobs generalmente logran inferir el esquema, pero en casos donde el esquema es un problema, es preferible definir el esquema manualmente o con apoyo de un LLM que pueda garantizar la correcta carga de la información.
- Cargar únicamente los campos utilizados para responder a las preguntas puede ser útil para este efecto pero eventualmente, lo ideal sería definir un esquema y un contrato de datos, para evitar sorpresas en el esquema y garantizar la carga de todos los datos posibles.

**Escalabilidad:**
- Si la base de datos crece, se podría almacenar la información particionada en BigQuery, esto facilitará los agrupamientos y el análisis de fechas específicas.

### q2: Los top 10 emojis más usados con su respectivo conteo. Debe incluir las siguientes funciones:

In [6]:
# Simple execution
print(q2_time(file_path))

[('✊', 2402), ('❤️', 1382), ('❤', 397), ('☮️', 316), ('♂️', 179), ('✌️', 168), ('♀️', 148), ('✌', 106), ('‼️', 74), ('♥️', 73)]


In [7]:
# Memory evaluation
print(q2_memory(file_path))

Filename: /Users/leonardoburbano/LatamAir/challenge_de_leonardo_burbano/src/q2_memory.py

Line #    Mem usage    Increment  Occurrences   Line Contents
    13     77.9 MiB     77.9 MiB           1   @profile
    14                                         def q2_memory(file_path: str) -> List[Tuple[datetime.date, str]]:
    15                                             """
    16                                             Executes a pipeline to load data from Cloud Storage to BigQuery and perform a query.
    17                                         
    18                                             Args:
    19                                                 file_path (str): The path of the file in Cloud Storage.
    20                                         
    21                                             Returns:
    22                                                 List[Tuple[datetime.date, str]]: A list of tuples containing date and string values.
    23                  

In [8]:
# Time evaluation
cProfile.run('q2_time(file_path)', './logs/tmp_time_func_stats')
p = pstats.Stats("./logs/tmp_time_func_stats")
p.sort_stats("cumulative").print_stats(10)

Fri May  3 01:55:00 2024    ./logs/tmp_time_func_stats

         83105 function calls (82974 primitive calls) in 10.440 seconds

   Ordered by: cumulative time
   List reduced from 736 to 10 due to restriction <10>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.000    0.000   10.440   10.440 {built-in method builtins.exec}
        1    0.000    0.000   10.440   10.440 <string>:1(<module>)
        1    0.000    0.000   10.440   10.440 /Users/leonardoburbano/LatamAir/challenge_de_leonardo_burbano/src/q2_time.py:12(q2_time)
     10/9    0.000    0.000   10.434    1.159 /Users/leonardoburbano/LatamAir/challenge_de_leonardo_burbano/venv_dev/lib/python3.9/site-packages/google/api_core/retry/retry_unary.py:286(retry_wrapped_func)
     10/9    0.000    0.000   10.434    1.159 /Users/leonardoburbano/LatamAir/challenge_de_leonardo_burbano/venv_dev/lib/python3.9/site-packages/google/api_core/retry/retry_unary.py:85(retry_target)
        1    0.000    0.000

<pstats.Stats at 0x109695550>

**Resultado:**
El tiempo de ejecución es de 10.440 segundos. Es bastante rápido, dado que incluye la carga de Cloud Storage a BigQuery y la ejecución de la consulta.
El consumo de memoria es de 77.9 MiB, en su mayoría centrado en el profiler. No hay un consumo de memoria local, por lo que es bastante óptimo para el orquestador. La carga de memoria se la envía a BigQuery que tendrá un consumo de slots para responder esta query.

**Posibles mejoras:** 
- El consumo de tiempo ha sido optimizado utilizando CTEs (Common Table Expressions). Esto evita las operaciones de disco I/O que pueden ser más lentas que la lectura en memoria.
- El consumo de memoria ha sido optimizado liberando al ambiente local de tareas de consumo de memoria y utilizando recursos en la nube para facilitar la geestión y escalabilidad.
- Se debería probar utilizando tablas temporales en lugar de CTEs y evaluar la mejora en el consumo de memoria, ya que no consumirá espacio la CTE. Sobre todo ya que en esta consulta se utiliza expresiones regulares, el consumo de memoria podría ser mucho menor.

**Aprendizajes:**
- Las expresiones regulares que mapean los emojis pueden ser bastante confusas de gestionar o entender. Se utilizó Gemini y ChatGPT para llegar a una versión adecuada que fuera compatible.
- El formateo de la query fue complejo, ya que las expresiones regulares consideraban llaves entre sus detalles. Para este efecto se extrajo la expresión regular, con lo cuál se hizo mucho más cómodo el formateo en este caso.

**Escalabilidad:**
- Ya que se aplica una transformación compleja al extraer los emojis. Debería manejarse una lógica de tabla temporal que facilite tener la data disponible con esa transformación implementada. De igual manera, para el conteo se podría utilizar Spark, con lo cuál sería mucho más cómodo escalar en chunks y de forma paralela.

### q3: El top 10 histórico de usuarios (username) más influyentes en función del conteo de las menciones (@) que registra cada uno de ellos.

In [9]:
print(q3_time(file_path))

[('narendramodi', 2265), ('Kisanektamorcha', 1840), ('RakeshTikaitBKU', 1644), ('PMOIndia', 1427), ('RahulGandhi', 1146), ('GretaThunberg', 1048), ('RaviSinghKA', 1019), ('rihanna', 986), ('UNHumanRights', 962), ('meenaharris', 926)]


In [8]:
print(q3_memory(file_path))

Filename: /Users/leonardoburbano/LatamAir/challenge_de_leonardo_burbano/src/q3_memory.py

Line #    Mem usage    Increment  Occurrences   Line Contents
    12    156.7 MiB    156.7 MiB           1   @profile
    13                                         def q3_memory(file_path: str) -> List[Tuple[str, int]]:
    14                                             """
    15                                             Executes a pipeline to load data from Cloud Storage to BigQuery and perform a query.
    16                                         
    17                                             Args:
    18                                                 file_path (str): The path of the file in Cloud Storage.
    19                                         
    20                                             Returns:
    21                                                 List[Tuple[datetime.date, str]]: A list of tuples containing date and string values.
    22                            

In [12]:
cProfile.run('q3_time(file_path)', 'tmp_time_func_stats')
p = pstats.Stats("tmp_time_func_stats")
p.sort_stats("cumulative").print_stats(10)

Thu May  2 23:05:33 2024    tmp_time_func_stats

         76934 function calls (76806 primitive calls) in 8.398 seconds

   Ordered by: cumulative time
   List reduced from 734 to 10 due to restriction <10>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.000    0.000    8.398    8.398 {built-in method builtins.exec}
        1    0.000    0.000    8.398    8.398 <string>:1(<module>)
        1    0.000    0.000    8.398    8.398 /Users/leonardoburbano/LatamAir/challenge_de_leonardo_burbano/src/q3_time.py:11(q3_time)
     10/9    0.000    0.000    8.395    0.933 /Users/leonardoburbano/LatamAir/challenge_de_leonardo_burbano/venv_dev/lib/python3.9/site-packages/google/api_core/retry/retry_unary.py:286(retry_wrapped_func)
     10/9    0.000    0.000    8.395    0.933 /Users/leonardoburbano/LatamAir/challenge_de_leonardo_burbano/venv_dev/lib/python3.9/site-packages/google/api_core/retry/retry_unary.py:85(retry_target)
        1    0.000    0.000    7.25

<pstats.Stats at 0x1304dc3a0>

**Resultado:**
El tiempo de ejecución es de 8.398 segundos. Es bastante rápido, dado que incluye la carga de Cloud Storage a BigQuery y la ejecución de la consulta.
El consumo de memoria es de 156.7 MiB, en su mayoría centrado en el profiler. No hay un consumo de memoria local, por lo que es bastante óptimo para el orquestador. La carga de memoria se la envía a BigQuery que tendrá un consumo de slots para responder esta query.

**Posibles mejoras:** 
- El consumo de tiempo ha sido optimizado utilizando CTEs (Common Table Expressions). Esto evita las operaciones de disco I/O que pueden ser más lentas que la lectura en memoria.
- El consumo de memoria ha sido optimizado liberando al ambiente local de tareas de consumo de memoria y utilizando recursos en la nube para facilitar la geestión y escalabilidad.
- La query está bastante optimizada, se podría pensar en particionamiento en caso de un crecimiento de los datos para facilitar la distibución del procesamiento.

**Aprendizajes:**
- UNNEST facilita el conteo y mapeo de los campos JSON, probablemente sea ideal realizar esa transformación previamente al conteo en caso de que crezca el número de datos o se quiera optimizar el consumo de memoria.

**Escalabilidad:**
- UNNEST consume bastante espacio en memoria y crea más filas de información. Se podría pensar en utilizar paralelización de la query o Spark para una adecuada optimización.

## Features implemementados y releases

Se utilizó el esquema de GitFlow, en ocasiones se solucionó desfases en las ramas utilizando nuevas versiones de los releases y rebase hacia commits pasados para resolver conflictos.
Se dividió el análisis en features que fueron desde la exploración inicial, la implementación de los conectores a las distintas fuentes de datos, implementación de los pipelines, perfilamiento de memoria y tiempo, y respuest a las preguntas.

La estrategia que se siguió, fue una estrategia Ágil, basada en agregar valor desde el primer release.
En el primer release, se liberó la funcionalidad, considerando una carga manual y transformación rápida utilizando un Notebook, esto liberó las respuestas a las preguntas a través del Notebook.
El siguiente release implementó el factor dinámico del nombre del archivo, aunque fuera cargado manualmente. Y así hasta llegar a facilitar la ejecución del flujo completo, desde la descarga del archivo.

Aprendizaje: Es crítico tener un concenso y bloqueos en el repositorio de Github, para evitar que hayan ajustes innecesarios y merges sobre ramas que no se deben, en general esta regulación podría estar acompañada sobre revisiones del formato de los scripts, vulnerabilidades en el código, testing automático y cualquier otra práctica que facilite la integración y despliegue continuos.

![alt text](images/github_network.png "Github Network")

## Tests básicos

Se implementaron test básicos utilizando **pytest**. Los tests están en la carpeta **src/tests**
Se considera que el archivo ya está cargado en CloudStorage.

Se recomienda desarrollar los tests con un enfoque de Data Engineering: verificar cantidad de registros cargados, validación de tipo de datos, etc.

Aquí los resultados:

![alt text](images/test_results.png "Tests Results")

## Mejoras y pasos futuros
- **Testing:** Implementar testing facilitar dar mantenimiento a las clases y a las functiones implementadas. Tanto los test unitarios como los test de integración harían que sea fácil detectar errores en las modificaciones futuras del código.
- **Monitoreo:** En general, el proceso depende de la calidad del archivo que se ingesta. A menos que este archivo venga directamente de la API de Twitter su integridad puede estar en duda, por lo que tener claridad en monitoreo puede ser bastante útil para detectar cambios de esquema y errores, además de programar un reprocesamiento automático, en caso de que se requiera o hayan actualizaciones sobre los tweets.
- **Plataforma:** Como se mencionó previamente, si se quiere tener un proceso recurrente, podría ser útil llevar una buena governanza de la ejecución de los JOBs en Databricks, Airflow (Composer) y programar alertas cuando el tiempo crezca, el archivo no sea el correcto, etc.
- **Producto:** Eventualmente, si apuntamos a productivizar este desarrollo. Puede ser interesante que las funciones y el archivo lleven un registro histórico de los resultados y de la calidad de los Pipelines, para garantizar mejoras continuas en el esquema, optimización de las queries, etc.
- **Infraestructura:** La infraestructura fue configurada manualmente, sería idea manejar las configuraciones como código para mantener la gobernanza de las configuraciones y facilitar despliegues futuros.