In [None]:
%load_ext dataflow.magics 

In [None]:
import json
command = {
    "compartmentId": "ocid1.compartment.oc1..aaaaaaaaxex562zeky4c3pdibw4umq3irexcnf4qrho3fnvjbybl3nyd56wq",
    "displayName": "Proceso Rezago Similitud 3.0",
    "language": "PYTHON",
    "sparkVersion": "3.2.1",
    "driverShape": "VM.Standard.E4.Flex",
    "executorShape": "VM.Standard.E4.Flex",
    "driverShapeConfig":{"ocpus":4,"memoryInGBs":32},   #4-32
    "executorShapeConfig":{"ocpus":16,"memoryInGBs":128}, #16-128
    "numExecutors": 32, #10  #15 executors
    "type": "SESSION",
    "logsBucketUri": "oci://dataflow-logs@ax1trrrsni7e/",
    "configuration": {"spark.archives":"oci://bucket-test1@ax1trrrsni7e/conda_environments/cpu/pyspark32_p38_cpu_v3/2.0/pyspark32_p38_cpu_v3v2_0#conda","spark.driver.maxResultSize":"100g"}# 200GB nextstep

}
command = f'\'{json.dumps(command)}\''
 
%create_session -l python -c $command

In [None]:
%%spark

from pyspark.sql import SparkSession
import pyspark.sql.functions as F 
import os
import re
import pandas as pd
import pyspark
import time

from fuzzywuzzy import fuzz
from datetime import datetime
from pyspark.conf import SparkConf
from pyspark.sql.types import FloatType, IntegerType


################################################################################################################
##        Carga Rezagos y Afiliados(MAC) Limpios Desde DataIntegration - Solo Columnas Necesarias             ##
################################################################################################################
file_name_rezago = 'previred_rezagos_files/tmp/rezagos_input.parquet'
file_name_afiliado = 'previred_rezagos_files/tmp/afiliados_input.parquet'

start = time.time()
input_rezagos_master = spark.read.parquet(f"oci://previred_rezagos_files@ax1trrrsni7e/{file_name_rezago}")
input_rezagos_master = input_rezagos_master.drop_duplicates().limit(5)    #...Evaluar si es necesario.
end = time.time()
print('Tiempo de Carga de Rezagos               : ',end - start, 'Segundos. \n', 'Cantidad: ', str(input_rezagos_master.count()), '\n')

start = time.time()
input_afiliados_master = spark.read.parquet(f"oci://previred_rezagos_files@ax1trrrsni7e/{file_name_afiliado}")
input_afiliados_master = input_afiliados_master.drop_duplicates()#.limit(1) #...Evaluar si es necesario.
end = time.time()
print('Tiempo de Carga de Afiliados             : ',end - start , 'Segundos. \n', 'Cantidad: ', str(input_afiliados_master.count()), '\n')
################################################################################################################


################################################################################################################
##Carga Rezagos y Afiliados(MAC) Limpios Desde DataIntegration - Para Completitud de Columnas en salida final ##
################################################################################################################

#file_paths_tmp_rzg  = [f"oci://previred_rezagos_files@ax1trrrsni7e/previred_rezagos_files/tmp/tmp_db_rzg"]
file_paths_tmp_afil = [f"oci://previred_rezagos_files@ax1trrrsni7e/previred_rezagos_files/tmp/tmp_db_afil"]

start    = time.time()
df_tmp_rezago = spark.read.csv("oci://previred_rezagos_files@ax1trrrsni7e/previred_rezagos_files/tmp/rezagos_actualizada.csv", sep=',', header=True)
df_tmp_rezago = df_tmp_rezago.drop_duplicates()     #.limit(1) #...Evaluar si es necesario.
end = time.time()   
print('Tiempo de Carga de RezagosMaster   : ',end - start , 'Segundos. \n', 'Cantidad: ', str(df_tmp_rezago.count()), '\n')

