# Engine AML

## Ambientar Parquets

In [7]:
import os
from pyspark.sql import SparkSession

pg_jdbc_jar_path = "/Users/mi19291/Downloads/postgresql-42.7.7.jar"

spark = SparkSession.builder \
    .appName("PostgresToParquet") \
    .master("local[*]") \
    .config("spark.jars", pg_jdbc_jar_path) \
    .getOrCreate()

db_user = "mi19291"
db_password = "1ALEMAN9"
db_url = "jdbc:postgresql://localhost:5432/data4pyspark"
db_table = "public.TSCA010_SIPEMPLEADO"

try:
    print(f"Leyendo la tabla '{db_table}' desde PostgreSQL")
    
    df = spark.read \
        .format("jdbc") \
        .option("url", db_url) \
        .option("dbtable", db_table) \
        .option("user", db_user) \
        .option("password", db_password) \
        .option("driver", "org.postgresql.Driver") \
        .load()

    print("¡Lectura completada!")
    df.printSchema()

    output_parquet_path = f"parquet/tablas/{db_table.split('.')[1].lower()}"
    print(f"\nGuardando los datos en formato Parquet en: '{output_parquet_path}'")

    df.write.mode("overwrite").parquet(output_parquet_path)

    print("Proceso completado")
    
    df.show(5)

except Exception as e:
    print(f"Ocurrió un error: {e}")

Leyendo la tabla 'public.TSCA010_SIPEMPLEADO' desde PostgreSQL
¡Lectura completada!
root
 |-- cd_usuario: string (nullable = true)
 |-- cd_cent_costo: integer (nullable = true)
 |-- cd_puesto: string (nullable = true)
 |-- nb_empleado: string (nullable = true)
 |-- nb_appaterno: string (nullable = true)
 |-- nb_apmaterno: string (nullable = true)


Guardando los datos en formato Parquet en: 'parquet/tablas/tsca010_sipempleado'
Proceso completado
+----------+-------------+---------+---------------+------------+------------+
|cd_usuario|cd_cent_costo|cd_puesto|    nb_empleado|nb_appaterno|nb_apmaterno|
+----------+-------------+---------+---------------+------------+------------+
|   MB78894|         5258|      ZD0|PATRICIA NOHEMI|      ALANIS|       SILVA|
|   MB78894|         5258|      ZD1|PATRICIA NOHEMI|      ALANIS|       SILVA|
|   MI29444|         5258|      ZH0|      MONSERRAT|     MORALES|     SANTANA|
|   MB02837|         5258|      5B6|          KENIA|     CARDOZA|     SANCHE

## Variables De Configuración Global

In [8]:
CD_SISTEMA = ['MDE', 'CCC', 'EFE', 'SPI', 'CBI', 'CHQ', 'AML', 'ATM']
CD_ALERTA = ['202403013601', '202309015801', '202501008301', '1073130', '1122165', '9230800069501', '9230800069502', '2506271215', '2506301306', '2402295596601', '2307315770001', '1155423', '2501000828', '2503001736', '51472272', '9240200078701', '9240200078702', '2503315305201', '2025020758', '2501315997401', '2412316727501']
CD_CASO = [1071288, 994651, 1185772, 1084150, 1061041, 968335, 1246109, 1176615, 1197600, 1198373, 1180636, 1168766]
NU_CLIENTE_LIST = ['D1714830', '97690262', 'E1395002', 'E1447669', '75816071', 'A4782243', 'A2480820', 'D5631652', 'J2018888', 'E1332464', '98903126', '98903258', '98903277', '98903330', '98903339']
CD_TIPOLOGIA_LIST = ['SW', 'DC', 'E3', 'MD', 'MP', 'PS', 'R7', 'RD', 'RN', 'CQ', 'M2', 'RX']

## Lectura de Tablas

In [9]:
tsca003_df = spark.read.parquet("parquet/tablas/tsca003_oficina")
tsca006_df = spark.read.parquet("parquet/tablas/tsca006_sistema")
tsca008_df = spark.read.parquet("parquet/tablas/tsca008_pregunta")
tsca009_df = spark.read.parquet("parquet/tablas/tsca009_respuesta")
tsca010_df = spark.read.parquet("parquet/tablas/tsca010_sipempleado")
tsca011_df = spark.read.parquet("parquet/tablas/tsca011_env_cuest")
tsca012_df = spark.read.parquet("parquet/tablas/tsca012_parametro")
tsca013_df = spark.read.parquet("parquet/tablas/tsca013_alerta")
tsca014_df = spark.read.parquet("parquet/tablas/tsca014_caso")
tsca016_df = spark.read.parquet("parquet/tablas/tsca016_cliente")
tsca017_df = spark.read.parquet("parquet/tablas/tsca017_cuenta")
tsca018_df = spark.read.parquet("parquet/tablas/tsca018_det_catalog")
tsca019_df = spark.read.parquet("parquet/tablas/tsca019_mov_bg")
tsca026_df = spark.read.parquet("parquet/tablas/tsca026_relctecta")
tsca027_df = spark.read.parquet("parquet/tablas/tsca027_op_relev")
tsca029_df = spark.read.parquet("parquet/tablas/tsca029_tp_operac")
tsca034_df = spark.read.parquet("parquet/tablas/tsca034_kyc")
tsca040_df = spark.read.parquet("parquet/tablas/tsca040_inst_mon")
tsca046_df = spark.read.parquet("parquet/tablas/tsca046_ropeinu")
tsca051_df = spark.read.parquet("parquet/tablas/tsca051_act_banxico")
tsca057_df = spark.read.parquet("parquet/tablas/tsca057_op_med")
tsca060_df = spark.read.parquet("parquet/tablas/tsca060_opis")
tsca065_df = spark.read.parquet("parquet/tablas/tsca065_centro_resp")
tsca068_df = spark.read.parquet("parquet/tablas/tsca068_act_kyc")
tsca069_df = spark.read.parquet("parquet/tablas/tsca069_riesgo_pais")
tsca071_df = spark.read.parquet("parquet/tablas/tsca071_prd_sbprd")
tsca100_df = spark.read.parquet("parquet/tablas/tsca100_sippuesto")
tsca106_df = spark.read.parquet("parquet/tablas/tsca106_pais")
tsca108_df = spark.read.parquet("parquet/tablas/tsca108_cat_oper")
tsca120_df = spark.read.parquet("parquet/tablas/tsca120_estados")
tsca217_df = spark.read.parquet("parquet/tablas/tsca217_rel_caso_sia")
tsca240_df = spark.read.parquet("parquet/tablas/tsca240_oficina_gr")

