In [1]:
import pandas as pd
import numpy as np
import config.config as constant
import os
import traceback
from datetime import datetime
import csv
from sqlalchemy import create_engine

pd.set_option('display.max_colwidth', None)
pd.set_option('display.width', None)

ERROR_03 = 'CARACTERES INVALIDOS;'
ERROR_04 = 'LARGO DEL REGISTRO NO VALIDO;'
ERROR_05 = 'FORMATO ID REZAGO NO NUMERICO;'
ERROR_06 = 'LARGO DE ID REZAGO NO VALIDO;'
ERROR_07 = 'ESTADO DEL REZAGO NO VALIDO; '

PARAMETRO_BUSQUEDA = 'crez20230908'
cod_inst= '1003'
ruta_salida_local = constant.LOCAL_OUTPUT_PATH.format(cod_ins_super=cod_inst)  
file_input_name = 'crez20230908_1030.cup'
fecha_carga_bd = '2023-09-01 21:00:00.000'
SERVER_GESTION= '172.25.51.40'

In [3]:
def validar_contenido(fullLocalPath = file_input_name, fileName = file_input_name):
    print(f'\t\t\t\t[VALIDAR CONTENIDO] Iniciando validación de contenido del archivo {fileName}')
    
    df= None
    df_valid= None
    df_invalid= None
    total = ''
    data= []
    informacion= {
        'Log': 'Sin información',
        'Excepcion' : '', 
        }
   
    try:
        with open(fullLocalPath, encoding='utf-8', errors='ignore') as f:
            count=0
            for line in f:
                if count > 0:
                    data.append(str(line.replace('\n','')))
                    # if constant.QA:                  #ELIMINAR
                    #     s = int(s) + 1           #ELIMINAR
                    #     s = str(s).zfill(25) #ELIMINAR                         
                    #     data.append(str(line.replace('\n','')) + s) #ELIMINAR
                    # else:
                    #     data.append(str(line.replace('\n','')))
                count += 1
            
        df= pd.DataFrame(data, columns=['Column_name'])
        df['Column_name'] = df['Column_name'].str.replace('\t', ' ')
        del data 
        df.index= df.index + 2
        if df is not None and len(df) > 0:
            #...Crear un diccionario que contenga el inicio y fin de cada columna
            columns = {}
            start = 0
            for column_name, length in constant.lengths.items():
                end = start + length
                columns[column_name] = (start, end)
                start = end
            #...Crear nuevas columnas usando str.slice()
            for column_name, (start, end) in columns.items():
                df[column_name] = df['Column_name'].str.slice(start, end)
            
            #...Validaciones y creación de columnas.
            df['longitud_total'] = df['Column_name'].str.len()
            df['nro_linea'] = df.index
            df['error_caracteres'] = np.where(df['Column_name'].str.contains(r'\\|;'), ERROR_03, '') #.. Código error 03
            df['error_longitud'] = np.where(df['Column_name'].str.len() == 277, '', ERROR_04) #.. Código error 04
            df['error_no_numerico_id_rzg_unico'] = np.where(df['error_longitud'] == '', np.where(df['id_rzg_unico'].str.isdigit(), '', ERROR_05 ), '') #.. Código error 05
            df['error_largo_id_rzg_unico'] = np.where((df['error_longitud'] == '') & (df['id_rzg_unico'].str.len() == constant.lengths['id_rzg_unico']), '', ERROR_06 ) #.. Código error 06
            df['error_estado_rezago'] = np.where((df['estado_rezago']== 'G') | (df['estado_rezago']== 'R'), '', ERROR_07 ) #.. Código error 07
            df['desc_error'] = df['error_caracteres']+df['error_longitud']+df['error_no_numerico_id_rzg_unico']+df['error_largo_id_rzg_unico']+df['error_estado_rezago']
            df['desc_error'] = df['desc_error'].str.strip() #.apply(lambda x: dividir_descripcion_error(x))
            df['registro_valido'] = np.where(df['desc_error'] == '', True, False) 
            
            #...Separando registros validos y no validos.
            df_valid = df[df['registro_valido']].reset_index(drop=True)
            df_invalid = df[~df['registro_valido']].reset_index(drop=True)
            
            total= str(len(df))
            
            informacion['Log'] = f'Validaciones terminadas correctamente. |Totales:{total}|Validos:{str(len(df_valid))}|No Validos:{str(len(df_invalid))}|'
            del df
            
        else:
            informacion['Log'] = f'Largo de dataframe no valido para ingresar a las validaciones [Largo: {str(len(df))}].'
    
    except Exception as e:
        informacion['Excepcion']= "__EXCEPTION_FILE: " + str(e) + str(traceback.format_exc(limit=1))
        informacion['Log'] = informacion['Excepcion']
        
    finally:
        print('\t\t\t\t\t' + str(informacion['Log']))
        
        return df_valid, df_invalid, informacion['Log']

