In [None]:
import xmltodict 
import pandas as pd
from os import listdir
import os
from data import CWTR, CUILES
from helpers.dict import unpack

In [None]:
montos_cargas = { 
    "normal": {
    1 :     19621.44, 
    3 :     9895.16, 
    30 :    9895.16, 
    31 :    19790.33, 
    32:     19790.33,
    51:     19621.44, # VER CON VALE
    103:    0,},

    "especial" : {
    1 :     23938.16, 
    3 :     12072.10, 
    30 :    12072.10, 
    31 :    24144.20, 
    32:     24144.20,
    51:     23938.16, # VER CON VALE
    103:    0,}
}

provincias_especiales = [
    'Neuquén',
    'Chubut',
    'Río Negro',
    'Santa Cruz',
    'La Pampa',
]

### COMPOSICIÓN DEL XML

In [None]:
from data import CUILES

def obtener_legajo(cuil):
    cuil = int(cuil)
    return CUILES.query('cuil == @cuil').iloc[0].at["legajo"]

In [None]:
cwtr_provincias = CWTR[['legajo', 'mes', 'provincia']].drop_duplicates().reset_index()

In [None]:
aux1 = cwtr_provincias.copy()
aux1['count'] = 1

resumen1 = aux1[['legajo', 'provincia', 'count']].groupby(['legajo', 'provincia'], as_index=False).sum()

aux2 = resumen1[['legajo']].copy()
aux2['count'] = 1


resumen2 = aux2[['legajo', 'count']].groupby(['legajo'], as_index=False).sum()
aux3 = resumen2.query('count > 1')

LEGAJOS_MULTIPROVINCIA = list(aux3['legajo'])

LEGAJOS_MULTIPROVINCIA

In [None]:
def obtener_provincia_mes(legajo_buscado, mes_buscado):

    if int(legajo_buscado) not in LEGAJOS_MULTIPROVINCIA:
        raise IndexError(f'El legajo "{legajo_buscado}" sólo tiene una provincia, no siendo necesario el parámetro mes. \nSe debe usar la función: \n  obtener_provincia()')
    
    # for i, row in cwtr_provincias.iterrows():
    #     legajo, mes, provincia = unpack(row, 'legajo', 'mes', 'provincia')

    #     if legajo_buscado == legajo and mes_buscado == mes:
    #         return provincia

    return cwtr_provincias.query('legajo == @legajo_buscado and mes == @mes_buscado').iloc[0].at['provincia']

In [None]:
def obtener_provincia(legajo_buscado):

    if int(legajo_buscado) in LEGAJOS_MULTIPROVINCIA:
        raise IndexError(f'El legajo "{legajo_buscado}" posee más de una provincia asociada. \nSe debe consultar para un mes determinado usando la función: \n  obtener_provincia_mes()')
    
    return resumen1.query('legajo == @legajo_buscado').iloc[0].at['provincia']

In [None]:
def apertura_deducciones_por_mes(cuil, tipo_deduccion, mes_desde, mes_hasta, monto_mensual):
    '''
    Esta funcion devuelve un df con la cantidad de registros necesarios, segun el intervalo desde hasta (periodo) por cada deducción.
    '''
    datos_572 = [[cuil, tipo_deduccion, monto_mensual]]
    df_datos_572 = pd.DataFrame(datos_572, columns=["cuil","codigo", "importe"])

    cant_filas = (mes_hasta - mes_desde + 1)

    df_datos_572 = pd.concat([df_datos_572]*cant_filas, ignore_index=True)

    df_datos_572["mes"] = list(range(mes_desde, mes_hasta + 1))

    return df_datos_572

