# Reto: Contribuyentes exonerados
Objetivo: Crear un modelo entrenado con un corpus de internet que permita identificar el buen uso del beneficio

#### Nota: Para poder correr de forma local el código de PySpark 
Es necesario configurar las variables de entorno del sistema **"SPARK_HOME: C:\ProgramData\Anaconda3\Lib\site-packages\pyspark"** y **"JAVA_HOME: C:\Program Files\Java\jdk1.8.0_144"**


### 01 Se procede a cargar los datos 

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark import SparkContext
from pyspark import SQLContext
sc = SparkContext("local", "App Name") 
spark = SQLContext(sc)#spark antes era sql.

In [3]:
import os
import pandas as pd
directorio = 'D:/usuarios/rlinares/_teen2/3.reto_exonerados_01/01.DB/'
archivo1 = 'data_exonerados_modelo.csv'
archivo2 = 'Educacion_Bi.csv'
fullname1 = os.path.join(directorio,archivo1)
fullname2 = os.path.join(directorio,archivo2)
data = pd.read_csv(fullname2, sep = ',',encoding='iso-8859-1')
data.head(20)

Unnamed: 0,categoria,tipo,fuente,ruc,detalle
0,otros,Club,cpe_oct,20144684631,ALQ. STAND FESTIVAL DE ANIVERSARIO
1,otros,Club,cpe_oct,20144684631,ALQ. STAND FESTIVAL DE ANIVERSARIO
2,otros,Club,cpe_oct,20144684631,COMISION FESTIVAL DE ANIVERSARIO
3,otros,Club,cpe_oct,20144684631,COMISION FESTIVAL DE ANIVERSARIO
4,otros,Club,cpe_oct,20144684631,ALQ. STAND FESTIVAL DE ANIVERSARIO
5,otros,Club,cpe_oct,20144684631,COMISION FESTIVAL DE ANIVERSARIO 2017
6,otros,Club,cpe_oct,20144684631,ALQ DE STAND - FESTIVAL DE ANIVERSARIO 2017
7,otros,Club,cpe_oct,20100093911,INGRESO NIÇï¿½O CON CONSUMO CHOSICA
8,otros,Club,cpe_oct,20100093911,INGRESO ADULTO CON CONSUMO CHOSICA
9,otros,Club,cpe_oct,20100093911,R.C.


In [4]:
#spark. antes era sql.
data = spark.createDataFrame(data) #Creando un dataframe de PySpark

In [5]:
from pyspark.sql.types import StringType, IntegerType, StructType, StructField
from pyspark.sql.functions import udf, col
import unicodedata

class Test():
    def __init__(self, df):
        self.df = df

    def clearAccents(self, columns):
        # Filtramos todas las columnas string de un dataFrame
        validCols = [c for (c, t) in filter(lambda t: t[1] == 'string', self.df.dtypes)]
        # Si es None o [] se proporciona con parametros de columnas
        if (columns == "*"): columns = validCols[:]

        # Recibe una cadena como argumento
        def remove_accents(inputStr):
            # Primero, normaliza la cadena:
            nfkdStr = unicodedata.normalize('NFKD', inputStr)
            # Mantenga los caracteres que no tienen otros caracteres combinados (es decir, acentos caracteres)
            withOutAccents = u"".join([c for c in nfkdStr if not unicodedata.combining(c)])
            return withOutAccents

        function = udf(lambda x: remove_accents(x) if x != None else x, StringType())
        exprs = [function(col(c)).alias(c) if (c in columns) and (c in validCols) else c for c in self.df.columns]
        self.df = self.df.select(*exprs)


In [6]:
from pyspark.sql import functions as F
def clean_column_row(value):
    if type(value)!=str:
        return value
    value=value.strip()
    #value=value.trim()
    value=value.lower()
    if len(value)==0:
        return None
    return value
    
clean_column=F.udf(clean_column_row,"string")

def depure_string(col):
    if type(col)!=str:
        return col
    return F.regexp_replace(F.regexp_replace(F.regexp_replace(col, "\?", ""), "[^a-z]", " "), "\\s+", " ")

