In [0]:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, IntegerType, DoubleType
import pyspark.pandas as pd
pd.set_option('display.max_rows', 20)
from pyspark.ml.feature import VectorAssembler

In [0]:
dfRaw = spark.read.format("csv") \
    .option("header", "true") \
    .option("delimiter", ",") \
    .option("encoding", "ISO-8859-1") \
    .schema(
        StructType([
            StructField("user_id", IntegerType(), True),
            StructField("age", IntegerType(), True),
            StructField("gender", StringType(), True),
            StructField("country", StringType(), True),
            StructField("traffic_source", StringType(), True),
            StructField("antiguedad_cliente", IntegerType(), True),
            StructField("total_ordenes", IntegerType(), True),
            StructField("total_gastado", DoubleType(), True),
            StructField("ticket_promedio", DoubleType(), True),
            StructField("dias_desde_ultima_compra", IntegerType(), True),
            StructField("num_categorias_compradas", IntegerType(), True),
            StructField("num_compras_0_30", IntegerType(), True),
            StructField("uso_cupon", IntegerType(), True),
            StructField("apertura_email_rate", DoubleType(), True),
            StructField("visitas_web_mes", IntegerType(), True),
            StructField("dispositivo_preferido", StringType(), True),
            StructField("ratio_devoluciones", DoubleType(), True),
            StructField("EstadoCliente", StringType(), True)
        ])
    ) \
    .load("dbfs:///FileStore/Proyecto_Ecommerce.csv")
dfRaw.show()

+-------+---+------+-------------+--------------+------------------+-------------+-------------+---------------+------------------------+------------------------+----------------+---------+-------------------+---------------+---------------------+------------------+-------------+
|user_id|age|gender|      country|traffic_source|antiguedad_cliente|total_ordenes|total_gastado|ticket_promedio|dias_desde_ultima_compra|num_categorias_compradas|num_compras_0_30|uso_cupon|apertura_email_rate|visitas_web_mes|dispositivo_preferido|ratio_devoluciones|EstadoCliente|
+-------+---+------+-------------+--------------+------------------+-------------+-------------+---------------+------------------------+------------------------+----------------+---------+-------------------+---------------+---------------------+------------------+-------------+
|  43944| 68|     F|        Japan|        Search|              2220|            4|      1188.86|         297.22|                     561|                    

In [0]:
from pyspark.sql.functions import col, sum, when

dfNulls = dfRaw.select([
    sum(when(col(c).isNull() | (col(c) == ""), 1).otherwise(0)).alias(c)
    for c in dfRaw.columns
])

dfNulls.show()

+-------+---+------+-------+--------------+------------------+-------------+-------------+---------------+------------------------+------------------------+----------------+---------+-------------------+---------------+---------------------+------------------+-------------+
|user_id|age|gender|country|traffic_source|antiguedad_cliente|total_ordenes|total_gastado|ticket_promedio|dias_desde_ultima_compra|num_categorias_compradas|num_compras_0_30|uso_cupon|apertura_email_rate|visitas_web_mes|dispositivo_preferido|ratio_devoluciones|EstadoCliente|
+-------+---+------+-------+--------------+------------------+-------------+-------------+---------------+------------------------+------------------------+----------------+---------+-------------------+---------------+---------------------+------------------+-------------+
|      0|  0|     0|      0|             0|                 0|            0|            0|              0|                       0|                       0|               0|  

In [0]:
from pyspark.ml.feature import StringIndexer

In [0]:
indexadorGender = StringIndexer(inputCol="gender", outputCol="gender_indexed").fit(dfRaw)
indexadorGender.labels

Out[14]: ['M', 'F']

In [0]:
indexadorCountry = StringIndexer(inputCol="country", outputCol="country_indexed").fit(dfRaw)
indexadorCountry.labels

Out[15]: ['China',
 'United States',
 'Brasil',
 'South Korea',
 'United Kingdom',
 'France',
 'Germany',
 'Spain',
 'Japan',
 'Australia',
 'Belgium',
 'Poland',
 'Colombia',
 'Austria',
 'Deutschland',
 'EspaÃ±a']

In [0]:
indexadorTraffic_source = StringIndexer(inputCol="traffic_source", outputCol="traffic_source_indexed").fit(dfRaw)
indexadorTraffic_source.labels

Out[16]: ['Search', 'Organic', 'Facebook', 'Email', 'Display']

In [0]:
indexadordispositivo_preferido = StringIndexer(inputCol="dispositivo_preferido", outputCol="dispositivo_preferido_indexed").fit(dfRaw)
indexadordispositivo_preferido.labels

Out[17]: ['Mobile', 'Desktop', 'App']

In [0]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[
    indexadordispositivo_preferido,
    indexadorTraffic_source,
    indexadorGender,
    indexadorCountry
])
dfRaw1 = pipeline.fit(dfRaw).transform(dfRaw)
dfRaw1.show()

+-------+---+------+-------------+--------------+------------------+-------------+-------------+---------------+------------------------+------------------------+----------------+---------+-------------------+---------------+---------------------+------------------+-------------+-----------------------------+----------------------+--------------+---------------+
|user_id|age|gender|      country|traffic_source|antiguedad_cliente|total_ordenes|total_gastado|ticket_promedio|dias_desde_ultima_compra|num_categorias_compradas|num_compras_0_30|uso_cupon|apertura_email_rate|visitas_web_mes|dispositivo_preferido|ratio_devoluciones|EstadoCliente|dispositivo_preferido_indexed|traffic_source_indexed|gender_indexed|country_indexed|
+-------+---+------+-------------+--------------+------------------+-------------+-------------+---------------+------------------------+------------------------+----------------+---------+-------------------+---------------+---------------------+------------------+----