In [None]:
def apertura_cargas_por_mes(cuil, tipo_deduccion, mes_desde, mes_hasta, parentesco, porcentaje_deduccion):
    '''
    Esta funcion devuelve un df con la cantidad de registros necesarios, segun el intervalo desde hasta (periodo) por cada deducción.
    '''
    print(cuil)
    legajo = obtener_legajo(cuil)

    if legajo not in LEGAJOS_MULTIPROVINCIA:

        provincia = obtener_provincia(legajo)
        tipo_mni = 'especial' if provincia in provincias_especiales else 'normal'
        montos_seg_zona = montos_cargas[tipo_mni]

        monto_mensual = montos_seg_zona[parentesco] * porcentaje_deduccion / 100

        return apertura_deducciones_por_mes(cuil, tipo_deduccion, mes_desde, mes_hasta, monto_mensual)

    else:

        df_out = pd.DataFrame()
        for mes in range(mes_desde, mes_hasta + 1):
            
            provincia = obtener_provincia_mes(legajo, mes)
            tipo_mni = 'especial' if provincia in provincias_especiales else 'normal'
            montos_seg_zona = montos_cargas[tipo_mni]

            monto_mensual = montos_seg_zona[parentesco] * porcentaje_deduccion / 100

            df_mes = apertura_deducciones_por_mes(cuil, tipo_deduccion, mes, mes, monto_mensual)
            df_out = pd.concat([df_out, df_mes])
        
        return df_out 


In [None]:
def cargas_por_mes(xml):

    prefijo = "Q-CargFam-"

    empleado = xml["presentacion"]["empleado"]["cuit"]
    cargas = xml["presentacion"]["cargasFamilia"]["cargaFamilia"]
    df_por_mes_cargas = pd.DataFrame([], columns=["cuil", "codigo", "importe", "mes"])
     
    if isinstance(cargas, dict):
        cargas = [cargas]

    for i in cargas:
        parentesco = int(i["parentesco"])
        tipo_deduccion_carga = i["parentesco"]
        porcentaje_deduccion = int(i["porcentajeDeduccion"])
        mes_desde = int(i["mesDesde"])
        mes_hasta = int(i["mesHasta"])

        df = apertura_cargas_por_mes(empleado, prefijo + tipo_deduccion_carga, mes_desde, mes_hasta, parentesco, porcentaje_deduccion)    
        df_por_mes_cargas = pd.concat([df_por_mes_cargas, df], ignore_index=True)
        
    return(df_por_mes_cargas)

In [None]:
CODIGOS_DEDUCC_MENSUAL = [1, 3, 4, 5, 8, 11, 22, 32] #solamente dejar los que necesito que se procesen mensualmente

def deducciones_totales_por_mes(xml):

    prefijo = "Q-Deduc-"

    empleado = xml["presentacion"]["empleado"]["cuit"]
    deducciones = xml["presentacion"]["deducciones"]["deduccion"]
    df_por_mes = pd.DataFrame([], columns=["cuil", "codigo", "importe", "mes"])
    
    if isinstance(deducciones, dict):
        deducciones = [deducciones]
    
    for i in deducciones:
        tipo_deduccion = int(i["@tipo"])
        if tipo_deduccion in CODIGOS_DEDUCC_MENSUAL:
            periodo = i["periodos"]["periodo"]
            
            #compruebo si es un diccionario y lo paso a listsa
            if isinstance(periodo, dict):
                periodo = [periodo]
            
            #para cada vaalor, determino los valores, ya sea que tengo una deduccion o muchas
            for j in periodo: 
                mes_desde = int(j["@mesDesde"])
                mes_hasta = int(j["@mesHasta"])
                monto_mensual = float(j["@montoMensual"])
                df = apertura_deducciones_por_mes(empleado, prefijo + str(tipo_deduccion), mes_desde, mes_hasta, monto_mensual)
                df_por_mes = pd.concat([df_por_mes, df], ignore_index=True)            
        else:
            monto_anual = float(i["montoTotal"])
            df = apertura_deducciones_por_mes(empleado, prefijo + str(tipo_deduccion), 13, 13, monto_anual)
            df_por_mes = pd.concat([df_por_mes, df], ignore_index=True)

    return df_por_mes

