## Agregacion de labels a datos de entidades contaminadas

En este notebook se agregaran los labels a las entidades tributarias contaminadas.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
import pyspark
import pandas as pd
import warnings
warnings.filterwarnings('ignore', category=DeprecationWarning)
from pyspark.sql.types import StringType,TimestampType
from pyspark_dist_explore import hist
import matplotlib.pyplot as plt

In [2]:
spark = SparkSession.builder \
  .appName("Test")  \
  .config("spark.yarn.access.hadoopFileSystems","abfs://data@datalakesii.dfs.core.windows.net/") \
  .config("spark.executor.memory", "24g") \
  .config("spark.driver.memory", "12g")\
  .config("spark.executor.cores", "12") \
  .config("spark.executor.instances", "24") \
  .config("spark.driver.maxResultSize", "12g") \
  .getOrCreate()

warnings.filterwarnings('ignore', category=DeprecationWarning)
sc=spark.sparkContext
sc.setLogLevel ('ERROR')
spark.conf.set("spark.sql.parquet.int96RebaseModeInRead", "CORRECTED")


24/04/15 19:25:40 WARN SparkConf: The configuration key 'spark.yarn.access.hadoopFileSystems' has been deprecated as of Spark 3.0 and may be removed in the future. Please use the new key 'spark.kerberos.access.hadoopFileSystems' instead.
24/04/15 19:25:40 WARN SparkConf: The configuration key 'spark.yarn.access.hadoopFileSystems' has been deprecated as of Spark 3.0 and may be removed in the future. Please use the new key 'spark.kerberos.access.hadoopFileSystems' instead.
Setting spark.hadoop.yarn.resourcemanager.principal to hvega.externo
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/15 19:25:40 WARN SparkConf: The configuration key 'spark.yarn.access.hadoopFileSystems' has been deprecated as of Spark 3.0 and may be removed in the future. Please use the new key 'spark.kerberos.access.hadoopFileSystems' instead.
24/04/1

## Lectura de datos de contaminacion

In [3]:
spark.read.options(header=True,inferSchema=True,delimiter=",").csv("/home/cdsw/data/processed/contaminados_processed_iva_representante_total.csv").createOrReplaceTempView("contaminados_total")
spark.sql('select * from contaminados_total').createOrReplaceTempView("contaminados")
spark.sql('select count(*) from contaminados_total').show()




+--------+
|count(1)|
+--------+
| 4336704|
+--------+



                                                                                

## Agregacion de labels

Junto con la contaminacion, agregaremos mas labels para cada nodo, con el fin de caracterizar cada grupo cuando se ejecuten los algoritmos de comunidad. 

1.-Valor de contaminacion, que se obtuvo de la propagacion.

2.-Cuanto ha pagado el contribuyente (código 89 del f29) desde el 2020 a la fecha.

3.-Cuanto compra o vende el contribuyente (total neto) desde el 202 a la fecha.

4.-Informacion geografica referida a la oficina que corresponde a cada contribuyente. 

5.-Edad del contribuyente (numero de documentos o fecha de constitucion) medida en dias. 

## Cuanto ha pagado el contribuyente (codigo 89 del formulario 29)

In [4]:
spark.read.parquet("abfs://data@datalakesii.dfs.core.windows.net/DatosOrigen/DW/DW_TRN_F29_E").createOrReplaceTempView("tmp_f29")
spark.sql('select count(distinct(CONT_RUT)) from tmp_f29').show()




+------------------------+
|count(DISTINCT CONT_RUT)|
+------------------------+
|                 2867844|
+------------------------+



                                                                                

