# APEX GLOBAL MOBILITY - NOTEBOOK EXPLORATORIO

Este notebook muestra como el flujo de trabajo ETL de produccion procesa los datos paso a paso.
<br><br>La logica principal se encuentra en `src/etl.py`.

## CONFIGURACION DE LIBRERIAS E INSTANCIAS

In [1]:
import os

os.chdir("..")  # subir a CBC/cbc-data-pipeline
print(os.getcwd())

/Users/glopez/Downloads/CBC/cbc-data-pipeline


In [2]:
from omegaconf import OmegaConf
from src.etl import ETLEngineer

In [3]:
# Cargar parametros base
base_conf = OmegaConf.load("conf/base/parameters.yaml")

# Cargar configuracion de entorno
env_conf = OmegaConf.load("conf/onpremise/env.yaml")

# Combinar
conf = OmegaConf.merge(base_conf, env_conf)

print(OmegaConf.to_yaml(conf))

pipeline:
  country: GT
  start_date: '20250101'
  end_date: '20251231'
  env: onpremise
  input_path: data/input/global_mobility_data_entrega_productos.csv
  output_path: data/processed/
  format: parquet
  write_mode: overwrite
business_rules:
  units_conversion:
    CS: 20
    ST: 1
  delivery_types:
    rutina:
    - ZPRE
    - ZVE1
    bonificacion:
    - Z04
    - Z05
spark:
  shuffle_partitions: 1



In [4]:
# Instanciamos la clase
etl = ETLEngineer(conf)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/11 22:24:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## CARGA DE DATOS (EXTRACT)

In [5]:
df_read = etl.read_data()
df_read.show(5)

+----+-------------+----------+------+------------+--------+-------+--------+------+
|pais|fecha_proceso|transporte|  ruta|tipo_entrega|material| precio|cantidad|unidad|
+----+-------------+----------+------+------------+--------+-------+--------+------+
|  GT|     20250513|  67053596|919885|        ZPRE|AA004003|3195.54|   100.0|    CS|
|  GT|     20250513|  67053596|919885|        ZPRE|BA018426| 529.99|    20.0|    CS|
|  GT|     20250513|  67053610|919885|         Z04|BA018426|7799.74|   103.0|    CS|
|  GT|     20250513|  67053610|919885|         Z04|BA018426|  25.25|     2.0|    ST|
|  GT|     20250513|  67053610|919885|         Z04|AA004005| 1537.5|    15.0|    CS|
+----+-------------+----------+------+------------+--------+-------+--------+------+
only showing top 5 rows



In [6]:
df_read.printSchema()
print(df_read.count())
df_read.select("pais").distinct().show()

root
 |-- pais: string (nullable = true)
 |-- fecha_proceso: integer (nullable = true)
 |-- transporte: integer (nullable = true)
 |-- ruta: integer (nullable = true)
 |-- tipo_entrega: string (nullable = true)
 |-- material: string (nullable = true)
 |-- precio: double (nullable = true)
 |-- cantidad: double (nullable = true)
 |-- unidad: string (nullable = true)

379
+----+
|pais|
+----+
|  GT|
|  PE|
|  EC|
|  SV|
|  HN|
|  JM|
+----+



## TRANSFORMACION

In [7]:
df_transform = etl.transform_data()
df_transform.show(5)

+----+-------------+----------+------+------------+--------+-------+--------+------+-----------+--------------------+--------------------+------------+-------------------------+-----------------+-----------------------+
|pais|fecha_proceso|transporte|  ruta|tipo_entrega|material| precio|cantidad|unidad|desc_fuente| desc_nombre_archivo|     dtm_fecha_carga|num_unidades|cantidad_unidades_totales|es_entrega_rutina|es_entrega_bonificacion|
+----+-------------+----------+------+------------+--------+-------+--------+------+-----------+--------------------+--------------------+------------+-------------------------+-----------------+-----------------------+
|  GT|   2025-05-13|  67053596|919885|        ZPRE|AA004003|3195.54|   100.0|    CS|        csv|global_mobility_d...|2026-01-11 22:24:...|          20|                   2000.0|                1|                      0|
|  GT|   2025-05-13|  67053596|919885|        ZPRE|BA018426| 529.99|    20.0|    CS|        csv|global_mobility_d...|202

In [8]:
df_transform.printSchema()
print(df_transform.count())
df_transform.select("material").distinct().show()

root
 |-- pais: string (nullable = true)
 |-- fecha_proceso: date (nullable = true)
 |-- transporte: integer (nullable = true)
 |-- ruta: integer (nullable = true)
 |-- tipo_entrega: string (nullable = true)
 |-- material: string (nullable = true)
 |-- precio: double (nullable = true)
 |-- cantidad: double (nullable = true)
 |-- unidad: string (nullable = true)
 |-- desc_fuente: string (nullable = false)
 |-- desc_nombre_archivo: string (nullable = false)
 |-- dtm_fecha_carga: timestamp (nullable = true)
 |-- num_unidades: integer (nullable = true)
 |-- cantidad_unidades_totales: double (nullable = true)
 |-- es_entrega_rutina: integer (nullable = false)
 |-- es_entrega_bonificacion: integer (nullable = false)

12
+--------+
|material|
+--------+
|AA004003|
|BA018426|
|AA004005|
+--------+



## CARGA (LOAD)

In [9]:
df_write = etl.write_data()
df_write.show(5)

+----+-------------+----------+------+------------+--------+-------+--------+------+-----------+--------------------+--------------------+------------+-------------------------+-----------------+-----------------------+
|pais|fecha_proceso|transporte|  ruta|tipo_entrega|material| precio|cantidad|unidad|desc_fuente| desc_nombre_archivo|     dtm_fecha_carga|num_unidades|cantidad_unidades_totales|es_entrega_rutina|es_entrega_bonificacion|
+----+-------------+----------+------+------------+--------+-------+--------+------+-----------+--------------------+--------------------+------------+-------------------------+-----------------+-----------------------+
|  GT|   2025-05-13|  67053596|919885|        ZPRE|AA004003|3195.54|   100.0|    CS|        csv|global_mobility_d...|2026-01-11 22:24:...|          20|                   2000.0|                1|                      0|
|  GT|   2025-05-13|  67053596|919885|        ZPRE|BA018426| 529.99|    20.0|    CS|        csv|global_mobility_d...|202

                                                                                

In [10]:
df_write.printSchema()
print(df_write.count())
df_write.select("tipo_entrega").distinct().show()

root
 |-- pais: string (nullable = true)
 |-- fecha_proceso: date (nullable = true)
 |-- transporte: integer (nullable = true)
 |-- ruta: integer (nullable = true)
 |-- tipo_entrega: string (nullable = true)
 |-- material: string (nullable = true)
 |-- precio: double (nullable = true)
 |-- cantidad: double (nullable = true)
 |-- unidad: string (nullable = true)
 |-- desc_fuente: string (nullable = false)
 |-- desc_nombre_archivo: string (nullable = false)
 |-- dtm_fecha_carga: timestamp (nullable = true)
 |-- num_unidades: integer (nullable = true)
 |-- cantidad_unidades_totales: double (nullable = true)
 |-- es_entrega_rutina: integer (nullable = false)
 |-- es_entrega_bonificacion: integer (nullable = false)

12
+------------+
|tipo_entrega|
+------------+
|        ZPRE|
|         Z04|
+------------+