## Consulta 0

In [21]:
try:
    from pyspark.sql.functions import (
    col, when, coalesce, to_date, date_add, 
    date_format, lit, concat
    )

    al = tsca013_df.alias("AL")
    oficina_gest = tsca003_df.alias("oficina_gest")
    oficina_cte = tsca003_df.alias("oficina_cte")

    mi_query = al.join(
        oficina_gest,
        col("AL.CD_OFICINA_GEST") == col("oficina_gest.CD_OFICINA"),
        "left"
    )

    mi_query = mi_query.filter(
        col("AL.CD_ALERTA").isin(CD_ALERTA) &
        col("AL.CD_SISTEMA").isin(CD_SISTEMA) &
        col("AL.CD_TIPOLOGIA").isin(CD_TIPOLOGIA_LIST)
    )

    mi_query = mi_query.select(
        col("AL.CD_SISTEMA").alias("cdSistema"),
        col("AL.CD_ALERTA").alias("cdAlerta"),
        col("AL.FH_ALERTA").alias("fhAlerta"),
        col("AL.FH_ENVIO").alias("fhEnvio"),
        col("AL.CD_TIPOLOGIA").alias("cdTipologia"),
        col("AL.CD_DIVISA").alias("cdDivisa"),
        # col("AL.ST_ALERTA").alias("cdStAlerta"),
        # col("AL.TX_MOTIVO_DESCARTE").alias("txMotivoDescarte"),
        # col("AL.ST_REP_AUT").alias("stRepAut"),
        # col("AL.NU_SCORE_MC").alias("scoreMc"),
        col("AL.NU_SCORE_MD").alias("scoreMdl"),
        col("AL.FH_VEN_LIB_SIA").alias("fhVenLibSia"),
        # col("AL.FH_REP_AUT").alias("fhRepAut"),
        col("AL.FH_ASIGNACION").alias("fhAsignacion"),
        # col("AL.FH_VEN_REV").alias("fhVenRev"),
        col("AL.CD_SEG").alias("cdSeg"),
        # col("AL.NU_SESION_ORIGEN").alias("cdSesion"),
        col("AL.IM_MONTO_ALERTA").alias("imMontoAlerta"),
        col("AL.CD_CLIENTE").alias("cdCliente"),
        col("AL.NU_CUENTA").alias("nuCuenta"),
        col("AL.CD_CASO").alias("cdCaso"),
        col("AL.CD_PERIODO").alias("cdPeriodo"),
        col("AL.CD_CTE").alias("cdCte"),
        
        when(col("AL.CD_ACCION") == 'D', 'ALERTA')
            .when(col("AL.CD_ACCION") == 'N', 'NOTIFICACION')
            .when(col("AL.CD_ACCION") == 'I', 'INEPA')
            .alias("cdAccion"),
        
        coalesce(col("AL.NB_NOMBRE"), col("AL.NB_CTE"), col("AL.NB_NOMBRE_CTE")).alias("nbCte"),

        when(col("TP_PER_JURI") == "F", "PERSONA FISICA")
            .when(col("TP_PER_JURI") == "M", "PERSONA MORAL")
            .when(col("TP_PER_JURI") == "G", "GOBIERNO")
            .alias("tpPerJuri"),
        
        concat(col("oficina_gest.CD_OFICINA"), lit(" - "), col("oficina_gest.NB_OFICINA")).alias("cdOficinaGest"),
        
        col("AL.NB_BANCA").alias("nbBanca"),
        # col("AL.CD_MATCH").alias("cdMatch"),
        col("AL.IM_SCORE").alias("imScore"),
        col("AL.CT_MOVIMIENTO").alias("ctMovimiento"),
        col("AL.CD_DIRECTA").alias("cdDirecta"),
        col("AL.NU_SCORE_CORTE").alias("nuScoreCorte"),
        # coalesce(col("AL.NB_NOMBRE"), col("AL.NB_CTE"), col("AL.NB_NOMBRE_CTE")).alias("nbNombreCte"),
        # col("AL.NB_APPATERNO_CTE").alias("nbAppaternoCte"),
        # col("AL.NB_APMATERNO_CTE").alias("nbApmaternoCte"),
        # col("AL.NU_OPER").alias("nuOper"),
        # col("AL.FH_NAC").alias("fhNac"),
        # col("AL.NU_EDAD").alias("nuEdad"),
        # col("AL.CD_SECTOR").alias("cdSector"),
        # col("AL.FH_APERTURA").alias("fhApertura"),
        # col("AL.NB_SUC_APERTURA").alias("nbSucApertura"),
        # col("AL.CD_CENTRO_SUP").alias("cdCentroSup"),
        # col("AL.NB_DIVISION").alias("nbDivision"),
        # col("AL.NB_ESTADO").alias("nbEstado"),
        col("AL.NB_ACT_ECO_BXICO").alias("nbActividad"),
        # col("AL.TP_SECTOR").alias("tpSector"),
  
        # date_format(
        #     date_add(to_date(lit("01-01-1900"), "dd-MM-yyyy"), col("AL.NU_ANTIGUEDAD") - 2),
        #     "dd/MM/yyyy"
        # ).alias("nuAntiguedad"),
        
        # col("AL.CD_SUCURSAL").alias("cdSucursal"),
        # col("AL.CD_DIR_ZONA").alias("cdDirZona"),
        # col("AL.FH_GEN").alias("fhGen"),
        # col("AL.CD_ACT_ECO_BXICO").alias("cdActividad"),
        
        # coalesce(col("AL.NU_DEP"), lit(0)).alias("nuDep"),
        # coalesce(col("AL.IM_IMPORTE_DEP"), lit(0)).alias("imImporteDep"),
        # coalesce(col("AL.IM_IMPORTE_RET"), lit(0)).alias("imImporteRet"),
        # coalesce(col("AL.NU_RET"), lit(0)).alias("nuRet"),
        # coalesce(col("AL.NU_CV"), lit(0)).alias("nuCv"),
        # coalesce(col("AL.IM_IMPORTE_CV"), lit(0)).alias("imImporteCv"),
        # coalesce(col("AL.CD_MOV_OTRO"), lit(0)).alias("cdMovOtro"),
        # coalesce(col("AL.IM_IMP_OTRO"), lit(0)).alias("imImpOtro"),
        
        # col("AL.NU_FOLIO_IFI").alias("nuFolioIfi"),
        
        # concat(col("oficina_cte.CD_OFICINA"), lit(" - "), col("oficina_cte.NB_OFICINA")).alias("crGestorCte"),
        
        # coalesce(col("AL.NB_NOMBRE"), col("AL.NB_CTE"), col("AL.NB_NOMBRE_CTE")).alias("nbNombre"),
    )

    mi_query.show()
    mi_query.coalesce(1).write.csv("parquet/salidas_pyspark/consulta_zero.csv", header=True)

