### Union de data

In [34]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
import pyspark
import pandas as pd
import warnings
warnings.filterwarnings('ignore', category=DeprecationWarning)
from pyspark.sql.types import StringType,TimestampType
import matplotlib.pyplot as plt
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [35]:
ss_name = 'Agregación de labels a contaminados'
wg_conn = "spark.kerberos.access.hadoopFileSystems"
db_conn = "abfs://data@datalakesii.dfs.core.windows.net/"

spark = SparkSession.builder \
      .appName(f"Ejecucion algoritmo {ss_name}")  \
      .config(wg_conn, db_conn) \
      .config("spark.executor.memory", "6g") \
      .config("spark.driver.memory", "12g")\
      .config("spark.executor.cores", "4") \
      .config("spark.executor.instances", "10") \
      .config("spark.driver.maxResultSize", "12g") \
      .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

spark.conf.set("spark.sql.parquet.enableVectorizedReader","false")
spark.conf.set("spark.sql.parquet.int96RebaseModeInRead", "CORRECTED")
spark.conf.set("spark.sql.parquet.int96RebaseModeInWrite", "CORRECTED")
spark.conf.set("spark.sql.parquet.datetimeRebaseModeInRead", "CORRECTED")
spark.conf.set("spark.sql.parquet.datetimeRebaseModeInWrite", "CORRECTED")

In [36]:
louvain= spark.read.csv("/home/cdsw/data/lovain_agrupacion/louvain_all.csv", header=True, inferSchema=True)
louvain.columns

                                                                                

KeyboardInterrupt: 

In [None]:
louvain.count()

In [None]:
contaminacion = spark.read.parquet("abfs://data@datalakesii.dfs.core.windows.net/DatosOrigen/lr-629/propuesta_f29_comunidades/data_contaminados_with_labels")
contaminacion.columns

In [None]:
join_spark_louvain = louvain.join(contaminacion, louvain["RUT_UNICO"] == contaminacion["cont_rut"], how='left')
join_spark_louvain = join_spark_louvain.drop("cont_rut")
join_spark_louvain.columns

In [None]:
join_spark_louvain.count()

In [None]:
oscuridad= spark.read.parquet("abfs://data@datalakesii.dfs.core.windows.net/DatoOrigen/lr-629/Oscuridad/final/oscuridad")
oscuridad.columns

In [None]:
resultado_intermedio = join_spark_louvain.join(oscuridad, join_spark_louvain["RUT_UNICO"] == oscuridad["CONT_RUT"], how='left')
resultado_intermedio  = resultado_intermedio.drop("CONT_RUT")
resultado_intermedio.columns

In [None]:
resultado_intermedio.count()

In [None]:
familiaridad= spark.read.parquet("abfs://data@datalakesii.dfs.core.windows.net/DatoOrigen/lr-629/Oscuridad/final/familiaridad")
familiaridad.columns

In [None]:
familiaridad.select("RUT_SOCIEDAD").count()

In [None]:
df_spark= resultado_intermedio.join(familiaridad,resultado_intermedio["RUT_UNICO"]==familiaridad["RUT_SOCIEDAD"],how='left')
df_spark  = df_spark.drop("RUT_SOCIEDAD")
df_spark.columns

In [None]:
df_spark.count()

### Se agrega cantidad de trabajadores

In [None]:
# Paso 1: Cargar el archivo CSV que contiene la información de trabajadores
df_trabajadores = spark.read.csv("/home/cdsw/data/trabajadores/trabajadores_last_declaration.csv", header=True, inferSchema=True)

# Verificar la estructura del DataFrame de trabajadores
df_trabajadores.printSchema()

df_trabajadores = df_trabajadores.drop("_c0")
# Mostrar una muestra de los datos de trabajadores
df_trabajadores.show(5)

# Paso 2: Unir df_spark con df_trabajadores utilizando la columna CONT_RUT en ambos DataFrames
df_spark_completo = df_spark.join(df_trabajadores, df_spark["RUT_UNICO"]==df_trabajadores["CONT_RUT"], how="left")
df_spark_completo = df_spark_completo.drop("CONT_RUT")
# Paso 3: Visualizar el DataFrame resultante
df_spark_completo.columns

