# **Desafío #3 – Proceso batch hacia un Data Lake en Azure**

## **3.1 Introducción** 

Este notebook documenta el desarrollo del Desafío #3 del componente de Ingeniería de Datos + Cloud.

En los desafíos anteriores se construyó un modelo sintético de una organización (departamentos, puestos de trabajo y empleados) y se persistieron los datos resultantes en formato CSV y Parquet dentro del proyecto local, en la ruta `01_data_cloud/data/raw`.

El objetivo de este tercer desafío es diseñar e implementar un proceso batch que migre esos datos hacia un entorno de almacenamiento en la nube, utilizando un servicio tipo Data Lake en Azure. El enunciado de la prueba permite utilizar una base de datos SQL/NoSQL, un Data Warehouse o un bucket analítico dentro de un Datalake. En esta solución se selecciona como destino un **Azure Storage Account con capacidades de Data Lake (Blob Storage / ADLS Gen2)**, aprovechando la capa gratuita disponible en la suscripción académica.

El proceso batch se implementa como un script de Python idempotente que:

- lee los archivos CSV y Parquet ubicados en `data/raw`,  
- organiza los datos en una estructura de carpetas propia de una capa *raw* de Data Lake,  
- y los carga a un contenedor de Azure Storage preparado para ser consumido por procesos analíticos posteriores.  

Este enfoque cumple con el requerimiento del Desafío #3 y se alinea con prácticas habituales de ingeniería de datos en arquitecturas modernas basadas en Data Lake.


## **3.2 Configuración de rutas y carga del `.env`**

In [1]:
import sys
from pathlib import Path
from dotenv import load_dotenv

# Directorio del notebook: 01_data_cloud/docs
notebook_dir = Path.cwd()
project_root = notebook_dir.parent

src_path = project_root / "src"
if str(src_path) not in sys.path:
    sys.path.append(str(src_path))

# Cargar variables del archivo .env ubicado en 01_data_cloud
env_path = project_root / ".env"
load_dotenv(env_path)

print("Notebook directory :", notebook_dir)
print("Project root       :", project_root)
print("Src path           :", src_path)
print(".env path          :", env_path)


Notebook directory : /Users/capeto/Library/CloudStorage/OneDrive-Personal/GitHub/mvm-technical-challenge/01_data_cloud/docs
Project root       : /Users/capeto/Library/CloudStorage/OneDrive-Personal/GitHub/mvm-technical-challenge/01_data_cloud
Src path           : /Users/capeto/Library/CloudStorage/OneDrive-Personal/GitHub/mvm-technical-challenge/01_data_cloud/src
.env path          : /Users/capeto/Library/CloudStorage/OneDrive-Personal/GitHub/mvm-technical-challenge/01_data_cloud/.env


## **3.3 Arquitectura del proceso batch**

El proceso batch diseñado para este desafío sigue la siguiente arquitectura lógica:

1. **Fuente local:** Los archivos generados en los Desafíos #1 y #2 se encuentran en:

   ```text
   01_data_cloud/data/raw
       departments.csv / departments.parquet
       job_positions.csv / job_positions.parquet
       employees.csv / employees.parquet
   ```

2. **Proceso batch (Python):** Un script ubicado en `src/batch_etl/org_batch_etl.py` implementa la lógica de:

   - descubrimiento de archivos en `data/raw`,  
   - construcción de rutas destino en el Data Lake,  
   - y carga de cada archivo a un contenedor de Azure Blob Storage.  

   El proceso es **idempotente**: la re-ejecución con los mismos datos sobrescribe los archivos existentes, evitando duplicados.

3. **Destino en la nube: Data Lake en Azure:** El destino es un contenedor de Azure Storage, por ejemplo `org-raw`, organizado con una estructura de prefijos que refleja una capa raw y una versión lógica del modelo:

   ```text
   container: org-raw
     └── org_data/
         └── v1/
             ├── departments/
             │   ├── departments.csv
             │   └── departments.parquet
             ├── job_positions/
             │   ├── job_positions.csv
             │   └── job_positions.parquet
             └── employees/
                 ├── employees.csv
                 └── employees.parquet
   ```

Esta estructura facilita el consumo posterior desde motores de análisis (Spark, Synapse, Databricks, etc.) y mantiene claramente identificada la versión del modelo sintético (`v1`).


## **3.4 Configuración de acceso a Azure Storage**