except Exception as e:
    print(e)


+---------+-------------+----------+----------+-----------+--------+-----------+-----------+------------+-----+-------------+---------+--------------------+-------+---------+--------+------------+--------------------+--------------+--------------------+---------------+-------+------------+---------+------------+--------------------+
|cdSistema|     cdAlerta|  fhAlerta|   fhEnvio|cdTipologia|cdDivisa|   scoreMdl|fhVenLibSia|fhAsignacion|cdSeg|imMontoAlerta|cdCliente|            nuCuenta| cdCaso|cdPeriodo|   cdCte|    cdAccion|               nbCte|     tpPerJuri|       cdOficinaGest|        nbBanca|imScore|ctMovimiento|cdDirecta|nuScoreCorte|         nbActividad|
+---------+-------------+----------+----------+-----------+--------+-----------+-----------+------------+-----+-------------+---------+--------------------+-------+---------+--------+------------+--------------------+--------------+--------------------+---------------+-------+------------+---------+------------+-----------------

## Consulta 2

In [None]:
try:
    from pyspark.sql.functions import col, concat, lit, trim, row_number
    from pyspark.sql.window import Window

    t10_func = tsca010_df.alias("t10_func")
    t100_func = tsca100_df.alias("t100_func")

    funcionarios_df = t10_func.join(
        t100_func,
        trim(col("t100_func.CD_PUESTO")) == trim(col("t10_func.CD_PUESTO"))
    ).filter(
        col("t10_func.CD_USUARIO").isNotNull() &
        trim(col("t100_func.CD_PERFIL")).isin('0001', '0022', '0041')
    ).select(
        col("t10_func.CD_CENT_COSTO").cast("integer").alias("CD_CENT_COSTO_INT"),
        concat(
            col("t10_func.NB_EMPLEADO"), lit(' '), 
            col("t10_func.NB_APPATERNO"), lit(' '), 
            col("t10_func.NB_APMATERNO")
        ).alias("nombre_completo_funcionario")
    )

    window_spec = Window.partitionBy("CD_CENT_COSTO_INT").orderBy(lit(1))

    funcionario_elegido_df = funcionarios_df.withColumn("rn", row_number().over(window_spec)) \
        .filter(col("rn") == 1) \
        .select("CD_CENT_COSTO_INT", "nombre_completo_funcionario")

    t14 = tsca014_df.alias("T14")
    t16 = tsca016_df.alias("T16")
    t13 = tsca013_df.alias("T13")
    t06 = tsca006_df.alias("T06")
    t03 = tsca003_df.alias("T03")
    t10 = tsca010_df.alias("T10")
    t65 = tsca065_df.alias("T65")

    cr_gestora = tsca065_df.alias("cr_gestora")
    oficina_riesgo = tsca003_df.alias("oficina_riesgo")
    cr_mercado = tsca065_df.alias("cr_mercado")
    cr_division = tsca065_df.alias("cr_division")

    mi_query2 = t14.join(t16, (col("T14.CD_CASO") == col("T16.CD_CASO")) & (col("T14.CD_CLIENTE") == col("T16.CD_CLIENTE")), "left") \
        .join(t13, col("T14.CD_CASO") == col("T13.CD_CASO"), "left") \
        .join(t06, col("T13.CD_SISTEMA") == col("T06.CD_SISTEMA"), "left") \
        .join(t03, col("T13.CD_OFICINA_GEST") == col("T03.CD_OFICINA"), "left") \
        .join(t10, col("T03.CD_OFICINA") == col("T10.CD_CENT_COSTO").cast("integer"), "left") \
        .join(t65, col("T16.CD_OFICINA") == col("T65.CD_OFICINA").cast("integer"), "left") \
        .join(cr_gestora, col("T65.CD_CENTRO_RESP7") == col("cr_gestora.CD_OFICINA"), "left") \
        .join(oficina_riesgo, col("T65.CD_CENTRO_RESP7").cast("integer") == col("oficina_riesgo.CD_OFICINA"), "left") \
        .join(cr_mercado, col("T65.CD_CENTRO_RESP6") == col("cr_mercado.CD_OFICINA"), "left") \
        .join(cr_division, col("T65.CD_CENTRO_RESP5") == col("cr_division.CD_OFICINA"), "left") \
        .join(funcionario_elegido_df, col("T65.CD_CENTRO_RESP7").cast("integer") == col("funcionario_elegido_df.CD_CENT_COSTO_INT"), "left")

    mi_query2 = mi_query2.filter(
        col("T13.CD_ALERTA").isin(CD_ALERTA) &
        col("T13.CD_SISTEMA").isin(CD_SISTEMA)
    )

    mi_query2 = mi_query2.select(
        col("T13.CD_ALERTA").alias("nuFolioAlertaIR"),
        col("T13.CD_CASO").alias("cdCaso"),
        col("T06.CD_SISTEMA").alias("nbSistemaIR"),
        col("T13.NU_CUENTA").alias("nuCuentaIR"),
        col("T13.IM_MONTO_ALERTA").alias("montoIR"),
        col("T13.CD_DIVISA").alias("divisaIR"),
        col("T13.FH_ALERTA").alias("fhAlertaIR"),
        col("T13.FH_ENVIO").alias("fhEnvioIR"),
        col("T13.CD_SESION").alias("cdSesionIR"),
        concat(col("cr_gestora.CD_OFICINA"), lit(' '), col("cr_gestora.NB_CENTRO_RESP")).alias("nbOficinaGestoraIR"),
        col("oficina_riesgo.NB_RIESGO").alias("nbRiesgoIR"),
        col("cr_mercado.NB_CENTRO_RESP").alias("nbMercadoIR"),
        concat(col("cr_division.CD_OFICINA"), lit(' '), col("cr_division.NB_CENTRO_RESP")).alias("nbDivisionIR"),
        col("funcionario_elegido_df.nombre_completo_funcionario").alias("nbFuncionarioIR")
    )

    mi_query2 = mi_query2.distinct()

    mi_query2.show()

