# Data documentos (obtencion de muestra estratificada))

En este notebook se obtiene caracteristicas unicas documentos electronicos tipo 33 previo a la union con la data de cada contribuyente. 

In [1]:
##Se importan packages necesarios
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
import pyspark
import pandas as pd
import warnings
warnings.filterwarnings('ignore', category=DeprecationWarning)
from pyspark.sql.types import StringType,TimestampType
import matplotlib.pyplot as plt
from pyspark.sql import functions as F

In [2]:
#inicio de sesion en spark
ss_name = 'Lectura de datos Dashboard'
wg_conn = "spark.kerberos.access.hadoopFileSystems"
db_conn = "abfs://data@datalakesii.dfs.core.windows.net/"

spark = SparkSession.builder \
      .appName(f"Ejecucion algoritmo {ss_name}")  \
      .config(wg_conn, db_conn) \
      .config("spark.executor.memory", "6g") \
      .config("spark.driver.memory", "12g")\
      .config("spark.executor.cores", "4") \
      .config("spark.executor.instances", "5") \
      .config("spark.driver.maxResultSize", "12g") \
      .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

spark.conf.set("spark.sql.parquet.enableVectorizedReader","false")
spark.conf.set("spark.sql.parquet.int96RebaseModeInRead", "CORRECTED")
spark.conf.set("spark.sql.parquet.int96RebaseModeInWrite", "CORRECTED")
spark.conf.set("spark.sql.parquet.datetimeRebaseModeInRead", "CORRECTED")
spark.conf.set("spark.sql.parquet.datetimeRebaseModeInWrite", "CORRECTED")
spark.conf.set("spark.sql.debug.maxToStringFields", "2000")
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

Setting spark.hadoop.yarn.resourcemanager.principal to hvega.externo


### Documentos electronicos tipo 33 y caracteristicas

In [3]:
# Seleccionamos de la dte emiso, receptor, folio, monto total, hora y fecha de emision respectivas

dte=spark.sql("select dhdr_folio,dtdc_codigo,dhdr_fch_emis, dhdr_rut_emisor,dhdr_dv_emisor,dhdr_rut_recep,dhdr_dv_recep,dhdr_mnt_total,dhdr_iva,dhdr_tmst_firma from DWBGDATA.HEADER_DTE_CONSOLIDADA_ENC_SAS_ANALITICA where dtdc_codigo=33")

Hive Session ID = 2e0460e2-370c-447c-bcf6-b24f83f33797


## Muestreo de ultimos meses

In [4]:
# 2. Obtener la última fecha de emisión
ultima_fecha_emision = dte.agg(F.max("dhdr_fch_emis")).collect()[0][0]

meses_antes = F.add_months(F.lit(ultima_fecha_emision), -3)

# 4. Filtrar el DataFrame 
dte = dte.filter(dte["dhdr_fch_emis"] >= meses_antes)


                                                                                

In [5]:
dte.columns

['dhdr_folio',
 'dtdc_codigo',
 'dhdr_fch_emis',
 'dhdr_rut_emisor',
 'dhdr_dv_emisor',
 'dhdr_rut_recep',
 'dhdr_dv_recep',
 'dhdr_mnt_total',
 'dhdr_iva',
 'dhdr_tmst_firma']

## Agregar variables de promedio y desviacion estandar de montos e iva para emisor y receptor en el periodo estudiado

Se agrega el monto total de emision para el emisor respectivo y el monto total recibido para ese receptor, ambos calculos realizados en la ventana de tiempo correspondiente.

In [6]:
# Agregar estadísticas para emis_CONT_RUT
agg_emisor = dte.groupBy("dhdr_rut_emisor").agg(
    F.avg("dhdr_mnt_total").alias("avg_dhdr_mnt_total_emisor"),
    F.stddev("dhdr_mnt_total").alias("stddev_dhdr_mnt_total_emisor"),
    F.avg("dhdr_iva").alias("avg_dhdr_iva_emisor"),
    F.stddev("dhdr_iva").alias("stddev_dhdr_iva_emisor")
)

