### IVA intra grupo y extra grupo en comunidades definidas por algoritmo de Louvain


In [53]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
import pyspark
import pandas as pd
import warnings
warnings.filterwarnings('ignore', category=DeprecationWarning)

from pyspark_dist_explore import hist
import matplotlib.pyplot as plt
from pyspark.sql.types import StringType,TimestampType


In [54]:

spark = SparkSession.builder \
  .appName("Test")  \
  .config("spark.yarn.access.hadoopFileSystems","abfs://data@datalakesii.dfs.core.windows.net/") \
  .config("spark.executor.memory", "64g") \
  .config("spark.driver.memory", "12g")\
  .config("spark.executor.cores", "12") \
  .config("spark.executor.instances", "24") \
  .config("spark.driver.maxResultSize", "12g") \
  .getOrCreate()

warnings.filterwarnings('ignore', category=DeprecationWarning)
sc=spark.sparkContext
sc.setLogLevel ('ERROR')
spark.conf.set("spark.sql.parquet.int96RebaseModeInRead", "CORRECTED")



In [55]:
spark.stop

<bound method SparkSession.stop of <pyspark.sql.session.SparkSession object at 0x7fb14b93cbe0>>

Leemos la data de los arcos comerciales.

In [56]:
spark.read.parquet("abfs://data@datalakesii.dfs.core.windows.net/DatosOrigen/LibSDF/JBA_ARCOS_E").createOrReplaceTempView("arcos")
spark.sql("SELECT PARU_RUT_E0, PARU_RUT_E2, Monto_IVA FROM arcos where Monto_IVA>0 order by PARU_RUT_E2 asc").createOrReplaceTempView("arcos")

In [57]:
spark.sql('select * from arcos').show()



+--------------------+--------------------+---------+
|         PARU_RUT_E0|         PARU_RUT_E2|Monto_IVA|
+--------------------+--------------------+---------+
|NaXazswsZodNytTsS...|+++0Re2TkLe14DpcD...|     4890|
|+0yKx5GSw/Q5FoSrp...|+++4/3jzUwtDPSSo3...|    50031|
|2R4UUuZeZouDs1ML3...|+++4/3jzUwtDPSSo3...|  2670934|
|/UHFkJ2XWLMi0oOW5...|+++4/3jzUwtDPSSo3...| 20959536|
|0Mc7YtTQj2BeLrOxn...|+++4/3jzUwtDPSSo3...|    30122|
|2ekh6DZal3FYd54WD...|+++4/3jzUwtDPSSo3...|    23023|
|+t/u9Evh7+r4KNza6...|+++4/3jzUwtDPSSo3...|    23562|
|xir6OxC/tCKNSXtX6...|+++4/3jzUwtDPSSo3...|  1414873|
|s04kNuIGjOmmDq3ip...|+++4/3jzUwtDPSSo3...|   228641|
|vMIN0af1h36ymVDyc...|+++4/3jzUwtDPSSo3...|     4332|
|25elxlIB0aBbDst3o...|+++4/3jzUwtDPSSo3...|    90696|
|mNKzwe6Z5TAI7rkgK...|+++4/3jzUwtDPSSo3...|  1092499|
|/+eRts6KZwhgL3x63...|+++4/3jzUwtDPSSo3...|  2281443|
|uTu+52CkIZ2DJ6ddQ...|+++4/3jzUwtDPSSo3...|     7524|
|LD33vBbV2xSePcVnE...|+++4/3jzUwtDPSSo3...|   670661|
|o+qwedorOHCucSLN1...|+++4/3

                                                                                

Ahora la data de las sociedades que incluye las caracteristicas.

In [58]:
contaminados = spark.read.options(header=True,inferSchema=True,delimiter=",").csv("/home/cdsw/data/processed/comunidades_louvain/comunidades_resolution_study_louvain.csv")
contaminados = contaminados.withColumnRenamed("comunidad_0.8", "comunidad_08")
contaminados.createOrReplaceTempView("contaminados")

                                                                                

In [59]:
spark.sql('select * from contaminados').show()