In [0]:
indexadorEstado = StringIndexer(inputCol="EstadoCliente", outputCol="EstadoCliente_indexed").fit(dfRaw1)
indexadorEstado.labels

Out[24]: ['Churned', 'Activo', 'Nuevo']

In [0]:
dfRaw2 = indexadorEstado.transform(dfRaw1)
dfRaw2.show()

+-------+---+------+-------------+--------------+------------------+-------------+-------------+---------------+------------------------+------------------------+----------------+---------+-------------------+---------------+---------------------+------------------+-------------+-----------------------------+----------------------+--------------+---------------+---------------------+
|user_id|age|gender|      country|traffic_source|antiguedad_cliente|total_ordenes|total_gastado|ticket_promedio|dias_desde_ultima_compra|num_categorias_compradas|num_compras_0_30|uso_cupon|apertura_email_rate|visitas_web_mes|dispositivo_preferido|ratio_devoluciones|EstadoCliente|dispositivo_preferido_indexed|traffic_source_indexed|gender_indexed|country_indexed|EstadoCliente_indexed|
+-------+---+------+-------------+--------------+------------------+-------------+-------------+---------------+------------------------+------------------------+----------------+---------+-------------------+---------------+-

In [0]:
dfRaw3 = VectorAssembler(
    inputCols = [
        "age",
        "antiguedad_cliente",
        "total_ordenes",
        "total_gastado",
        "ticket_promedio",
        "dias_desde_ultima_compra",
        "num_categorias_compradas",
        "num_compras_0_30",
        "uso_cupon",
        "apertura_email_rate",
        "visitas_web_mes",
        "ratio_devoluciones",
        "dispositivo_preferido_indexed",
        "traffic_source_indexed",
        "gender_indexed",
        "country_indexed"
    ],
    outputCol="features"
).transform(dfRaw2)

dfRaw3.show()  

+-------+---+------+-------------+--------------+------------------+-------------+-------------+---------------+------------------------+------------------------+----------------+---------+-------------------+---------------+---------------------+------------------+-------------+-----------------------------+----------------------+--------------+---------------+---------------------+--------------------+
|user_id|age|gender|      country|traffic_source|antiguedad_cliente|total_ordenes|total_gastado|ticket_promedio|dias_desde_ultima_compra|num_categorias_compradas|num_compras_0_30|uso_cupon|apertura_email_rate|visitas_web_mes|dispositivo_preferido|ratio_devoluciones|EstadoCliente|dispositivo_preferido_indexed|traffic_source_indexed|gender_indexed|country_indexed|EstadoCliente_indexed|            features|
+-------+---+------+-------------+--------------+------------------+-------------+-------------+---------------+------------------------+------------------------+----------------+-----

In [0]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator

In [0]:
algoritmo = DecisionTreeClassifier(
  labelCol = "EstadoCliente_indexed", 
  featuresCol = "features"
)

In [0]:
evaluador = MulticlassClassificationEvaluator(
  labelCol = "EstadoCliente_indexed",
  predictionCol = "prediction",
  metricName="accuracy"
)


In [0]:
mallaDeParametros = ParamGridBuilder().\
addGrid(algoritmo.maxDepth, [5, 10, 15]).\
addGrid(algoritmo.impurity, ["entropy", "gini"]).\
build()


In [0]:
validacionCruzada = CrossValidator(
  estimator = algoritmo,
  estimatorParamMaps = mallaDeParametros,
  evaluator = evaluador,
  numFolds = 5 
)  

In [0]:
modelosGenerados = validacionCruzada.fit(dfRaw3)

In [0]:
modelo = modelosGenerados.bestModel

In [0]:
dfPrediccion = modelo.transform(dfRaw3)
dfPrediccion.show()

+-------+---+------+-------------+--------------+------------------+-------------+-------------+---------------+------------------------+------------------------+----------------+---------+-------------------+---------------+---------------------+------------------+-------------+-----------------------------+----------------------+--------------+---------------+---------------------+--------------------+-----------------+--------------------+----------+
|user_id|age|gender|      country|traffic_source|antiguedad_cliente|total_ordenes|total_gastado|ticket_promedio|dias_desde_ultima_compra|num_categorias_compradas|num_compras_0_30|uso_cupon|apertura_email_rate|visitas_web_mes|dispositivo_preferido|ratio_devoluciones|EstadoCliente|dispositivo_preferido_indexed|traffic_source_indexed|gender_indexed|country_indexed|EstadoCliente_indexed|            features|    rawPrediction|         probability|prediction|
+-------+---+------+-------------+--------------+------------------+-------------+--

In [0]:
dfPrediccion.select(
  dfPrediccion["EstadoCliente_indexed"],
  dfPrediccion["prediction"],
  dfPrediccion["probability"]
).show(5, False)

+---------------------+----------+-------------+
|EstadoCliente_indexed|prediction|probability  |
+---------------------+----------+-------------+
|0.0                  |0.0       |[1.0,0.0,0.0]|
|0.0                  |0.0       |[1.0,0.0,0.0]|
|1.0                  |1.0       |[0.0,1.0,0.0]|
|1.0                  |1.0       |[0.0,1.0,0.0]|
|1.0                  |1.0       |[0.0,1.0,0.0]|
+---------------------+----------+-------------+
only showing top 5 rows



In [0]:
evaluador = MulticlassClassificationEvaluator(
  labelCol="EstadoCliente_indexed", 
  predictionCol="prediction", 
  metricName="accuracy"
)
evaluador.evaluate(dfPrediccion)

Out[39]: 0.9946269233956739