except:
    pass


+---------------+-------+-----------+--------------------+----------+--------+----------+----------+----------+-----------------------------------------+----------+----------------+----------------------------------+-------------------------------------+
|nufolioalertair|cdcaso |nbsistemair|nucuentair          |montoir   |divisair|fhalertair|fhenvioir |cdsesionir|nboficinagestorair                       |nbriesgoir|nbmercadoir     |nbdivisionir                      |nbfuncionarioir                      |
+---------------+-------+-----------+--------------------+----------+--------+----------+----------+----------+-----------------------------------------+----------+----------------+----------------------------------+-------------------------------------+
|202501008301   |1185772|MDE        |00744485002914964810|115150.00 |MXP     |2025-01-31|2025-02-14|14-2025   |5258 CERCADO                             |MEDIO ALTO|DZ MONTERREY SUR|3390 DIV. NOROESTE                |CLAUDIA ESTHELA CAB

## Consulta 6

In [18]:
try:
    from pyspark.sql.functions import (
    col, when, year, count, sum as _sum, 
    coalesce, lit, asc
)
    
    t60 = tsca060_df.alias("T60")
    t13 = tsca013_df.alias("T13")
    t69 = tsca069_df.alias("T69")
    t106 = tsca106_df.alias("T106")

    pre_agg_df = t60.join(
        t13,
        (col("T13.CD_CASO") == col("T60.CD_CASO")) & (col("T13.CD_CLIENTE") == col("T60.NU_CLIENTE"))
    ).join(
        t69,
        col("T60.CD_PAIS_BENEF") == col("T69.CD_ISO"),
        "left"
    ).join(
        t106,
        col("T60.CD_PAIS_BENEF") == col("T106.CD_PAIS_UIF"), 
        "left"
    ).filter(
        col("T13.CD_CASO").isin(CD_CASO) &
        col("T13.CD_SISTEMA").isin(CD_SISTEMA) &
        col("T13.CD_ALERTA").isin(CD_ALERTA)
    )

    transformed_df = pre_agg_df.select(
        col("T13.CD_CASO").alias("cdCaso"),
        col("T60.NU_CUENTA").alias("numCuenta"),
        col("T60.CD_REFERENCIA").alias("cdReferencia"),
        col("T60.IM_INST_CAMBIO").alias("imInstrumento"),
        col("T69.NB_NIVEL_RIESGO").alias("nivelRiesgo"),
        col("T69.CD_PARAISO_FISC").alias("cdParaisoFiscal"),
        
        when(col("T60.TP_TRANSACC") == 'R', 'RECIBIDAS')
        .when(col("T60.TP_TRANSACC") == 'E', 'ENVIADAS')
        .alias("tpTransaccion"),
        
        year(col("T60.FH_FOLIO")).alias("fhFolio"),

        coalesce(col("T106.NB_PAIS"), col("T60.CD_PAIS_BENEF")).alias("cdPaisDestino")
    )

    final_df = transformed_df.groupBy(
        "cdCaso",
        "numCuenta",
        "tpTransaccion",
        "nivelRiesgo",
        "cdParaisoFiscal",
        "cdPaisDestino",
        "fhFolio"
    ).agg(
        coalesce(count(col("cdReferencia")), lit(0)).alias("numOperaciones"),
        
        coalesce(_sum(col("imInstrumento")), lit(0)).alias("montoTransacciones")
    )

    final_df = final_df.select(
        "cdCaso",
        "numCuenta",
        "tpTransaccion",
        "cdPaisDestino",
        "cdParaisoFiscal",
        "nivelRiesgo",
        "numOperaciones",
        "montoTransacciones",
        "fhFolio"
    ).orderBy(
        asc("fhFolio")
    )

    final_df.show()