In [5]:
#se utiliza codigo de validez=1 que corresponde a los formularios validos.
spark.sql("select CONT_RUT,F29_C_89,CONCAT(CAST(F29_AGNO_MES_TRIBUTARIO_VO AS VARCHAR(10) ),'01') as fecha,TIVA_COD_VALIDEZ from tmp_f29 where TIVA_COD_VALIDEZ=1").createOrReplaceTempView("tmp_f29")
df=spark.sql("select * from tmp_f29")
df=df.withColumn("fecha",to_timestamp(col("fecha").cast("string"), "yyyyMMdd"))
df.schema
df.createOrReplaceTempView("tmp_f29")
spark.sql("select * from tmp_f29 where fecha >= TO_TIMESTAMP( '2020-01-01', 'yyyy-mm-dd' ) order by cont_rut asc").createOrReplaceTempView("tmp_f29")
spark.sql("select * from tmp_f29").show()
spark.sql("select CONT_RUT,sum(F29_C_89) as total_pago_f29 from tmp_f29  group by CONT_RUT").createOrReplaceTempView("tmp_f29")
#spark.sql("select count(distinct(CONT_RUT)) from tmp_f29 where total_pago_f29 is not null ").show()

                                                                                

+--------------------+--------+-------------------+----------------+
|            CONT_RUT|F29_C_89|              fecha|TIVA_COD_VALIDEZ|
+--------------------+--------+-------------------+----------------+
|+++4/3jzUwtDPSSo3...|     0.0|2020-08-01 00:00:00|               1|
|+++4/3jzUwtDPSSo3...|     0.0|2020-09-01 00:00:00|               1|
|+++4/3jzUwtDPSSo3...|     0.0|2022-11-01 00:00:00|               1|
|+++4/3jzUwtDPSSo3...|     0.0|2023-01-01 00:00:00|               1|
|+++4/3jzUwtDPSSo3...|     0.0|2022-09-01 00:00:00|               1|
|+++4/3jzUwtDPSSo3...|     0.0|2022-12-01 00:00:00|               1|
|+++4/3jzUwtDPSSo3...|     0.0|2021-09-01 00:00:00|               1|
|+++4/3jzUwtDPSSo3...|     0.0|2021-08-01 00:00:00|               1|
|+++4/3jzUwtDPSSo3...|     0.0|2022-03-01 00:00:00|               1|
|+++4/3jzUwtDPSSo3...|     0.0|2022-08-01 00:00:00|               1|
|+++4/3jzUwtDPSSo3...|     0.0|2023-06-01 00:00:00|               1|
|+++4/3jzUwtDPSSo3...|     0.0|202

In [6]:
spark.sql("select contaminados.cont_rut, score, total_pago_f29 from contaminados left join tmp_f29 on contaminados.cont_rut=tmp_f29.CONT_RUT").createOrReplaceTempView("contaminados")

In [7]:
#spark.sql("select count(*) from contaminados").show()

## Cuanto compra o vende el contribuyente (IVA_neto)

In [8]:
spark.sql("select dhdr_rut_emisor,dhdr_rut_recep,dhdr_iva,dhdr_mnt_total,dhdr_fch_emis_int from dwbgdata.header_dte_consolidada_enc_sas_analitica where (dtdc_codigo ='33' or dtdc_codigo='34')").createOrReplaceTempView("consolidada")
df=spark.sql("select * from consolidada")
df=df.withColumn("dhdr_fch_emis_int",to_timestamp(col("dhdr_fch_emis_int").cast("string"), "yyyyMMdd"))
df.createOrReplaceTempView("consolidada")
#spark.sql("select count(*) from contaminados").show()

Hive Session ID = 01e9018f-781e-4b51-92e0-6ac297bfee68


In [9]:
spark.sql("select * from consolidada where dhdr_fch_emis_int >= TO_TIMESTAMP( '2020-01-01', 'yyyy-mm-dd')").createOrReplaceTempView("consolidada")

spark.sql("select dhdr_rut_emisor, sum(dhdr_iva) as iva_emitido from consolidada group by dhdr_rut_emisor").createOrReplaceTempView("emisor")
spark.sql("select dhdr_rut_recep, sum(dhdr_iva) as iva_recibido from consolidada group by dhdr_rut_recep").createOrReplaceTempView("receptor")

spark.sql("select * from emisor full outer join receptor on emisor.dhdr_rut_emisor=receptor.dhdr_rut_recep").createOrReplaceTempView("consolidada")
spark.sql("select case when dhdr_rut_emisor is null then dhdr_rut_recep else dhdr_rut_emisor end as cont_rut, case when iva_emitido is null then 0 else iva_emitido end as iva_emitido, case when iva_recibido is null then 0 else iva_recibido end as iva_recibido from consolidada").createOrReplaceTempView("consolidada")