def guardar_salida_local(df= None, cod_ins_super = cod_inst, ext='', type=True, fileName=file_input_name):
    informacion= {
        'Log': 'Sin información',
        'Excepcion' : '', 
        }
    output_name = ''
    try:
        
        if type:
            print(f'\t\t\t\t[REGISTROS VALIDOS] Guardado de salida local.')
            output_name = constant.LOCAL_OUTPUT_PATH.format(cod_ins_super = cod_ins_super) + fileName
            df['fecha_validacion'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            df['fecha_carga'] = fecha_carga_bd
            df['cod_afp'] = cod_ins_super
            columns = list(constant.lengths.keys()) + ['fecha_validacion', 'fecha_carga' , 'cod_afp']
            df = df[columns]
            df
            df.to_csv(output_name, sep = '\t', index=False, quoting=csv.QUOTE_NONE, quotechar='', escapechar='\\')
            
        else:
            print(f'\t\t\t\t[REGISTROS NO VALIDOS] Guardado de salida local.')
            output_name = constant.LOCAL_OUTPUT_PATH.format(cod_ins_super = cod_ins_super) + constant.OUTPUT_NOK_FILE.format(extafp = ext)
            df = df.rename(columns={
                'nro_linea': 'Nro. Linea',
                'Column_name': 'Información del Registro', 
                'desc_error': 'Descripción del Error'
                })
            df = df[['Nro. Linea','Información del Registro', 'Descripción del Error']]
            
            df.to_csv(output_name, sep = '\t', index=False, quoting=csv.QUOTE_NONE, quotechar='', escapechar='\\')
        
        informacion['Log'] = f'Archivo {output_name} copiado en el local.'
        
    except Exception as e:
        informacion['Excepcion'] = "EXCEPTION_FILE: " + str(e) + str(traceback.format_exc(limit = 1))
        informacion['Log'] = informacion['Excepcion']
        
    finally:
        print('\t\t\t\t\t' + str(informacion['Log']))
        
        return informacion['Log']

def connect_alchemy(df=None, table = constant.TBL_MAESTRO_REZAGO):
    print(f'\t\t\t\t[INSERTAR REGISTROS] Iniciando conexión a la base de datos.')
    
    informacion= {
        'Log': 'Sin información',
        'Excepcion' : '', 
        }
    try:
        print(f'\t\t\t\t\tCreando conexión a [{SERVER_GESTION}]-[{constant.BD_INTEGRA}] con sqlalchemy...')
        engine = create_engine(f'mssql+pymssql://{constant.UID}:{constant.PWD}@{SERVER_GESTION}/{constant.BD_INTEGRA}')
        
        print(f'\t\t\t\t\tInsertando en tabla ...')
        df.to_sql(table, con=engine, if_exists='append', schema=constant.SCHEMA_MAE, index=False, method="multi",chunksize=100,)

        informacion['Log'] = f'[OK] {str(len(df))} Registros cargados exitosamente.'
        
        engine.dispose()
        
    except Exception as e:
        informacion['Excepcion'] = "EXCEPTION_DB_ALCHEMY: " + str(e) + str(traceback.format_exc(limit=1))
        informacion['Log'] = informacion['Excepcion']
        
    finally:
        print('\t\t\t\t\t' + str(informacion['Log']))
        return informacion['Log']
    
def obtener_df_validos(ruta, parametro):
    print(f'\t\t\t\t[OBTENER DF VALIDOS] Buscando archivos validos en ruta {ruta}.')
    
    try:
        df= None
        df_concatenados= []
        informacion= {
            'Log': 'Sin información',
            'Excepcion' : '', 
        }
        
        lista_archivos = os.listdir(ruta)
        archivos_validos= []
        ultimo_archivo_del_dia = ''
        hora_archivo = []
        
        if len(lista_archivos) > 0:
            for f in lista_archivos:
                if str(f).startswith(parametro):
                    archivos_validos.append(f)
                    hora_archivo.append(int(f[13:17])) #... Esto es para agregar las horas y luego obtener el maximo
                else:
                    informacion['Log'] = 'No se encuentran registros validos a rescatar.'
                    
            if len(archivos_validos) > 0 and len(hora_archivo) > 0 :
                #... Ejemplo crez20230411_1332 
                for a in archivos_validos:
                    if parametro  + '_' + str(max(hora_archivo)).zfill(4) in a:
                        ultimo_archivo_del_dia = a
                        df = pd.read_csv(ruta + a, sep='\t', dtype=str)
                        break
                    
                # for a in archivos_validos:
                #     df = pd.read_csv(ruta + a, sep='\t', dtype=str)
                #     df_concatenados.append(df)
                # df = pd.concat(df_concatenados, ignore_index=True, axis=0)
                
                #df = df.drop_duplicates(subset=['id_rzg_unico'])
                df['fecha_carga'] = fecha_carga_bd
                
                informacion['Log'] = f'[OK] {str(len(df))} Registros obtenidos desde ultimo archivo cargado "{str(ultimo_archivo_del_dia)}".'
                
        else:
            informacion['Log'] = 'El directorio no contiene archivos.'

    except Exception as e:
        informacion['Excepcion'] = "EXCEPTION_FILE: " + str(e) + str(traceback.format_exc(limit = 1))
        informacion['Log'] = informacion['Excepcion']
    finally:
        print('\t\t\t\t\t' + str(informacion['Log']))
        
        return df, ultimo_archivo_del_dia, informacion['Log']

In [4]:
df_valid, df_invalid, log = validar_contenido()

				[VALIDAR CONTENIDO] Iniciando validación de contenido del archivo crez20230908_1030.cup
					Validaciones terminadas correctamente. |Totales:13713|Validos:13713|No Validos:0|


In [5]:
guardar_salida_local(df= df_valid) #..Validos
#guardar_salida_local(df= df_invalid , type=False) #..No validos

				[REGISTROS VALIDOS] Guardado de salida local.
					Archivo /Users/felipebravoespinosa/Documents/Empresas/Previred/servicios-automatizados/MaestroRezagos/reports/1003/Salida/crez20230908_1030.cup copiado en el local.


'Archivo /Users/felipebravoespinosa/Documents/Empresas/Previred/servicios-automatizados/MaestroRezagos/reports/1003/Salida/crez20230908_1030.cup copiado en el local.'

In [5]:
df = pd.read_csv(f'/Users/felipebravoespinosa/Documents/Empresas/Previred/servicios-automatizados/MaestroRezagos/reports/{cod_inst}/Salida/{file_input_name}', sep='\t')

In [6]:
df_valid, archivos_validos, log = obtener_df_validos(ruta_salida_local, PARAMETRO_BUSQUEDA)

				[OBTENER DF VALIDOS] Buscando archivos validos en ruta /Users/felipebravoespinosa/Documents/Empresas/Previred/servicios-automatizados/MaestroRezagos/reports/1035/Salida/.
					[OK] 43569 Registros obtenidos desde ultimo archivo cargado "crez20230901_1230.uno".


In [7]:
df_valid

Unnamed: 0,tipo_registro,rut_afiliado,dv_afiliado,ap_paterno,ap_materno,nombres,rut_pagador,dv_pagador,razon_social,mes_dev_rem_imp,...,per_traspaso_rez,tipo_cuenta,tipo_pago,tipo_planilla,nro_folio_planilla,tipo_recaudacion,id_rzg_unico,fecha_validacion,fecha_carga,cod_afp
0,D,00000000,,FERNANDEZ,RONDON,NAZARETH DE LOS ANGELES,76924218,K,LUIS RENATO LEIVA REYES CONTRATISTA EN SERVICIOS D,201912,...,00000000,1,R,E,2013201912028952,E,0000000000000000000066243,2023-09-04 09:18:12,2023-09-01 21:00:00.000,1035
1,D,00000000,,,,,00000001,9,NO TIENE,201912,...,00000000,6,R,D,1220132019120734,E,0000000000000000000119499,2023-09-04 09:18:12,2023-09-01 21:00:00.000,1035
2,D,00000000,,,,,00000001,9,NO TIENE,202001,...,00000000,6,R,D,1201320191207344,E,0000000000000000000119498,2023-09-04 09:18:12,2023-09-01 21:00:00.000,1035
3,D,00000000,,,,,00000000,,,202006,...,00000000,6,R,D,6201320200512091,X,0000000000000000000484668,2023-09-04 09:18:12,2023-09-01 21:00:00.000,1035
4,D,00000000,,,,,00000000,,,202010,...,00000000,6,R,D,1020132020081370,E,0000000000000000000590126,2023-09-04 09:18:12,2023-09-01 21:00:00.000,1035
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
43564,D,00000000,,,,,96640140,0,EMPRESAS SOLVENCIA SPA,202108,...,00000000,1,R,E,0000000000000014,M,0000000000000000001789430,2023-09-04 09:18:12,2023-09-01 21:00:00.000,1035
43565,D,00000000,,,,,96640140,0,EMPRESAS SOLVENCIA SPA,202204,...,00000000,1,R,E,0000000000000027,M,0000000000000000001789431,2023-09-04 09:18:12,2023-09-01 21:00:00.000,1035
43566,D,00000000,,,,,96640140,0,EMPRESAS SOLVENCIA SPA,202107,...,00000000,1,R,E,0000000000000013,M,0000000000000000001789429,2023-09-04 09:18:12,2023-09-01 21:00:00.000,1035
43567,D,00000000,,,,,96640140,0,EMPRESAS SOLVENCIA SPA,202108,...,00000000,1,R,E,0000000000000015,M,0000000000000000001789426,2023-09-04 09:18:12,2023-09-01 21:00:00.000,1035


In [8]:
connect_alchemy(df_valid)

				[INSERTAR REGISTROS] Iniciando conexión a la base de datos.
					Creando conexión a [172.25.51.40]-[previred_integracion] con sqlalchemy...
					Insertando en tabla ...
					[OK] 43569 Registros cargados exitosamente.


'[OK] 43569 Registros cargados exitosamente.'

In [None]:
# SELECT 
# 	cod_afp,
# 	fecha_carga,
# 	fecha_validacion ,
# 	COUNT(*)
# FROM previred_integracion.mae.maestro_rezagos
# WHERE CONVERT(CHAR(8), fecha_carga, 112)='20230804' 
# GROUP BY cod_afp, fecha_carga,fecha_validacion

# SELECT * FROM previred_integracion.mae.instituciones_rezagos ir 

# SELECT 
# 	cod_afp,
# 	fecha_carga  ,
# 	fecha_validacion ,
# 	COUNT(*)
# FROM previred_integracion.mae.maestro_rezagos WITH(NOLOCK)
# WHERE CONVERT(CHAR(8), fecha_validacion, 112)='20230807' 
# GROUP BY cod_afp, fecha_carga, fecha_validacion

In [32]:
import pymssql 
import pandas

def connect_pymssql(server, data_base, query=None, type=None, variable=False):
    print(f'\t\t\t\t[OBTENER TABLA BD] Iniciando conexión a la base de datos.')
    conn= None
    cursor = None
    df= None
    resp = None
    bitacora = False
    informacion= {
        'Log': 'Sin información',
        'Excepcion' : '', 
        }
    try:
        
        print(f'\t\t\t\t\tCreando conexión a [{constant.SERVER_GESTION}]-[{constant.BD_INTEGRA}] con pymssql...')
        conn = pymssql.connect(server, constant.UID, constant.PWD, data_base)
        cursor = conn.cursor()
       
        if type == 'pandas' and variable is False: 
            if str(query).upper().startswith('EXEC') or 'EXEC cuadre.' in str(query):
                if str(query).upper().startswith('EXEC'):
                    print(f'\t\t\t\t\t({server}-{data_base}) Enviando SP con pandas...   \n \t\t\t\t\t\t\t\t\t\t{query}')
                    dfs = []
                    for chunk in pandas.read_sql_query(query, conn, chunksize=100):
                        dfs.append(chunk) 
                    df = pandas.concat(dfs, ignore_index=True)
                    
                else:
                    print(f'\t\t\t\t\t({server}-{data_base}) Enviando SP con pandas...   \n \t\t\t\t\t\t\t\t\t\t{query}')
                    df = pandas.read_sql_query(query, conn) 
                    
            else:
                print(f'\t\t\t\t\t({server}-{data_base}) Enviando consulta SQL con pandas...   \n \t\t\t\t\t\t\t\t\t\t{query}')
                df = pandas.read_sql_query(open(file=query).read(), conn) 
                
            if bitacora is False and (df is not None and not df.empty):   
                informacion['Log'] = f'[OK] "{str(len(df))}" Registros obtenidos correctamente.'
            else:
                informacion['Log'] = f'[NOK] No se encuentran registros dentro de la base de datos.'
                
        elif type == 'pandas' and variable:
            if variable:
                print(f'\t\t\t\t\t({server}-{data_base}) Enviando query para obtener datos de calendario...   \n \t\t\t\t\t\t\t\t\t\t')
                df = pandas.read_sql_query(query, conn)
            if df is not None and not df.empty:   
                informacion['Log'] = f'[OK] "{str(len(df))}" Registros obtenidos correctamente.'
            else:
                informacion['Log'] = f'[NOK] No se encuentran registros dentro de la base de datos.'
                 
        else:
            cursor.execute(query)
            if cursor.description is not None:
                resp = list(cursor.fetchall()[0])[0] #Ejemplo salida fetchall [(3263,)]
                conn.commit()    
                bitacora = True
            else: 
                conn.commit()
                bitacora = True
            if bitacora:
                informacion['Log'] = f'[BITÁCORA OK] SP Bitácora ejecutado correctamente.'
            else:
                informacion['Log'] = f'[BITÁCORA NOK] SP Bitácora finalizado con error.'

    except Exception as e:
        informacion['Excepcion'] = "EXCEPTION_DB_PYMSSQL: " + str(e) + str(traceback.format_exc(limit=1))
        informacion['Log'] = informacion['Excepcion']
        
    finally:
        if conn:
            conn.close()
        if cursor:
            cursor.close()  
        print('\t\t\t\t\t' + str(informacion['Log']))
        resp = resp if bitacora else df
    
        return resp, informacion['Log']

In [36]:
#...Consolidado
from functions.file import guardar_consolidado_local

spLST = constant.SPLST_CONSOLIDADO_MAESTRO_REZAGO.format(
                            fecha_carga_desde = '20230731', 
                            fecha_carga_hasta=  '20230806'
                            )
df_consolidado, log = connect_pymssql(SERVER_GESTION, constant.BD_INTEGRA, spLST, type='pandas')
log = guardar_consolidado_local(df_consolidado, cod_inst)

				[OBTENER TABLA BD] Iniciando conexión a la base de datos.
					Creando conexión a [INTEGRACION_PROD]-[previred_integracion] con pymssql...
					(172.25.51.40-previred_integracion) Enviando SP con pandas...   
 										EXEC [previred_integracion].[mae].[spLST_Consolidado_Maestro_Rezagos] '20230731', '20230806'


  for chunk in pandas.read_sql_query(query, conn, chunksize=100):


					[OK] "1312689" Registros obtenidos correctamente.
[2023-08-08 10:31:02][1624.11mb/1355.12mb]:  				[DEPOSITAR CONSOLIDADO] Depositar archivo consolidado en casilla SFTP.
[2023-08-08 10:31:27][542.66mb/1126.31mb]:  					[OK] Archivo consolidado guardado en local. [/Users/felipebravoespinosa/Documents/Empresas/Previred/servicios-automatizados/MaestroRezagos/reports/1003/Salida/concrez20230808_0923.pre]