In [None]:
def df_otro_empleador(xml):

    prefijo = 'Q-OE-'

    empleado = xml["presentacion"]["empleado"]["cuit"]
    ingresos_oe = xml["presentacion"]["ganLiqOtrosEmpEnt"]["empEnt"]["ingresosAportes"]["ingAp"]
    df_otro_empleador = pd.DataFrame([], columns=["cuil", "codigo", "importe", "mes"])

    if isinstance(ingresos_oe, dict):
        ingresos_oe = [ingresos_oe]

    for mes in ingresos_oe:
        for k in list(mes.keys()):
            if k == "@mes":
                continue 
            valor = float(mes[k])
            if valor != 0:
                datos_df_oe = [empleado, prefijo + k, valor, int(mes["@mes"])]
                df_oe_new = pd.DataFrame([datos_df_oe], columns=["cuil", "codigo", "importe", "mes"])
                df_otro_empleador = pd.concat([df_otro_empleador, df_oe_new], ignore_index=True)
        
    return(df_otro_empleador)

In [None]:
def df_ret_per_pago(xml):

    prefijo = 'Q-RetPerPagCu-'

    empleado = xml["presentacion"]["empleado"]["cuit"]
    rpp = xml["presentacion"]["retPerPagos"]["retPerPago"]
    df_por_rrp = pd.DataFrame([], columns=["cuil", "codigo", "importe", "mes"])
    
    if isinstance(rpp, dict):
        rpp = [rpp]
    
    for i in rpp:
        tipo_deduccion = int(i["@tipo"])            
        monto_mensual = float(i["montoTotal"])
        datos_df_rpp = [empleado, prefijo + str(tipo_deduccion), monto_mensual, 13]
        df = pd.DataFrame([datos_df_rpp], columns=["cuil", "codigo", "importe", "mes"])
        df_por_rrp = pd.concat([df_por_rrp, df], ignore_index=True) 

    return df_por_rrp

In [None]:
carpeta_xml = "../bases-ganancias-2022/in/formularios572/"

In [None]:
CUILES_NO_PROCESAR = []
archivo_cuiles = list(CUILES['cuil'])

for f in listdir(carpeta_xml):
    cuil = int(f[0:11])
    if cuil not in archivo_cuiles:
        CUILES_NO_PROCESAR.append(cuil)

CUILES_NO_PROCESAR

In [None]:
from helpers.progressbar import ProgressBar
def deducciones_finales(carpeta):

    listdir(carpeta_xml)
    df_deducciones = pd.DataFrame()

    p = ProgressBar(len(listdir(carpeta_xml)), segmentos=120)
    for a in listdir(carpeta_xml):
        path = carpeta_xml + "\\" + a

        with open(path, encoding='utf-8') as f:
            xml = xmltodict.parse(f.read())

            cuit = xml['presentacion']['empleado']['cuit']

            if int(cuit) in CUILES_NO_PROCESAR:
                print('Cuit salteado:', cuit)
                p.next()
                continue

            if "deducciones" in xml["presentacion"]:
                df_deducc_mes_por_empleado = deducciones_totales_por_mes(xml)
                df_deducciones = pd.concat([df_deducc_mes_por_empleado, df_deducciones], ignore_index=True)

            if "cargasFamilia" in xml["presentacion"]:
                df_cargas_por_empleado_mes = cargas_por_mes(xml)
                df_deducciones = pd.concat([df_cargas_por_empleado_mes, df_deducciones], ignore_index=True)

            if "ganLiqOtrosEmpEnt" in xml["presentacion"]:
                df_ing_aport_otro_empleador = df_otro_empleador(xml)
                df_deducciones = pd.concat([df_ing_aport_otro_empleador, df_deducciones], ignore_index=True)

            if "retPerPagos" in xml["presentacion"]:
                df_rpp = df_ret_per_pago(xml)
                df_deducciones= pd.concat([df_rpp, df_deducciones], ignore_index=True)

        p.next()
                
    return(df_deducciones)


In [None]:
df = deducciones_finales(carpeta_xml)

In [None]:
df.to_clipboard()

In [None]:
df.to_excel("deducciones_572web.xlsx")

In [None]:
codigos = list(df['codigo'].unique())
codigos.sort()
for c in codigos:
    print(c)

In [None]:
df.to_parquet('../bases-ganancias-2022/middle/572web.parquet', compression='brotli')