In [0]:
#Esto es para que la librería se actualice automáticamente (el .py de los includes en este caso), en producción hay que tomar la decisión si debería quitarse para evitar cualquier problema y hacer un restart si se cambia la librería o si dejarlo y cuando se haga un cambio, lo va a tomar automáticamente
%load_ext autoreload
%autoreload 2

# To disable autoreload; run %autoreload 0

In [0]:
#IMPORT LIBRARIES
from pyspark.sql import SparkSession
import json
import sys
import importlib
import pyspark.sql.functions as F
from pyspark.sql.types import StructType
from pyspark.sql.window import Window
from datetime import datetime
from delta.tables import DeltaTable

sys.path.append("/Workspace/BI-OVC")

#esto es funciona porque existe un __init__.py dentro de cada carpeta
from includes import control_functions
from includes import validations
from schema import fin_act_sch


In [0]:
spark = SparkSession.builder \
    .appName("bi-ovc-test") \
    .getOrCreate()

In [0]:
#GENERIC PARAMETER
sys_status_column_name = 'sys_status_code'
error_status_code = 'E'
process_source_name = 'Load BI OVC'
process_step_name = 'raw to bronze'
sys_modified_by_name = 'NBK - Load Finance - raw to bronze'
source_system_code = 'sapbr'
fn_status = True


In [0]:
#LOG START
process_run_id, fn_status = control_functions.log_process_run_start(process_source_name,process_step_name,source_system_code,sys_modified_by_name)

In [0]:
#IF errors, set row status to E
if process_run_id > 0:
  control_functions.log_process_run_update_value(process_run_id,sys_modified_by_name,sys_status_column_name, error_status_code)

if fn_status == False:
  print(f"❌ ERROR STARTING PROCESS")
  raise 

In [0]:
#FILE PARAMETERS FROM NOTEBOOK
sourceFileNamePrefix = 'tdf_fin_variable_cost_act_sapbr'
sourceFileExtension = 'txt'
sourceFileDelimiter = '\\t'
sourceFileEncoding = 'ISO-8859-1'
sourceSchema = 'fin-act-raw-sapbr'
tableName = 'tb_fin_variable_cost_act_sapbr'

In [0]:
#GET DATA FROM CONTROL TABLE
try:
    df = control_functions.get_process_source_parameters(process_source_name,process_step_name)
except Exception as e:
    control_functions.log_process_run_update_value(process_run_id,sys_modified_by_name,sys_status_column_name, error_status_code)
    print(f"❌ ERROR GETTING PROCESS SETUP PARAMETERS: {e}")
    raise 

In [0]:
#PARAMETERS FROM CONTROL TABLE
SourceBucket = df.select('source_bucket_name').collect()[0][0]
SourceBucketFolderKey = df.select('source_bucket_folder_key').collect()[0][0]
TargetBucket = df.select('target_bucket_name').collect()[0][0]
TargetBucketFolderKey = df.select('target_bucket_folder_key').collect()[0][0]
ArchiveBucket = df.select('archive_bucket_name').collect()[0][0]
ArchiveBucketFolderKey = df.select('archive_bucket_folder_key').collect()[0][0]

In [0]:
# READ SCHEMAS - from .py
try:
    raw_schema = fin_act_sch.get_schema(sourceSchema)
except Exception as e:
    control_functions.log_process_run_update_value(process_run_id,sys_modified_by_name,sys_status_column_name, error_status_code)
    print(f"❌ ERROR READING SOURCE SCHEMA: {e}")
    raise

In [0]:
#GET FILE NAMES FROM RAW SOURCE
files_to_process = []

raw_generic_path = "s3://" + SourceBucket + "/" + SourceBucketFolderKey + "/"

try:
    files = dbutils.fs.ls(raw_generic_path)

    files_to_process = [
        f for f in files
        if f.name.lower().startswith(sourceFileNamePrefix.lower()) and f.name.lower().endswith(f".{sourceFileExtension.lower()}")
    ]

    files_to_process.sort()

    if not files_to_process:
        print(f"⚠️ NO MATCHING FILES")

except Exception as e:
    control_functions.log_process_run_update_value(process_run_id,sys_modified_by_name,sys_status_column_name, error_status_code)
    print(f"❌ ERROR GETTING FILE NAME FROM SOURCE: {e}")
    raise 

In [0]:
archive_path = "s3://" + ArchiveBucket + "/" + ArchiveBucketFolderKey
bronze_generic_path = "s3://" + TargetBucket + "/" 

source_cnt = 0
target_cnt = 0
curr_row_flg_updated = 0 