df_spark_completo.count()

### Se agregan los patrimonios

In [None]:
spark.read.parquet("abfs://data@datalakesii.dfs.core.windows.net/DatoOrigen/lr-629/Agrupacion_empresas_similares/patrimonio/patrimonios_incompletos").createOrReplaceTempView("pat_2022_incompleto")
spark.sql('select  RUT_SOCIEDAD as CONT_RUT, SUM(PAT_2022) as PAT_2022 from pat_2022_incompleto group by RUT_SOCIEDAD').createOrReplaceTempView("pat_2022_incompleto")

In [None]:
# Realizar la unión de df_spark_completo con pat_2022_incompleto
df_spark_con_pat = df_spark_completo.join(
    spark.table("pat_2022_incompleto"),  # Usamos la tabla temporal creada
    df_spark_completo["RUT_UNICO"] == spark.table("pat_2022_incompleto")["CONT_RUT"],
    how="left"  # Unión por la columna CONT_RUT, y 'left' para mantener todas las filas de df_spark_completo
)
df_spark_con_pat.drop("cont_rut")
df_spark_con_pat.printSchema()
df_spark_con_pat.count()

In [None]:
# Seleccionar y renombrar las columnas
df_contribuyentes = df_spark_con_pat.select(
    "RUT_UNICO", "SOCIEDAD", "CONTADOR","NATURAL","REPRESENTANTE",
    df_spark_con_pat["score"].alias("Contaminacion"),
    df_spark_con_pat["Total_pago_f29"],
    df_spark_con_pat["IVA_neto"],
    df_spark_con_pat["Unidad_regional"],
    df_spark_con_pat["n_documentos"].alias("Numero_documentos"),
    df_spark_con_pat["lifetime"].alias("Tiempo_de_vida_dias"),
    df_spark_con_pat["Alerta_inicial"],
    df_spark_con_pat["Value"].alias("Oscuridad"),
    df_spark_con_pat["FAMILIARES"].alias("Familiares"),
    df_spark_con_pat["TOTAL"].alias("Personas_naturales_socios"),
    df_spark_con_pat["TASA_FAMILIARIDAD"].alias("Tasa_familiaridad"),
    df_spark_con_pat["COM_INICIAL"].alias("Grupo_economico_inicial"),
    df_spark_con_pat["`comunidad_0.109257`"].alias("Grupo_louvain"),
    df_spark_con_pat["cantidad_trabajadores_honorarios"].alias("Trabajadores_honorarios"),
    df_spark_con_pat["cantidad_trabajadores_dependientes"].alias("Trabajadores_dependientes"),
    df_spark_con_pat["PAT_2022"].alias("Patrimonio_2022")
)

In [None]:
df_contribuyentes.count()

### Ventas a regiones extremas

In [None]:
#INICIALMENTE A INCLUIR PERO DEBIDO A PROBLEMAS DE PROCESAMIENTO NO INCLUIDO EN LA VERSION FINAL

# Crear la vista temporal a partir del dataframe df_contribuyentes
"""df_contribuyentes.createOrReplaceTempView("contribuyentes")"""

# Realizar la consulta seleccionando solo los dhdr_rut_emisor que están en RUT_UNICO de df_contribuyentes
"""spark.sql('''
SELECT d.dhdr_rut_emisor, d.dhdr_cmna_origen
FROM dwbgdata.header_dte_consolidada_enc_sas_analitica d
INNER JOIN contribuyentes c
ON d.dhdr_rut_emisor = c.RUT_UNICO
''').createOrReplaceTempView("documentos_filtrados")"""