+----------+--------------------+------------------+--------------+---------+--------------------+------------+--------+--------------+-------------+----------------+-------------+------------------+-------------+------------------+-------------+------------------+-------------+------------------+-------------+------------------+-------------+------------------+-------------+------------------+------------+------------------+-------------+------------------+-------------+------------------+
|Unnamed: 0|            cont_rut|             score|total_pago_f29| IVA_neto|     unidad_regional|n_documentos|lifetime|alerta_inicial|comunidad_0.0| modularidad_0.0|comunidad_0.1|   modularidad_0.1|comunidad_0.2|   modularidad_0.2|comunidad_0.3|   modularidad_0.3|comunidad_0.4|   modularidad_0.4|comunidad_0.5|   modularidad_0.5|comunidad_0.6|   modularidad_0.6|comunidad_0.7|   modularidad_0.7|comunidad_08|   modularidad_0.8|comunidad_0.9|   modularidad_0.9|comunidad_1.0|   modularidad_1.0|
+-------

Ahora hacemos el cruce de los datos para asignar la comunidad a cada entidad de los arcos comerciales, tanto emisor como receptor. 

In [60]:
spark.sql('select PARU_RUT_E0,contaminados.comunidad_08 as comunidad_emisor, PARU_RUT_E2, Monto_IVA from arcos left join contaminados on arcos.PARU_RUT_E0=contaminados.cont_rut').createOrReplaceTempView('arcos')

In [61]:
spark.sql('select PARU_RUT_E0 as emisor,comunidad_emisor,PARU_RUT_E2 as receptor,contaminados.comunidad_08 as comunidad_receptor, Monto_IVA from arcos  left join contaminados on arcos.PARU_RUT_E2=contaminados.cont_rut').createOrReplaceTempView('arcos')

In [62]:
spark.sql('select * from arcos where comunidad_emisor is not null or comunidad_receptor is not null').createOrReplaceTempView('arcos')

spark.sql('select * from arcos where comunidad_emisor is null or comunidad_receptor is null').show()


[Stage 520:>                                                        (0 + 3) / 3]

+--------------------+----------------+--------------------+------------------+---------+
|              emisor|comunidad_emisor|            receptor|comunidad_receptor|Monto_IVA|
+--------------------+----------------+--------------------+------------------+---------+
|BJmaUqTnhu56rAeIU...|            null|+kAO4J/HF/BHMc7du...|                 1|    19000|
|MP6d8c6MQEwozlAf+...|            null|AgHClE8vUGuNWnXlZ...|                 6|     9500|
|Z7OYCkj/kjqp/ONZ3...|            null|CpL0jbiNG6xErwixU...|                 1|   152000|
|mR8KpjMdF+DB+1rcl...|            null|Kq16VMi2rl6putzer...|                 3|    34200|
|XHxz2+oWsKZjlTFeT...|            null|WjiWlHOqho6fHK01o...|                12|    19000|
|tAYC9s9eTlb8dZFkt...|            null|ZoSJEz1r4ELZe5oOd...|                 1|    31934|
|ZZRM3LDY2l2dIDLXp...|            null|pgnXtYCxJbXlDqc9R...|                 1|    31933|
|EHD0OdMgUSKOdVm8U...|            null|z5JmOio9X1bMDZXdY...|                 1|    22800|
|/n+h+44VQ

                                                                                

Se calcula la emision intragrupo, extragrupo, recepcion intragrupo y extragrupo

In [63]:
spark.sql('select comunidad_emisor as comunidad, sum(Monto_IVA) as emision_intragrupo from arcos where comunidad_emisor=comunidad_receptor and comunidad_emisor is not null group by comunidad_emisor order by comunidad_emisor asc').createOrReplaceTempView('emision_intra')
spark.sql('select comunidad_emisor as comunidad, sum(Monto_IVA) as emision_extragrupo from arcos where comunidad_emisor<>comunidad_receptor and comunidad_emisor is not null group by comunidad_emisor order by comunidad_emisor asc').createOrReplaceTempView('emision_extra')

