In [0]:
%load_ext autoreload
%autoreload 2
%reload_ext autoreload
import sys
import configparser
import logging
import inspect
from pyspark.sql.functions import count, lit, current_timestamp
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, StringType

# Configuración del logger
logging.getLogger().setLevel(logging.INFO)
logger = logging.getLogger("py4j")
logger.setLevel(logging.WARN)
logger = logging.getLogger(__name__)

prod = True  # Para setear paths

# Variables globales

root_repo = "/Workspace/Shared/MITAFO"
config_files = {
    "general": f"{root_repo}/CGRLS_0010/Conf/CF_GRLS_PROCESS.py.properties",
    "connection": f"{root_repo}/CGRLS_0010/Conf/CF_GRLS_CONN.py.properties",
    "process": f"{root_repo}/ANCIN_0030/Jobs/04_GEN_ACRED_MOVS/DISPERSIONES/Conf/CF_PART_PROC.py.properties"
    if prod
    else "/Workspace/Repos/mronboye@emeal.nttdata.com/QueryConfigLab.ide/"
    "MITAFO/ANCIN_0030/Jobs/04_GEN_ACRED_MOVS/"
    "DISPERSIONES/Conf/"
    "CF_PART_PROC.py.properties",
}

notebook_name = (
    dbutils.notebook.entry_point.getDbutils()
    .notebook()
    .getContext()
    .notebookPath()
    .get()
)
message = "NB Error: " + notebook_name
source = "ETL"

process_name = "root"

# Carga de funciones externas
sys.path.append(f"{root_repo}/CGRLS_0010/Notebooks")
try:
    from NB_GRLS_DML_FUNCTIONS import *
    from NB_GRLS_SIMPLE_FUNCTIONS import *
except Exception as e:
    logger.error("Error al cargar funciones externas: %s", e)

global_params = {}
global_confs = {}  # Diccionario para almacenar las keys globales


def input_values() -> dict:
    """Obtiene los valores de los widgets de entrada y los almacena en un diccionario global."""

    # Definir los widgets en minúsculas
    widget_defaults = {
        "SR_FOLIO_REL": "",
        "SR_PROCESO": "",
        "SR_FECHA_LIQ": "",
        "SR_TIPO_MOV": "",
        "SR_REPROCESO": "",
        "SR_SUBPROCESO": "",
        "SR_USUARIO": "",
        "SR_INSTANCIA_PROCESO": "",
        "SR_ORIGEN_ARC": "",
        "SR_ID_SNAPSHOT": "",
        "SR_FECHA_ACC": "",
        "SR_FOLIO": "",
        "SR_SUBETAPA": "",
    }

    # Crear los widgets en minúsculas
    for key, default_value in widget_defaults.items():
        dbutils.widgets.text(key.lower(), default_value)

    # Actualizar el diccionario global en mayúsculas para el resto del notebook
    global_params.update(
        {
            key.upper(): dbutils.widgets.get(key.lower()).strip()
            for key in widget_defaults
        }
    )

    # Verificar si los valores son válidos
    if any(not value for value in global_params.values()):
        logger.error("Valores de entrada vacíos o nulos")
        global_params["status"] = "0"
    else:
        global_params["status"] = "1"

    return global_params


def conf_process_values(arg_config_file: str, arg_process_name: str) -> tuple:
    """Obtiene los valores de configuración del proceso y los almacena en un diccionario global."""
    keys = [
        "sql_conf_file",  # siempre de primero
        "debug",  # siempre de segundo
        "conn_schema_001",
        "conn_schema_002",
        "catalog_name",
        "schema_name",	
    ]

    try:
        config = configparser.ConfigParser()
        config.read(arg_config_file)
        result = {key: config.get(arg_process_name, key) for key in keys}
        result["status"] = "1"
        # Almacenar los valores en el diccionario global
        global_confs.update(result)
    except (ValueError, IOError) as error:
        logger.error("Error en la función %s: %s", inspect.stack()[0][3], error)
        result = {key: "0" for key in keys}
        result["status"] = "0"
        # Almacenar los valores en el diccionario global
        global_confs.update(result)

    return tuple(result.values())