In [None]:
"""
spark.sql('select dhdr_rut_emisor,dhdr_cmna_origen from documentos_filtrados').createOrReplaceTempView("emitidos")
spark.sql('select * from libsdf.DICCIONARIO_CMNA_DTE_ARFI').createOrReplaceTempView("comunas")
spark.sql('select dhdr_rut_emisor,comuna_obtenida from emitidos left join comunas on emitidos.dhdr_cmna_origen=comunas.dhdr_dir_origen').createOrReplaceTempView("puntos_venta")

spark.sql('select dhdr_rut_emisor, comuna_obtenida, count(*) as c  from puntos_venta where dhdr_rut_emisor is not null group by dhdr_rut_emisor,comuna_obtenida').createOrReplaceTempView("puntos_venta")

spark.sql('select dhdr_rut_emisor,REPLACE(comuna_obtenida,"�", "N") AS comuna_obtenida from puntos_venta').createOrReplaceTempView("puntos_venta")

spark.sql('select COMU_DES_COMUNA, COMU_DES_REGION from dw.dim_comuna').createOrReplaceTempView("comunas")

spark.sql('select  dhdr_rut_emisor,comuna_obtenida,COMU_DES_COMUNA, COMU_DES_REGION from puntos_venta left join comunas on puntos_venta.comuna_obtenida=comunas.COMU_DES_COMUNA Where comuna_obtenida is not null and COMU_DES_COMUNA IS not  NULL').createOrReplaceTempView("puntos_venta_final")

df=spark.sql('select * from puntos_venta_final')
"""

In [None]:
# Definir los patrones de las regiones extremas
# I region de Tarapaca
# XV reion de Arica y Parinacota
# XI regino de Aysen
#XII region de Magallanes
"""
extreme_regions_norte = ["XV R", "I RE"]
extreme_regions_sur = ["XI R","XII "]

# Añadir una nueva columna con los primeros tres caracteres de la región
df = df.withColumn("region_prefix", substring(col("COMU_DES_REGION"), 1, 4))

# Clasificar las regiones
df = df.withColumn(
    "extremo_norte",
    when(col("region_prefix").isin(extreme_regions_norte), 1).otherwise(0)
)
df = df.withColumn(
    "extremo_sur",
    when(col("region_prefix").isin(extreme_regions_sur), 1).otherwise(0)
)

"""

In [None]:
# Agrupar por dhdr_rut_emisor y calcular la suma de extremo_norte y extremo_sur
"""df_grouped = df.groupBy("dhdr_rut_emisor").agg(
    sum("extremo_norte").alias("total_extremo_norte"),
    sum("extremo_sur").alias("total_extremo_sur")
)

# Crear una columna adicional que indique si distribuye al extremo norte y al extremo sur
df_extremos = df_grouped.withColumn(
    "distribuye_extremos",
    when((col("total_extremo_norte") > 0) & (col("total_extremo_sur") > 0), "si").otherwise("no")
)
"""

In [None]:
"""df_extremos.count()"""

In [None]:
"""df_extremos.write.mode('overwrite').format("parquet").save("abfs://data@datalakesii.dfs.core.windows.net/DatosOrigen/lr-629/riesgo_fraude/proyecto_visualizacion_racionales_positivos/data_puntos_venta")
df_extremos=spark.read.parquet("abfs://data@datalakesii.dfs.core.windows.net/DatosOrigen/lr-629/riesgo_fraude/proyecto_visualizacion_racionales_positivos/data_puntos_venta")
df_extremos.count()"""

In [None]:
# Realizar el left join entre df_contribuyentes y df_extremos en base a las columnas especificadas
"""df_final = df_contribuyentes.join(
    df_extremos.select('dhdr_rut_emisor','distribuye_extremos'),  # Seleccionamos las columnas de df_extremos
    df_contribuyentes['RUT_UNICO'] == df_extremos['dhdr_rut_emisor'],  # Condición del join
    how='left'  # Especificamos que sea un left join
)


# Eliminar la columna 'dhdr_rut_emisor' del resultado final, ya que no la deseas en el DataFrame final
df_final = df_final.drop(df_extremos['dhdr_rut_emisor'])"""

In [None]:
"""df_final.columns"""

In [None]:
"""df_final.count()"""

### Remanente contaminado