except:
    pass



+-------+--------------------+-------------+--------------+---------------+-----------+--------------+------------------+-------+
| cdCaso|           numCuenta|tpTransaccion| cdPaisDestino|cdParaisoFiscal|nivelRiesgo|numOperaciones|montoTransacciones|fhFolio|
+-------+--------------------+-------------+--------------+---------------+-----------+--------------+------------------+-------+
|1176615|00741513380119752161|     ENVIADAS|         CHINA|           null|  RIESGO 10|             2|          62744.00|   2023|
|1176615|00741513380119752161|     ENVIADAS|ESTADOS UNIDOS|           null|   RIESGO 2|             1|          38500.00|   2023|
|1185772|00744485002914964810|    RECIBIDAS|        MEXICO|           null|   RIESGO 2|             1|            198.16|   2023|
|1176615|00741513380119752161|     ENVIADAS|         CHINA|           null|  RIESGO 10|             6|          88753.80|   2024|
|1185772|00744485002914964810|    RECIBIDAS|        MEXICO|           null|   RIESGO 2|   

## Consulta 7

In [None]:
try:
    from pyspark.sql.functions import col, lit, concat

    valid_clients_df = tsca013_df.filter(
        col("CD_CASO").isin(CD_CASO) &
        col("CD_ALERTA").isin(CD_ALERTA) &
        col("CD_SISTEMA").isin(CD_SISTEMA)
    ).select("CD_CLIENTE").distinct()

    t27 = tsca027_df.alias("T27")
    t16_1 = tsca016_df.alias("T16_1")
    inst_mon_1 = tsca040_df.alias("inst_mon_1")
    tp_operac_1 = tsca029_df.alias("tp_operac_1")
    oficina_1 = tsca003_df.alias("oficina_1")

    relevantes_df = t27.join(
        t16_1,
        (col("T16_1.CD_CLIENTE") == col("T27.CD_CLIENTE")) & (col("T16_1.CD_CASO") == col("T27.CD_CASO"))
    ).join(
        valid_clients_df,
        col("T27.CD_CLIENTE") == col("valid_clients_df.CD_CLIENTE"), 
        "semi"
    ).join(
        inst_mon_1, col("T27.CD_INST_MON").cast("integer") == col("inst_mon_1.CD_INS_MON"), "left"
    ).join(
        tp_operac_1, col("T27.TP_OPERACION") == col("tp_operac_1.TP_OPERAC"), "left"
    ).join(
        oficina_1, col("T27.CD_OFICINA") == col("oficina_1.CD_OFICINA"), "left"
    ).filter(
        col("T27.CD_CASO").isin(CD_CASO)
    )

    relevantes_df = relevantes_df.select(
        col("T27.CD_CASO").alias("cdCaso"),
        col("T27.FH_OPERACION").alias("fhOperacion"),
        col("T27.CD_DIVISA").alias("nbMoneda"),
        col("inst_mon_1.NB_INS_MON").alias("nbInstrumento"),
        col("tp_operac_1.NB_OPERAC").alias("nbTipoTransaccion"),
        col("T27.NU_CUENTA").alias("nuCuentas"),
        col("T27.CD_FOLIO").alias("nbReferencia"),
        col("T27.IM_MONTO").alias("imOperacion"),
        concat(col("T27.CD_OFICINA"), lit(' - '), col("oficina_1.NB_OFICINA")).alias("nbOficina"),
        lit('RELEVANTES').alias("tpOperacion")
    )

    t57 = tsca057_df.alias("T57")
    t16_2 = tsca016_df.alias("T16_2")
    inst_mon_2 = tsca040_df.alias("inst_mon_2")
    tp_operac_2 = tsca029_df.alias("tp_operac_2")
    oficina_2 = tsca003_df.alias("oficina_2")

    medianas_df = t57.join(
        t16_2, 
        (col("T16_2.CD_CLIENTE") == col("T57.CD_CLIENTE")) & (col("T16_2.CD_CASO") == col("T57.CD_CASO"))
    ).join(
        valid_clients_df,
        col("T57.CD_CLIENTE") == col("valid_clients_df.CD_CLIENTE"), 
        "semi"
    ).join(
        inst_mon_2, col("T57.CD_INST_MONE").cast("integer") == col("inst_mon_2.CD_INS_MON"), "left"
    ).join(
        tp_operac_2, col("T57.TP_OPERACION") == col("tp_operac_2.TP_OPERAC"), "left"
    ).join(
        oficina_2, col("T57.CD_OFICINA") == col("oficina_2.CD_OFICINA"), "left"
    ).filter(
        col("T57.CD_CASO").isin(CD_CASO)
    )

    medianas_df = medianas_df.select(
        col("T57.CD_CASO").alias("cdCaso"),
        col("T57.FH_OPERACION").alias("fhOperacion"),
        col("T57.CD_DIVISA").alias("nbMoneda"),
        col("inst_mon_2.NB_INS_MON").alias("nbInstrumento"),
        col("tp_operac_2.NB_OPERAC").alias("nbTipoTransaccion"),
        col("T57.NU_CUENTA").alias("nuCuentas"),
        col("T57.CD_FOLIO").alias("nbReferencia"),
        col("T57.IM_IMPORTE").alias("imOperacion"),
        concat(col("T57.CD_OFICINA"), lit(' - '), col("oficina_2.NB_OFICINA")).alias("nbOficina"),
        lit('MEDIANAS').alias("tpOperacion")
    )

    mi_query_final = relevantes_df.unionByName(medianas_df)

    mi_query_final.show()
    