# Configuración del manejador global de excepciones
def global_exception_handler(exc_type, exc_value, exc_traceback):
    if issubclass(exc_type, KeyboardInterrupt):
        # Permitir que KeyboardInterrupt se maneje normalmente
        sys.__excepthook__(exc_type, exc_value, exc_traceback)
        return

    message = f"Uncaught exception: {exc_value}"
    source = "ETL"
    input_parameters = dbutils.widgets.getAll().items()
    # Registro del error y notificación
    logger.error("Please review log messages")
    notification_raised(webhook_url, -1, message, source, input_parameters)
    raise Exception("An error raised")

# Asigna el manejador de excepciones al hook global de sys
sys.excepthook = global_exception_handler


if __name__ == "__main__":
    # Inicialización de variables
    input_parameters = dbutils.widgets.getAll().items()

    webhook_url, channel, failed_task = conf_init_values(
        config_files["general"], process_name, "TEMP_PROCESS"
    )

    # Inicialización de variables
    input_values()
    if global_params["status"] == "0":
        logger.error("Revisar mensajes en los logs")
        notification_raised(webhook_url, -1, message, source, input_parameters)
        raise Exception("Error en los valores de entrada, revisar logs")

    if failed_task == "0":
        logger.error("Please review log messages")
        notification_raised(webhook_url, -1, message, source, input_parameters)
        raise Exception("Process ends")

    process_name = "root"
    conf_values = conf_process_values(config_files["process"], process_name)
    if conf_values[-1] == "0":
        logger.error("Revisar mensajes en los logs")
        notification_raised(webhook_url, -1, message, source, input_parameters)
        raise Exception("Error en la configuración del proceso, revisar logs")

    conn_name_ora = "jdbc_oracle"
    (
        conn_options,
        conn_additional_options,
        conn_user,
        conn_key,
        conn_url,
        scope,
        failed_task,
    ) = conf_conn_values(config_files["connection"], conn_name_ora)
    if failed_task == "0":
        logger.error("Revisar mensajes en los logs")
        notification_raised(webhook_url, -1, message, source, input_parameters)
        raise Exception("Error en la configuración de la conexión, revisar logs")

    if prod:
        sql_conf_file = f"{root_repo}/ANCIN_0030/Jobs/04_GEN_ACRED_MOVS/DISPERSIONES/JSON/{conf_values[0]}"
    else:
        sql_conf_file = f"/Workspace/Repos/mronboye@emeal.nttdata.com/QueryConfigLab.ide/MITAFO/ANCIN_0030/Jobs/04_GEN_ACRED_MOVS/DISPERSIONES/JSON/{conf_values[0]}"
    # Seteamos el valor de debug
    debug = conf_values[1]
    debug = debug.lower() == "true"

In [0]:
with open(sql_conf_file) as f:
    file_config_sql = json.load(f)

conf_values = [
    (fields["step_id"], "\n".join(fields["value"]))
    for line, value in file_config_sql.items()
    if line == "steps"
    for fields in value
]

### Extraccion a `CIERREN_ETL.TTSISGRAL_ETL_DISPERSION`

In [0]:
query_statement = "001"

params = [
    global_params["SR_FOLIO"],
    global_params["SR_FOLIO_REL"],
]

statement, failed_task = getting_statement(conf_values, query_statement, params)

if failed_task == "0":
    logger.error("No value %s found", statement)
    logger.error("Please review log messages")
    notification_raised(webhook_url, -1, message, source, global_params)
    raise Exception("Process ends")

# Ensure the statement is correctly formatted with the parameters
formatted_statement = statement.format(*params)

DF_100_CRE_ETL_DISPERSION, failed_task = query_table(
    conn_name_ora, spark, formatted_statement, conn_options, conn_user, conn_key
)

# Inserto DF_100_CRE_ETL_DISPERSION al cache
DF_100_CRE_ETL_DISPERSION.cache()

if failed_task == "0":
    logger.error("Please review log messages")
    notification_raised(webhook_url, -1, message, source, input_parameters)
    raise Exception("An error raised")

if debug:
    display(DF_100_CRE_ETL_DISPERSION)

### Extraccion a `CIERREN.TTAFOGRAL_MATRIZ_CONVIVENCIA`

In [0]:
query_statement = "002"

params = [
    global_params["SR_FOLIO"],
]

statement, failed_task = getting_statement(conf_values, query_statement, params)

if failed_task == "0":
    logger.error("No value %s found", statement)
    logger.error("Please review log messages")
    notification_raised(webhook_url, -1, message, source, global_params)
    raise Exception("Process ends")

# Ensure the statement is correctly formatted with the parameters
formatted_statement = statement.format(*params)