# Agregar estadísticas para recep_CONT_RUT
agg_receptor = dte.groupBy("dhdr_rut_recep").agg(
    F.avg("dhdr_mnt_total").alias("avg_dhdr_mnt_total_receptor"),
    F.stddev("dhdr_mnt_total").alias("stddev_dhdr_mnt_total_receptor"),
    F.avg("dhdr_iva").alias("avg_dhdr_iva_receptor"),
    F.stddev("dhdr_iva").alias("stddev_dhdr_iva_receptor")
)

In [7]:
# Unir las estadísticas agregadas para emis_CONT_RUT
dte= dte.join(agg_emisor, on="dhdr_rut_emisor", how="left")

# Unir las estadísticas agregadas para recep_CONT_RUT
dte = dte.join(agg_receptor, on="dhdr_rut_recep", how="left")


In [8]:
from pyspark.sql import functions as F

dte = dte.withColumn("anio", F.year("dhdr_tmst_firma")) \
         .withColumn("mes", F.month("dhdr_tmst_firma")) \
         .withColumn("dia", F.dayofmonth("dhdr_tmst_firma")) \
         .withColumn("hora", F.hour("dhdr_tmst_firma")) \
         .withColumn("es_fin_de_semana", 
             F.when(F.date_format("dhdr_tmst_firma", "u").cast("int").isin([6, 7]), 1).otherwise(0)) \
         .withColumn("bloque_horario", 
             F.when((F.col("hora") >= 0) & (F.col("hora") < 6), "Madrugada")
              .when((F.col("hora") >= 6) & (F.col("hora") < 12), "Mañana")
              .when((F.col("hora") >= 12) & (F.col("hora") < 19), "Tarde")
              .otherwise("Noche")) \
         .withColumn("dia_semana", F.dayofweek("dhdr_tmst_firma")) \
         .withColumn("semana_mes", 
             (F.dayofmonth("dhdr_tmst_firma") - 1) / 7 + 1)


In [9]:
dte=dte.drop('dhdr_tmst_firma','dhdr_fch_emis')

In [10]:
dte.columns

['dhdr_rut_recep',
 'dhdr_rut_emisor',
 'dhdr_folio',
 'dtdc_codigo',
 'dhdr_dv_emisor',
 'dhdr_dv_recep',
 'dhdr_mnt_total',
 'dhdr_iva',
 'avg_dhdr_mnt_total_emisor',
 'stddev_dhdr_mnt_total_emisor',
 'avg_dhdr_iva_emisor',
 'stddev_dhdr_iva_emisor',
 'avg_dhdr_mnt_total_receptor',
 'stddev_dhdr_mnt_total_receptor',
 'avg_dhdr_iva_receptor',
 'stddev_dhdr_iva_receptor',
 'anio',
 'mes',
 'dia',
 'hora',
 'es_fin_de_semana',
 'bloque_horario',
 'dia_semana',
 'semana_mes']

## Sample estratificado con consideracion de tipo de contribuyente

In [11]:
# Ruta donde está el DataFrame de contribuyentes en formato Parquet
ruta_contribuyentes = "abfs://data@datalakesii.dfs.core.windows.net/DatosOrigen/lr-629/APA/Analisis_factura/data_contribuyentes"

# Leer el DataFrame desde la ruta especificada
contribuyentes = spark.read.format("parquet").load(ruta_contribuyentes)


In [12]:
contribuyentes.filter(F.col('CONT_RUT').isNull()).count()

0

In [13]:
recuento = contribuyentes.groupBy("ES_PERSONA", "ES_EMPRESA").count()
# Mostrar el resultado
recuento.show()



+----------+--------------------+--------+
|ES_PERSONA|          ES_EMPRESA|   count|
+----------+--------------------+--------+
|      null|Segmento Pequeñas...|  296711|
|      null|                null|  145603|
|         1|                null|25381961|
|      null|Segmento Micro Em...| 3290283|
|      null|Segmento Grandes ...|   76036|
|      null|Segmento Medianas...|   50209|
+----------+--------------------+--------+



                                                                                