start    = time.time()
dfs_afil = [spark.read.parquet(file_afil) for file_afil in file_paths_tmp_afil]
if len(dfs_afil) > 1:
    df_tmp_afiliado = dfs_afil[0]
    df_tmp_afiliado = [df_tmp_afiliado.unionAll(d) for d in dfs_afil[1:]][0]
else:
    df_tmp_afiliado = dfs_afil[0]
end = time.time()
print('Tiempo de Carga y Unión de AfiliadosTmp : ', end - start , 'Segundos. \n', 'Cantidad: ', str(df_tmp_afiliado.count()), '\n')
################################################################################################################

In [None]:
%%spark
spark.sparkContext.setLogLevel("ERROR")

################################################################################################################
##            Similitud [Rezagos VS Afiliados(MAC)] Por Dígito Verificador y Guardado en Bucket               ##
################################################################################################################
print('Iniciando Calculo De Similitudes')
start_master = time.time()
df_tmp_rezago = df_tmp_rezago.repartition(50)
digito_verificador = ['0','1','2','3','4','5','6','7','8','9','K']


for var in digito_verificador:
    print(str(datetime.now().time()) + ' - Procesando Similitud con digito verificador  ' + str(var))

    df_b = input_afiliados_master.filter(F.col('DV') == var)
    df_b = df_b.fillna('')
    df_b = df_b.repartition(50)
    
    #Dataframe df_tmp_afiliado Añadido en un dataframe con el universo mas disminuido. (TABLA UTILIZADA SOLO PARA COMPLETITUD DE COLUMNAS)
    df_c = df_tmp_afiliado.filter(F.col('DV') == var)
    df_c = df_c.fillna('')
    df_c = df_c.repartition(50)
    
    df_a_1 = df_a.select('ID','RUT_AFILIADO','NOM_TRABAJADOR','APELLIDO_PATERNO','APELLIDO_MATERNO','NOM_COMPLETO','NOM_SIN_AM','DV_AFILIADO')
    df_b_1 = df_b.select('RUT','NOMBRES','AP_PATERNO','AP_MATERNO','NOM_COMPLETO_F','NOM_SIN_AM_F','DV') 
    
    df1_x_df2 = df_a_1.crossJoin(df_b_1)
    
    #...Similitud concatenadas (LEVENSHTEIN)...
    #...df = df1_x_df2.withColumn('SIMILITUD', F.round(100 - (F.levenshtein(F.col("NOM_COMPLETO"), F.col("NOM_COMPLETO_F"))*100) / F.greatest(F.length(F.col("NOM_COMPLETO")), F.length(F.col("NOM_COMPLETO_F"))), 0).cast(IntegerType()))
    #...df = df.withColumn('SIMILITUD_SIN_APMA', F.round(100 - (F.levenshtein(F.col("NOM_SIN_AM"), F.col("NOM_SIN_AM_F"))*100) / F.greatest(F.length(F.col("NOM_SIN_AM")), F.length(F.col("NOM_SIN_AM_F"))), 0).cast(IntegerType()))
    
    #...Similitud concatenadas (FUZZY WUZZY)
    df = df1_x_df2.withColumn('SIMILITUD', (F.udf(fuzz.ratio)(F.col("NOM_COMPLETO"), F.col("NOM_COMPLETO_F"))))
    df = df.withColumn('SIMILITUD_SIN_APMA', (F.udf(fuzz.ratio)(F.col("NOM_SIN_AM"), F.col("NOM_SIN_AM_F"))))
    print('\tSimilitud concatenadas     [COMPLETADA]')

    #...Encontrar primer nombre del rezago.
    df = df.withColumn('PRIMER_NOM_TRABAJADOR', F.split(F.col('NOM_TRABAJADOR'), ' ')[0] )
    df = df.withColumn('EXISTENCIA', F.when(F.udf(fuzz.token_set_ratio)(F.col('PRIMER_NOM_TRABAJADOR'), F.col('NOMBRES')) >= 100, 1 ).otherwise(0))
    print('\tExistencia                 [COMPLETADA]')

    #...Similitudes de Nombres
    df = df.withColumn('SIMILITUDNOMBRE', F.round(100 - (F.levenshtein(F.col("NOM_TRABAJADOR"), F.col("NOMBRES"))*100) / F.greatest(F.length(F.col("NOMBRES")), F.length(F.col("NOM_TRABAJADOR"))), 0).cast(IntegerType()))
    df = df.withColumn('SIMILITUDNOMBREREZAGOAPP', F.round(100 - (F.levenshtein(F.col("NOMBRES"), F.col("APELLIDO_PATERNO"))*100) / F.greatest(F.length(F.col("NOMBRES")), F.length(F.col("APELLIDO_PATERNO"))), 0).cast(IntegerType()))
    df = df.withColumn('SIMILITUDNOMBREREZAGOAPM', F.round(100 - (F.levenshtein(F.col("NOMBRES"), F.col("APELLIDO_MATERNO"))*100) / F.greatest(F.length(F.col("NOMBRES")), F.length(F.col("APELLIDO_MATERNO"))), 0).cast(IntegerType()))
    print('\tSimilitud Nombres          [COMPLETADA]')

    #Similitud de apellido paterno
    df = df.withColumn('SIMILITUDAPEPATERNO', F.round(100 - (F.levenshtein(F.col("AP_PATERNO"), F.col("APELLIDO_PATERNO"))*100) / F.greatest(F.length(F.col("AP_PATERNO")), F.length(F.col("APELLIDO_PATERNO"))), 0).cast(IntegerType()))
    df = df.withColumn('SIMILITUDAPPATREZAGONOMBRE', F.round(100 - (F.levenshtein(F.col("AP_PATERNO"), F.col("NOM_TRABAJADOR"))*100) / F.greatest(F.length(F.col("AP_PATERNO")), F.length(F.col("NOM_TRABAJADOR"))), 0).cast(IntegerType()))
    df = df.withColumn('SIMILITUDAPPATREZAGOAPMAT', F.round(100 - (F.levenshtein(F.col("AP_PATERNO"), F.col("APELLIDO_MATERNO"))*100) / F.greatest(F.length(F.col("AP_PATERNO")), F.length(F.col("APELLIDO_MATERNO"))), 0).cast(IntegerType()))
    print('\tSimilitud apellido Paterno [COMPLETADA]') 

    #Similitud de apellido materno
    df = df.withColumn(
        'SIMILITUDAPEMATERNO', 
        F.when(F.isnull(F.round(100 - (F.levenshtein(F.col("AP_MATERNO"), F.col("APELLIDO_MATERNO"))*100) / F.greatest(F.length(F.col("AP_MATERNO")), F.length(F.col("APELLIDO_MATERNO"))), 0).cast(IntegerType())), F.lit(0))
        .otherwise(F.round(100 - (F.levenshtein(F.col("AP_MATERNO"), F.col("APELLIDO_MATERNO"))*100) / F.greatest(F.length(F.col("AP_MATERNO")), F.length(F.col("APELLIDO_MATERNO"))), 0).cast(IntegerType())))
    df = df.withColumn('SIMILITUDAPMATREZAGONOMBRE', F.round(100 - (F.levenshtein(F.col("AP_MATERNO"), F.col("NOM_TRABAJADOR"))*100) / F.greatest(F.length(F.col("AP_MATERNO")), F.length(F.col("NOM_TRABAJADOR"))), 0).cast(IntegerType()))
    df = df.withColumn('SIMILITUDAPMATREZAGOAPPAT', F.round(100 - (F.levenshtein(F.col("AP_MATERNO"), F.col("APELLIDO_PATERNO"))*100) / F.greatest(F.length(F.col("AP_MATERNO")), F.length(F.col("APELLIDO_PATERNO"))), 0).cast(IntegerType()))
    print('\tSimilitud apellido Materno [COMPLETADA]') 

    #Similitud de rut 
    df = df.withColumn('SIMILITUD_RUT_AFILIADO', F.round(100 - (F.levenshtein(F.col("RUT_AFILIADO"), F.col("RUT"))*100) / F.greatest(F.length(F.col("RUT_AFILIADO")), F.length(F.col("RUT"))), 0).cast(IntegerType()) )
    print('\tSimilitud Rut              [COMPLETADA]')

    # ...FILTRO DE SIMILITUD...
    df = df.filter(F.col("SIMILITUD") >= 60) 
    
    print( str(datetime.now().time()) + ' - Similitud Terminanda con digito verificador. ' + str(var))
        
    ################################################################################################################
    ##                      Armado de Salida Por Dígito Verificador y Guardado en Bucket                          ##
    ################################################################################################################
    
    print( str(datetime.now().time()) + ' - Rellenando y Uniendo Columnas faltantes.      ' + str(var))
        
    #...Agregamos alias a dataframe para simular consultas como si fuera base de datos. 
    #.. A: Dataframe de similitudes  -  B: Dataframe de TMP_REZAGO  -  C: Dataframe de TMP_AFILIADO
    df = df.alias('A') 
    df_tmp_rezago = df_tmp_rezago.alias('B')
    df_c = df_c.alias('C')
    
    #..Se realiza merge de tablas.
    df = df.join(df_c, F.col('A.RUT') == F.col('C.RUT'), how='left').drop(F.col('C.RUT'))
    df = df.join(df_tmp_rezago, F.col('A.ID') == F.col('B.ID'), how='left').drop(F.col('B.RUT_AFILIADO')).drop(F.col('B.APELLIDO_PATERNO'))
 
    #...Definimos formatos de salida para columnas NECESARIAS.
    df = df.withColumn('RUT', F.col('A.RUT').cast(IntegerType()))
    df = df.withColumn('ROL_AFILIADO', F.lit(000))
    df = df.withColumn('NACIONALIDAD', F.lit(' '))
    df = df.withColumn('CORREO_ELECTRONICO', F.lit(' '))
    df = df.withColumn('TELEFONO', F.lit(' '))
    df = df.withColumn('ESTADO_CASO', F.lit(' '))
    df = df.withColumn('FECHA_CREACION_CASO', F.lit(' '))
    df = df.withColumn('FECHA_ACTUALIZACION_CASO', F.lit(' '))
    df = df.withColumn('DOCUMENTACION', F.lit(' '))
    df = df.withColumn('STATUS', F.lit(' '))
    df = df.withColumn('RUT_EMPLEADOR_PAGO', F.lit(' '))
    df = df.withColumn('CRITERIO_DE_ACLARACION', F.lit(' '))
    df = df.withColumn('FECHA_RESOLUCION', F.lit(' '))
    df = df.withColumn('RUT_AFILIADO', F.regexp_replace(F.col('A.RUT_AFILIADO'),"''",'0')).drop(F.col('A.RUT_AFILIADO'))
    
    #...Creación de salida con estructura final.
    df = df.select(
        'RUT',
        'A.DV',
        'ROL_AFILIADO',
        'A.NOMBRES',
        'A.AP_PATERNO',
        'A.AP_MATERNO',
        'C.PERIODO',
        'NACIONALIDAD',
        'C.SEXO',
        'C.FECHA_NACIMIENTO',
        'CORREO_ELECTRONICO',
        'TELEFONO',
        'ESTADO_CASO',
        'FECHA_CREACION_CASO',
        'FECHA_ACTUALIZACION_CASO',
        'DOCUMENTACION',
        'C.COD_INSTITUCION',
        'C.FECHA_INGRESO_SISTEMA',
        'C.FECHA_INCORPORACION_AFP',
        'C.SITUACION_AFILIADO',
        'A.ID',
        'STATUS',
        'RUT_EMPLEADOR_PAGO',
        'CRITERIO_DE_ACLARACION',
        'FECHA_RESOLUCION',
        'B.AFP',
        'B.RUT_EMPLEADOR',
        'B.DV_EMPLEADOR',
        'B.RAZON_SOCIAL',
        'B.MAIL_EMPLEADOR',        
        'B.DOMICILIO_EMPLEADOR',
        'B.COMUNA_EMPLEADOR',
        'RUT_AFILIADO',
        'A.DV_AFILIADO',
        'A.APELLIDO_PATERNO',
        'A.APELLIDO_MATERNO',
        F.col('A.NOM_TRABAJADOR').alias('NOMBRES_TRABAJADOR'),
        'B.TIPO_PRODUCTO',
        'B.P_DEVENGADO',
        'B.REM_IMPONIBLE',
        'B.MTO_COTIZACION',
        'B.FECHA_PAGO',
        'B.LLAVE_INTERNA_AFP',
        F.col('A.NOM_TRABAJADOR').alias('NOMBRE_REZAGO'),
        F.col('A.NOMBRES').alias('NOMBRE_AFILIADO'),
        'EXISTENCIA',
        'SIMILITUD',
        'SIMILITUD_SIN_APMA',
        'SIMILITUD_RUT_AFILIADO',
        'SIMILITUDNOMBRE',
        'SIMILITUDAPEPATERNO',
        'SIMILITUDAPEMATERNO',
        'SIMILITUDNOMBREREZAGOAPP',
        'SIMILITUDNOMBREREZAGOAPM',
        'SIMILITUDAPPATREZAGONOMBRE', 
        'SIMILITUDAPPATREZAGOAPMAT', 
        'SIMILITUDAPMATREZAGONOMBRE',
        'SIMILITUDAPMATREZAGOAPPAT'
    )
    
    ###
    #df.createOrReplaceTempView('TMP')
    #spark.sql('SELECT COUNT(*) FROM TMP').show()
    ###
    
    df = df.repartition(10)
    df.persist(pyspark.StorageLevel.DISK_ONLY)

    print( str(datetime.now().time()) + ' - Escribiendo en bucket con digito verificador  ' + str(var))
    
    start = time.time()
    df.write.format("parquet")\
    .option("header", "true")\
    .mode("overwrite")\
    .save(F"oci://previred_rezagos_files@ax1trrrsni7e/previred_rezagos_files/proceso_rezago_3_0/dv_{var}")
    end = time.time()
    print(f'Tiempo guardando en bucket con dígito verificador {var}: ', end - start, '\n')