except:
    pass


+-------+-----------+--------+-------------+-----------------+--------------------+------------+-----------+------------------+-----------+
|cdcaso |fhoperacion|nbmoneda|nbinstrumento|nbtipotransaccion|nucuentas           |nbreferencia|imoperacion|nboficina         |tpoperacion|
+-------+-----------+--------+-------------+-----------------+--------------------+------------+-----------+------------------+-----------+
|1176615|2023-05-31 |MXP     |EFECTIVO     |RETIRO           |00741513000118876274|382860      |379500.00  |887 - CAMPUS NORTE|RELEVANTES |
|1176615|2023-06-13 |MXP     |EFECTIVO     |RETIRO           |00741513000118876274|149453      |406975.00  |887 - CAMPUS NORTE|RELEVANTES |
|1176615|2023-07-03 |MXP     |EFECTIVO     |RETIRO           |00741513000118876274|011328      |150000.00  |887 - CAMPUS NORTE|RELEVANTES |
|1176615|2023-06-23 |MXP     |EFECTIVO     |RETIRO           |00741513000118876274|284455      |550000.00  |887 - CAMPUS NORTE|RELEVANTES |
|1176615|2023-07-06 

## Consulta 8

In [None]:
try:
    from pyspark.sql.functions import (
    col, when, lit, concat, substring, length, date_format,
    coalesce
)

    # Falta la tsca101 con esa info se haría JOIN con tcte_df
    """
    sub_a_df = tsca101_df.alias("sub_a").select(
        col("sub_a.CD_CLIENTE"),
        col("sub_a.ST_CLA_INTER"),
        col("sub_a.CD_SEC_INTER"),
        col("sub_a.NU_CUENTA"),
        when(col("sub_a.ST_CTA").isNull(), None).otherwise(col("sub_a.FH_CANCELACION")).alias("FH_CANCELACION"),
        when(date_format(col("sub_a.FH_APERTURA_CTA"), 'dd/MM/yyyy') == '01/01/0001', col("sub_a.FH_ALTA_RELA_CTA"))
        .otherwise(col("sub_a.FH_APERTURA_CTA")).alias("FH_APERTURA_CTA")
    )
    """

    tcte_df = tsca016_df.alias("B").filter(
        col("B.CD_CASO").isin(CD_CASO)
    ).select(
        col("A.ST_CLA_INTER"),
        col("A.CD_SEC_INTER"),
        col("A.NU_CUENTA"),
        when(col("B.TP_PERSONA") == 'F', 
            concat(col("B.NB_NOMBRE"), lit(' '), col("B.NB_AP_PATERNO"), lit(' '), col("B.NB_AP_MATERNO")))
        .otherwise(concat(col("B.NB_NOMBRE"), col("B.NB_AP_PATERNO"), col("B.NB_AP_MATERNO")))
        .alias("NOMBRE"),
        col("B.CD_CASO"),
        col("A.FH_CANCELACION"),
        col("B.CD_CLIENTE"),
        col("A.FH_APERTURA_CTA")
    )

    prod_a_expr = when(length(col("A.NU_CUENTA_PAR")) > 10, substring(col("A.NU_CUENTA_PAR"), 11, 2)) \
        .otherwise(substring(col("A.NU_CUENTA_PAR"), 1, 2))

    t26_df = tsca026_df.alias("A") \
        .withColumn("PROD_A", prod_a_expr) \
        .join( 
            tsca014_df.alias("C"),
            (col("A.CD_CASO") == col("C.CD_CASO")) & (col("A.CD_CLIENTE_PAR") == col("C.CD_CLIENTE")),
            "inner"
        ).join(
            tsca071_df.alias("D"),
            col("PROD_A").cast("integer") == col("D.CD_PRODUCTO"),
            "left"
        ).filter(
            col("A.CD_CASO").isin(CD_CASO) &
            col("PROD_A").isin('01', '04', '11', '12', '14', '26', '27', '28', '29')
        ).select(
            col("B.IM_SALDO"),
            col("B.FH_APERTURA"),
            col("A.NU_CUENTA_PAR").alias("nuCuentaCCR"),
            col("A.NB_BLOQUEO").alias("nbEstatusCCR"),
            concat(col("D.CD_PRODUCTO"), lit(' - '), col("D.NB_PRODUCTO")).alias("nbProductoCCR"),
            col("B.CD_SUBPRODUCTO"),
            col("A.CD_CASO").alias("cdCaso"),
            col("A.CD_CLIENTE_PAR").alias("cdClienteCCR"),
            col("B.TP_CARGA"),
            col("PROD_A")
        )

    t71 = tsca071_df.alias("T71")

    final_df = t26_df.alias("T26").join(
        t71,
        (col("T26.PROD_A").cast("integer") == col("T71.CD_PRODUCTO")) &
        (col("T26.CD_SUBPRODUCTO").cast("integer") == col("T71.CD_SUBPRODUCTO").cast("integer")),
        "left"
    ).join(
        tcte_df.alias("TCTE"),
        (substring(col("T26.nuCuentaCCR"), -10, 10) == col("TCTE.NU_CUENTA")) &
        (col("T26.cdClienteCCR") == col("TCTE.CD_CLIENTE")),
        "left"
    )

    final_df = final_df.filter(
        col("T26.TP_CARGA").isNull() | (col("T26.TP_CARGA") == 'A')
    ).select(
        col("T26.cdCaso"),
        col("T26.cdClienteCCR"),
        date_format(coalesce(col("T26.FH_APERTURA"), col("TCTE.FH_APERTURA_CTA")), 'dd/MM/yyyy').alias("fhAperturaCCR"),
        col("T26.nuCuentaCCR"),
        col("T26.nbEstatusCCR"),
        col("T26.nbProductoCCR"),
        concat(col("T26.CD_SUBPRODUCTO"), lit(' - '), col("T71.NB_SUBPRODUCTO")).alias("nbSubproductoCCR"),
        concat(col("TCTE.ST_CLA_INTER"), lit(' - '), col("TCTE.CD_SEC_INTER"), lit(' '), col("TCTE.NOMBRE")).alias("nbTitularCCR"),
        date_format(col("TCTE.FH_CANCELACION"), 'dd/MM/yyyy').alias("fhCancelacionCCR"),
        lit('0').alias("imSaldoCCR")
    ).distinct()

    final_df.show()