In [7]:
Tdata = Test(data)
Tdata.clearAccents(columns="*")
data = Tdata.df

for col in data.columns:
    data=data.withColumn(col,clean_column(col))

data=data.withColumn('detalle',depure_string('detalle'))
data.head()

Row(categoria='otros', tipo='club', fuente='cpe_oct', ruc='20144684631', detalle='alq stand festival de aniversario')

In [8]:
print(data.count())
data = data.select('categoria','tipo','detalle')
data = data.dropDuplicates()
data = data.na.drop()

1877


In [9]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml import Pipeline

regexTokenizer = RegexTokenizer(inputCol="detalle", outputCol="words", pattern="\\W")

add_stopwords = ["http","https","r","f","g","kg","x", "n","s","m","00","ml","1"] 
stopwordsSpanish = StopWordsRemover.loadDefaultStopWords("spanish")

stopwordsRemover1 = StopWordsRemover(inputCol="words", outputCol="filtered1").setStopWords(add_stopwords)
stopwordsRemover2 = StopWordsRemover(inputCol="filtered1", outputCol="filtered").setStopWords(stopwordsSpanish)

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover1,stopwordsRemover2])
pipelineFit = pipeline.fit(data.filter(data['categoria'] == 'educacion'))
data_edu = pipelineFit.transform(data.filter(data['categoria'] == 'educacion'))

In [10]:
from pyspark.sql.functions import explode, desc
data_word = data_edu.select(explode(data_edu.filtered).alias('word'),'categoria')
data_word_edu = data_word.groupBy('categoria','word').count().orderBy(desc('count')).select('word')
data_word_edu.head()

Row(word='educacion')

In [11]:
data_wedu= data_word_edu.toPandas()
stopwords_educacion = data_wedu['word'].tolist() #Listado de palabras referidas a educacion

In [12]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover
# stop words
add_stopwords = stopwords_educacion # stop words sobre educacion
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover1, stopwordsRemover2])
pipelineFit = pipeline.fit(data.filter(data['categoria'] != 'educacion'))
data_sinedu = pipelineFit.transform(data.filter(data['categoria'] != 'educacion'))

In [13]:
data = data_edu.union(data_sinedu) # se une la data de sin educacion a con educacion
data.show()

