## Modelo Para Predecir Sector a partir de ref1

In [2]:
import sys
from pyspark.sql import *
import matplotlib as plt
import numpy as np
import pandas as pd
from optimus.optimus import Optimus
from datetime import datetime
from pyspark.sql.functions import col, asc
from pyspark.sql.types import *
from pyspark import SparkContext
from pyspark.sql import *

In [3]:
op =Optimus()
tr = op.load.csv("/FileStore/tables/transacs_out.csv", header=None, encoding="Latin1")
tr = (tr
      .withColumnRenamed("_c0", "id_trn_ach")
      .withColumnRenamed("_c1", "id_cliente")
      .withColumnRenamed("_c2", "fechahora")
      .withColumnRenamed("_c3", "valor_trx")
      .withColumnRenamed("_c4", "ref1")
      .withColumnRenamed("_c5", "sector")
      .withColumnRenamed("_c6", "subsector")
      .withColumnRenamed("_c7", "descripcion"))
tr.dtypes

In [4]:
drop_list = ['id_cliente', 'fechahora', 'valor_trx', 'subsector', 'descripcion']
data = tr.select([column for column in tr.columns if column not in drop_list])
data.show(5)

## Matriz con datos de ref1 para predecir sector

In [6]:
#display(data.filter("ref1!=''"))
data1=data.cols.trim("*")
data1=data1.filter("ref1!=''")
display(data1)


id_trn_ach,ref1,sector
230435642,CC,
222356110,Referencia: Contrato: Valor: CC,
309137749,CC,
324614737,CC,
235344690,MEDICINA PREPAGADA COLSANITAS CE,
320049316,RECAUDO COLSANITAS CC,
230519178,CC,
241307506,Pago de la Planilla Cesantias CEDULA DE CIUDADANIA,SERVICIOS FINANCIEROS
316193109,CC,
282076350,Pago de la factura # CONJUNTO RESIDENCIAL PIETRA SANTA PH IDC,


In [7]:
data1= data1.filter("sector is null")
#display(data1.filter("sector is null"))
display(data1)

id_trn_ach,ref1,sector
230435642,CC,
222356110,Referencia: Contrato: Valor: CC,
309137749,CC,
324614737,CC,
235344690,MEDICINA PREPAGADA COLSANITAS CE,
320049316,RECAUDO COLSANITAS CC,
230519178,CC,
316193109,CC,
282076350,Pago de la factura # CONJUNTO RESIDENCIAL PIETRA SANTA PH IDC,
300800637,Pago de la factura # CONJUNTO RESIDENCIAL PIETRA SANTA PH IDC,


##Entrenando el modelo

In [9]:
data=data.dropna()

In [10]:
data.groupBy("sector") \
    .count() \
    .orderBy(col("count").desc()) \
    .show(5)
#sorted(gdf.agg({"*":"count"}).collect())

In [11]:
data.groupBy("ref1") \
    .count() \
    .orderBy(col("count").desc()) \
    .show(5)

In [12]:
display(data)

id_trn_ach,ref1,sector
241307506,Pago de la Planilla Cesantias CEDULA DE CIUDADANIA,SERVICIOS FINANCIEROS
359125394,CPV,SERVICIOS FINANCIEROS
285847659,Pago Factura Asociado BancoomevaPFA,SERVICIOS FINANCIEROS
319249942,Presentación y Pago del Impuesto Predial Unificado AAAXDFT,GOBIERNO
335337578,EDIF MIRABELL PH BOG,SERVICIOS FINANCIEROS
365833134,EDIF MIRABELL PH BOG,SERVICIOS FINANCIEROS
346095462,Pago de Saldo,MEDIOS DE COMUNICACION
228320905,Referencia de pago express No |,MEDIOS DE COMUNICACION
249117129,Referencia de pago express No |,MEDIOS DE COMUNICACION
278487871,Referencia pago express No | CC,MEDIOS DE COMUNICACION


##Procedimiento