spark.sql("select cont_rut, (iva_recibido-iva_emitido) as iva_neto from consolidada").createOrReplaceTempView("consolidada")
#spark.sql("select count(*) from consolidada").show()

#Cuando se hace alguna factura, el emisor debe pagar el IVA. Cuando se recibe una factura, el receptor retiene el IVA. 
# por lo que el valor neto del iva recibido es iva_recibido-iva_emitido

spark.sql("select contaminados.cont_rut as cont_rut, score, total_pago_f29,consolidada.iva_neto as IVA_neto from contaminados left join consolidada on contaminados.cont_rut=consolidada.CONT_RUT").createOrReplaceTempView("contaminados")

#spark.sql("select count(*) from contaminados").show()

## Informacion geografica

In [10]:
#spark.sql("select * from dw.dw_trn_negocios_e").show()
spark.sql("select CONT_RUT, UNOP_DES_REGIONAL,NEGO_FECHA_VIGENCIA from dw.dw_trn_negocios_e  left join dw.dw_dim_unidad_operativa on dw.dw_trn_negocios_e.UNOP_UNIDAD= dw.dw_dim_unidad_operativa.UNOP_UNIDAD").createOrReplaceTempView("geo")
spark.sql("WITH RankedData AS (SELECT CONT_RUT, UNOP_DES_REGIONAL,NEGO_FECHA_VIGENCIA, ROW_NUMBER() OVER (PARTITION BY CONT_RUT ORDER BY NEGO_FECHA_VIGENCIA DESC) AS RowNum FROM geo TuTabla) SELECT CONT_RUT, UNOP_DES_REGIONAL,NEGO_FECHA_VIGENCIA FROM RankedData WHERE RowNum = 1").createOrReplaceTempView("geo")
#spark.sql("select * from geo").show()

In [11]:
spark.sql("select  contaminados.cont_rut as cont_rut, score, total_pago_f29,IVA_neto,UNOP_DES_REGIONAL as unidad_regional from contaminados left join  geo on contaminados.cont_rut=geo.CONT_RUT").createOrReplaceTempView("contaminados")

In [12]:
#spark.sql("select count(*) from contaminados").show()

## Documentos emitidos

In [13]:
spark.sql("select dhdr_rut_emisor, count(*) as n_documentos from dwbgdata.header_dte_consolidada_enc_sas_analitica where (dtdc_codigo ='33' or dtdc_codigo='34') group by dhdr_rut_emisor").createOrReplaceTempView("emitidos")

In [14]:
#spark.sql("select count(distinct(dhdr_rut_recep)) from  dwbgdata.header_dte_consolidada_enc_sas_analitica where (dtdc_codigo ='33' or dtdc_codigo='34')").show()

In [15]:
#spark.sql("select count(*) from emitidos").show()

In [16]:
spark.sql("select  contaminados.cont_rut, score, total_pago_f29,IVA_neto,unidad_regional, n_documentos from contaminados left join emitidos on contaminados.cont_rut=emitidos.dhdr_rut_emisor").createOrReplaceTempView("contaminados")
#spark.sql("select count(*) from contaminados").show()

## Duracion de la vida del contribuyente (en dias)

In [17]:
spark.read.parquet("abfs://data@datalakesii.dfs.core.windows.net/DatosOrigen/DW/DW_TRN_CONTRIBUYENTES_E").createOrReplaceTempView("contribuyente")
#spark.sql('select count(*) from contribuyente where (cont_fecha_constitucion_vo is null or cont_fecha_constitucion_vo> NOW())').show()
#spark.sql('select count(*) from contribuyente where (cont_fecha_constitucion_vo is not null or cont_fecha_constitucion_vo<= NOW())').show()
#spark.sql('select cont_rut, count(*) as c from contribuyente group by cont_rut').createOrReplaceTempView("contribuyente")
#spark.sql('select cont_rut, c from contribuyente order by c desc').show()
spark.sql("select cont_rut,cont_fecha_constitucion_vo, case when (cont_fecha_constitucion_vo is null or cont_fecha_constitucion_vo> NOW()) then NULL else DATEDIFF( NOW(),cont_fecha_constitucion_vo) end as lifetime from dw.dw_trn_contribuyentes_e order by cont_fecha_constitucion_vo desc ").createOrReplaceTempView("aux")


