## Calculo del numero de trabajadores para los ultimos periodos tributarios

 ### Se crea sesion de Spark

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pandas as pd
import numpy as np
import time
import os
from pyspark.sql import functions as F
from pyspark.sql.functions import col, expr, col, concat, lit
from pyspark.sql.types import IntegerType
import warnings
warnings.filterwarnings("ignore")
warnings.filterwarnings("ignore", category=UserWarning, module="pyspark")

from sklearn.linear_model import LogisticRegression, SGDClassifier
from sklearn.ensemble import RandomForestClassifier
from joblib import dump, load
from sklearn.preprocessing import MinMaxScaler

In [4]:
spark = SparkSession.builder \
    .appName("Test") \
    .config("spark.yarn.access.hadoopFileSystems","abfs://data@datalakesii.dfs.core.windows.net/") \
    .master("local[*]") \
    .getOrCreate()

spark.conf.set("spark.sql.parquet.enableVectorizedReader","false")

24/04/18 02:54:19 WARN SparkConf: The configuration key 'spark.yarn.access.hadoopFileSystems' has been deprecated as of Spark 3.0 and may be removed in the future. Please use the new key 'spark.kerberos.access.hadoopFileSystems' instead.
24/04/18 02:54:20 WARN SparkConf: The configuration key 'spark.yarn.access.hadoopFileSystems' has been deprecated as of Spark 3.0 and may be removed in the future. Please use the new key 'spark.kerberos.access.hadoopFileSystems' instead.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/18 02:54:20 WARN SparkConf: The configuration key 'spark.yarn.access.hadoopFileSystems' has been deprecated as of Spark 3.0 and may be removed in the future. Please use the new key 'spark.kerberos.access.hadoopFileSystems' instead.
24/04/18 02:54:20 WARN SparkConf: The configuration key 'spark.yarn.access.h

## Calculo de trabajadores segun DJ1887 y DJ1879

### Cantidad de trabajadores con contrato y honorarios de la última declaración (más rentas)

#### DJ1887

