**Instalando Pyspark**

In [1]:
# innstall java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

!pip install -q findspark
import findspark
findspark.init()

findspark.find()

'/content/spark-3.0.0-bin-hadoop3.2'

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

**Conectando con drive**

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


**Data**

In [4]:
from pyspark.sql import functions as F

In [413]:
path = '/content/drive/MyDrive/data_dataton/Matriz_infobra_unicos.txt'
df = spark.read.csv(path, header = True, sep ='|')
df.count()

15350

In [414]:
df.show(3,False)

+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+--------------------------------------------------------------------------------------------------------------------------------+-----------+------------------+------------------------------------+----------------------------------------------------------------------------------+------------------------------------+-----------------+---------------------+------------+-----------------------+--------------+---------------------+-------------+------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [415]:
df2 = df.select( 
        F.trim(F.col('NOBR_ID')).alias('NOBR_ID'),
        F.trim(F.col('NOBR_TIPEJE')).alias('NOBR_TIPEJE'),
        F.trim(F.col('COPA_DESCRI')).alias('COPA_DESCRI'),
        F.trim(F.col('RUBRO')).alias('RUBRO'),
        F.trim(F.col('NOBR_CODSNIP')).alias('NOBR_CODSNIP'),
        F.col('NSNP_MONTOVERI').cast('Decimal(38,2)').alias('NSNP_MONTOVERI'),
        F.col('NSNP_MONTOAPR').cast('Decimal(38,2)').alias('NSNP_MONTOAPR'),
        F.trim(F.col('CODIGO_UNICO')).alias('CODIGO_UNICO'),
        F.col('FECHA_INICIO').cast('Date').alias('FECHA_INICIO'),
        F.col('FECHA_ACTUALIZACION').cast('Date').alias('FECHA_ACTUALIZACION'),
        F.trim(F.col('UNIDAD_OPMI')).alias('UNIDAD_OPMI'),

        F.trim(F.col('SECTOR')).alias('SECTOR'),
        F.trim(F.col('ENTIDAD')).alias('ENTIDAD'),
        F.trim(F.col('FUNCION')).alias('FUNCION'),
        F.trim(F.col('NIVEL')).alias('NIVEL'),

        F.col('MTO_VIABLE').cast('Decimal(38,2)').alias('MTO_VIABLE'),
        F.col('COSTO_ACTUALIZADO').cast('Decimal(38,2)').alias('COSTO_ACTUALIZADO'),
        
        F.col('FEC_REGISTRO').cast('Date').alias('FEC_REGISTRO'),
        F.col('FEC_VIABLE').cast('Date').alias('FEC_VIABLE'),
        F.trim(F.col('TIPO_FORMATO')).alias('TIPO_FORMATO'),

        F.col('CERTIFICADO_ANO_ACTUAL').cast('Decimal(38,2)').alias('CERTIFICADO_ANO_ACTUAL'),
        F.col('GIRADO_ANO_ACTUAL').cast('Decimal(38,2)').alias('GIRADO_ANO_ACTUAL'),
        F.col('COMPROMETIDO_ANO_ACTUAL').cast('Decimal(38,2)').alias('COMPROMETIDO_ANO_ACTUAL'),

        F.col('PIM_ANO_VIGENTE').cast('Decimal(38,2)').alias('PIM_ANO_VIGENTE'),
        F.col('MTO_PMI_1').cast('Decimal(38,2)').alias('MTO_PMI_1'),
        F.col('MTO_PMI_2').cast('Decimal(38,2)').alias('MTO_PMI_2'),
        F.col('MTO_PMI_3').cast('Decimal(38,2)').alias('MTO_PMI_3'),
        F.col('MTO_PMI_4').cast('Decimal(38,2)').alias('MTO_PMI_4'),
        F.col('DEV_ANO_VIGENTE').cast('Decimal(38,2)').alias('DEV_ANO_VIGENTE'),
        F.col('DEV_ACUMULADO').cast('Decimal(38,2)').alias('DEV_ACUMULADO'),

        F.col('FEC_INI_EJ').cast('Date').alias('FEC_INI_EJ'),
        F.col('FEC_FIN_EJ').cast('Date').alias('FEC_FIN_EJ'),

        F.trim(F.col('MODAL_EJEC')).alias('MODAL_EJEC'),
        F.trim(F.col('MARCO')).alias('MARCO'),
        F.trim(F.col('IND_REG_PMI')).alias('IND_REG_PMI'),
        F.trim(F.col('DES_PROGRAMA')).alias('DES_PROGRAMA'),
        F.trim(F.col('DES_SUB_PROGRAMA')).alias('DES_SUB_PROGRAMA'),

        F.col('DIFF_MONTO').cast('Decimal(38,2)').alias('DIFF_MONTO'),
        F.col('DIFF_PORC_MONTO').cast('Decimal(10,4)').alias('DIFF_PORC_MONTO'),
        F.col('DIFF_FECHA_INI_EJE_ET').cast('Decimal(38,4)').alias('DIFF_FECHA_INI_EJE_ET'),
        F.col('DIFF_FECHA_NOW_ET').cast('Decimal(38,4)').alias('DIFF_FECHA_NOW_ET'),
        F.col('DIFF_FECHA_NOW_INI_EJE').cast('Decimal(38,4)').alias('DIFF_FECHA_NOW_INI_EJE'),

        F.trim(F.col('CDPT_CD_DPTO')).alias('CDPT_CD_DPTO'),
        F.trim(F.col('CDPT_DPTO')).alias('CDPT_DPTO'),
        F.trim(F.col('CDST_CODIGO')).alias('CDST_CODIGO'),


        F.col('DOBA_FCHREP_count').cast('Decimal(38,4)').alias('DOBA_FCHREP_count'),
        F.col('DOBA_FCHREP_min').cast('Date').alias('DOBA_FCHREP_min'),
        F.col('DOBA_FCHREP_max').cast('Date').alias('DOBA_FCHREP_max'),
        F.col('NOBA_FISICO_PRO_min').cast('Decimal(38,4)').alias('NOBA_FISICO_PRO_min'),
        F.col('NOBA_FISICO_PRO_max').cast('Decimal(38,4)').alias('NOBA_FISICO_PRO_max'),
        F.col('NOBA_FISICO_REA_min').cast('Decimal(38,4)').alias('NOBA_FISICO_REA_min'),
        F.col('NOBA_FISICO_REA_max').cast('Decimal(38,4)').alias('NOBA_FISICO_REA_max'),
        F.col('NOBA_VALZDO_PRO_min').cast('Decimal(38,4)').alias('NOBA_VALZDO_PRO_min'),
        F.col('NOBA_VALZDO_PRO_max').cast('Decimal(38,4)').alias('NOBA_VALZDO_PRO_max'),
        F.col('NOBA_VALZDO_REA_min').cast('Decimal(38,4)').alias('NOBA_VALZDO_REA_min'),
        F.col('NOBA_VALZDO_REA_max').cast('Decimal(38,4)').alias('NOBA_VALZDO_REA_max'),
        F.col('Monto actualizado S/_count').cast('Decimal(38,4)').alias('Monto actualizado S/_count'),
        F.col('Monto actualizado S/_min').cast('Decimal(38,4)').alias('Monto actualizado S/_min'),
        F.col('Monto actualizado S/_max').cast('Decimal(38,4)').alias('Monto actualizado S/_max')
        ).distinct()