In [None]:
remanente=spark.read.parquet("abfs://data@datalakesii.dfs.core.windows.net/DatosOrigen/lr-629/riesgo_fraude/remanente_contaminado/remanente_hist_updated")


In [None]:
remanente.columns

In [None]:
# Definir una ventana particionada por "receptor" y ordenada por "dcv_ptributario" en orden descendente
window_spec = Window.partitionBy("receptor").orderBy(F.desc("dcv_ptributario"))

# Agregar una columna de rank que identifique el valor máximo por receptor
remanente_con_rank = remanente.withColumn("rank", F.rank().over(window_spec))

# Filtrar para quedarse solo con el valor máximo de "dcv_ptributario" para cada "receptor"
resultado_max = remanente_con_rank.filter(F.col("rank") == 1)

# Seleccionar las columnas deseadas: "dcv_ptributario", "receptor" y "remanente_cont"
remanente_final = resultado_max.select("dcv_ptributario", "receptor", "remanente_cont")

In [None]:
# Realizar el join entre df_contribuyentes y remanente_final en las columnas RUT_UNICO y receptor
joined_df = df_contribuyentes.join(remanente_final, df_contribuyentes["RUT_UNICO"] == remanente_final["receptor"], "left")

# Renombrar las columnas 'dcv_ptributario' a 'ultimo_periodo_remanente_contaminado'
# y mantener 'remanente_cont' sin cambios
joined_df_renamed = joined_df.withColumnRenamed("dcv_ptributario", "ultimo_periodo_remanente_contaminado")

# Eliminar la columna 'receptor' ya que no se quiere en el resultado final
final_df = joined_df_renamed.drop("receptor")

In [None]:
final_df.createOrReplaceTempView('contribuyentes')
final_df.write.mode('overwrite').format("parquet").save("abfs://data@datalakesii.dfs.core.windows.net/DatosOrigen/lr-629/riesgo_fraude/proyecto_visualizacion_racionales_positivos/data_contribuyentes")

### Calculo de flujo de IVA intra y extragrupo para cada contribuyente

In [None]:
spark.read.parquet("abfs://data@datalakesii.dfs.core.windows.net/DatosOrigen/LibSDF/JBA_ARCOS_E").createOrReplaceTempView("arcos")
spark.sql("SELECT PARU_RUT_E0, PARU_RUT_E2, Monto_IVA FROM arcos where Monto_IVA>0 order by PARU_RUT_E2 asc").createOrReplaceTempView("arcos")

In [None]:

spark.sql('''
    SELECT 
        arcos.PARU_RUT_E0 AS emisor,
        c1.grupo_louvain AS comunidad_emisor,
        arcos.PARU_RUT_E2 AS receptor,
        c2.grupo_louvain AS comunidad_receptor,
        arcos.Monto_IVA
    FROM arcos
    LEFT JOIN contribuyentes AS c1 ON arcos.PARU_RUT_E0 = c1.RUT_UNICO
    LEFT JOIN contribuyentes AS c2 ON arcos.PARU_RUT_E2 = c2.RUT_UNICO
    WHERE c1.grupo_louvain IS NOT NULL OR c2.grupo_louvain IS NOT NULL
''').createOrReplaceTempView('arcos')


In [None]:
resultados_emisor = spark.sql("""
    SELECT 
        emisor,
        SUM(CASE 
                WHEN comunidad_emisor = comunidad_receptor THEN Monto_IVA 
                ELSE 0 
            END) AS total_iva_intragrupo,
        SUM(CASE 
                WHEN comunidad_emisor <> comunidad_receptor AND comunidad_receptor IS NOT NULL THEN Monto_IVA 
                ELSE 0 
            END) AS total_iva_extragrupo,
        COUNT(CASE 
                WHEN comunidad_emisor = comunidad_receptor THEN 1 
                ELSE NULL 
            END) AS arcos_intragrupo_emisor,
        COUNT(CASE 
                WHEN comunidad_emisor <> comunidad_receptor AND comunidad_receptor IS NOT NULL THEN 1 
                ELSE NULL 
            END) AS arcos_extragrupo_emisor
    FROM 
        arcos
    WHERE 
        comunidad_emisor IS NOT NULL OR comunidad_receptor IS NOT NULL
    GROUP BY 
        emisor
""")