except:
    pass

+-------+------------+-------------+--------------------+-------------+---------------------+----------------+------------+----------------+----------+
|cdcaso |cdclienteccr|fhaperturaccr|nucuentaccr         |nbestatusccr |nbproductoccr        |nbsubproductoccr|nbtitularccr|fhcancelacionccr|imsaldoccr|
+-------+------------+-------------+--------------------+-------------+---------------------+----------------+------------+----------------+----------+
|1176615|D1714830    |null         |00741513000118876274|CUENTA ACTIVA|1 - CUENTA DE CHEQUES|null            |null        |null            |0         |
|1176615|D1714830    |null         |00741513000119752161|CUENTA ACTIVA|1 - CUENTA DE CHEQUES|null            |null        |null            |0         |
|1180636|E1395002    |null         |00747696000124019830|CUENTA ACTIVA|1 - CUENTA DE CHEQUES|null            |null        |null            |0         |
|1180636|E1395002    |null         |00747696000124019946|CUENTA ACTIVA|1 - CUENTA DE CHE

## Consulta 9

In [None]:
try:
    from pyspark.sql.functions import col, when, lit, concat_ws, trim

    t26 = tsca026_df.alias("T26")
    b = tsca016_df.alias("B")

    mi_query9 = t26.join(
        b,
        col("T26.CD_CLIENTE_PAR") == col("B.CD_CLIENTE"),
        "left"
    ).filter(
        col("T26.CD_CASO").isin(CD_CASO) &
        (~col("T26.CD_CLIENTE_PAR").isin(NU_CLIENTE_LIST))
    )

    mi_query9 = mi_query9.select(
        col("T26.CD_CASO").alias("cdCaso"),
        col("T26.CD_CLIENTE_PAR").alias("nuCtePartCCR"),
        
        trim(
            concat_ws(
                ' ',
                col("T26.CD_INTERVENCION"),
                col("T26.NU_SECUENCIA"),
                when(col("B.TP_PERSONA") == 'F', 
                    concat_ws(' ', col("B.NB_NOMBRE"), col("B.NB_AP_PATERNO"), col("B.NB_AP_MATERNO")))
                .otherwise(concat_ws('', col("B.NB_NOMBRE"), col("B.NB_AP_PATERNO"), col("B.NB_AP_MATERNO")))
            )
        ).alias("nbParticipesCCR")
    ).distinct()

    mi_query9.show()

except:
    pass

+-------+------------+-------------+--------------------+-------------+---------------------+----------------+------------+----------------+----------+
|cdcaso |cdclienteccr|fhaperturaccr|nucuentaccr         |nbestatusccr |nbproductoccr        |nbsubproductoccr|nbtitularccr|fhcancelacionccr|imsaldoccr|
+-------+------------+-------------+--------------------+-------------+---------------------+----------------+------------+----------------+----------+
|1176615|D1714830    |null         |00741513000118876274|CUENTA ACTIVA|1 - CUENTA DE CHEQUES|null            |null        |null            |0         |
|1176615|D1714830    |null         |00741513000119752161|CUENTA ACTIVA|1 - CUENTA DE CHEQUES|null            |null        |null            |0         |
|1180636|E1395002    |null         |00747696000124019830|CUENTA ACTIVA|1 - CUENTA DE CHEQUES|null            |null        |null            |0         |
|1180636|E1395002    |null         |00747696000124019946|CUENTA ACTIVA|1 - CUENTA DE CHE

## Consulta 10