In [416]:
df2.show(5,False)

+-------+-----------+------------+----------------------------+------------+--------------+-------------+------------+------------+-------------------+-----------+-----------------+---------------------------------------------------+----------+-----+-----------+-----------------+------------+----------+---------------------+----------------------+-----------------+-----------------------+---------------+---------+---------+---------+---------+---------------+-------------+----------+----------+----------------------------------------+-----+-----------+-----------------+---------------------------------+------------+---------------+------------------------+-----------------+----------------------+------------+---------+-----------+-----------------+---------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------------+-----------------------

In [421]:
df3 = df2.filter(F.coalesce(F.col('CODIGO_UNICO'),F.lit('9999999999'))!='2425620').fillna(0)\
         .withColumn('modalidad_ejec',
                     F.when(F.col('COPA_DESCRI')=='Por Contrata', F.lit('Contrata'))\
                      .when(F.col('COPA_DESCRI')=='Adm. Directa', F.lit('Directo'))\
                      .otherwise(F.lit('Otro'))
                     )\
          .withColumn('rubro_2',
                      F.when(F.col('RUBRO')=='TRANSPORTES Y COMUNICACIONES', F.lit('Transporte_Comunicaciones'))\
                       .when(F.col('RUBRO')=='VIVIENDA, CONSTRUCCIÓN Y SANEAMIENTO', F.lit('Vivienda_Construccion'))\
                       .when(F.col('RUBRO')=='EDUCACIÓN/CULTURA', F.lit('Educacion'))\
                       .when(F.col('RUBRO')=='OTRAS INFRAESTRUCTURAS', F.lit('Otras_infra'))\
                       .when(F.col('RUBRO')=='AGRICULTURA', F.lit('Agricultura'))\
                       .when(F.col('RUBRO')=='ENERGÍA Y MINAS', F.lit('Energia_minas'))\
                       .when(F.col('RUBRO')=='SALUD', F.lit('Salud'))\
                       .otherwise(F.lit('Otro'))
                      )\
          .withColumn('sector_2',
                      F.when(F.col('SECTOR')=='GOBIERNOS LOCALES', F.lit('Gobierno_local'))\
                       .when(F.col('SECTOR')=='GOBIERNOS REGIONALES', F.lit('Gobierno_regional'))\
                       .otherwise(F.lit('Otro'))
                      )\
          .withColumn('funcion_2',
                      F.when(F.col('FUNCION')=='TRANSPORTE', F.lit('Transporte'))\
                       .when(F.col('FUNCION').isin('SANEAMIENTO','SALUD Y SANEAMIENTO','SALUD'), F.lit('Salud_Saneamiento'))\
                       .when(F.col('FUNCION').isin('EDUCACIÓN','EDUCACION Y CULTURA'), F.lit('Educacion'))\
                       .when(F.col('FUNCION')=='CULTURA Y DEPORTE', F.lit('Cultura_deporte'))\
                       .when(F.col('FUNCION').isin('AGROPECUARIA','AGRARIA'), F.lit('Agro'))\
                       .when(F.col('FUNCION')=='VIVIENDA Y DESARROLLO URBANO', F.lit('Vivienda'))\
                       .when(F.col('FUNCION').isin('ENERGÍA','ENERGIA Y RECURSOS MINERALES','AMBIENTE'), F.lit('Energia_ambiente'))\
                       .when(F.col('FUNCION').isin('PLANEAMIENTO, GESTIÓN Y RESERVA DE CONTINGENCIA','ORDEN PÚBLICO Y SEGURIDAD',
                                                   'DEFENSA Y SEGURIDAD NACIONAL','ADMINISTRACION Y PLANEAMIENTO'), F.lit('gestion_seguridad'))\
                       .when(F.col('FUNCION').isin('PROTECCIÓN SOCIAL','ASISTENCIA Y PREVISION SOCIAL '), F.lit('social'))\
                       .otherwise('Otro')
                      )\
            .withColumn('nivel_2',
                        F.when(F.col('NIVEL').isin('GL','GN','GR'), F.col('NIVEL')).otherwise(F.lit('Otro'))
                        )\
            .withColumn('tipo_formato_2',
                        F.when(F.col('TIPO_FORMATO')=='PROYECTO DE INVERSION', F.lit('p_inversion'))\
                         .when(F.col('TIPO_FORMATO').isin('FUR','IOARR'), F.lower(F.col('TIPO_FORMATO')) )\
                         .otherwise(F.lit('Otro'))
                        )\
            .withColumn('marco_2',
                        F.when(F.col('MARCO').isin('SNIP','INVIERTE'), F.lower(F.col('MARCO')))\
                        .otherwise(F.lit('Otro'))
                        )\
            .withColumn('marca_reg_pmi',
                        F.when(F.col('IND_REG_PMI')=='SI', F.lit(1)).otherwise(F.lit(0))
                        )\
            .withColumn('rg_diff_proc_monto',
                        F.when(F.col('DIFF_PORC_MONTO')<=0, F.lit('01.<=0'))\
                         .when(F.col('DIFF_PORC_MONTO')<=0.15, F.lit('02.<=15%'))\
                         .when(F.col('DIFF_PORC_MONTO')<=0.50, F.lit('03.<=50%'))\
                         .otherwise('04.>50%')
                        )\
            .withColumn('departamento',
                        F.when(F.col('CDPT_DPTO').isin('CUSCO','PIURA','LAMBAYEQUE','LORETO','LIMA'), F.lower(F.col('CDPT_DPTO')))\
                         .otherwise('Otro')
                        )\
            .withColumn('diff_inicio_viable', F.months_between(F.col('FECHA_INICIO'), F.col('FEC_VIABLE')))\
            .withColumn('diff_today_inicio', F.months_between(F.to_date(F.lit('2022-05-07'),'yyyy-MM-dd'), F.col('FECHA_INICIO')))