In [14]:
contribuyentes = contribuyentes.withColumn(
    "tipo_contribuyente",
    F.when(F.col("ES_EMPRESA").isNotNull(), F.col("ES_EMPRESA"))
     .when(F.col("ES_PERSONA").isNotNull(), "Persona")
)

In [15]:
contribuyentes.groupBy("tipo_contribuyente").count().show()



+--------------------+--------+
|  tipo_contribuyente|   count|
+--------------------+--------+
|Segmento Pequeñas...|  296711|
|                null|  145603|
|Segmento Micro Em...| 3290283|
|Segmento Grandes ...|   76036|
|Segmento Medianas...|   50209|
|             Persona|25381961|
+--------------------+--------+



                                                                                

In [16]:
# 1. Realizar el left join para incluir la columna 'tipo_contribuyente' en el DataFrame de emisores
dte_contribuyentes_emisor = dte.join(
    contribuyentes,
    (dte["dhdr_rut_emisor"] == contribuyentes["CONT_RUT"]) &
    (dte["dhdr_dv_emisor"] == contribuyentes["CONT_DV"]),
    how="left"
)

# 2. Seleccionar las columnas necesarias y manejar valores nulos en 'tipo_contribuyente'
resultado_final_emisor = dte_contribuyentes_emisor.select(
    dte["*"],  # Todas las columnas de dte
    contribuyentes["tipo_contribuyente"]  # La nueva columna
)

resultado_final_emisor = resultado_final_emisor.withColumn(
    "tipo_contribuyente",
    F.when(F.col("tipo_contribuyente").isNull(), "indefinido")
     .otherwise(F.col("tipo_contribuyente"))
)

# 3. Contar los valores únicos de 'CONT_RUT' por 'tipo_contribuyente'
conteo_por_clase_emisor = resultado_final_emisor.groupBy("tipo_contribuyente") \
                                   .agg(F.countDistinct("dhdr_rut_emisor").alias("num_cont_rut_unicos"))

# Mostrar el conteo de valores únicos por clase
conteo_por_clase_emisor.show()

# 4. Determinar el tamaño mínimo para el muestreo equilibrado
min_tamano = conteo_por_clase_emisor.agg(F.min("num_cont_rut_unicos")).first()[0]

# 5. Calcular la fracción de muestreo para cada clase
fracciones_emisor = conteo_por_clase_emisor.select(
    "tipo_contribuyente",
    (F.lit(min_tamano) / F.col("num_cont_rut_unicos")).alias("fraccion")
).rdd.collectAsMap()

# 6. Realizar el muestreo equilibrado utilizando las fracciones calculadas
dataset_final_emisor = resultado_final_emisor.sampleBy("tipo_contribuyente", fracciones_emisor, seed=42)

# 7. Contar el tamaño final del dataset muestreado
#print(f"Tamaño del dataset final calibración emisor: {dataset_final_emisor.count()}")

# 8. Eliminar la columna 'tipo_contribuyente' para el dataset final
dataset_final_emisor = dataset_final_emisor.drop('tipo_contribuyente')

                                                                                ]]]

+--------------------+-------------------+
|  tipo_contribuyente|num_cont_rut_unicos|
+--------------------+-------------------+
|Segmento Pequeñas...|             174939|
|Segmento Micro Em...|             299706|
|          indefinido|               1431|
|Segmento Grandes ...|              15913|
|Segmento Medianas...|              31860|
|             Persona|                341|
+--------------------+-------------------+



                                                                                0]]]

In [17]:
# 2. Hacer el left join para hacer un sample de receptores
dte_contribuyentes_recep = dte.join(
    contribuyentes,
    (dte["dhdr_rut_recep"] == contribuyentes["CONT_RUT"]) &
    (dte["dhdr_dv_recep"] == contribuyentes["CONT_DV"]),
    how="left"
)

resultado_final_recep = dte_contribuyentes_recep.select(
    dte["*"],  # Todas las columnas de dte
    contribuyentes["tipo_contribuyente"]  # La nueva columna
)

resultado_final_recep = resultado_final_recep.withColumn(
    "tipo_contribuyente",
    F.when(F.col("tipo_contribuyente").isNull(), "indefinido")
     .otherwise(F.col("tipo_contribuyente"))
)