DF_200_MATRIZ, failed_task = query_table(
    conn_name_ora, spark, formatted_statement, conn_options, conn_user, conn_key
)

# Inserto DF_200_MATRIZ al cache
DF_200_MATRIZ.cache()

if failed_task == "0":
    logger.error("Please review log messages")
    notification_raised(webhook_url, -1, message, source, input_parameters)
    raise Exception("An error raised")

if debug:
    display(DF_200_MATRIZ)

Se hace un inner join de las 2 consultas anteriores a traves de la key `FTN_NUM_CTA_INVDUAL` con el objetivo de mapear `ID_MARCA`

In [0]:
# Perform an inner join between DF_100_CRE_ETL_DISPERSION and DF_200_MATRIZ on the key FTN_NUM_CTA_INVDUAL
# The final DataFrame should include the key ID_MARCA

# Assuming the key columns are named 'FTN_NUM_CTA_INVDUAL' in both DataFrames
main_df = DF_100_CRE_ETL_DISPERSION.join(
    DF_200_MATRIZ,
    DF_100_CRE_ETL_DISPERSION["FTN_NUM_CTA_INVDUAL"] == DF_200_MATRIZ["FTN_NUM_CTA_INVDUAL"],
    "inner"
).select(DF_100_CRE_ETL_DISPERSION["*"], DF_200_MATRIZ["FTN_ID_MARCA"])

# unpersist the cache
DF_200_MATRIZ.unpersist()
DF_100_CRE_ETL_DISPERSION.unpersist()

# eliminar los df
del DF_200_MATRIZ, DF_100_CRE_ETL_DISPERSION

# Inserto main_df al cache
main_df.cache()

if debug:
    display(main_df)

### Extraccion a `CIERREN.TCCRXGRAL_TIPO_SUBCTA`

In [0]:
query_statement = "003"

params = [
]

statement, failed_task = getting_statement(conf_values, query_statement, params)

if failed_task == "0":
    logger.error("No value %s found", statement)
    logger.error("Please review log messages")
    notification_raised(webhook_url, -1, message, source, global_params)
    raise Exception("Process ends")

# Ensure the statement is correctly formatted with the parameters
formatted_statement = statement.format(*params)

DF_300_TipoSubcta, failed_task = query_table(
    conn_name_ora, spark, formatted_statement, conn_options, conn_user, conn_key
)

# Inserto DF_300_TipoSubcta al cache
DF_300_TipoSubcta.cache()

if failed_task == "0":
    logger.error("Please review log messages")
    notification_raised(webhook_url, -1, message, source, input_parameters)
    raise Exception("An error raised")

if debug:
    display(DF_300_TipoSubcta)

Hacemos un inner join entre `DF_300_TipoSubcta` y los datos del flujo (`main_df`) a través de la key `FCN_ID_TIPO_SUBCTA`, esto con el propósito de mapear los campos `ID_PLAZO` y `ID_REGIMEN`

In [0]:
# Perform an inner join between DF_300_TipoSubcta and main_df on the key FCN_ID_TIPO_SUBCTA
# The final DataFrame should include the fields ID_PLAZO and ID_REGIMEN

# Assuming the key columns are named 'FCN_ID_TIPO_SUBCTA' in both DataFrames
main_df = main_df.join(
    DF_300_TipoSubcta,
    main_df["FCN_ID_TIPO_SUBCTA"] == DF_300_TipoSubcta["FCN_ID_TIPO_SUBCTA"],
    "inner"
).select(main_df["*"], DF_300_TipoSubcta["FCN_ID_PLAZO"], DF_300_TipoSubcta["FCN_ID_REGIMEN"])

# unpersist the cache
DF_300_TipoSubcta.unpersist()

# eliminar los df
del DF_300_TipoSubcta

if debug:
    display(main_df)

Luego le hacemos un cambio al campo `FCN_ID_REGIMEN` según esta regla: 
```sql
IF FCN_ID_SIEFORE = 82 then 140 ELSE
	IF FCN_ID_SIEFORE = 83 then 140 ELSE FCN_ID_REGIMEN
```

In [0]:
from pyspark.sql.functions import when, col

# Apply the rule to the main_df DataFrame
main_df = main_df.withColumn(
    "FCN_ID_REGIMEN",
    when(col("FCN_ID_SIEFORE") == 82, 140).otherwise(
        when(col("FCN_ID_SIEFORE") == 83, 140).otherwise(col("FCN_ID_REGIMEN"))
    ),
)