In [422]:
df3.show(3,False)

+-------+-----------+------------+----------------------------+------------+--------------+-------------+------------+------------+-------------------+-----------+-----------------+---------------------------------------------------+----------+-----+-----------+-----------------+------------+----------+---------------------+----------------------+-----------------+-----------------------+---------------+---------+---------+---------+---------+---------------+-------------+----------+----------+----------------------------------------+-----+-----------+-----------------+----------------+------------+---------------+------------------------+-----------------+----------------------+------------+---------+-----------+-----------------+---------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------------+------------------------+---------------

In [259]:
c = 'UNIDAD_OPMI'
#df2.groupBy(c).count().orderBy(c).show(100,False)
df3.groupBy(c).agg(F.count('NOBR_ID').alias('count'), F.round(F.mean('DIFF_PORC_MONTO'),2).alias('mean_diff_proc_monto')).orderBy(F.col('count').desc()).show(100,False)
#df3.groupBy(c).agg(F.count('NOBR_ID').alias('count'), F.round(F.mean('DIFF_PORC_MONTO'),2).alias('mean_diff_proc_monto')).orderBy(c).show(100,False)

+-----------+-----+--------------------+
|UNIDAD_OPMI|count|mean_diff_proc_monto|
+-----------+-----+--------------------+
|0.0        |11803|0.32                |
|null       |314  |0.00                |
|21252.0    |88   |0.57                |
|21558.0    |87   |0.33                |
|21780.0    |63   |0.32                |
|21127.0    |60   |0.12                |
|21295.0    |60   |0.47                |
|21468.0    |57   |0.40                |
|21626.0    |41   |0.03                |
|21564.0    |38   |0.37                |
|27151.0    |38   |0.03                |
|21571.0    |34   |0.37                |
|21250.0    |33   |0.27                |
|22696.0    |33   |0.00                |
|22132.0    |31   |0.09                |
|21266.0    |31   |0.29                |
|21577.0    |30   |0.16                |
|21789.0    |30   |0.12                |
|21239.0    |29   |0.27                |
|22098.0    |28   |0.38                |
|21570.0    |28   |0.29                |
|21658.0    |28 

In [423]:
df3.show(3,False)

+-------+-----------+------------+----------------------------+------------+--------------+-------------+------------+------------+-------------------+-----------+-----------------+---------------------------------------------------+----------+-----+-----------+-----------------+------------+----------+---------------------+----------------------+-----------------+-----------------------+---------------+---------+---------+---------+---------+---------------+-------------+----------+----------+----------------------------------------+-----+-----------+-----------------+----------------+------------+---------------+------------------------+-----------------+----------------------+------------+---------+-----------+-----------------+---------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------------+------------------------+---------------

In [511]:
df4 = df3.select(
    F.col('CODIGO_UNICO').alias('CODIGO_UNICO'),
    F.col('NSNP_MONTOVERI').alias('monto_verificacion'),
    F.col('NSNP_MONTOAPR').alias('monto_aprobacion'),
    F.col('MTO_VIABLE').alias('monto_viable'),
    F.col('COSTO_ACTUALIZADO').alias('costo_actualizado'),
    F.col('DEV_ACUMULADO').alias('dev_acumulado'),
    F.col('DIFF_MONTO').alias('diff_monto'),
    F.col('DIFF_PORC_MONTO').alias('diff_porc_monto'),
    F.col('DIFF_FECHA_INI_EJE_ET').alias('diff_fecha_ini_eje_et'),
    F.col('DIFF_FECHA_NOW_ET').alias('diff_fecha_now_et'),
    F.col('DIFF_FECHA_NOW_INI_EJE').alias('diff_fecha_now_ini_eje'),
    F.col('DOBA_FCHREP_count').alias('cant_avances'),
    F.col('NOBA_FISICO_PRO_min').alias('porc_fisico_prog_min'),
    F.col('NOBA_FISICO_PRO_max').alias('porc_fisico_prog_max'),
    F.col('NOBA_FISICO_REA_min').alias('porc_fisico_real_min'),
    F.col('NOBA_FISICO_REA_max').alias('porc_fisico_real_max'),
    F.col('NOBA_VALZDO_PRO_min').alias('monto_avance_prog_min'),
    F.col('NOBA_VALZDO_PRO_max').alias('monto_avance_prog_max'),
    F.col('NOBA_VALZDO_REA_min').alias('monto_avance_real_min'),
    F.col('NOBA_VALZDO_REA_max').alias('monto_avance_real_max'),
    'modalidad_ejec',
    'rubro_2',
    'sector_2',
    'funcion_2',
    'nivel_2',
    'tipo_formato_2',
    'marco_2',
    'marca_reg_pmi',
    'rg_diff_proc_monto',
    'departamento',
    'diff_inicio_viable',
    'diff_today_inicio'
)

In [512]:
df4.show(5,False)

+------------+------------------+----------------+------------+-----------------+-------------+------------+---------------+------------------------+-----------------+----------------------+------------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+---------------------+---------------------+--------------+-------------------------+--------------+----------------+-------+--------------+-------+-------------+------------------+------------+------------------+-----------------+
|CODIGO_UNICO|monto_verificacion|monto_aprobacion|monto_viable|costo_actualizado|dev_acumulado|diff_monto  |diff_porc_monto|diff_fecha_ini_eje_et   |diff_fecha_now_et|diff_fecha_now_ini_eje|cant_avances|porc_fisico_prog_min|porc_fisico_prog_max|porc_fisico_real_min|porc_fisico_real_max|monto_avance_prog_min|monto_avance_prog_max|monto_avance_real_min|monto_avance_real_max|modalidad_ejec|rubro_2                  |sector_2      |funcio

In [513]:
df_pd = df4.toPandas()

In [514]:
df_pd.to_csv('/content/drive/MyDrive/data_dataton/matriz1.txt', index=False, sep='|')

**SEGMENTACION**

In [515]:
import pandas as pd
pd.options.display.max_columns = 200
pd.options.display.max_rows = 200
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
from pyspark.ml.clustering import KMeans
import math
import numpy as np
from pyspark.sql.types import *