# 1. Contar los valores únicos de CONT_RUT por tipo_contribuyente
conteo_por_clase_recep = resultado_final_recep.groupBy("tipo_contribuyente") \
                                   .agg(F.countDistinct("dhdr_rut_emisor").alias("num_cont_rut_unicos"))

# Mostrar el resultado
conteo_por_clase_recep.show()

# 3. Determinar el tamaño mínimo para el muestreo equilibrado
min_tamano = conteo_por_clase_recep.agg(F.min("num_cont_rut_unicos")).first()[0]

# 4. Calcular la fracción de muestreo para cada clase
fracciones_recep = conteo_por_clase_recep.select(
    "tipo_contribuyente",
    (F.lit(min_tamano) / F.col("num_cont_rut_unicos")).alias("fraccion")
).rdd.collectAsMap()

# 5. Realizar el muestreo equilibrado
dataset_final_recep = resultado_final_recep.sampleBy("tipo_contribuyente", fracciones_recep, seed=42)

# 6. Contar el tamaño final del dataset muestreado
#print(f"Tamaño del dataset fina calibracion emisor: {dataset_final_recep.count()}")
dataset_final_recep=dataset_final_recep.drop('tipo_contribuyente')

                                                                                200]]]

+--------------------+-------------------+
|  tipo_contribuyente|num_cont_rut_unicos|
+--------------------+-------------------+
|Segmento Pequeñas...|             309750|
|Segmento Micro Em...|             256233|
|          indefinido|              17308|
|Segmento Grandes ...|             324865|
|Segmento Medianas...|             250545|
|             Persona|              72126|
+--------------------+-------------------+



                                                                                200]]]

In [18]:
# Supongamos que ya tienes dataset_final_emisor y dataset_final_recep

# 1. Obtener el tamaño de ambos datasets
size_emisor = dataset_final_emisor.count()
size_recep = dataset_final_recep.count()

# 2. Identificar el dataset más grande y el más pequeño
if size_emisor > size_recep:
    # Resampleamos el dataset_emisor para que tenga el mismo tamaño que el dataset_recep
    fraccion_emisor = size_recep / size_emisor
    dataset_final_emisor_resampled = dataset_final_emisor.sample(withReplacement=False, fraction=fraccion_emisor, seed=42)
    dataset_final_recep_resampled = dataset_final_recep
else:
    # Resampleamos el dataset_recep para que tenga el mismo tamaño que el dataset_emisor
    fraccion_recep = size_emisor / size_recep
    dataset_final_recep_resampled = dataset_final_recep.sample(withReplacement=False, fraction=fraccion_recep, seed=42)
    dataset_final_emisor_resampled = dataset_final_emisor

# 3. Realizamos la unión de los dos datasets (ahora de tamaño similar)
dataset_final = dataset_final_emisor_resampled.union(dataset_final_recep_resampled)

# Eliminar filas duplicadas
dataset_final = dataset_final.dropDuplicates()

# 4. Contamos el tamaño final del dataset combinado
#print(f"Tamaño del dataset final combinado: {dataset_final.count()}")


                                                                                 200]]

In [19]:
print(f"Tamaño del dataset final combinado: {dataset_final.count()}")

[Stage 314:(158 + 14) / 200][Stage 315:(138 + 14) / 200][Stage 323:(366 + 4) / 383]]]]

Tamaño del dataset final combinado: 3450678


                                                                                

In [20]:

# Se guarda el archivo final en el datalake. 
dataset_final .write.mode('overwrite').format("parquet").save("abfs://data@datalakesii.dfs.core.windows.net/DatosOrigen/lr-629/APA/Analisis_factura/dtes_muestra_estratificada")


24/11/08 00:00:24 441 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from /10.244.29.15:44834 is closed
[Stage 343:(4590 + -46) / 4544][Stage 350:(166 + -4) / 162][Stage 363:> (0 + 1) / 72]]]

------------------------------------------------------------------------------------------------------------------------

## Cesión de documentos tributarios

## Completar