if files_to_process:
    for i, file in enumerate(files_to_process, start=1):
        #CHECK FILENAME

        #READ SOURCE FILE AND ADD COLUMN NAMES TO DATAFRAME
        file_path = raw_generic_path + file.name

        try: 
            df_raw = spark.read.options(encoding=sourceFileEncoding,delimiter=sourceFileDelimiter, header=False, schema=raw_schema).csv(file_path) 
        
        except Exception as e:
            control_functions.log_process_run_update_value(process_run_id,sys_modified_by_name,sys_status_column_name, error_status_code)
            print(f"❌ ERROR READING FILE {file_path}: {e}")
            raise

        #Rename columns using schema names
        df_raw = df_raw.toDF(*[f.name for f in raw_schema.fields])

        #Add DW_ROW_ID to be able to identify rows in case of errors in validations
        window_spec = Window.orderBy(F.lit(1))

        # Composite ID: process_run_id + "_" + seq nbr
        df_raw = df_raw.withColumn(
            "DW_ROW_ID",
            F.concat(F.lit(str(process_run_id) + "_"), F.row_number().over(window_spec))
        )

        #Accumulate count of rows from source before any validation, count inside loop in case of many files loaded
        source_cnt += df_raw.count()

        #PH VALIDATIONS

        #Get all PH validations that applies for the current process
        try:
            df_validations = validations.get_object_validation(process_run_id, 'PH')

        except Exception as e:
            control_functions.log_process_run_update_value(process_run_id,sys_modified_by_name,sys_status_column_name, error_status_code)
            print(f"❌ ERROR GETTING OBJECT VALIDATIONS LIST: {e}")
            raise

        #Validate and get records validated and records rejected
        try:
            df_bronze, df_bronze_rejected = validations.physical_validation(df_raw, df_validations, process_run_id)
            
        except Exception as e:
            control_functions.log_process_run_update_value(process_run_id,sys_modified_by_name,sys_status_column_name, error_status_code)
            print(f"❌ ERROR IN PHYSICAL VALIDATIONS FN: {e}")
            raise

        #WRITE IN BRONZE
        #if df_bronze and df_bronze.limit(1).count() > 0:
        if df_bronze.limit(1).count() > 0:
            bronze_delta_path = bronze_generic_path.rstrip("/") + "/" + tableName 

            try:
                dbutils.fs.ls(bronze_delta_path)
            except:
                #dbutils.fs.mkdirs(bronze_delta_path)
                df_empty = spark.createDataFrame([], "DW_VALID_FROM_DT timestamp")  # ajusta el esquema
                df_empty.write.format("delta").save(bronze_delta_path)
                
            try:
                # Enrich DF ALWAYS before writing
                df_bronze = df_bronze.withColumn('DW_VALID_FROM_DT', F.from_utc_timestamp(F.current_timestamp(), "Brazil/East")) \
                    .withColumn('DW_VALID_TO_DT', F.lit(None).cast("timestamp")) \
                    .withColumn('DW_CURR_ROW_FLG', F.lit(True)) \
                    .withColumn('DW_FILE_NAME', F.lit(file.name))
                
                if DeltaTable.isDeltaTable(spark, bronze_delta_path):
                    delta_table = DeltaTable.forPath(spark, bronze_delta_path)
                    
                    # Update existing records (only the first time (i==1), so doesnt mark as False the records of other files inserted in the same process running).
                    if i == 1 and curr_row_flg_updated == 0 and "DW_CURR_ROW_FLG" in delta_table.toDF().columns:
                        delta_table.update(
                            condition="DW_CURR_ROW_FLG = true",
                            set={
                                "DW_CURR_ROW_FLG": "false",
                                "DW_VALID_TO_DT": F.from_utc_timestamp(F.current_timestamp(), "Brazil/East")
                            }
                        )

                        curr_row_flg_updated = 1 
                    mode = "append"
                else:
                    mode = "overwrite"  #first loading
                
                # DataFrame with only the schema
                df_bronze.write.format("delta").mode(mode).option("mergeSchema", "true").save(bronze_delta_path)
            
            except Exception as e:
                control_functions.log_process_run_update_value(process_run_id,sys_modified_by_name,sys_status_column_name, error_status_code)
                print(f"❌ ERROR READING FILE {bronze_generic_path}: {e}")
                raise
        else:
            print(f"⚠️ EMPTY BRONZE DATAFRAME")

        #WRITE IN BRONZE_REJECTED
        #if df_bronze_rejected and df_bronze_rejected.limit(1).count() > 0:
        df_empty = []
        if df_bronze_rejected.limit(1).count() > 0:
            bronze_delta_path = bronze_generic_path.rstrip("/") + "-rejected" + "/" + tableName 

            try:
                dbutils.fs.ls(bronze_delta_path)
            except:
                df_empty = spark.createDataFrame([], "DW_VALID_FROM_DT timestamp")  # adjust schema
                df_empty.write.format("delta").save(bronze_delta_path)
                
            try:
                # Enrich DF ALWAYS before writing
                df_bronze_rejected = df_bronze_rejected.withColumn('DW_VALID_FROM_DT', F.from_utc_timestamp(F.current_timestamp(), "Brazil/East")) \
                    .withColumn('DW_VALID_TO_DT', F.lit(None).cast("timestamp")) \
                    .withColumn('DW_CURR_ROW_FLG', F.lit(True)) \
                    .withColumn('DW_FILE_NAME', F.lit(file.name))
                
                if DeltaTable.isDeltaTable(spark, bronze_delta_path):
                    delta_table_rejected = DeltaTable.forPath(spark, bronze_delta_path)
                    
                    # Update existing records (only the first time (i==1), so doesnt mark as False the records of other files inserted in the same process running) And if it wasn't updated for Write in Bronze first

                    if i == 1 and curr_row_flg_updated == 0 and "DW_CURR_ROW_FLG" in delta_table_rejected.toDF().columns:
                        delta_table_rejected.update(
                            condition="DW_CURR_ROW_FLG = true",
                            set={
                                "DW_CURR_ROW_FLG": "false",
                                "DW_VALID_TO_DT": F.from_utc_timestamp(F.current_timestamp(), "Brazil/East")
                            }
                        )
                    
                    curr_row_flg_updated = 1 
                    
                    mode = "append"
                else:
                    mode = "overwrite"  # primera carga
                
                # DataFrame mínimo solo con esquema

                df_bronze_rejected.write.format("delta").mode(mode).option("mergeSchema", "true").save(bronze_delta_path)

            except Exception as e:
                control_functions.log_process_run_update_value(process_run_id,sys_modified_by_name,sys_status_column_name, error_status_code)
                print(f"❌ ERROR WRITING FILE {bronze_generic_path}: {e}")
                raise
        else:
            print("⚠️ EMPTY BRONZE REJECTED DATAFRAME")

        #Accumulate count of rows moved to target (not rejected), count inside loop in case of many files loaded
        target_cnt += df_bronze.count()

        #Always move to archive
        source_path = raw_generic_path.rstrip("/")
        target_path = archive_path.rstrip("/")

        # Ruta completa del archivo origen
        source_file_full = f"{source_path}/{file.name}"

        # Verificar existencia
        try:
            files = [f.name for f in dbutils.fs.ls(source_path)]
            if file.name not in files:
                print(f"⚠️ FILE NOT FOUND: {source_file_full}")
        except Exception as e:
            control_functions.log_process_run_update_value(process_run_id,sys_modified_by_name,sys_status_column_name, error_status_code)
            print(f"❌ ERROR ACCESSING SOURCE PATH: {e}")
            raise

        # Crear carpeta destino si no existe
        try:
            dbutils.fs.ls(target_path)
        except:
            dbutils.fs.mkdirs(target_path)

        # Generar nombre con timestamp
        target_file_name = file.name

        # Rutas completas
        target_file_full = f"{target_path}/{target_file_name}"

        # Copiar y borrar 
        #DESCOMENTAR PARA MOVER A ARCHIVO
        #dbutils.fs.cp(source_file_full, target_file_full)
        #dbutils.fs.rm(source_file_full)

        print(f"✅ File Archived: {source_file_full} → {target_file_full}")

    #LOG SOURCE & TARGET RECORD COUNT
    control_functions.log_process_run_update_value(process_run_id,sys_modified_by_name,'process_run_source_record_count', source_cnt)
    control_functions.log_process_run_update_value(process_run_id,sys_modified_by_name,'process_run_target_record_count', target_cnt)
else:
    print(f"⚠️ NO FILES TO PROCESS")


In [0]:
#table = "`latam-md-finance`.bronze.tb_fin_variable_cost_act_sapbr" 
#spark.sql("DROP TABLE IF EXISTS " + table + "")
#spark.sql("CREATE TABLE IF NOT EXISTS " + table + " USING DELTA LOCATION 's3a://latam-md-finance-bronze/tb_fin_variable_cost_act_sapbr'")

#print(table)

In [0]:
#LOG END
x = control_functions.log_process_run_end(process_run_id,sys_modified_by_name)

In [0]:
#IF errors, set row status to E
if x == False:
  control_functions.log_process_run_update_value(process_run_id,sys_modified_by_name,sys_status_column_name, error_status_code)