if debug:
    display(main_df)

### Extraccion a `CIERREN.TCAFOGRAL_VALOR_ACCION`

In [0]:
query_statement = "004"

params = [
    global_params["SR_FECHA_ACC"]
]

statement, failed_task = getting_statement(conf_values, query_statement, params)

if failed_task == "0":
    logger.error("No value %s found", statement)
    logger.error("Please review log messages")
    notification_raised(webhook_url, -1, message, source, global_params)
    raise Exception("Process ends")

# Ensure the statement is correctly formatted with the parameters
formatted_statement = statement.format(*params)

DF_400_valorAccion, failed_task = query_table(
    conn_name_ora, spark, formatted_statement, conn_options, conn_user, conn_key
)

# Inserto DF_400_valorAccion al cache
DF_400_valorAccion.cache()

if failed_task == "0":
    logger.error("Please review log messages")
    notification_raised(webhook_url, -1, message, source, input_parameters)
    raise Exception("An error raised")

if debug:
    display(DF_400_valorAccion)

Luego hacemos un inner join entre los datos del flujo (`main_df`) y la tabla `DB_400_valorAccion` con la finalidad de mapear los campos que se ven la imagen (`FCN_ID_VALOR_ACCION` y `FCN_VALOR_ACCION`). El inner join lo hacemos a través las keys `FCN_ID_SIEFORE` y `FCN_ID_REGIMEN`

In [0]:
# Perform an inner join between main_df and DF_400_valorAccion on FCN_ID_SIEFORE and FCN_ID_REGIMEN
main_df = main_df.join(
    DF_400_valorAccion,
    (main_df["FCN_ID_SIEFORE"] == DF_400_valorAccion["FCN_ID_SIEFORE"]) &
    (main_df["FCN_ID_REGIMEN"] == DF_400_valorAccion["FCN_ID_REGIMEN"]),
    "inner"
).select(
    main_df["*"],
    DF_400_valorAccion["FCN_ID_VALOR_ACCION"],
    DF_400_valorAccion["FCN_VALOR_ACCION"]
)

# unpersist the cache
DF_400_valorAccion.unpersist()

# eliminar los df
del DF_400_valorAccion

if debug:
    display(main_df)

### Extraccion a `CIERREN.TFAFOGRAL_CONFIG_CONCEP_MOV`

In [0]:
query_statement = "005"

params = [
]

statement, failed_task = getting_statement(conf_values, query_statement, params)

if failed_task == "0":
    logger.error("No value %s found", statement)
    logger.error("Please review log messages")
    notification_raised(webhook_url, -1, message, source, global_params)
    raise Exception("Process ends")

# Ensure the statement is correctly formatted with the parameters
formatted_statement = statement.format(*params)

DF_500_ConfigConcept, failed_task = query_table(
    conn_name_ora, spark, formatted_statement, conn_options, conn_user, conn_key
)

# Inserto DF_500_ConfigConcept al cache
DF_500_ConfigConcept.cache()

if failed_task == "0":
    logger.error("Please review log messages")
    notification_raised(webhook_url, -1, message, source, input_parameters)
    raise Exception("An error raised")

if debug:
    display(DF_500_ConfigConcept)

Luego hacemos un inner join del `df_main` con `DF_500_ConfigConcept` a través de la key `FNN_ID_CONCEPTO_MOV`, con la finalidad de traernos el campo `FTN_DEDUCIBLE`

In [0]:
# Perform an inner join between main_df and DF_500_ConfigConcept on FNN_ID_CONCEPTO_MOV
main_df = main_df.join(
    DF_500_ConfigConcept,
    main_df["FFN_ID_CONCEPTO_MOV"] == DF_500_ConfigConcept["FFN_ID_CONCEPTO_MOV"],
    "inner",
).select(main_df["*"], DF_500_ConfigConcept["FTN_DEDUCIBLE"])

# unpersist the cache
DF_500_ConfigConcept.unpersist()

# eliminar los df
del DF_500_ConfigConcept

if debug:
    display(main_df)

### Extraccion a `CIERREN_ETL.TTSISGRAL_ETL_DISPERSION`

In [0]:
query_statement = "006"

params = [
    global_params["SR_FOLIO"]
]

statement, failed_task = getting_statement(conf_values, query_statement, params)

if failed_task == "0":
    logger.error("No value %s found", statement)
    logger.error("Please review log messages")
    notification_raised(webhook_url, -1, message, source, global_params)
    raise Exception("Process ends")