In [21]:
doct=spark.table("hivenom.csn_doctos_final")
ces=spark.table("hivenom.csn_cesion_final")

In [22]:
# Realiza el inner join
dte_cesion = dte.join(
    doct,  # Asumiendo que ya tienes el DataFrame 'ces'
    (dte.dhdr_folio == doct.rdoc_folio) &
    (dte.dhdr_rut_emisor == doct.rdoc_rut_emisor_e) &
    (dte.dhdr_dv_emisor == doct.rdoc_dv_emisor),
    "left"
).select(
    dte.dhdr_folio,
    dte.dtdc_codigo,
    dte.dhdr_rut_emisor,
    dte.dhdr_dv_emisor,
    dte.dhdr_rut_recep,
    dte.dhdr_dv_recep,
    dte.dhdr_mnt_total,
    dte.dhdr_iva,
    dte.dhdr_fch_emis,  # Asegúrate de que esta columna exista en dte
    *doct.columns # Selecciona todas las columnas de 'ces'
)

# Agrega la columna 'cedido' para indicar si hay cruce
dte_cesion = dte_cesion.withColumn(
    "cedido",
    F.when(doct.rdoc_folio.isNotNull(), "Sí").otherwise("No")
)



AttributeError: 'DataFrame' object has no attribute 'dhdr_fch_emis'

In [None]:
dte_cesion.columns

In [None]:
# Agrupa por las columnas originales de dte y cuenta las veces que ha sido cedido
result = dte_cesion.groupBy(
    "dhdr_folio",
    "dtdc_codigo",
    "dhdr_fch_emis",
    "dhdr_rut_emisor",
    "dhdr_dv_emisor",
    "dhdr_rut_recep",
    "dhdr_dv_recep",
    "dhdr_mnt_total",
    "dhdr_iva"
).agg(
    F.count(F.when(F.col("cedido") == "Sí", 1)).alias("veces_cedido")  # Cuenta solo los cedidos
)

In [None]:
result.show()

In [None]:
# Realiza el left join con dte_cesion
dte_cesion_det = dte_cesion.join(
    ces,  # Asumiendo que ya tienes el DataFrame 'ces'
    dte_cesion.rdoc_codigo == ces.rdoc_codigo,  # Asegúrate de que esta columna exista
    "left"  # Cambiado a left join
).select(
    dte_cesion.dhdr_folio,
    dte_cesion.dhdr_rut_emisor,
    dte_cesion.dhdr_rut_recep
)

# Muestra el resultado (opcional)
dte_cesion_det.show()

In [None]:
result.show()

## Obtencion de documentos de RCV

In [None]:
rcv=spark.table("DWBGDATA.DCV_GENERIC_DET_CONSOLIDADO_SAS")

## Cruce con registros de RCV

In [None]:
# Realiza el left join
union= dte.join(
    rcv,
    (dte.dhdr_folio == rcv.det_folio_doc_ref) &
    (dte.dhdr_rut_emisor ==rcv.dcv_rut_emisor_e) &
    (dte.dhdr_rut_recep == rcv.det_rut_doc_e),
    "left"
)

# Agrega una columna para indicar si hubo cruce
union = union.withColumn(
    "cruce_rcv",
    F.when(rcv.det_folio_doc_ref.isNotNull(), "Sí").otherwise("No")
)


In [None]:
# Selecciona todas las columnas originales de dte y las columnas específicas de rcv
df_final = union.select(
    dte.dhdr_folio,
    dte.dtdc_codigo,
    dte.dhdr_fch_emis,
    dte.dhdr_rut_emisor,
    dte.dhdr_dv_emisor,
    dte.dhdr_rut_recep,
    dte.dhdr_dv_recep,
    dte.dhdr_mnt_total,
    dte.dhdr_iva,
    rcv.det_emisor_nota,
    rcv.det_fch_doc,
    rcv.det_fec_creacion,
    rcv.tipo_transaccion,
    F.when(rcv.det_folio_doc_ref.isNotNull(), "Sí").otherwise("No").alias("cruce_rcv")
)

# Muestra el resultado
df_final.show()


### Cesion de documentos

In [23]:
spark.stop()