end = time.time()
tiempo_transcurrido = end - start_master
print(f'El Proceso Demoro ', int(tiempo_transcurrido // 60), ' Minutos y ' , int(tiempo_transcurrido % 60), ' Segundos.')

In [None]:
%%spark

###############################################################################################################
##          Unimos PARQUETs y Guardamos 10 Particiones Unificadas Para Generar Tabla Externa                 ##
###############################################################################################################
start = time.time()

# Le damos la direccion de todos los parquet dentro del bucket
file_paths = ["oci://previred_rezagos_files@ax1trrrsni7e/previred_rezagos_files/proceso_rezago_3_0/dv_*"]

dfs = [spark.read.parquet(file_path) for file_path in file_paths]

# Se unifica a 1 solo data frame
merged_df = dfs[0].unionAll(dfs[1:]) if len(dfs) > 1 else dfs[0]

# se fuera que existan solo 10 reparticiones.
merged_df = merged_df.repartition(10)

# Se crea el nuevo input que sera la tabla externa al autonomos.
merged_df.write.format("parquet")\
    .option("header", "true")\
    .mode("overwrite")\
    .save("oci://previred_rezagos_files@ax1trrrsni7e/previred_rezagos_files/proceso_rezago_3_0/Salida_Final")

end = time.time()
print(f'Tiempo guardando en bucket: ', end - start, '\n')

In [None]:
#...Importante stopear la sesión de spark. Para CLOUD ejecutar '%stop_session'
%stop_session