# Ensure the statement is correctly formatted with the parameters
formatted_statement = statement.format(*params)

DF_100_CRE_ETL_DISPERSION_VIV, failed_task = query_table(
    conn_name_ora, spark, formatted_statement, conn_options, conn_user, conn_key
)

# Inserto DF_100_CRE_ETL_DISPERSION_VIV al cache
DF_100_CRE_ETL_DISPERSION_VIV.cache()

if failed_task == "0":
    logger.error("Please review log messages")
    notification_raised(webhook_url, -1, message, source, input_parameters)
    raise Exception("An error raised")

if debug:
    display(DF_100_CRE_ETL_DISPERSION_VIV)

Hacemos un left join de `main_df` con `DF_100_CRE_ETL_DISPERSION_VIV` a través de los campos: `FCN_ID_TIPO_SUBCTA`, `FCN_ID_SIEFORE`, `FTN_NUM_CTA_INVDUAL`, `FTC_TABLA_NCI_MOV`, `FTN_NO_LINEA` y `FFN_ID_CONCEPTO_MOV`. Del resultado final solo nos traemos la key `FTF_MONTO_ACCIONES`.

In [0]:
# Perform a left join between main_df and DF_100_CRE_ETL_DISPERSION_VIV on specified fields
main_df = main_df.join(
    DF_100_CRE_ETL_DISPERSION_VIV,
    (main_df["FCN_ID_TIPO_SUBCTA"] == DF_100_CRE_ETL_DISPERSION_VIV["FCN_ID_TIPO_SUBCTA"]) &
    (main_df["FCN_ID_SIEFORE"] == DF_100_CRE_ETL_DISPERSION_VIV["FCN_ID_SIEFORE"]) &
    (main_df["FTN_NUM_CTA_INVDUAL"] == DF_100_CRE_ETL_DISPERSION_VIV["FTN_NUM_CTA_INVDUAL"]) &
    (main_df["FTC_TABLA_NCI_MOV"] == DF_100_CRE_ETL_DISPERSION_VIV["FTC_TABLA_NCI_MOV"]) &
    (main_df["FTN_NO_LINEA"] == DF_100_CRE_ETL_DISPERSION_VIV["FTN_NO_LINEA"]) &
    (main_df["FFN_ID_CONCEPTO_MOV"] == DF_100_CRE_ETL_DISPERSION_VIV["FFN_ID_CONCEPTO_MOV"]),
    "left"
).select(main_df["*"], DF_100_CRE_ETL_DISPERSION_VIV["FTF_MONTO_ACCIONES"])

# unpersist the cache
DF_100_CRE_ETL_DISPERSION_VIV.unpersist

# eliminar los df
del DF_100_CRE_ETL_DISPERSION_VIV

if debug:
    display(main_df)

In [0]:
display(main_df.filter(col("FFN_ID_CONCEPTO_MOV")==225))

In [0]:
# display(main_df.filter(
#     (main_df["FFN_ID_CONCEPTO_MOV"] == 225) | 
#     (main_df["FFN_ID_CONCEPTO_MOV"] == 227)
# ))
# main_df.count()

### Guardamos df_main en una ~~vista global~~, llamaremos a la vista `temp_dispersion_mov_01_{global_params['sr_folio']}`

In [0]:
# Save main_df to a global view named temp_dispersion_01_{global_params['sr_folio']}
view_name = f"temp_dispersion_mov_01_{global_params['SR_FOLIO']}"

spark.sql(f"DROP TABLE IF EXISTS {global_confs['catalog_name']}.{global_confs['schema_name']}.{view_name}")
main_df.write.format("delta").mode("overwrite").saveAsTable(
    f"{global_confs['catalog_name']}.{global_confs['schema_name']}.{view_name}"
)

# main_df.createOrReplaceGlobalTempView(view_name)

In [0]:
from pyspark.sql import DataFrame

# Clear cache
spark.catalog.clearCache()

# Unpersist and delete all DataFrames
for df_name in list(globals()):
    if isinstance(globals()[df_name], DataFrame):
        globals()[df_name].unpersist()
        del globals()[df_name]

In [0]:
# Liberar la caché del DataFrame si se usó cache
main_df.unpersist()

# Eliminar DataFrames para liberar memoria
del main_df

# Recolector de basura para liberar recursos inmediatamente
import gc
gc.collect()