El acceso al Storage Account se gestiona mediante variables de entorno, evitando exponer credenciales en el código fuente. El proceso batch utiliza las siguientes variables:

- **`AZURE_STORAGE_CONNECTION_STRING`:** Connection string completo del Storage Account (incluye `AccountName` y `AccountKey`).  

- **`AZURE_STORAGE_CONTAINER_RAW`:** Nombre del contenedor donde se almacenarán los datos raw. Valor por defecto utilizado en el script: `org-raw`.  

- **`AZURE_STORAGE_BASE_PREFIX`:** Prefijo base dentro del contenedor para la versión actual del modelo. Valor por defecto utilizado en el script: `org_data/v1`.  

Estas variables pueden definirse a nivel de sistema operativo o mediante un archivo `.env` que no se versiona en el repositorio. El script batch lee dichas variables en tiempo de ejecución y falla explícitamente si la cadena de conexión no se encuentra configurada.


In [2]:
from batch_etl.org_batch_etl import run_batch_upload

run_batch_upload()

Iniciando proceso batch de carga a Data Lake...
Run timestamp (UTC): 2025-12-06T15:36:07.788942
Directorio local de origen: /Users/capeto/Library/CloudStorage/OneDrive-Personal/GitHub/mvm-technical-challenge/01_data_cloud/data/raw
Contenedor destino: org-raw
Prefijo base en el contenedor: org_data/v1
------------------------------------------------------------
Subido: job_positions.parquet -> org_data/v1/job_positions/job_positions.parquet
Subido: job_positions.csv -> org_data/v1/job_positions/job_positions.csv
Subido: departments.csv -> org_data/v1/departments/departments.csv
Subido: departments.parquet -> org_data/v1/departments/departments.parquet
Subido: employees.parquet -> org_data/v1/employees/employees.parquet
Subido: employees.csv -> org_data/v1/employees/employees.csv
------------------------------------------------------------
Proceso batch completado.


## **3.5 Implementación del proceso batch en `src/batch_etl/org_batch_etl.py`**

La lógica del proceso batch se implementa en el módulo `src/batch_etl/org_batch_etl.py`. Este módulo:

- resuelve las rutas locales del proyecto (`data/raw`),  

- carga la configuración de Azure Storage desde el entorno,  

- identifica los archivos CSV y Parquet a cargar,  

- construye las rutas destino dentro del contenedor,  

- y realiza la carga de cada archivo utilizando el SDK oficial `azure-storage-blob`.

A continuación se muestra el contenido esencial del módulo para referencia técnica.

In [3]:
import os
from azure.storage.blob import BlobServiceClient

from dotenv import load_dotenv
load_dotenv(project_root / ".env")  # por si el kernel se reinició

conn_str = os.getenv("AZURE_STORAGE_CONNECTION_STRING")
container_name = os.getenv("AZURE_STORAGE_CONTAINER_RAW", "org-raw")

blob_service = BlobServiceClient.from_connection_string(conn_str)
container_client = blob_service.get_container_client(container_name)

print(f"Blobs en el contenedor '{container_name}':")
for blob in container_client.list_blobs():
    print(" -", blob.name)


Blobs en el contenedor 'org-raw':
 - org_data
 - org_data/v1
 - org_data/v1/departments
 - org_data/v1/departments/departments.csv
 - org_data/v1/departments/departments.parquet
 - org_data/v1/employees
 - org_data/v1/employees/employees.csv
 - org_data/v1/employees/employees.parquet
 - org_data/v1/job_positions
 - org_data/v1/job_positions/job_positions.csv
 - org_data/v1/job_positions/job_positions.parquet


## **3.6 Conclusión del Desafío #3**

El proceso descrito y ejecutado en este notebook completa el Desafío #3 de la prueba técnica:

- Los datos sintéticos generados y persistidos localmente en formatos CSV y Parquet se migran a un **Data Lake en Azure** utilizando un Storage Account con capacidades de Blob Storage / ADLS Gen2.
- La carga se realiza mediante un proceso batch implementado en Python, configurado por variables de entorno, idempotente y alineado con una estructura de carpetas propia de una capa *raw* versionada (`org_data/v1`).
- La solución es coherente con arquitecturas de datos contemporáneas y prepara el terreno para la creación de vistas, consultas y APIs en los desafíos posteriores.

Con esta etapa, la cadena de valor de la solución de Ingeniería de Datos + Cloud se extiende desde la generación de datos sintéticos hasta su almacenamiento analítico en la nube.