+---------+-------------+--------------------+--------------------+--------------------+--------------------+
|categoria|         tipo|             detalle|               words|           filtered1|            filtered|
+---------+-------------+--------------------+--------------------+--------------------+--------------------+
|educacion|colegios_univ|pe direccin proye...|[pe, direccin, pr...|[pe, direccin, pr...|[pe, direccin, pr...|
|educacion|colegios_univ|pension ec ec kin...|[pension, ec, ec,...|[pension, ec, ec,...|[pension, ec, ec,...|
|educacion|    educacion|actividades de ev...|[actividades, de,...|[actividades, de,...|[actividades, eva...|
|educacion|    educacion|          didactica |         [didactica]|         [didactica]|         [didactica]|
|educacion|colegios_univ|     pri pension oct| [pri, pension, oct]| [pri, pension, oct]| [pri, pension, oct]|
|educacion|    educacion|constituyen los a...|[constituyen, los...|[constituyen, los...|[constituyen, asp...|
|educacion

In [14]:
import pyspark.sql.functions as f
import pandas as pd

data = data.withColumn('wordCount', f.size(f.col('filtered')))

wc = data.toPandas()
new_word = []
new_tipo = []
new_cat = []
for i in range(len(wc.filtered)):
    if wc.wordCount[i]<=5:
        new_word.append(wc.filtered[i])
        new_tipo.append(wc.tipo[i])
        new_cat.append(wc.categoria[i])
    else :
        n = round(wc.wordCount[i]/5,0)
        for j in range(n.astype(int)-1):
            new_word.append(wc.filtered[i][j*5:(j+1)*5])
            new_tipo.append(wc.tipo[i])
            new_cat.append(wc.categoria[i])
        new_word.append(wc.filtered[i][(n-1).astype(int)*5:wc.wordCount[i]])
        new_tipo.append(wc.tipo[i])
        new_cat.append(wc.categoria[i])
newdata = pd.DataFrame({
    'categoria': new_cat,
    'tipo': new_tipo,
    'filtered': new_word
})

In [15]:


ndata = spark.createDataFrame(newdata)
ndata = ndata.dropDuplicates()
ndata = ndata.na.drop()

In [16]:
ndata.show()

+---------+--------------------+-----------------+
|categoria|            filtered|             tipo|
+---------+--------------------+-----------------+
|educacion|[real, decreto, o...|        educacion|
|educacion|[motora, control,...|        educacion|
|educacion|[mismo, relacione...|        educacion|
|educacion|[proporciona, for...|        educacion|
|educacion|[mas, sencillos, ...|        educacion|
|educacion|[etc, fomentar, c...|        educacion|
|educacion|[diseno, curricul...|        educacion|
|    otros|      [canc, evento]|             club|
|    otros|[maqueta, pintura...|arte_vinosreserva|
|    otros|         [porc, pom]|     restaurantes|
|    otros|[lomito, fino, sa...|     restaurantes|
|    otros|    [obra, arte, aa]|arte_vinosreserva|
|educacion|[pasos, cambiar, ...|        educacion|
|educacion|[sistema, educati...|        educacion|
|educacion|[propuesta, curri...|        educacion|
|educacion|[edad, educacion,...|        educacion|
|educacion|[grandes, problem...

In [21]:
from pyspark.sql.types import StringType, IntegerType, StructType, StructField
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType, IntegerType, StructType, StructField
from pyspark.sql.functions import udf, col
import unicodedata
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover

class Test():
    def __init__(self, df):
        self.df = df

    def clearAccents(self, columns):
        validCols = [c for (c, t) in filter(lambda t: t[1] == 'string', self.df.dtypes)]

        if (columns == "*"): columns = validCols[:]

        def remove_accents(inputStr):
            nfkdStr = unicodedata.normalize('NFKD', inputStr)
            withOutAccents = u"".join([c for c in nfkdStr if not unicodedata.combining(c)])
            return withOutAccents

        function = udf(lambda x: remove_accents(x) if x != None else x, StringType())
        exprs = [function(col(c)).alias(c) if (c in columns) and (c in validCols) else c for c in self.df.columns]
        self.df = self.df.select(*exprs)
    
    
#datacpe = spark.sql('''select CONCAT(ruc_emi,'-',cod_tip_cpe,'-',num_serie,'-',num_cpe) as tipo, 'fe' as categoria, detalle  from externo.cpe_det_mayo a inner join preprocesamiento.exonerados_asociacion_cepe_educacion_np86 b on a.ruc_emi=b.ddp_numruc''') 
archivo3 = 'cpe_.csv'
datacpe_pandas = os.path.join(directorio,archivo3)
data = pd.read_csv(datacpe_pandas, sep = ',',encoding='iso-8859-1')
datacpe = spark.createDataFrame(data)
#agregado por ricardo.

tdatacpe = Test(datacpe)
tdatacpe.clearAccents(columns="*")
datacpe = tdatacpe.df
for col in datacpe.columns: datacpe=datacpe.withColumn(col,clean_column(col))
datacpe=datacpe.withColumn('detalle',depure_string('detalle'))
datacpe = datacpe.dropDuplicates()
#datacpe = datacpe.na.drop()


##stop words
regexTokenizer = RegexTokenizer(inputCol="detalle", outputCol="words", pattern="\\W")
add_stopwords = ["http","https","r","f","g","kg","x", "n","s","m","00","ml","1"] # standard stop words
stopwordsSpanish = StopWordsRemover.loadDefaultStopWords("spanish")

stopwordsRemover1 = StopWordsRemover(inputCol="words", outputCol="filtered1").setStopWords(add_stopwords)
stopwordsRemover2 = StopWordsRemover(inputCol="filtered1", outputCol="filtered").setStopWords(stopwordsSpanish)

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover1, stopwordsRemover2]) 
pipelineFit = pipeline.fit(datacpe) 
datacpeset = pipelineFit.transform(datacpe)

In [22]:
#datacpeset.head()
from pyspark.sql.functions import explode, desc

data_word = datacpeset.select(explode(datacpeset.filtered).alias('word'),'categoria')
data_word.groupBy('categoria','word').count().orderBy(desc('count')).show(100, truncate = False)

+---------+---------------+-----+
|categoria|word           |count|
+---------+---------------+-----+
|fe       |pe             |6325 |
|fe       |cctld          |3147 |
|fe       |anual          |2705 |
|fe       |renovacion     |2161 |
|fe       |com            |1384 |
|fe       |registro       |1004 |
|fe       |curso          |748  |
|fe       |induccion      |730  |
|fe       |servicio       |676  |
|fe       |anos           |479  |
|fe       |general        |457  |
|fe       |orientacion    |448  |
|fe       |ensenanza      |402  |
|fe       |cuota          |384  |
|fe       |inscripcion    |365  |
|fe       |peru           |344  |
|fe       |cade           |338  |
|fe       |universitario  |327  |
|fe       |mayo           |324  |
|fe       |may            |297  |
|fe       |pension        |267  |
|fe       |edicion        |237  |
|fe       |lenguaje       |224  |
|fe       |ocupacional    |211  |
|fe       |provincia      |204  |
|fe       |historia       |191  |
|fe       |i  

In [23]:
from pyspark.ml.feature import NGram

bigram = NGram(n=2, inputCol='filtered', outputCol='bigrams')
df_bigram = bigram.transform(datacpeset)

df_words = df_bigram.select(explode(df_bigram.bigrams).alias('word'))
df_words.groupBy('word').count().orderBy(desc('count')).show(100,truncate = False)

+-----------------------+-----+
|word                   |count|
+-----------------------+-----+
|cctld pe               |3147 |
|anual cctld            |2686 |
|renovacion anual       |1809 |
|com pe                 |1384 |
|registro anual         |877  |
|anos cctld             |461  |
|orientacion induccion  |406  |
|induccion general      |406  |
|curso orientacion      |406  |
|servicio ensenanza     |397  |
|renovacion anos        |342  |
|inscripcion cade       |329  |
|cade universitario     |327  |
|ensenanza may          |285  |
|curso induccion        |265  |
|universitario provincia|204  |
|cuota unica            |151  |
|antamina hombre        |141  |
|hombre nuevo           |141  |
|induccion antamina     |141  |
|edu pe                 |127  |
|registro anos          |119  |
|universitario lima     |119  |
|ensenanza jun          |112  |
|org pe                 |96   |
|i bimestre             |95   |
|obras completas        |90   |
|aos i                  |81   |
|cursos 

In [24]:
from pyspark.ml.feature import NGram

trigram = NGram(n=3, inputCol='filtered', outputCol='trigrams')
df_trigram = trigram.transform(datacpeset)

df_words = df_trigram.select(explode(df_trigram.trigrams).alias('word'))
df_words.groupBy('word').count().orderBy(desc('count')).show(100,truncate = False)

+--------------------------------+-----+
|word                            |count|
+--------------------------------+-----+
|anual cctld pe                  |2686 |
|renovacion anual cctld          |1809 |
|registro anual cctld            |877  |
|anos cctld pe                   |461  |
|curso orientacion induccion     |406  |
|orientacion induccion general   |406  |
|renovacion anos cctld           |342  |
|inscripcion cade universitario  |323  |
|servicio ensenanza may          |285  |
|cade universitario provincia    |204  |
|antamina hombre nuevo           |141  |
|induccion antamina hombre       |141  |
|curso induccion antamina        |141  |
|registro anos cctld             |119  |
|cade universitario lima         |119  |
|servicio ensenanza jun          |112  |
|aos i bimestre                  |81   |
|curso induccion buenaventura    |60   |
|induccion hudbay anexo          |55   |
|curso induccion hudbay          |55   |
|cita nro fecha                  |54   |
|cursos internos

In [25]:
from pyspark.ml.feature import Word2Vec
word2Vec = Word2Vec(vectorSize=50, minCount=10, maxIter=50, inputCol='filtered', outputCol='featuresW2V')
word2Vec_model = word2Vec.fit(datacpeset)

In [26]:
word2Vec_model.findSynonyms("pension",30).show()

+----------+-------------------+
|      word|         similarity|
+----------+-------------------+
|    school| 0.6379492282867432|
| semestral| 0.5397517681121826|
|         p| 0.5149340629577637|
|        ec| 0.5117691159248352|
|    bloque| 0.4828224778175354|
|    kinder| 0.4563576281070709|
|    codigo| 0.4424918591976166|
|elementary|0.42747604846954346|
|    manana|0.42707183957099915|
|     ciclo|0.42615100741386414|
|    middle|0.41408154368400574|
| conflicto|0.40872353315353394|
|      high|0.39488163590431213|
|        er| 0.3848575949668884|
|  sistemas| 0.3771640360355377|
|     cuota|0.37505635619163513|
|   bolivia|0.37245815992355347|
| republica|0.37073081731796265|
|       hrs| 0.3691304326057434|
|   ingreso|0.36626991629600525|
+----------+-------------------+
only showing top 20 rows



In [35]:
#modificado
data_model = datacpeset.select('categoria','tipo','filtered').union(ndata.select('categoria','tipo','filtered'))



In [36]:
data_model.groupBy('categoria').count().show()

+---------+-----+
|categoria|count|
+---------+-----+
|categoria|    1|
|       fe| 8857|
|    otros| 1023|
|educacion| 1865|
+---------+-----+



In [37]:
data_model.show(truncate=False)

+---------+-------------------------+-------------------------------------------------------------------------------+
|categoria|tipo                     |filtered                                                                       |
+---------+-------------------------+-------------------------------------------------------------------------------+
|fe       |20111451592-01-f003-86911|[renovacion, anual, cctld, pe, planotec, com, pe]                              |
|fe       |20147829583-01-f01a-2104 |[lenguaje]                                                                     |
|fe       |20147829583-01-f05a-856  |[aprendizaje]                                                                  |
|fe       |20147829583-01-f01b-2388 |[ocupacional]                                                                  |
|fe       |20111451592-01-f003-84968|[renovacion, anual, cctld, pe, diamanteperu, com, pe]                          |
|fe       |20111451592-01-f003-85014|[renovacion, anual,

In [38]:
from pyspark.sql.functions import lit

df = spark.createDataFrame(
    [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))

df_with_x4 = df.withColumn("x4", lit(0))
df_with_x4.show()

+---+---+-----+---+
| x1| x2|   x3| x4|
+---+---+-----+---+
|  1|  a| 23.0|  0|
|  3|  B|-23.0|  0|
+---+---+-----+---+



In [39]:
from pyspark.sql.functions import exp

df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))
df_with_x5.show()


+---+---+-----+---+--------------------+
| x1| x2|   x3| x4|                  x5|
+---+---+-----+---+--------------------+
|  1|  a| 23.0|  0| 9.744803446248903E9|
|  3|  B|-23.0|  0|1.026187963170189...|
+---+---+-----+---+--------------------+



In [40]:
lookup = spark.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
df_with_x6 = (df_with_x5
    .join(lookup, df_with_x5.x1 == lookup.k, "leftouter")
    .drop("k")
    .withColumnRenamed("v", "x6"))
    
df_with_x6.show()

+---+---+-----+---+--------------------+----+
| x1| x2|   x3| x4|                  x5|  x6|
+---+---+-----+---+--------------------+----+
|  1|  a| 23.0|  0| 9.744803446248903E9| foo|
|  3|  B|-23.0|  0|1.026187963170189...|null|
+---+---+-----+---+--------------------+----+



In [41]:
import pyspark.sql.functions as f
#|    otros| 1023|
#|educacion| 1865|
data_model_n = data_model.select('categoria','tipo','filtered',f.when(f.array_contains(data_model.filtered, "cctld"),'otros').when(data_model.categoria == "fe","fe").when(data_model.categoria == "otros","otros").when(data_model.categoria == "educacion","educacion").otherwise(0).alias('categoria_n'))



In [46]:
#modificado
#data_model_n.write.format("parquet").option("encoding","iso-8859-1").option("header",True).saveAsTable("Analytics4.tt_ExoneradosEducacion_Bi_al05")
#data_model_n.groupBy('categoria_n').count().show()
data_model_n.toPandas().to_csv('data_model_n_.csv')


In [47]:
#modificado
#data_model_n = spark.sql('select * from Analytics4.tt_ExoneradosEducacion_Bi_al05')

In [48]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)

from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[countVectors])
pipelineFit = pipeline.fit(data_model_n)
data_model_n = pipelineFit.transform(data_model_n)

In [49]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

dataset_otros_educ = data_model_n.filter(data_model_n.categoria_n != 'fe')
dataset_facturas= data_model_n.filter(data_model_n.categoria_n == 'fe')

label_stringIdx = StringIndexer(inputCol = "categoria_n", outputCol = "label")

from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[label_stringIdx])
pipelineFit = pipeline.fit(dataset_otros_educ)
dataset_otros_educ = pipelineFit.transform(dataset_otros_educ)

pipelineFit = pipeline.fit(dataset_facturas)
dataset_facturas = pipelineFit.transform(dataset_facturas)

In [50]:
print('dataset_otros_educ')
print(dataset_otros_educ.groupBy('categoria_n','label').count().show())
print('dataset_facturas')
print(dataset_facturas.groupBy('categoria_n','label').count().show())

dataset_otros_educ
+-----------+-----+-----+
|categoria_n|label|count|
+-----------+-----+-----+
|          0|  2.0|    1|
|      otros|  0.0| 4170|
|  educacion|  1.0| 1865|
+-----------+-----+-----+

None
dataset_facturas
+-----------+-----+-----+
|categoria_n|label|count|
+-----------+-----+-----+
|         fe|  0.0| 5710|
+-----------+-----+-----+

None


In [52]:
#dataset_otros_educ = dataset #.filter(dataset.categoria_n != 'fe')

In [54]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [55]:
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
lrModel = lr.fit(dataset_otros_educ)

In [56]:
print("Coefficients: \n" + str(lrModel.coefficientMatrix))
print("Intercept: " + str(lrModel.interceptVector))

Coefficients: 
3 X 1325 CSRMatrix
(0,0) 0.0925
(0,1) 0.1855
(1,0) -0.0915
(1,1) -0.1835
Intercept: [2.63967301301,2.18865751406,-4.82833052707]


In [57]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(dataset_otros_educ)
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(dataset_otros_educ)

#labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(dataset_facturas)
#featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(dataset_facturas)

In [58]:
(trainingData, testData) = dataset_otros_educ.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(40)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))

treeModel = model.stages[2]
# summary only
print(treeModel)

+----------+------------+--------------------+
|prediction|indexedLabel|            features|
+----------+------------+--------------------+
|       0.0|         0.0|(1325,[0,1,2,3,4]...|
|       0.0|         0.0|(1325,[0,1,2,3],[...|
|       0.0|         0.0|(1325,[0,1,2,3,4]...|
|       0.0|         0.0|(1325,[0,1,3,9],[...|
|       0.0|         0.0|(1325,[0,1,2,3,4]...|
|       0.0|         0.0|(1325,[0,1,2,3],[...|
|       0.0|         0.0|(1325,[0,1,3,4,9]...|
|       0.0|         0.0|(1325,[0,1,2,4,5]...|
|       0.0|         0.0|(1325,[0,1,2,3,4]...|
|       0.0|         0.0|(1325,[0,1,2,3,4]...|
|       0.0|         0.0|(1325,[0,1,2,3,4]...|
|       0.0|         0.0|(1325,[0,1,2,3,4]...|
|       0.0|         0.0|(1325,[0,1,2,5],[...|
|       0.0|         0.0|(1325,[0,1,2,3],[...|
|       0.0|         0.0|(1325,[0,1,2,3,4]...|
|       0.0|         0.0|(1325,[0,1,2,3],[...|
|       0.0|         0.0|(1325,[0,1,2,5,11...|
|       0.0|         0.0|(1325,[0,1,2,3],[...|
|       0.0| 

In [59]:
predictions.groupBy('prediction','label').count().show()

+----------+-----+-----+
|prediction|label|count|
+----------+-----+-----+
|       1.0|  1.0|  528|
|       1.0|  0.0|  262|
|       0.0|  0.0|  962|
+----------+-----+-----+



In [160]:
data_factura = data_model_n.filter(data_model_n.categoria_n == 'fe')

In [161]:
from pyspark.ml import Pipeline

#modificado
labelIndexer = StringIndexer(inputCol="categoria_n", outputCol="indexedLabel",handleInvalid="skip")

pipeline = Pipeline(stages=[labelIndexer])
pipelineFit = pipeline.fit(data_factura)
data_factura = pipelineFit.transform(data_factura)


In [162]:
predictions = model.transform(data_factura)

# Select example rows to display.
#predictions.groupBy('prediction').count().show()

In [147]:
predictions

DataFrame[categoria: string, tipo: string, filtered: array<string>, categoria_n: string, features: vector, indexedLabel: double, indexedFeatures: vector, rawPrediction: vector, probability: vector, prediction: double]

In [163]:
predictions.show(3)

Py4JJavaError: An error occurred while calling o4111.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 664.0 failed 1 times, most recent failure: Lost task 0.0 in stage 664.0 (TID 29079, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$11: (vector) => vector)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: VectorIndexer encountered invalid value 1.0 on feature index 23. To handle or skip invalid value, try setting VectorIndexer.handleInvalid.
	at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:400)
	at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:356)
	at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:431)
	at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:431)
	... 19 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:363)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3272)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2698)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
	at sun.reflect.GeneratedMethodAccessor200.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$11: (vector) => vector)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.apache.spark.SparkException: VectorIndexer encountered invalid value 1.0 on feature index 23. To handle or skip invalid value, try setting VectorIndexer.handleInvalid.
	at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:400)
	at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:356)
	at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:431)
	at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:431)
	... 19 more


In [127]:
predictions.show(1)

+---------+--------------------+----------+-----------+-----------------+------------+-----------------+------------------+--------------------+----------+
|categoria|                tipo|  filtered|categoria_n|         features|indexedLabel|  indexedFeatures|     rawPrediction|         probability|prediction|
+---------+--------------------+----------+-----------+-----------------+------------+-----------------+------------------+--------------------+----------+
|       fe|20147829583-01-f0...|[lenguaje]|         fe|(1325,[22],[1.0])|         0.0|(1325,[22],[1.0])|[634.0,1337.0,1.0]|[0.32150101419878...|       1.0|
+---------+--------------------+----------+-----------+-----------------+------------+-----------------+------------------+--------------------+----------+
only showing top 1 row



In [148]:
#predictions.toPandas().to_csv('predictions_.csv')
type(predictions)
predictions.head(5)

Py4JJavaError: An error occurred while calling o3732.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 657.0 failed 1 times, most recent failure: Lost task 0.0 in stage 657.0 (TID 28274, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$11: (vector) => vector)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: VectorIndexer encountered invalid value 1.0 on feature index 23. To handle or skip invalid value, try setting VectorIndexer.handleInvalid.
	at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:400)
	at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:356)
	at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:431)
	at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:431)
	... 19 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:363)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:3195)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3192)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3192)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:3225)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3192)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$11: (vector) => vector)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.apache.spark.SparkException: VectorIndexer encountered invalid value 1.0 on feature index 23. To handle or skip invalid value, try setting VectorIndexer.handleInvalid.
	at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:400)
	at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:356)
	at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:431)
	at org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:431)
	... 19 more


AttributeError: 'SQLContext' object has no attribute 'version'

In [165]:
sc.version

'2.3.0'