In [18]:
spark.sql('select * from aux where lifetime is not null').show()



+--------------------+--------------------------+--------+
|            cont_rut|cont_fecha_constitucion_vo|lifetime|
+--------------------+--------------------------+--------+
|xWHI3pWiVUy93+tLZ...|       2024-03-25 03:00:00|      21|
|lxBxF+gczV1MOGIgZ...|       2024-03-25 03:00:00|      21|
|y+/AAozWrn7mF8KLW...|       2024-03-25 03:00:00|      21|
|3itfDixx663/fPkHj...|       2024-03-25 03:00:00|      21|
|HIJn1q5OKNDG70MIc...|       2024-03-25 03:00:00|      21|
|pYndYDFhoDRPw3nUU...|       2024-03-25 03:00:00|      21|
|5bpfGujgG2FooWCGW...|       2024-03-25 03:00:00|      21|
|IPBKjLZju85eOmyX2...|       2024-03-25 03:00:00|      21|
|FtzmlZ+LHfnXfWz5Z...|       2024-03-25 03:00:00|      21|
|YmY0VS/Yh58Ekjjf/...|       2024-03-25 03:00:00|      21|
|uujQqyoN8AqFDf7Rl...|       2024-03-25 03:00:00|      21|
|RzxB+Jyh0Hj8mh3MB...|       2024-03-25 03:00:00|      21|
|IUh3ah7y1gdpzyYD5...|       2024-03-25 03:00:00|      21|
|Ar+iHTwh1V//K2wW4...|       2024-03-25 03:00:00|      2

                                                                                

In [19]:
spark.sql("select cont_rut, case when (cont_fecha_constitucion_vo is null or cont_fecha_constitucion_vo> NOW()) then NULL else DATEDIFF( NOW(),cont_fecha_constitucion_vo) end as lifetime from dw.dw_trn_contribuyentes_e order by cont_fecha_constitucion_vo desc ").createOrReplaceTempView("lifetime")
spark.sql("select  contaminados.cont_rut, score,  total_pago_f29,IVA_neto,unidad_regional, n_documentos,lifetime from contaminados left join lifetime on contaminados.cont_rut=lifetime.cont_rut").createOrReplaceTempView("contaminados")
#spark.sql("select count(*) from contaminados where lifetime is not null").show()

## Contaminacion previa

In [20]:
spark.read.options(header=True,inferSchema=True,delimiter=",").csv("/home/cdsw/data/processed/contaminados.csv").createOrReplaceTempView("contaminados_inicio")

                                                                                

In [21]:
#spark.sql("select count(*) from contaminados_inicio").show()

In [22]:
spark.sql("select contaminados.cont_rut, contaminados.score,  total_pago_f29,IVA_neto,unidad_regional, n_documentos,lifetime , case when contaminados_inicio.cont_rut is null then 0 else 1 end as alerta_inicial from contaminados left join contaminados_inicio on contaminados.cont_rut=contaminados_inicio.cont_rut").createOrReplaceTempView("contaminados")

## Guardamos la data previo a la busqueda de comunidades

In [23]:


contaminados=spark.sql("select * from contaminados")
contaminados.write.mode('overwrite').format("parquet").save("abfs://data@datalakesii.dfs.core.windows.net/DatosOrigen/lr-629/propuesta_f29_comunidades/data_contaminados_with_labels")

contaminados=contaminados.toPandas()
#guardamos los valores de contaminacion con las columnas adicionales
contaminados.to_csv('/home/cdsw/data/processed/contaminados_processed_iva_representante_total_with_labels.csv', index=False)


                                                                                ]]]]]