In [516]:
df = spark.read.csv('/content/drive/MyDrive/data_dataton/matriz1.txt', header= True, sep='|').fillna('0')
df.show(5,False)

+------------+------------------+----------------+------------+-----------------+-------------+------------+---------------+------------------------+-----------------+----------------------+------------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+---------------------+---------------------+--------------+-------------------------+--------------+----------------+-------+--------------+-------+-------------+------------------+------------+------------------+-----------------+
|CODIGO_UNICO|monto_verificacion|monto_aprobacion|monto_viable|costo_actualizado|dev_acumulado|diff_monto  |diff_porc_monto|diff_fecha_ini_eje_et   |diff_fecha_now_et|diff_fecha_now_ini_eje|cant_avances|porc_fisico_prog_min|porc_fisico_prog_max|porc_fisico_real_min|porc_fisico_real_max|monto_avance_prog_min|monto_avance_prog_max|monto_avance_real_min|monto_avance_real_max|modalidad_ejec|rubro_2                  |sector_2      |funcio

In [517]:
lista_num = ['monto_verificacion','monto_aprobacion','monto_viable','costo_actualizado','dev_acumulado','diff_monto','diff_porc_monto','diff_fecha_ini_eje_et',
                  'diff_fecha_now_et','diff_fecha_now_ini_eje','cant_avances','porc_fisico_prog_min','porc_fisico_prog_max','porc_fisico_real_min','porc_fisico_real_max',
                  'monto_avance_prog_min','monto_avance_prog_max','monto_avance_real_min','monto_avance_real_max','marca_reg_pmi','diff_inicio_viable','diff_today_inicio']

for c in lista_num:
  df = df.withColumn(c, F.col(c).cast('Decimal(38,2)'))

df =df.withColumn('DIFF_PORC', F.col('diff_porc_monto'))

In [518]:
df.printSchema()