resultados_emisor.createOrReplaceTempView('resultados_emisor')


In [None]:
resultados_receptor = spark.sql("""
    SELECT 
        receptor,
        SUM(CASE 
                WHEN comunidad_receptor = comunidad_emisor THEN Monto_IVA 
                ELSE 0 
            END) AS total_iva_intragrupo_receptor,
        SUM(CASE 
                WHEN comunidad_receptor <> comunidad_emisor AND comunidad_emisor IS NOT NULL THEN Monto_IVA 
                ELSE 0 
            END) AS total_iva_extragrupo_receptor,
        COUNT(CASE 
                WHEN comunidad_receptor <> comunidad_emisor AND comunidad_emisor IS NOT NULL 
                THEN 1 
                ELSE NULL 
            END) AS arcos_extragrupo_receptor,
        COUNT(CASE 
                WHEN comunidad_receptor = comunidad_emisor THEN 1 
                ELSE NULL 
            END) AS arcos_intragrupo_receptor
    FROM 
        arcos
    WHERE 
        comunidad_receptor IS NOT NULL OR comunidad_emisor IS NOT NULL
    GROUP BY 
        receptor
""")


# Muestra el resultado para el receptor
resultados_receptor.createOrReplaceTempView('resultados_receptor')

In [None]:
resultados_final = spark.sql("""
    SELECT 
        COALESCE(e.emisor, r.receptor) AS contribuyente,
        COALESCE(e.total_iva_intragrupo, 0) AS total_iva_intragrupo_emisor,
        COALESCE(e.total_iva_extragrupo, 0) AS total_iva_extragrupo_emisor,
        COALESCE(r.total_iva_intragrupo_receptor, 0) AS total_iva_intragrupo_receptor,
        COALESCE(r.total_iva_extragrupo_receptor, 0) AS total_iva_extragrupo_receptor,
        arcos_intragrupo_emisor,
        arcos_extragrupo_emisor,
        arcos_intragrupo_receptor,
        arcos_extragrupo_receptor
        
    FROM 
        (SELECT emisor, total_iva_intragrupo, total_iva_extragrupo,arcos_intragrupo_emisor,arcos_extragrupo_emisor FROM resultados_emisor) e
    FULL OUTER JOIN 
        (SELECT receptor, total_iva_intragrupo_receptor, total_iva_extragrupo_receptor,arcos_intragrupo_receptor,arcos_extragrupo_receptor FROM resultados_receptor) r
    ON e.emisor = r.receptor
""")

# Muestra el resultado final
resultados_final.limit(10).show()
resultados_final.write.mode('overwrite').format("parquet").save("abfs://data@datalakesii.dfs.core.windows.net/DatosOrigen/lr-629/riesgo_fraude/proyecto_visualizacion_racionales_positivos/data_iva_contribuyentes")

### Calculo de flujo de IVA intra y extragrupo total para comunidad

In [None]:
# Emisión intra-grupo
spark.sql("""
    SELECT comunidad_emisor AS comunidad, SUM(Monto_IVA) AS emision_intragrupo, COUNT(*) AS arcos_emision_intra
    FROM arcos
    WHERE comunidad_emisor = comunidad_receptor AND comunidad_emisor IS NOT NULL
    GROUP BY comunidad_emisor
    ORDER BY comunidad_emisor ASC
""").createOrReplaceTempView('emision_intra')

# Emisión extra-grupo
spark.sql("""
    SELECT comunidad_emisor AS comunidad, SUM(Monto_IVA) AS emision_extragrupo, COUNT(*) AS arcos_emision_extra
    FROM arcos
    WHERE comunidad_emisor <> comunidad_receptor AND comunidad_emisor IS NOT NULL
    GROUP BY comunidad_emisor
    ORDER BY comunidad_emisor ASC
""").createOrReplaceTempView('emision_extra')