In [64]:
spark.sql('select comunidad_receptor as comunidad, sum(Monto_IVA) as recepcion_intragrupo from arcos where comunidad_emisor=comunidad_receptor and comunidad_receptor is not null group by comunidad_receptor order by comunidad_receptor asc').createOrReplaceTempView('recepcion_intra')
spark.sql('select comunidad_receptor as comunidad, sum(Monto_IVA) as recepcion_extragrupo from arcos where comunidad_emisor<>comunidad_receptor and comunidad_receptor is not null group by comunidad_receptor order by comunidad_receptor asc').createOrReplaceTempView('recepcion_extra')

In [65]:
spark.sql('select case when emision_intra.comunidad is null then emision_extra.comunidad else emision_intra.comunidad end as com, emision_intragrupo, emision_extragrupo from emision_intra full outer join emision_extra on emision_extra.comunidad=emision_intra.comunidad').createOrReplaceTempView('emision')
spark.sql('select * from emision').show()
spark.sql('select case when recepcion_intra.comunidad is null then recepcion_extra.comunidad else recepcion_intra.comunidad end as com, recepcion_intragrupo, recepcion_extragrupo from recepcion_intra full outer join recepcion_extra on recepcion_extra.comunidad=recepcion_intra.comunidad order by recepcion_intra.comunidad asc').createOrReplaceTempView('recepcion')
spark.sql('select * from recepcion order by recepcion_intragrupo desc').show()

                                                                                

+---+------------------+------------------+
|com|emision_intragrupo|emision_extragrupo|
+---+------------------+------------------+
|  1|    18883969532823|    13094520778587|
|  2|     6612360765255|     2306182712429|
|  3|      811418080006|     1507114062611|
|  4|      355492631604|      562530997191|
|  5|       27373968512|       21513986799|
|  6|      298853122898|      562339304661|
|  7|       29750102570|       39977276949|
|  8|      318985562053|      577701774857|
|  9|       72985671961|      290984459122|
| 10|      256190710566|      240826234065|
| 11|        3642903158|       79704446933|
| 12|        6898025906|       19043895192|
| 13|       38787924381|      199441058819|
| 14|       31124401295|      223527531891|
| 15|        7628903743|       28772911474|
| 16|       50344956494|      211516779728|
| 17|        6459516481|      197846026253|
| 18|       97520393341|      102317288006|
| 19|       14076918764|       67038174250|
| 20|       39391884609|      14

[Stage 560:>                                                        (0 + 1) / 1]

+---+--------------------+--------------------+
|com|recepcion_intragrupo|recepcion_extragrupo|
+---+--------------------+--------------------+
|  1|      18883969532823|       9878493899684|
|  2|       6612360765255|       5276448416911|
|146|       3426187781739|       3681590947732|
|  3|        811418080006|       1077447010147|
| 50|        372919288286|        456189769101|
| 54|        355680050113|        491876921408|
|  4|        355492631604|        455719914133|
|  8|        318985562053|        454312514529|
|  6|        298853122898|        638597377465|
| 10|        256190710566|        337198355007|
|137|        252533153968|         90221373401|
|160|        249812314783|        523512494229|
|291|        223203676505|        147008740498|
| 63|        165265230730|        116285977205|
| 69|        132507376357|         14514940795|
| 81|        116004222966|         90565277021|
| 18|         97520393341|        200538172049|
| 41|         93846609628|         96807

                                                                                

In [66]:
aux=spark.sql('select case when emision.com is null then recepcion.com else emision.com end as com, emision_intragrupo, emision_extragrupo,  recepcion_intragrupo, recepcion_extragrupo from emision full outer join recepcion on emision.com=recepcion.com')
aux.show()
aux.createOrReplaceTempView('iva_comunidad')



+---+------------------+------------------+--------------------+--------------------+
|com|emision_intragrupo|emision_extragrupo|recepcion_intragrupo|recepcion_extragrupo|
+---+------------------+------------------+--------------------+--------------------+
|  1|    18883969532823|    13094520778587|      18883969532823|       9878493899684|
|  2|     6612360765255|     2306182712429|       6612360765255|       5276448416911|
|  3|      811418080006|     1507114062611|        811418080006|       1077447010147|
|  4|      355492631604|      562530997191|        355492631604|        455719914133|
|  5|       27373968512|       21513986799|         27373968512|        121173161157|
|  6|      298853122898|      562339304661|        298853122898|        638597377465|
|  7|       29750102570|       39977276949|         29750102570|          9732261777|
|  8|      318985562053|      577701774857|        318985562053|        454312514529|
|  9|       72985671961|      290984459122|         72

                                                                                

