In [1]:
%load_ext autoreload
%autoreload 2

In [9]:
import json
import logging
from pathlib import Path
from datetime import datetime

from pyspark.sql import SparkSession, DataFrame, functions as F

from src.innova.config.config import etl_config
from utils.trackers import setup_logging, save_metadata
from utils.spark_helpers import (
    load_parquet, save_parquet, load_metadata, save_metadata
)
from utils.transform_schema import apply_strict_schema
from utils.connections.sql_server_connector import SQLServerConnector


In [3]:
spark = SparkSession.builder.getOrCreate()

In [4]:
BUSINESS: str = "innova"
LOAD_TS = datetime.now().strftime("%Y%m%d%H%M%S")

LAYER: str = etl_config["etl"]["layer_slv"]
STEP_NAME_1: str = etl_config["etl"]["steps"][0]["name"]
STEP_NAME_2: str = etl_config["etl"]["steps"][1]["name"]
BASE_PATH: str = etl_config["paths"]["base_path"]      
LOGS_PATH: str = etl_config["paths"]["logs_path"]    
METADATA_PATH: str = etl_config["paths"]["metadata_path"] 

BASE_SILVER: Path = f"{BASE_PATH}/{LAYER}/{BUSINESS}"


logger = setup_logging(LOGS_PATH)
logger.info(f"starting {STEP_NAME_2}")

'/alegra/metadata/innova/'

In [11]:
metadata_file_step_01 = Path(f"{METADATA_PATH}/{STEP_NAME_1}.json")
logger.info(f"Loading Bronze metadata from: {metadata_file_step_01}")
bronze_metadata = load_metadata(
    metadata_path=metadata_file_step_01
)

{'customers': {'path': '/alegra/data_lake/bronze/innova/customers/load_date=20250601055858',
  'load_date': '20250601055858',
  'record_count': 100},
 'products': {'path': '/alegra/data_lake/bronze/innova/products/load_date=20250601055858',
  'load_date': '20250601055858',
  'record_count': 50},
 'invoices': {'path': '/alegra/data_lake/bronze/innova/invoices/load_date=20250601055858',
  'load_date': '20250601055858',
  'record_count': 1000}}

In [None]:
def load_bronze_df(
        spark: SparkSession, 
        bronze_path: str, 
        logger: logging.Logger
) -> DataFrame:
    logger.info(f"Loading Bronze parquet from: {bronze_path}")
    return load_parquet(
        spark=spark, 
        path=bronze_path, 
        logger=logger
    )


def transform_and_cast(
    source_name: str,
    df: DataFrame,
    schema_dict: dict,
    logger: logging.Logger
) -> DataFrame:
    logger.info(f"Transforming and casting table: {source_name}")
    if source_name == "customers":
        df = df.filter(F.col("ID").isNotNull())
    elif source_name == "products":
        df = df.filter(F.col("ID").isNotNull())
    elif source_name == "invoices":
        df = (
            df.filter(F.col("ID").isNotNull())
              .filter(F.col("ClienteID").isNotNull())
              .filter(F.col("ProductoID").isNotNull())
        )
    df = apply_strict_schema(df, schema_dict)
    count_after = df.count()
    logger.info(f"[{source_name}] Row count after cast: {count_after}")
    return df


def write_to_silver_parquet(
    df: DataFrame,
    silver_path: str,
    logger: logging.Logger
) -> None:
    logger.info(f"Saving to Silver parquet: {silver_path}")
    save_parquet(data=df, path=silver_path, logger=logger)


def write_to_sql_staging(
    spark: SparkSession,
    df: DataFrame,
    staging_table: str,
    logger: logging.Logger
) -> None:
    logger.info(f"Writing to SQL Server staging: {staging_table}")
    with SQLServerConnector(spark, logger=logger) as sql_conn:
        sql_conn.write_dataframe(
            df,
            table=staging_table,
            mode="overwrite",
            options={"batchsize": 1000}
        )

In [7]:
metadata_silver: dict = {}
tables_cfg = etl_config["tables"]

for table_key, table_conf in tables_cfg.items():
    source_name = table_conf["source"]
    target_name = table_conf["target"]
    schema_dict = table_conf["schema"]
    print(
       source_name,
       target_name,
       schema_dict
    )

    bronze_info = bronze_metadata.get(source_name)
    if bronze_info is None:
        logger.error(f"No Bronze metadata for '{source_name}', skipping.")
        continue

    df_bronze = load_bronze_df(
        spark=spark, 
        bronze_path=bronze_info["path"], 
        logger=logger
    )

    df_transformed = transform_and_cast(
        source_name=source_name, 
        df=df_bronze, 
        schema_dict=schema_dict, 
        logger=logger
    )

    silver_path = f"{BASE_SILVER}/temp_tables/{target_name}"
    write_to_silver_parquet(
        df=df_transformed, 
        silver_path=silver_path, 
        logger=logger
    )

    db_table_full = f"silver_prod.temp_tables.{target_name}"
    write_to_sql_staging(
        spark=spark, 
        df=df_transformed, 
        staging_table=db_table_full, 
        logger=logger
    )

    record_count = df_transformed.count()
    metadata_silver[table_key] = {
        "path"        : silver_path,
        "load_date"   : LOAD_TS,
        "record_count": record_count,
        "db_table"    : db_table_full
    }

logger.info(f"Finished {STEP_NAME_2}")

customers customers_stg {'ID': 'LongType', 'Nombre': 'StringType', 'Ubicacion': 'StringType', 'Segmento': 'StringType'}
products products_stg {'ID': 'LongType', 'Nombre': 'StringType', 'Categoria': 'StringType', 'Precio': 'DecimalType(18,14)'}
invoices invoices_stg {'ID': 'LongType', 'Fecha': 'DateType', 'ClienteID': 'LongType', 'ProductoID': 'LongType', 'Cantidad': 'LongType', 'Total': 'DecimalType(18,14)'}


In [12]:
output_metadata_file = Path(f"{METADATA_PATH}/{STEP_NAME_2}.json")
logger.info(f"Writing Silver metadata to: {output_metadata_file}")
save_metadata(
    metadata=metadata_silver, 
    metadata_path=output_metadata_file
)
logger.info("Metadata for Step 02 saved.")