root
 |-- CODIGO_UNICO: string (nullable = false)
 |-- monto_verificacion: decimal(38,2) (nullable = true)
 |-- monto_aprobacion: decimal(38,2) (nullable = true)
 |-- monto_viable: decimal(38,2) (nullable = true)
 |-- costo_actualizado: decimal(38,2) (nullable = true)
 |-- dev_acumulado: decimal(38,2) (nullable = true)
 |-- diff_monto: decimal(38,2) (nullable = true)
 |-- diff_porc_monto: decimal(38,2) (nullable = true)
 |-- diff_fecha_ini_eje_et: decimal(38,2) (nullable = true)
 |-- diff_fecha_now_et: decimal(38,2) (nullable = true)
 |-- diff_fecha_now_ini_eje: decimal(38,2) (nullable = true)
 |-- cant_avances: decimal(38,2) (nullable = true)
 |-- porc_fisico_prog_min: decimal(38,2) (nullable = true)
 |-- porc_fisico_prog_max: decimal(38,2) (nullable = true)
 |-- porc_fisico_real_min: decimal(38,2) (nullable = true)
 |-- porc_fisico_real_max: decimal(38,2) (nullable = true)
 |-- monto_avance_prog_min: decimal(38,2) (nullable = true)
 |-- monto_avance_prog_max: decimal(38,2) (nullable 

In [519]:
cols_rescaling = ['monto_verificacion','monto_aprobacion','monto_viable','costo_actualizado','dev_acumulado','diff_monto','diff_porc_monto','diff_fecha_ini_eje_et',
                  'diff_fecha_now_et','diff_fecha_now_ini_eje','cant_avances','porc_fisico_prog_min','porc_fisico_prog_max','porc_fisico_real_min','porc_fisico_real_max',
                  'monto_avance_prog_min','monto_avance_prog_max','monto_avance_real_min','monto_avance_real_max','diff_inicio_viable','diff_today_inicio']

vAssembler = VectorAssembler(inputCols=cols_rescaling, outputCol='feat_toSc')
mmScaler = MinMaxScaler(inputCol="feat_toSc", outputCol="feat_sc")

In [520]:
df_matriz = vAssembler.transform(df)
df_matriz = mmScaler.fit(df_matriz).transform(df_matriz)

In [521]:
df_matriz.show(2,False)

+------------+------------------+----------------+------------+-----------------+-------------+----------+---------------+----------------------+-----------------+----------------------+------------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+---------------------+---------------------+--------------+-------------------------+--------------+----------+-------+--------------+-------+-------------+------------------+------------+------------------+-----------------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [522]:
# UDF para extraer los valores del VectorDense
udf_getValue = F.udf(lambda x,i: float(x[i]), FloatType())

In [523]:
for ind, col in enumerate(cols_rescaling):
    df_matriz = df_matriz.withColumn(col, udf_getValue(F.col('feat_sc'), F.lit(ind)) )

In [524]:
df_matriz = df_matriz.drop('feat_toSc','feat_sc')

In [525]:
df_matriz.describe().toPandas()

Unnamed: 0,summary,CODIGO_UNICO,monto_verificacion,monto_aprobacion,monto_viable,costo_actualizado,dev_acumulado,diff_monto,diff_porc_monto,diff_fecha_ini_eje_et,diff_fecha_now_et,diff_fecha_now_ini_eje,cant_avances,porc_fisico_prog_min,porc_fisico_prog_max,porc_fisico_real_min,porc_fisico_real_max,monto_avance_prog_min,monto_avance_prog_max,monto_avance_real_min,monto_avance_real_max,modalidad_ejec,rubro_2,sector_2,funcion_2,nivel_2,tipo_formato_2,marco_2,marca_reg_pmi,rg_diff_proc_monto,departamento,diff_inicio_viable,diff_today_inicio,DIFF_PORC
0,count,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346,15346,15346,15346,15346,15346,15346,15346.0,15346,15346,15346.0,15346.0,15346.0
1,mean,2243568.9955037143,0.0018615239525067,0.0015983289777643,0.0015285310332457,0.00185125539765,0.0040483682733429,0.4252131060746538,0.0012507746803487,0.6399061644728268,0.929239334969466,0.00019549068161084677,0.0484091016777026,7.007668575223543e-05,0.7431827381740771,6.9894982968403e-05,9.726203918259508e-05,0.0026803976865164,0.00019923393281418936,0.002685984897823,0.0004043496169214231,,,,,,,,0.243581,,,0.4637012798242027,0.2300995777749261,0.373426
2,stddev,320911.98746439605,0.0159014131734476,0.0137373373627108,0.0114532100159633,0.01339095931062,0.0220577062188853,0.0121901801235453,0.0083044295930049,0.2729372072816848,0.0449282313500752,0.0139808870324018,0.0724169252147731,0.0080723579109937,0.4082593965699014,0.0080723591766072,0.0080721572647352,0.0171872119441221,0.0088007792082982,0.0170062736920226,0.0085491886476467,,,,,,,,0.4292569198227213,,,0.0673061367285795,0.1640918657745247,9.11876196634554
3,min,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,Contrata,Agricultura,Gobierno_local,Agro,GL,Otro,Otro,0.0,01.<=0,Otro,0.0,0.0,-1.0
4,max,2545513.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,Otro,Vivienda_Construccion,Otro,social,Otro,p_inversion,snip,1.0,04.>50%,piura,1.0,1.0,1097.06


In [526]:
df_matriz_pd = df_matriz.toPandas()

In [527]:
df_matriz_pd.head()

Unnamed: 0,CODIGO_UNICO,monto_verificacion,monto_aprobacion,monto_viable,costo_actualizado,dev_acumulado,diff_monto,diff_porc_monto,diff_fecha_ini_eje_et,diff_fecha_now_et,diff_fecha_now_ini_eje,cant_avances,porc_fisico_prog_min,porc_fisico_prog_max,porc_fisico_real_min,porc_fisico_real_max,monto_avance_prog_min,monto_avance_prog_max,monto_avance_real_min,monto_avance_real_max,modalidad_ejec,rubro_2,sector_2,funcion_2,nivel_2,tipo_formato_2,marco_2,marca_reg_pmi,rg_diff_proc_monto,departamento,diff_inicio_viable,diff_today_inicio,DIFF_PORC
0,2130561,0.000327,0.000262,0.000262,0.000327,0.001072,0.424394,0.001175,1.0,0.952058,4.445229e-18,0.025316,4.986446e-06,1.0,9e-06,4.6e-05,0.001406,2.3e-05,0.002674,0.000108,Contrata,Transporte_Comunicaciones,Gobierno_local,Transporte,GL,p_inversion,snip,0.0,03.<=50%,loreto,0.48536,0.455947,0.29
1,2160726,0.001014,0.00075,0.00075,0.001014,0.003128,0.424987,0.001266,0.5,0.903596,4.445229e-18,0.018987,9.659693e-06,1.0,8e-06,2.6e-05,0.002084,3e-05,0.003408,0.000143,Contrata,Transporte_Comunicaciones,Gobierno_local,Transporte,GL,p_inversion,snip,0.0,03.<=50%,loreto,0.443535,0.423282,0.39
2,2184820,0.002577,0.007238,0.007238,0.002735,0.008925,0.412147,0.000355,0.5,0.92913,1.1600960000000001e-17,0.031646,6.614252e-07,0.7683,3e-06,4.6e-05,0.00116,0.000112,0.005539,0.00067,Contrata,Transporte_Comunicaciones,Gobierno_local,Transporte,GL,p_inversion,snip,0.0,01.<=0,loreto,0.424833,0.26316,-0.61
3,2243092,0.00024,0.00021,0.00021,0.00024,0.000778,0.424292,0.001075,1.0,0.941636,4.445229e-18,0.031646,3.708927e-06,1.0,5e-06,4.5e-05,0.000864,1.9e-05,0.001116,8.4e-05,Directo,Transporte_Comunicaciones,Gobierno_local,Transporte,GL,p_inversion,snip,0.0,03.<=50%,loreto,0.548244,0.219927,0.18
4,2248073,0.000231,0.000237,0.000237,0.000231,0.000718,0.424191,0.000911,0.5,0.903596,4.445229e-18,0.012658,7.525e-06,1.0,1.7e-05,4.6e-05,0.001503,1.7e-05,0.003641,7.6e-05,Contrata,Energia_minas,Gobierno_local,Energia_ambiente,GL,p_inversion,snip,0.0,01.<=0,loreto,0.451461,0.304872,0.0


In [528]:
df_dummies = pd.get_dummies(data=df_matriz_pd,columns=['modalidad_ejec','rubro_2','sector_2','funcion_2','nivel_2','tipo_formato_2','marco_2','rg_diff_proc_monto','departamento'])

In [529]:
df_dummies.head()

Unnamed: 0,CODIGO_UNICO,monto_verificacion,monto_aprobacion,monto_viable,costo_actualizado,dev_acumulado,diff_monto,diff_porc_monto,diff_fecha_ini_eje_et,diff_fecha_now_et,diff_fecha_now_ini_eje,cant_avances,porc_fisico_prog_min,porc_fisico_prog_max,porc_fisico_real_min,porc_fisico_real_max,monto_avance_prog_min,monto_avance_prog_max,monto_avance_real_min,monto_avance_real_max,marca_reg_pmi,diff_inicio_viable,diff_today_inicio,DIFF_PORC,modalidad_ejec_Contrata,modalidad_ejec_Directo,modalidad_ejec_Otro,rubro_2_Agricultura,rubro_2_Educacion,rubro_2_Energia_minas,rubro_2_Otras_infra,rubro_2_Otro,rubro_2_Salud,rubro_2_Transporte_Comunicaciones,rubro_2_Vivienda_Construccion,sector_2_Gobierno_local,sector_2_Gobierno_regional,sector_2_Otro,funcion_2_Agro,funcion_2_Cultura_deporte,funcion_2_Educacion,funcion_2_Energia_ambiente,funcion_2_Otro,funcion_2_Salud_Saneamiento,funcion_2_Transporte,funcion_2_Vivienda,funcion_2_gestion_seguridad,funcion_2_social,nivel_2_GL,nivel_2_GN,nivel_2_GR,nivel_2_Otro,tipo_formato_2_Otro,tipo_formato_2_fur,tipo_formato_2_ioarr,tipo_formato_2_p_inversion,marco_2_Otro,marco_2_invierte,marco_2_snip,rg_diff_proc_monto_01.<=0,rg_diff_proc_monto_02.<=15%,rg_diff_proc_monto_03.<=50%,rg_diff_proc_monto_04.>50%,departamento_Otro,departamento_cusco,departamento_lambayeque,departamento_lima,departamento_loreto,departamento_piura
0,2130561,0.000327,0.000262,0.000262,0.000327,0.001072,0.424394,0.001175,1.0,0.952058,4.445229e-18,0.025316,4.986446e-06,1.0,9e-06,4.6e-05,0.001406,2.3e-05,0.002674,0.000108,0.0,0.48536,0.455947,0.29,1,0,0,0,0,0,0,0,0,1,0,1,0,0,0,0,0,0,0,0,1,0,0,0,1,0,0,0,0,0,0,1,0,0,1,0,0,1,0,0,0,0,0,1,0
1,2160726,0.001014,0.00075,0.00075,0.001014,0.003128,0.424987,0.001266,0.5,0.903596,4.445229e-18,0.018987,9.659693e-06,1.0,8e-06,2.6e-05,0.002084,3e-05,0.003408,0.000143,0.0,0.443535,0.423282,0.39,1,0,0,0,0,0,0,0,0,1,0,1,0,0,0,0,0,0,0,0,1,0,0,0,1,0,0,0,0,0,0,1,0,0,1,0,0,1,0,0,0,0,0,1,0
2,2184820,0.002577,0.007238,0.007238,0.002735,0.008925,0.412147,0.000355,0.5,0.92913,1.1600960000000001e-17,0.031646,6.614252e-07,0.7683,3e-06,4.6e-05,0.00116,0.000112,0.005539,0.00067,0.0,0.424833,0.26316,-0.61,1,0,0,0,0,0,0,0,0,1,0,1,0,0,0,0,0,0,0,0,1,0,0,0,1,0,0,0,0,0,0,1,0,0,1,1,0,0,0,0,0,0,0,1,0
3,2243092,0.00024,0.00021,0.00021,0.00024,0.000778,0.424292,0.001075,1.0,0.941636,4.445229e-18,0.031646,3.708927e-06,1.0,5e-06,4.5e-05,0.000864,1.9e-05,0.001116,8.4e-05,0.0,0.548244,0.219927,0.18,0,1,0,0,0,0,0,0,0,1,0,1,0,0,0,0,0,0,0,0,1,0,0,0,1,0,0,0,0,0,0,1,0,0,1,0,0,1,0,0,0,0,0,1,0
4,2248073,0.000231,0.000237,0.000237,0.000231,0.000718,0.424191,0.000911,0.5,0.903596,4.445229e-18,0.012658,7.525e-06,1.0,1.7e-05,4.6e-05,0.001503,1.7e-05,0.003641,7.6e-05,0.0,0.451461,0.304872,0.0,1,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,1,0,0,0,0,0,0,1,0,0,0,0,0,0,1,0,0,1,1,0,0,0,0,0,0,0,1,0


In [530]:
df_dummies.describe()

Unnamed: 0,monto_verificacion,monto_aprobacion,monto_viable,costo_actualizado,dev_acumulado,diff_monto,diff_porc_monto,diff_fecha_ini_eje_et,diff_fecha_now_et,diff_fecha_now_ini_eje,cant_avances,porc_fisico_prog_min,porc_fisico_prog_max,porc_fisico_real_min,porc_fisico_real_max,monto_avance_prog_min,monto_avance_prog_max,monto_avance_real_min,monto_avance_real_max,diff_inicio_viable,diff_today_inicio,modalidad_ejec_Contrata,modalidad_ejec_Directo,modalidad_ejec_Otro,rubro_2_Agricultura,rubro_2_Educacion,rubro_2_Energia_minas,rubro_2_Otras_infra,rubro_2_Otro,rubro_2_Salud,rubro_2_Transporte_Comunicaciones,rubro_2_Vivienda_Construccion,sector_2_Gobierno_local,sector_2_Gobierno_regional,sector_2_Otro,funcion_2_Agro,funcion_2_Cultura_deporte,funcion_2_Educacion,funcion_2_Energia_ambiente,funcion_2_Otro,funcion_2_Salud_Saneamiento,funcion_2_Transporte,funcion_2_Vivienda,funcion_2_gestion_seguridad,funcion_2_social,nivel_2_GL,nivel_2_GN,nivel_2_GR,nivel_2_Otro,tipo_formato_2_Otro,tipo_formato_2_fur,tipo_formato_2_ioarr,tipo_formato_2_p_inversion,marco_2_Otro,marco_2_invierte,marco_2_snip,rg_diff_proc_monto_01.<=0,rg_diff_proc_monto_02.<=15%,rg_diff_proc_monto_03.<=50%,rg_diff_proc_monto_04.>50%,departamento_Otro,departamento_cusco,departamento_lambayeque,departamento_lima,departamento_loreto,departamento_piura
count,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0,15346.0
mean,0.001862,0.001598,0.001529,0.001851,0.004048,0.425222,0.001251,0.639906,0.929225,0.0001954907,0.048409,7.007782e-05,0.743183,6.989278e-05,9.7e-05,0.00268,0.000199,0.002686,0.000404,0.463713,0.230099,0.477844,0.472762,0.049394,0.072527,0.225336,0.038903,0.098397,0.037469,0.022481,0.262088,0.242799,0.812068,0.076632,0.111299,0.063404,0.067053,0.169947,0.061384,0.036035,0.222599,0.251727,0.051219,0.058712,0.01792,0.812068,0.089926,0.076632,0.021374,0.022807,0.078457,0.072788,0.825948,0.022807,0.297602,0.679591,0.343998,0.215757,0.296885,0.14336,0.019354,0.409618,0.140493,0.019614,0.135605,0.275316
std,0.015901,0.013737,0.011453,0.013391,0.022058,0.01219,0.008304,0.272929,0.044928,0.01398034,0.072416,0.008072114,0.408245,0.008072115,0.008072,0.017187,0.008799,0.017006,0.008548,0.067306,0.164093,0.499525,0.499274,0.216696,0.259367,0.417817,0.193369,0.29786,0.189914,0.148248,0.439785,0.428789,0.39067,0.266016,0.314513,0.243696,0.250122,0.375598,0.240041,0.186384,0.416005,0.434019,0.22045,0.235093,0.132665,0.39067,0.286085,0.266016,0.144631,0.149293,0.268898,0.259796,0.379166,0.149293,0.457218,0.466649,0.475056,0.41136,0.4569,0.350451,0.137769,0.491779,0.347509,0.138675,0.34238,0.446688
min,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
25%,0.000166,0.000157,0.000155,0.00017,0.000336,0.424191,0.000911,0.5,0.906722,4.445229e-18,0.006329,2.287171e-07,0.525925,1.551865e-07,1.6e-05,0.000108,3e-06,6.8e-05,1.1e-05,0.416907,0.075417,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
50%,0.000444,0.000414,0.000408,0.00046,0.001031,0.424269,0.001002,0.5,0.928609,4.445229e-18,0.025316,1.965731e-06,1.0,1.81119e-06,4.4e-05,0.000623,1.7e-05,0.000554,6.8e-05,0.439735,0.222369,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
75%,0.001259,0.001172,0.00116,0.001306,0.003207,0.424688,0.001239,1.0,0.952058,8.456777e-18,0.056962,6.770852e-06,1.0,6.913969e-06,4.6e-05,0.001639,5.7e-05,0.001817,0.000234,0.490535,0.377047,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,1.0,1.0,1.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0,0.0,1.0
max,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0


In [531]:
df_dummies.to_csv('/content/drive/MyDrive/data_dataton/matriz2.txt', index=False, sep='|')

In [532]:
df = spark.read.csv('/content/drive/MyDrive/data_dataton/matriz2.txt', header= True, sep='|')
df.show(5,False)

+------------+------------------+----------------+-------------+-----------------+-------------+----------+---------------+---------------------+-----------------+----------------------+------------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+---------------------+---------------------+-------------+------------------+-----------------+---------+-----------------------+----------------------+-------------------+-------------------+-----------------+---------------------+-------------------+------------+-------------+---------------------------------+-----------------------------+-----------------------+--------------------------+-------------+--------------+-------------------------+-------------------+--------------------------+--------------+---------------------------+--------------------+------------------+---------------------------+----------------+----------+----------+----------+------------+---

In [533]:
columns = df.columns
columns.remove('CODIGO_UNICO')
columns.remove('rg_diff_proc_monto_01.<=0')
columns.remove('rg_diff_proc_monto_02.<=15%')
columns.remove('rg_diff_proc_monto_03.<=50%')
columns.remove('rg_diff_proc_monto_04.>50%')

In [534]:
columns

['monto_verificacion',
 'monto_aprobacion',
 'monto_viable',
 'costo_actualizado',
 'dev_acumulado',
 'diff_monto',
 'diff_porc_monto',
 'diff_fecha_ini_eje_et',
 'diff_fecha_now_et',
 'diff_fecha_now_ini_eje',
 'cant_avances',
 'porc_fisico_prog_min',
 'porc_fisico_prog_max',
 'porc_fisico_real_min',
 'porc_fisico_real_max',
 'monto_avance_prog_min',
 'monto_avance_prog_max',
 'monto_avance_real_min',
 'monto_avance_real_max',
 'marca_reg_pmi',
 'diff_inicio_viable',
 'diff_today_inicio',
 'DIFF_PORC',
 'modalidad_ejec_Contrata',
 'modalidad_ejec_Directo',
 'modalidad_ejec_Otro',
 'rubro_2_Agricultura',
 'rubro_2_Educacion',
 'rubro_2_Energia_minas',
 'rubro_2_Otras_infra',
 'rubro_2_Otro',
 'rubro_2_Salud',
 'rubro_2_Transporte_Comunicaciones',
 'rubro_2_Vivienda_Construccion',
 'sector_2_Gobierno_local',
 'sector_2_Gobierno_regional',
 'sector_2_Otro',
 'funcion_2_Agro',
 'funcion_2_Cultura_deporte',
 'funcion_2_Educacion',
 'funcion_2_Energia_ambiente',
 'funcion_2_Otro',
 'funcion

In [535]:
for c in columns:
  df = df.withColumn(c, F.col(c).cast('Decimal(38,2)'))

In [536]:
columns.append('CODIGO_UNICO')
columns.append('DIFF_PORC')

In [537]:
df = df.select(columns)

In [538]:
df.printSchema()

root
 |-- monto_verificacion: decimal(38,2) (nullable = true)
 |-- monto_aprobacion: decimal(38,2) (nullable = true)
 |-- monto_viable: decimal(38,2) (nullable = true)
 |-- costo_actualizado: decimal(38,2) (nullable = true)
 |-- dev_acumulado: decimal(38,2) (nullable = true)
 |-- diff_monto: decimal(38,2) (nullable = true)
 |-- diff_porc_monto: decimal(38,2) (nullable = true)
 |-- diff_fecha_ini_eje_et: decimal(38,2) (nullable = true)
 |-- diff_fecha_now_et: decimal(38,2) (nullable = true)
 |-- diff_fecha_now_ini_eje: decimal(38,2) (nullable = true)
 |-- cant_avances: decimal(38,2) (nullable = true)
 |-- porc_fisico_prog_min: decimal(38,2) (nullable = true)
 |-- porc_fisico_prog_max: decimal(38,2) (nullable = true)
 |-- porc_fisico_real_min: decimal(38,2) (nullable = true)
 |-- porc_fisico_real_max: decimal(38,2) (nullable = true)
 |-- monto_avance_prog_min: decimal(38,2) (nullable = true)
 |-- monto_avance_prog_max: decimal(38,2) (nullable = true)
 |-- monto_avance_real_min: decimal(3

**K-MEANS**

In [579]:
vars_segm = [
 'monto_verificacion',
 'monto_aprobacion',
 'monto_viable',
 #'costo_actualizado',
 'dev_acumulado',
 #'diff_monto',
 #'diff_fecha_ini_eje_et',
 #'diff_inicio_viable',
 #'diff_today_inicio',
 #'diff_fecha_now_et',
 #'diff_fecha_now_ini_eje',
 'cant_avances',
 #'porc_fisico_prog_min',
 #'porc_fisico_prog_max',
 #'porc_fisico_real_min',
 'porc_fisico_real_max',
 #'monto_avance_prog_min',
 #'monto_avance_prog_max',
 #'monto_avance_real_min',
 'monto_avance_real_max',
 'marca_reg_pmi',
 'modalidad_ejec_Contrata',
 'modalidad_ejec_Directo',
 #'modalidad_ejec_Otro',
 #'rubro_2_Agricultura',
 #'rubro_2_Educacion',
 #'rubro_2_Energia_minas',
 #'rubro_2_Otras_infra',
 #'rubro_2_Otro',
 #'rubro_2_Salud',
 #'rubro_2_Transporte_Comunicaciones',
 #'rubro_2_Vivienda_Construccion',
 'sector_2_Gobierno_local',
 'sector_2_Gobierno_regional',
 #'sector_2_Otro',
 'funcion_2_Agro',
 'funcion_2_Cultura_deporte',
 'funcion_2_Educacion',
 'funcion_2_Energia_ambiente',
 #'funcion_2_Otro',
 'funcion_2_Salud_Saneamiento',
 'funcion_2_Transporte',
 'funcion_2_Vivienda',
 #'funcion_2_gestion_seguridad',
 #'funcion_2_social',
 'nivel_2_GL',
 'nivel_2_GN',
 'nivel_2_GR',
 #'nivel_2_Otro',
 #'tipo_formato_2_Otro',
 #'tipo_formato_2_fur',
 'tipo_formato_2_ioarr',
 'tipo_formato_2_p_inversion',
 #'marco_2_Otro',
 'marco_2_invierte',
 'marco_2_snip',
 'departamento_loreto',
 'departamento_piura',
 'departamento_cusco'
 ]


In [580]:
df_input_segm = df

In [581]:
vecAssembler = VectorAssembler( inputCols = vars_segm, outputCol='features')
df_segm = vecAssembler.transform(df_input_segm)
df_segm.count()

15346

In [582]:
n_min = 2
n_max = 4

In [583]:
def fx_perfilamiento(df, cols, col_seg=None):
  pd_perf_gl = df.agg(*[F.mean(var).alias(var) for var in cols]).toPandas()
  pd_perf_gl.index=['global']

  if col_seg is None:
    return((pd_perf_gl,))
  else:
    pd_perf_gl = pd_perf_gl.reset_index().rename(columns={'index':col_seg})
    pd_perf_seg = df.groupBy(col_seg).agg(*[F.mean(var).alias(var) for var in cols]).orderBy('cluster').toPandas()
    pd_perf = pd.concat([pd_perf_gl,pd_perf_seg], axis=0).reset_index(drop=True)
        
    pd_perf_norm = pd_perf.copy()
    for col in cols:
      pd_perf_norm[col] = pd_perf[col]/pd_perf_gl[col][0]
      pd_perf_norm[col] = pd_perf_norm[col].apply(lambda x: round(x,3))
    return(pd_perf,pd_perf_norm)

In [584]:
vecAssembler = VectorAssembler( inputCols = vars_segm, outputCol='features')
df_segm = vecAssembler.transform(df_input_segm)
df_segm.count()

15346

In [585]:
kmeans = KMeans(k=3, featuresCol='features', predictionCol='prediction', seed=24041964) # 20200825
kmeansModel = kmeans.fit(df_segm)

In [586]:
udf_cluster_name = F.udf(lambda x: 'c' + ('0'+str(x+1))[-2:], StringType())

In [587]:
df_segm = kmeansModel.transform(df_segm)
df_segm = df_segm.withColumn('cluster', udf_cluster_name(F.col('prediction')))

In [588]:
df_dist = df_segm.groupBy('cluster').count().orderBy('cluster').toPandas()
df_dist['%'] = 100*df_dist['count']/df_dist['count'].sum()
df_dist

Unnamed: 0,cluster,count,%
0,c01,3261,21.249837
1,c02,9727,63.384595
2,c03,2358,15.365568


In [589]:
%%time
pd_result, pd_result_norm = fx_perfilamiento(df=df_segm, cols=vars_segm, col_seg='cluster')

CPU times: user 186 ms, sys: 29.9 ms, total: 216 ms
Wall time: 7.17 s


In [590]:
pd_result_norm.head(10)

Unnamed: 0,cluster,monto_verificacion,monto_aprobacion,monto_viable,dev_acumulado,cant_avances,porc_fisico_real_max,monto_avance_real_max,marca_reg_pmi,modalidad_ejec_Contrata,modalidad_ejec_Directo,sector_2_Gobierno_local,sector_2_Gobierno_regional,funcion_2_Agro,funcion_2_Cultura_deporte,funcion_2_Educacion,funcion_2_Energia_ambiente,funcion_2_Salud_Saneamiento,funcion_2_Transporte,funcion_2_Vivienda,nivel_2_GL,nivel_2_GN,nivel_2_GR,tipo_formato_2_ioarr,tipo_formato_2_p_inversion,marco_2_invierte,marco_2_snip,departamento_loreto,departamento_piura,departamento_cusco
0,global,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0
1,c01,0.339,0.356,0.391,0.406,0.521,0.0,0.172,1.142,1.308,0.666,1.033,0.02,0.914,0.832,1.16,0.809,1.039,1.15,0.79,1.033,1.777,0.02,4.386,0.395,3.359,0.0,1.16,1.763,0.378
2,c02,0.305,0.258,0.278,0.631,1.104,0.0,0.975,0.92,0.83,1.26,1.231,0.0,0.89,1.173,0.867,1.1,1.028,1.106,0.975,1.231,0.0,0.0,0.0,1.211,0.362,1.313,0.877,0.777,1.319
3,c03,4.78,4.954,4.822,3.343,1.235,6.523,2.247,1.135,1.277,0.39,0.0,6.48,1.572,0.519,1.325,0.85,0.831,0.357,1.391,0.0,4.051,6.48,0.443,0.967,0.368,1.093,1.285,0.863,0.546


In [591]:
c = 'cluster'
#df2.groupBy(c).count().orderBy(c).show(100,False)
df_segm.groupBy(c).agg(F.count('CODIGO_UNICO').alias('count'), F.round(F.mean('DIFF_PORC'),2).alias('mean_diff_proc_monto')).orderBy(c).show(100,False)

+-------+-----+--------------------+
|cluster|count|mean_diff_proc_monto|
+-------+-----+--------------------+
|c01    |3261 |0.76                |
|c02    |9727 |0.26                |
|c03    |2358 |0.30                |
+-------+-----+--------------------+



In [592]:
df_segm.show(3,False)

+------------------+----------------+------------+-----------------+-------------+----------+---------------+---------------------+-----------------+----------------------+------------+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+---------------------+---------------------+-------------+------------------+-----------------+---------+-----------------------+----------------------+-------------------+-------------------+-----------------+---------------------+-------------------+------------+-------------+---------------------------------+-----------------------------+-----------------------+--------------------------+-------------+--------------+-------------------------+-------------------+--------------------------+--------------+---------------------------+--------------------+------------------+---------------------------+----------------+----------+----------+----------+------------+-----------------

In [593]:
df_segm_pd = df_segm.toPandas()

In [594]:
df_segm_pd.to_csv('/content/drive/MyDrive/data_dataton/cluster.txt', index=False, sep='|')