Tambien definimos algunas tasas de comparacion entre montos de IVA

In [67]:
spark.sql('select com, emision_extragrupo/(emision_intragrupo+emision_extragrupo)*100 as perct_emision_extra,recepcion_extragrupo/(recepcion_intragrupo+recepcion_extragrupo)*100 as perct_recepcion_extra,emision_extragrupo, recepcion_extragrupo,emision_intragrupo, emision_extragrupo/emision_intragrupo as tasa_emision_extra_intra from iva_comunidad').createOrReplaceTempView('iva_comunidad')

Ahora ya podemos agregar los datos de IVA del grupo a la estadistica general por grupo

In [68]:
data_comunidades = spark.read.options(header=True,inferSchema=True,delimiter=",").csv("/home/cdsw/data/processed/comunidades_louvain/estadistica_comunidades_louvain.csv")
data_comunidades = data_comunidades.withColumnRenamed("comunidad_0.8", "comunidad_08")
data_comunidades.createOrReplaceTempView("data_comunidades")

In [69]:
data=spark.sql('select * from data_comunidades left join iva_comunidad on data_comunidades.comunidad_08=iva_comunidad.com')
data=data.toPandas()
data=data.drop('com', axis=1)
data=data.sort_values(by='promedio_score', ascending=False)
data.head(15)

                                                                                

Unnamed: 0,comunidad_08,promedio_score,miembros,contaminados_iniciales,promedio_total_pago_f29,nulos_total_pago_f29,promedio_IVA_neto,nulos_IVA_neto,promedio_n_documentos,nulos_n_documentos,promedio_lifetime,nulos_lifetime,moda_unidad_regional,frecuencia_unidad_regional,perct_emision_extra,perct_recepcion_extra,emision_extragrupo,recepcion_extragrupo,emision_intragrupo,tasa_emision_extra_intra
651,454,1.000049,5,5,0.0,0,-42084750.0,0,29.0,0,670.0,0,SANTIAGO ORIENTE,5,53.680611,,208525664.0,,179930542.0,1.158923
307,523,1.000012,3,3,85817.67,0,-13751830.0,0,17.333333,0,466.333333,0,Sin Moda,0,49.84731,,40576844.0,,40825431.0,0.993911
428,443,1.000005,6,6,0.0,0,-19820320.0,0,28.166667,0,469.833333,0,Sin Moda,0,41.620534,,115957906.0,,162649535.0,0.712931
95,578,1.0,2,2,2307216.0,0,-7875643.0,0,21.0,0,1764.0,0,SANTIAGO ORIENTE,2,,,,,,
412,453,1.0,6,6,158358.0,1,-4805971.0,0,11.0,0,522.666667,0,SANTIAGO CENTRO,1,32.154332,,26626828.0,,56182631.0,0.473933
171,715,1.0,2,2,505465.0,0,-95941310.0,0,46.5,0,1224.5,0,7MA UNIDAD REGIONAL MAULE,2,,,,,,
274,425,1.0,10,2,0.0,9,-3416726.0,0,17.5,8,862.5,8,SANTIAGO PONIENTE,3,,,,,23803200.0,
287,563,1.0,2,1,,2,-20916050.0,0,13.5,0,977.0,0,SANTIAGO CENTRO,2,,,,,,
235,545,1.0,2,1,,2,-8507060.0,0,39.0,1,862.0,0,SANTIAGO CENTRO,1,,,9317220.0,,,
164,512,1.0,3,3,,3,-24447710.0,0,13.0,0,1626.5,1,8VA UNIDAD REGIONAL BIO BIO,2,,,,,,


Finalmente guardamos el archivo

In [70]:

data.to_csv('/home/cdsw/data/processed/comunidades_louvain/estadistica_comunidades_louvain_with_iva_balance.csv', index=False)