# Recepción intra-grupo
spark.sql("""
    SELECT comunidad_receptor AS comunidad, SUM(Monto_IVA) AS recepcion_intragrupo, COUNT(*) AS arcos_reception_intra
    FROM arcos
    WHERE comunidad_emisor = comunidad_receptor AND comunidad_receptor IS NOT NULL
    GROUP BY comunidad_receptor
    ORDER BY comunidad_receptor ASC
""").createOrReplaceTempView('recepcion_intra')

# Recepción extra-grupo
spark.sql("""
    SELECT comunidad_receptor AS comunidad, SUM(Monto_IVA) AS recepcion_extragrupo, COUNT(*) AS arcos_reception_extra
    FROM arcos
    WHERE comunidad_emisor <> comunidad_receptor AND comunidad_receptor IS NOT NULL
    GROUP BY comunidad_receptor
    ORDER BY comunidad_receptor ASC
""").createOrReplaceTempView('recepcion_extra')

# Unir emisión intra y extra en una sola tabla
spark.sql("""
    SELECT COALESCE(emision_intra.comunidad, emision_extra.comunidad) AS com, 
           emision_intragrupo, emision_extragrupo, arcos_emision_intra, arcos_emision_extra
    FROM emision_intra
    FULL OUTER JOIN emision_extra ON emision_extra.comunidad = emision_intra.comunidad
""").createOrReplaceTempView('emision')

# Unir recepción intra y extra en una sola tabla
spark.sql("""
    SELECT COALESCE(recepcion_intra.comunidad, recepcion_extra.comunidad) AS com, 
           recepcion_intragrupo, recepcion_extragrupo, arcos_reception_intra as arcos_recepcion_intra, arcos_reception_extra as arcos_recepcion_extra
    FROM recepcion_intra
    FULL OUTER JOIN recepcion_extra ON recepcion_extra.comunidad = recepcion_intra.comunidad
    ORDER BY recepcion_intra.comunidad ASC
""").createOrReplaceTempView('recepcion')

# Unir las tablas de emisión y recepción
spark.sql("""
    SELECT COALESCE(emision.com, recepcion.com) AS com, 
           emision_intragrupo, emision_extragrupo, 
           recepcion_intragrupo, recepcion_extragrupo,
           arcos_emision_intra, arcos_emision_extra, 
           arcos_recepcion_intra, arcos_recepcion_extra
    FROM emision
    FULL OUTER JOIN recepcion ON emision.com = recepcion.com
""").createOrReplaceTempView('iva_comunidad')

# Cálculos adicionales
iva_comunidad=spark.sql("""
    SELECT com, 
           (emision_extragrupo / (emision_intragrupo + emision_extragrupo)) * 100 AS perct_emision_extra,
           (recepcion_extragrupo / (recepcion_intragrupo + recepcion_extragrupo)) * 100 AS perct_recepcion_extra,
           emision_extragrupo, recepcion_extragrupo,
           emision_intragrupo, recepcion_intragrupo,
           emision_extragrupo / emision_intragrupo AS tasa_emision_extra_intra,
           arcos_emision_intra, arcos_emision_extra,
           arcos_recepcion_intra, arcos_recepcion_extra
    FROM iva_comunidad
""")


### Se agrega tamanio de cada comunidad

In [None]:
# Paso 1: Contar el tamaño de cada comunidad en la tabla `contribuyentes`
grupo_tamanio = df_contribuyentes.groupBy("grupo_louvain").count().withColumnRenamed("count", "tamanio_comunidad")

# Paso 2: Unir la información del tamaño de comunidad con la tabla IVA_comunidad
iva_comunidad_con_tamanio = iva_comunidad.join(grupo_tamanio, iva_comunidad["com"] == grupo_tamanio["grupo_louvain"], how="left")

In [None]:
iva_comunidad_con_tamanio.write.mode('overwrite').format("parquet").save("abfs://data@datalakesii.dfs.core.windows.net/DatosOrigen/lr-629/riesgo_fraude/proyecto_visualizacion_racionales_positivos/data_iva_dashboard")

In [None]:
spark.stop()