Se va a utilizar el Modelo Pipeline: Spark Machine Learning Peipelines Api, el cual es similar a Scikit _ Learn. Este pipeline incluye tres pasos

#### 1. Tokenización: 
Utilizando la expresion regular `regexTokenizer`
####2. Quitar palabras Vacías
Se utiliza la expresion `stopwordsRemover` para quitar pronombres, articulos, preposiciones
####3. Contar Vectores
Mediante la expresión `countVectors`

In [15]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression

# tokenizar la expresión
regexTokenizer = RegexTokenizer(inputCol="ref1", outputCol="words", pattern="\\W")

# Quitar palabras vacias
add_stopwords = ["http","https","amp","rt","t","c","the"] 
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)

# Conteo de palabras
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)

Ahora utilizamos la expresión `StringIndexer` para codificar una columna de etiquetas en cadena en una columna de indices en cadena. Los indices se encuentran ordenados por frecuencias de etiqueta [0,numLabels], cuya etiqueta más frecuente tiene el índice 0.
La etiqueta de Sector se codificara con el fin de etiquetar indices , en este caso la etiqueta más frecuente es MEDIOS DE COMUNIOCACIÓN, por tanto se indexo como 0

In [17]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
label_stringIdx = StringIndexer(inputCol = "sector", outputCol = "label")
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

# ajustar el pipeline a las columnas de entrenamiento
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
dataset.show(5)

### Partición de los datos
Se realiza la partición de los datos, donde se dispone del 70% para entrenamiento y 30% para prueba

In [19]:
# set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

### Entrenamiento del Modelo
Se entreno el modelo de regresión logistica utilizando caracteristicas de un vector de conteo, el cual realizara prediciiones y registros en el conjunto de pruebas, luego se observa las 10 mejores predicciones con la probabilidad más alta

In [21]:
#Logistic Regression using Count Vector Features

lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)
predictions = lrModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("id_trn_ach","ref1","sector","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

In [22]:
display(predictions)

id_trn_ach,ref1,sector,words,filtered,features,label,rawPrediction,probability,prediction
215122439,Obligaciones seleccionadas por el usuario (ref: ) CC,GOBIERNO,"List(obligaciones, seleccionadas, por, el, usuario, ref, cc)","List(obligaciones, seleccionadas, por, el, usuario, ref, cc)","List(0, 9213, List(2, 14, 25, 78, 91, 148, 216), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))",2.0,"List(1, 10, List(), List(2.772971257714097, 1.9292877644396234, 3.9453614796352356, 1.1447849576150724, 1.480258982548745, -0.5047783341987545, -2.1227208655254914, -2.4274166807716346, -2.516254555452494, -3.7014940060070733))","List(1, 10, List(), List(0.19275506988881447, 0.08290843630064268, 0.6225416519999775, 0.037834977858098855, 0.05291612113846975, 0.007269377944087386, 0.001441563360631867, 0.001062933327730203, 9.725775165377725E-4, 2.972906650096677E-4))",2.0
215123596,CPV,SERVICIOS FINANCIEROS,List(cpv),List(cpv),"List(0, 9213, List(13), List(1.0))",1.0,"List(1, 10, List(), List(2.707435116390643, 4.425638154388275, 2.1311655822453117, 1.425778128857572, 0.4665611953784687, -0.43543035456731255, -2.0994271228947863, -2.416956892239868, -2.5059419215054044, -3.698821886052687))","List(1, 10, List(), List(0.13184241659648624, 0.734955853216441, 0.07409424482558839, 0.03659641951735145, 0.01402348570908886, 0.005690180243072984, 0.0010776094940865828, 7.844404539549308E-4, 7.176526207398034E-4, 2.1769732318999596E-4))",1.0
215123613,CONJ RES GUALI PH,SERVICIOS FINANCIEROS,"List(conj, res, guali, ph)","List(conj, res, guali, ph)","List(0, 9213, List(64, 76, 99, 506), List(1.0, 1.0, 1.0, 1.0))",1.0,"List(1, 10, List(), List(2.426723736962087, 5.055525867313431, 1.9441632061927026, 1.3048693887059564, 0.4646012418904374, -0.4618596953546766, -2.1043160893635724, -2.4205484903226835, -2.5093279998801523, -3.699831166143529))","List(1, 10, List(), List(0.06240546275433719, 0.8647625374246434, 0.038516714306496426, 0.020323918982519402, 0.008771697283997648, 0.0034731757716057713, 6.720738755534151E-4, 4.898679511713473E-4, 4.482523512290557E-4, 1.3629929844630607E-4))",1.0
215123633,Referencia de pago express No | CC,MEDIOS DE COMUNICACION,"List(referencia, de, pago, express, no, cc)","List(referencia, de, pago, express, no, cc)","List(0, 9213, List(0, 1, 2, 4, 6, 17), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0))",0.0,"List(1, 10, List(), List(5.276380183576645, 2.164739725866507, 1.6324192723511397, 1.5790735951612822, 0.5128428350973939, -0.4304398539808395, -2.0973542878610267, -2.42192799811386, -2.5155280260893553, -3.7002054460063345))","List(1, 10, List(), List(0.9017573907661409, 0.0401533172365928, 0.023579663031026356, 0.022354752255984375, 0.00769682700990082, 0.0029967413397441454, 5.65871105414968E-4, 4.0903168114999033E-4, 3.724834417269445E-4, 1.1392213231883343E-4))",0.0
215125608,Recarga express,MEDIOS DE COMUNICACION,"List(recarga, express)","List(recarga, express)","List(0, 9213, List(4, 10), List(1.0, 1.0))",0.0,"List(1, 10, List(), List(4.02962481475416, 3.272833476492395, 2.062518335750137, 1.3516157993859423, 0.4575634497424687, -0.44684460491824357, -2.1018411565687383, -2.418699746749999, -2.5074586238807166, -3.699311744004273))","List(1, 10, List(), List(0.580434683465461, 0.2723222040155586, 0.08118015316049301, 0.03987574569409268, 0.016308980582584492, 0.006601572362614341, 0.0012615123837884609, 9.189282033683964E-4, 8.40880130271514E-4, 2.5534000176750466E-4))",0.0
215126063,Referencia de pago express No | CC,MEDIOS DE COMUNICACION,"List(referencia, de, pago, express, no, cc)","List(referencia, de, pago, express, no, cc)","List(0, 9213, List(0, 1, 2, 4, 6, 17), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0))",0.0,"List(1, 10, List(), List(5.276380183576645, 2.164739725866507, 1.6324192723511397, 1.5790735951612822, 0.5128428350973939, -0.4304398539808395, -2.0973542878610267, -2.42192799811386, -2.5155280260893553, -3.7002054460063345))","List(1, 10, List(), List(0.9017573907661409, 0.0401533172365928, 0.023579663031026356, 0.022354752255984375, 0.00769682700990082, 0.0029967413397441454, 5.65871105414968E-4, 4.0903168114999033E-4, 3.724834417269445E-4, 1.1392213231883343E-4))",0.0
215126232,Transaccion_para_generacion_de_certificados_de_tradicion_y_libertad IDC,GOBIERNO,"List(transaccion_para_generacion_de_certificados_de_tradicion_y_libertad, idc)","List(transaccion_para_generacion_de_certificados_de_tradicion_y_libertad, idc)","List(0, 9213, List(11, 12), List(1.0, 1.0))",2.0,"List(1, 10, List(), List(2.4277513507745105, 2.5595849023393, 4.380312551326262, 1.3078566952618689, 0.455237351289878, -0.4286469971824673, -2.1041168849744296, -2.419841878306576, -2.483644879141608, -3.694492211386586))","List(1, 10, List(), List(0.10268320747778167, 0.11715316071869965, 0.723579044959977, 0.03350698543737467, 0.01428392235107426, 0.005901757837923944, 0.0011049282020915826, 8.057799118729408E-4, 7.559744975347807E-4, 2.2523860566950899E-4))",2.0
215126447,URB ARROYO DE LOS BERNAL MED,SERVICIOS FINANCIEROS,"List(urb, arroyo, de, los, bernal, med)","List(urb, arroyo, de, los, bernal, med)","List(0, 9213, List(0, 54, 174, 199, 613, 1787), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0))",1.0,"List(1, 10, List(), List(2.2035129795438944, 4.893351293722485, 1.8820028820998635, 1.2874942681406403, 0.8334349269140129, -0.4180472835004228, -2.05364516753969, -2.4095196603019637, -2.516496654379898, -3.702087584700488))","List(1, 10, List(), List(0.05808227574619468, 0.8555109278445117, 0.04211274621271874, 0.02323923098531679, 0.014757958551340785, 0.004221963438636218, 8.225898235212597E-4, 5.76273971076511E-4, 5.178088702218012E-4, 1.5822455646164206E-4))",1.0
215127742,CPV,SERVICIOS FINANCIEROS,List(cpv),List(cpv),"List(0, 9213, List(13), List(1.0))",1.0,"List(1, 10, List(), List(2.707435116390643, 4.425638154388275, 2.1311655822453117, 1.425778128857572, 0.4665611953784687, -0.43543035456731255, -2.0994271228947863, -2.416956892239868, -2.5059419215054044, -3.698821886052687))","List(1, 10, List(), List(0.13184241659648624, 0.734955853216441, 0.07409424482558839, 0.03659641951735145, 0.01402348570908886, 0.005690180243072984, 0.0010776094940865828, 7.844404539549308E-4, 7.176526207398034E-4, 2.1769732318999596E-4))",1.0
215128243,Transaccion_para_generacion_de_certificados_de_tradicion_y_libertad IDC,GOBIERNO,"List(transaccion_para_generacion_de_certificados_de_tradicion_y_libertad, idc)","List(transaccion_para_generacion_de_certificados_de_tradicion_y_libertad, idc)","List(0, 9213, List(11, 12), List(1.0, 1.0))",2.0,"List(1, 10, List(), List(2.4277513507745105, 2.5595849023393, 4.380312551326262, 1.3078566952618689, 0.455237351289878, -0.4286469971824673, -2.1041168849744296, -2.419841878306576, -2.483644879141608, -3.694492211386586))","List(1, 10, List(), List(0.10268320747778167, 0.11715316071869965, 0.723579044959977, 0.03350698543737467, 0.01428392235107426, 0.005901757837923944, 0.0011049282020915826, 8.057799118729408E-4, 7.559744975347807E-4, 2.2523860566950899E-4))",2.0


##Evaluación del Modelo

In [24]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

#### Predicción en la matriz data1, es decir para los datos cuyo sector no esta definido pero tenemos  ref1

In [26]:
display(data1)

id_trn_ach,ref1,sector
230435642,CC,
222356110,Referencia: Contrato: Valor: CC,
309137749,CC,
324614737,CC,
235344690,MEDICINA PREPAGADA COLSANITAS CE,
320049316,RECAUDO COLSANITAS CC,
230519178,CC,
316193109,CC,
282076350,Pago de la factura # CONJUNTO RESIDENCIAL PIETRA SANTA PH IDC,
300800637,Pago de la factura # CONJUNTO RESIDENCIAL PIETRA SANTA PH IDC,


In [27]:
label_stringIdx = StringIndexer(inputCol = "sector", outputCol = "label")

pipeline1 = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors])
pipelineFit1 = pipeline1.fit(data1)
dataset1 = pipelineFit1.transform(data1)

In [28]:
dataset1 = dataset1.drop("sector")

In [29]:
dataset1.show()

In [30]:
predictions = lrModel.transform(dataset1)


In [31]:
predictions.show()

In [32]:
predictions.filter(predictions['prediction'] == 0) \
    .select("id_trn_ach","ref1","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)