In [None]:
try:
    prod_a_expr = when(length(trim(col("A.NU_CUENTA_PAR"))) > 10, substring(col("A.NU_CUENTA_PAR"), 11, 2)) \
        .otherwise(substring(col("A.NU_CUENTA_PAR"), 1, 2))

    t26_df = tsca026_df.alias("A") \
        .withColumn("PROD_A", prod_a_expr) \
        .join( # Reemplaza subconsulta en WHERE para A.CD_CLIENTE_PAR
            tsca014_df.alias("C14"),
            (col("A.CD_CASO") == col("C14.CD_CASO")) & (col("A.CD_CLIENTE_PAR") == col("C14.CD_CLIENTE")),
            "inner"
        ).join(
            tsca071_df.alias("C71"),
            col("PROD_A").cast("integer") == col("C71.CD_PRODUCTO"),
            "left"
        ).filter(
            col("A.CD_CASO").isin(CD_CASO) &
            (~col("PROD_A").isin('01', '04', '11', '12', '14', '26', '27', '28', '29'))
        ).select(
            col("A.NU_CUENTA_PAR").alias("nuCuentaCCR"),
            col("A.NB_BLOQUEO").alias("nbEstatusCCR"),
            when(length(concat(col("C71.CD_PRODUCTO"), lit(' - '), col("C71.NB_PRODUCTO"))) > 6, 
                concat(col("C71.CD_PRODUCTO"), lit(' - '), col("C71.NB_PRODUCTO")))
            .otherwise('NO EXISTE PRODUCTO EN CATALOGO ').alias("nbProductoCCR"),
            col("A.CD_CASO").alias("cdCaso"),
            col("A.CD_CLIENTE_PAR").alias("cdClienteCCR")
        ).distinct()
    
except:
    pass

In [None]:
miquery0 = spark.read.csv("parquet/salidas/query_zero.csv", sep=",", header=True)
miquery2 = spark.read.csv("parquet/salidas/query_dos.csv", sep=",", header=True)
miquery6 = spark.read.csv("parquet/salidas/query_seis.csv", sep=",", header=True)
miquery7 = spark.read.csv("parquet/salidas/query_siete.csv", sep=",", header=True)
miquery8 = spark.read.csv("parquet/salidas/query_ocho.csv", sep=",", header=True)
miquery9 = spark.read.csv("parquet/salidas/query_nueve.csv", sep=",", header=True)
miquery10 = spark.read.csv("parquet/salidas/query_diez.csv", sep=",", header=True)

## Verificar Salida SQL vs Salida Pyspark

In [32]:
consulta = "zero"
salida_sql = f"parquet/salidas_sql/query_{consulta}.csv"
salida_pyspark = f"parquet/salidas_pyspark/consulta_{consulta}"

salida_pyspark_df = spark.read.csv(salida_pyspark, sep=",", header=True).drop("tpPerJuri")
salida_sql_df = spark.read.csv(salida_sql, sep=",", header=True).select(salida_pyspark_df.columns)

print("Conteo Pyspark:", salida_pyspark_df.count())
print("Conteo SQL:", salida_sql_df.count())

Conteo Pyspark: 21
Conteo SQL: 21


In [None]:
def son_dfs_iguales(df1, df2):
    """
    Compara dos DataFrames de PySpark para ver si son idénticos en
    esquema, número de filas y contenido.

    Args:
        df1 (DataFrame): El primer DataFrame.
        df2 (DataFrame): El segundo DataFrame.

    Returns:
        bool: True si los DataFrames son iguales, False en caso contrario.
    """
    # 1. Comparar esquemas
    if df1.schema != df2.schema:
        print("Los esquemas no coinciden.")
        return False

    # 2. Comparar el número de filas
    if df1.count() != df2.count():
        print("El número de filas no coincide.")
        print("Conteo DF1:", df1.count())
        print("Conteo DF2:", df2.count())
        return False

    # 3. Comparar el contenido
    # Restar df2 de df1
    diff1 = df1.subtract(df2)
    if diff1.count() > 0:
        print("Hay filas en el primer DataFrame que no están en el segundo:")
        diff1.show()
        print("Filas Distintas:", diff1.count())
        
        return False

    # Restar df1 de df2
    diff2 = df2.subtract(df1)
    if diff2.count() > 0:
        print("Hay filas en el segundo DataFrame que no están en el primero:")
        diff2.show()
        print("Filas Distintas:", diff2.count())
        return False
    

    return True


print("Comparando df1 y df_igual:")
print(f"¿Son iguales? {son_dfs_iguales(salida_pyspark_df, salida_sql_df)}\n")

Comparando df1 y df_igual:
Hay filas en el primer DataFrame que no están en el segundo:
+---------+-------------+----------+----------+-----------+--------+-----------+-----------+------------+-----+-------------+---------+--------------------+-------+---------+--------+------------+--------------------+--------------------+---------------+-------+------------+---------+------------+--------------------+
|cdSistema|     cdAlerta|  fhAlerta|   fhEnvio|cdTipologia|cdDivisa|   scoreMdl|fhVenLibSia|fhAsignacion|cdSeg|imMontoAlerta|cdCliente|            nuCuenta| cdCaso|cdPeriodo|   cdCte|    cdAccion|               nbCte|       cdOficinaGest|        nbBanca|imScore|ctMovimiento|cdDirecta|nuScoreCorte|         nbActividad|
+---------+-------------+----------+----------+-----------+--------+-----------+-----------+------------+-----+-------------+---------+--------------------+-------+---------+--------+------------+--------------------+--------------------+---------------+-------+----------

In [36]:
for columna in salida_sql_df.columns:
    diferencias = salida_sql_df.select(columna).subtract(salida_pyspark_df.select(columna))
    if diferencias.count() > 0:
        diferencias.show()
        break

+---------+
|   cdCaso|
+---------+
|1071288.0|
|1246109.0|
|1198373.0|
|1197600.0|
|1176615.0|
|1061041.0|
|1084150.0|
| 968335.0|
|1180636.0|
|1168766.0|
|1185772.0|
| 994651.0|
+---------+