Se cargan las DJ1887 para los periodos de estudio (en este caso son las declaraciones asociada a los anio 2020, 2021 y 2022.

In [5]:
dj1887 = spark.read.parquet("abfs://data@datalakesii.dfs.core.windows.net/DatosOrigen/DW/DW_TRN_DJR_F1887_E")
dj1887 = dj1887.filter((dj1887.PERI_AGNO_MES_TRIBUTARIO == 202000) | (dj1887.PERI_AGNO_MES_TRIBUTARIO == 202100) | (dj1887.PERI_AGNO_MES_TRIBUTARIO == 202200))

                                                                                

Se agrupa por rut y periodo, y se calculan las sumas de la rentas de los empleados y la cantidad de estos.

In [6]:
cols = ['CONT_RUT','CONT_DV','PERI_AGNO_MES_TRIBUTARIO','F1887_RENTA_TOTAL_EXENTA','F1887_RENTA_NETA','F1887_IMPTO_RETE','F1887_MAYOR_RETE','F1887_RENTA_EXEN','F1887_REBAJA_ZON','F1887_INGRESO_ENE_SA_C37','F1887_INGRESO_FEB_SA_C38','F1887_INGRESO_MAR_SA_C39','F1887_INGRESO_ABR_SA_C40','F1887_INGRESO_MAY_SA_C41','F1887_INGRESO_JUN_SA_C42','F1887_INGRESO_JUL_SA_C43','F1887_INGRESO_AGO_SA_C44','F1887_INGRESO_SEP_SA_C45','F1887_INGRESO_OCT_SA_C46','F1887_INGRESO_NOV_SA_C47','F1887_INGRESO_DIC_SA_C48']
rentas = dj1887.select(cols).groupBy(cols[:3]).sum('F1887_RENTA_TOTAL_EXENTA','F1887_RENTA_NETA','F1887_IMPTO_RETE','F1887_MAYOR_RETE','F1887_RENTA_EXEN','F1887_REBAJA_ZON','F1887_INGRESO_ENE_SA_C37','F1887_INGRESO_FEB_SA_C38','F1887_INGRESO_MAR_SA_C39','F1887_INGRESO_ABR_SA_C40','F1887_INGRESO_MAY_SA_C41','F1887_INGRESO_JUN_SA_C42','F1887_INGRESO_JUL_SA_C43','F1887_INGRESO_AGO_SA_C44','F1887_INGRESO_SEP_SA_C45','F1887_INGRESO_OCT_SA_C46','F1887_INGRESO_NOV_SA_C47','F1887_INGRESO_DIC_SA_C48').toPandas()
count_trabajadores = dj1887.groupBy(cols[:3]).agg(countDistinct('CONT_RUT_INFO').alias("cantidad_trabajadores_dependientes")).toPandas()

                                                                                

Obtenemos un unico dataframe para calcular el numero de trabajadores con la suma de las rentas por campo

In [7]:
df_count_rentas = count_trabajadores.merge(rentas, how='left', on=['CONT_RUT','CONT_DV','PERI_AGNO_MES_TRIBUTARIO'])

In [20]:
df_count_rentas

Unnamed: 0,CONT_RUT,CONT_DV,PERI_AGNO_MES_TRIBUTARIO,cantidad_trabajadores_dependientes,sum(F1887_RENTA_TOTAL_EXENTA),sum(F1887_RENTA_NETA),sum(F1887_IMPTO_RETE),sum(F1887_MAYOR_RETE),sum(F1887_RENTA_EXEN),sum(F1887_REBAJA_ZON),...,sum(F1887_INGRESO_MAR_SA_C39),sum(F1887_INGRESO_ABR_SA_C40),sum(F1887_INGRESO_MAY_SA_C41),sum(F1887_INGRESO_JUN_SA_C42),sum(F1887_INGRESO_JUL_SA_C43),sum(F1887_INGRESO_AGO_SA_C44),sum(F1887_INGRESO_SEP_SA_C45),sum(F1887_INGRESO_OCT_SA_C46),sum(F1887_INGRESO_NOV_SA_C47),sum(F1887_INGRESO_DIC_SA_C48)
0,99CbDFIrQY8OlcggvWfLWA==,1,202000,7,0,24435557,152930,0,25514464,0,...,,,,,,,,,,
1,1wnCd8jt7YReILfIUea5lA==,9,202000,48,0,254673509,3198233,0,11981109,0,...,,,,,,,,,,
2,gt/4j3OaP8caoTXo9uOBdQ==,8,202000,7,0,18579713,0,0,0,0,...,,,,,,,,,,
3,8/0MZDiTigJ/gJIThpxFvQ==,5,202000,27,0,220275725,6149288,0,27768789,0,...,,,,,,,,,,
4,8ratwuLD0N+52/fR0z2mTg==,9,202000,11,12376511,46896115,533678,0,0,0,...,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1173915,JJBk9da8db66bujuZeNV3Q==,0,202100,1,0,3970534,0,0,0,0,...,326749.0,326749.0,326749.0,326749.0,326749.0,326749.0,326749.0,326749.0,345100.0,350606.0
1173916,pkn/O7KQY/uCt+UvVCUkEg==,0,202100,1,0,7208910,0,0,4251450,0,...,593712.0,593512.0,593456.0,593500.0,593500.0,593536.0,593475.0,593194.0,592782.0,592696.0
1173917,0wdZrHeSqv7EKr7YJs/raQ==,1,202000,1,0,2957654,0,0,0,0,...,,,,,,,,,,
1173918,X+wusbzKwcoOTTTR5xuJPw==,4,202200,0,0,4215217,0,0,0,0,...,333478.0,333478.0,333478.0,333478.0,344202.0,344202.0,344202.0,344202.0,344202.0,344202.0


Se selecciona la última declaración

In [26]:
# calculamos el ultimo periodo disponible para cada rut
dj1887_last = df_count_rentas.sort_values(by=['CONT_RUT','CONT_DV','PERI_AGNO_MES_TRIBUTARIO'], ascending=False) \
                                    .groupby(['CONT_RUT','CONT_DV'])\
                                    .first()


In [27]:
dj1887_last=dj1887_last.reset_index(drop=False)
dj1887_last

Unnamed: 0,CONT_RUT,CONT_DV,PERI_AGNO_MES_TRIBUTARIO,cantidad_trabajadores_dependientes,sum(F1887_RENTA_TOTAL_EXENTA),sum(F1887_RENTA_NETA),sum(F1887_IMPTO_RETE),sum(F1887_MAYOR_RETE),sum(F1887_RENTA_EXEN),sum(F1887_REBAJA_ZON),...,sum(F1887_INGRESO_MAR_SA_C39),sum(F1887_INGRESO_ABR_SA_C40),sum(F1887_INGRESO_MAY_SA_C41),sum(F1887_INGRESO_JUN_SA_C42),sum(F1887_INGRESO_JUL_SA_C43),sum(F1887_INGRESO_AGO_SA_C44),sum(F1887_INGRESO_SEP_SA_C45),sum(F1887_INGRESO_OCT_SA_C46),sum(F1887_INGRESO_NOV_SA_C47),sum(F1887_INGRESO_DIC_SA_C48)
0,+++4/3jzUwtDPSSo3PxUgQ==,K,202200,3,0,100335428,11057741,0,2974910,0,...,7964654.0,8131831.0,8131398.0,8168393.0,8168732.0,7591093.0,8166825.0,8162064.0,8153792.0,8199533.0
1,+++VKgYcn1igYZjkTHXjgA==,5,202200,1,0,1306000,0,0,0,0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2,++/ILpGAKOZrH+0u+L1tdw==,1,202000,11,0,2025493,0,0,0,0,...,,,,,,,,,,
3,++/UTl2mwL/J484yKcuZrg==,0,202200,2,0,20241701,245967,0,0,0,...,1726589.0,1720037.0,1713484.0,1708570.0,1708570.0,1693826.0,1688912.0,1667617.0,1646321.0,1638130.0
4,++/dFaxQQDYaKe8zO/rKoQ==,2,202200,7,0,11172701,0,0,0,0,...,858125.0,858125.0,858125.0,1325312.0,1292500.0,1713750.0,1713750.0,842500.0,842500.0,1404166.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
533479,zzxiVVlRVmrHIZn8ZjPTVQ==,5,202200,23,0,72350541,362416,0,4623164,0,...,7502194.0,7006238.0,6194317.0,5810360.0,6306282.0,5773516.0,4508210.0,5056608.0,4869184.0,4859469.0
533480,zzydn/q5KUdW0PcyxWSyzg==,8,202200,2,146471,4497040,0,0,0,0,...,582420.0,338642.0,338642.0,338642.0,370784.0,349532.0,349532.0,349532.0,349532.0,349532.0
533481,zzzEfsBUrus3+LpvpwcHig==,9,202200,2,0,7414505,0,0,0,0,...,816250.0,816250.0,816250.0,842500.0,842500.0,842500.0,842500.0,842500.0,842500.0,842500.0
533482,zzzo7RUe8Wp/sBVZrqC1BQ==,9,202200,1,2348275,5534665,0,0,0,0,...,483549.0,481714.0,479820.0,478444.0,478444.0,474315.0,299527.0,466976.0,461013.0,458719.0


In [28]:
dj1887_last = dj1887_last[['CONT_RUT','cantidad_trabajadores_dependientes']]

Se guarda en un archivo .csv

In [29]:
dj1887_last.to_csv("/home/cdsw/data/processed/trabajadores/dj1887_last_period.csv")


#### DJ1879

Se cargan las DJ1879 para los periodos de estudio

In [30]:
EXAMPLE_SQL_QUERY = """
select *
from dw.dw_trn_djr_f1879_e
where PERI_AGNO_MES_TRIBUTARIO BETWEEN 202000 AND 202200
"""
dj1879 = spark.sql(EXAMPLE_SQL_QUERY)
dj1879.createOrReplaceTempView('DJ1879')


24/04/18 03:28:57 WARN HiveClientImpl: Detected HiveConf hive.execution.engine is 'tez' and will be reset to 'mr' to disable useless hive logic
Hive Session ID = 15f4c7c2-9907-4ba6-8019-1334e3e41328
24/04/18 03:29:00 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [31]:
dj1879.columns

['PERI_AGNO_MES_TRIBUTARIO',
 'HEADER_FORM_KEY_VO',
 'TIFO_COD_FORM',
 'TIFO_COD_FORM_VERSION',
 'CONT_RUT',
 'CONT_DV',
 'F1879_RUT_DECLA_VO',
 'F1879_DV_DECLA_VO',
 'CONT_RUT_INFO',
 'CONT_DV_INFO',
 'F1879_RUT_INFO_VO',
 'F1879_DV_INFO_VO',
 'F1879_RETE_HONO',
 'F1879_RETE_DIRE10',
 'F1879_RETE_DIRE20',
 'F1879_CERTIFICADO',
 'F1879_CORRELATIVO',
 'F1879_PERIODO_PAGO',
 'F1879_RETE_HONO_ARTES',
 'F1879_RETE_HONO_ISLA',
 'F1879_RUT_FNCR',
 'F1879_DV_FNCR',
 'CONT_RUT_FNCR',
 'CONT_DV_FNCR',
 'F1879_SEVE',
 'F1879_BORRA_VO',
 'F1879_ORIGEN_VO',
 'F1879_ESTADO_VX',
 'TIMO_COD_MONEDA',
 'F1879_FECHA_CONVERSION',
 'F1879_FECHA_EXTRACCION_SO',
 'F1879_PAGO_PRESTAMO_C29',
 'F1879_FECHA_CARGA_DW',
 'c33']

Se filtra para el mismo año del F29 asociado, se agrupa por rut y periodo, luego se calculan las retenciones a honorarios y la cantidad de estos.

In [32]:
EXAMPLE_SQL_QUERY = """
select CONT_RUT,
PERI_AGNO_MES_TRIBUTARIO,
count(distinct CONT_RUT_INFO) as cantidad_trabajadores_honorarios,
sum(F1879_RETE_HONO) as total_monto_honorario
from DJ1879
group by CONT_RUT,PERI_AGNO_MES_TRIBUTARIO
"""

spark.sql(EXAMPLE_SQL_QUERY).createOrReplaceTempView('DJ1879')


In [33]:
spark.sql('select * from DJ1879').show()



+--------------------+------------------------+--------------------------------+---------------------+
|            CONT_RUT|PERI_AGNO_MES_TRIBUTARIO|cantidad_trabajadores_honorarios|total_monto_honorario|
+--------------------+------------------------+--------------------------------+---------------------+
|MX00ds03bncXIVIDk...|                  202000|                              12|              2434345|
|o+FW24YXhu8O1RsL8...|                  202000|                               7|               473347|
|yi949NL99XDQj3rwY...|                  202000|                              11|              6224048|
|3zaD5DhBpmVXzIDyk...|                  202000|                               8|               449048|
|ZD/OuUJ29SunRqUMb...|                  202000|                              22|               366046|
|rvFQdRZ3RDW0WT+3u...|                  202000|                              13|              3783154|
|NJX/iehPCW07Bv12J...|                  202000|                          

                                                                                

In [41]:
dj1879_pd = spark.sql('select * from DJ1879').toPandas()


                                                                                

Seleccionamos los datos del periodo informado mas reciente en cada caso

In [39]:
# calculamos el ultimo periodo disponible para cada rut
dj1879_last = dj1879_pd.sort_values(by=['CONT_RUT', 'PERI_AGNO_MES_TRIBUTARIO'], ascending=False) \
                                    .groupby(['CONT_RUT'])\
                                    .first()

In [42]:
dj1879_last=dj1879_last.reset_index(drop=False)

In [43]:
dj1879_last= dj1879_last[['CONT_RUT', 'cantidad_trabajadores_honorarios']]

Se guarda en un archivo .csv

In [44]:
dj1879_last.to_csv("/home/cdsw/data/processed/trabajadores/dj1879_last_period.csv")

Tambien generaremos un dataset donde haya tanto el numero de trabajadores dependientes como el numero de trabajadores a honorarios 

### Dataset final

In [45]:
# Realizar un outer join en la columna 'CONT_RUT'
merged_df = dj1879_last.merge(dj1887_last, on='CONT_RUT', how='outer')

# Seleccionar las columnas de interés
columns_to_keep = ['CONT_RUT', 'cantidad_trabajadores_honorarios', 'cantidad_trabajadores_dependientes']
final_df = merged_df[columns_to_keep]
final_df = final_df.fillna(0)

In [47]:
final_df.to_csv("/home/cdsw/data/processed/trabajadores/trabajadores_last_declaration.csv")