# Programa: Ciencia de Datos, Curso: Big Data
# Proyecto Final, Machine Learning Para Big Data

## Alonso Nuñez Sanchez

Esta notebook agrupa los conocimientos adquiridos durante el curso, para resolver un problema real con datos reales.

Muestra la aplicación de Machine Learning usando librerias de Spark y MLLib.

Usa distintos algoritmos de Clasificación para tratar un conjunto de datos de más de 22.000 registros. Cada registro es un día de operación de una tienda, el rango de los datos corresponde a 365 días de un año

Estos datos corresponden a las ventas diarias de un negocio, la clase a clasificar corresponde a la clasificación del día como de altas ventas (1) o bajas (0).

Los datos son leídos de una Base de Datos de postgreSQL y posteriormente manejados en memoria en estructuras data frame de Spark

El objetivo es tener una herramienta que permita predecir si un día futuro será de altas o bajas ventas, y así el negocio tome decisiones sobre horarios, cantidad de empleados, pedidos de materia prima, etc. , con una certeza mayor que solo la experiencia de los gerentes.


Iniciacialización de Spark y lectura de la Base de Datos PostgreSQL


In [1]:
import findspark
findspark.init('C:\spark')

from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql import DataFrameReader
from pyspark.sql.functions import col, date_format, udf 
from pyspark.sql.types import DateType

spark = SparkSession \
    .builder \
    .appName("Basic JDBC pipeline") \
    .config("spark.driver.extraClassPath", "C:\Tarea3\postgresql-42.2.9.jar") \
    .config("spark.executor.extraClassPath", "C:\Tarea3\postgresql-42.2.9.jar") \
    .getOrCreate()

# Lee los datos de la tabla "clientes" en la base de datos "BigData_ProyectoFinal"
# usando el conector JDBC de Postgresql y crea una estructura tipo dataframe de Spark
df = spark \
    .read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/BigData_ProyectoFinal") \
    .option("user", "postgres") \
    .option("password", "zeroone") \
    .option("dbtable", "clientes") \
    .load()

#muestra el contenido del dataframe (los primeros 10 registros)
df.limit(10).toPandas()



Unnamed: 0,id_tieda,tipo_tienda,provincia,supervisor,mes_del_anio,semana_del_anio,es_quincena,es_feriado,dia_de_la_semana,horas_operacion,pos_en_uso,clientes_totales,clientes_vip,label
0,12,FS,SAN JOSE,2015622_080,1,1,VERDADERO,Anio Nuevo ...,Lunes,11,4,210,23,0
1,12,FS,SAN JOSE,2015622_080,1,1,VERDADERO,NO ...,Martes,11,4,306,17,1
2,12,FS,SAN JOSE,2015622_080,1,1,VERDADERO,NO ...,Miercoles,11,4,203,12,0
3,12,FS,SAN JOSE,2015622_080,1,1,FALSO,NO ...,Jueves,11,4,192,13,0
4,12,FS,SAN JOSE,2015622_080,1,1,FALSO,NO ...,Viernes,12,4,229,16,0
5,12,FS,SAN JOSE,2015622_080,1,1,FALSO,NO ...,Sabado,15,4,225,22,0
6,12,FS,SAN JOSE,2015622_080,1,2,FALSO,NO ...,Domingo,16,4,224,26,0
7,12,FS,SAN JOSE,2015622_080,1,2,FALSO,NO ...,Lunes,12,4,179,14,0
8,12,FS,SAN JOSE,2015622_080,1,2,FALSO,NO ...,Martes,11,4,268,15,0
9,12,FS,SAN JOSE,2015622_080,1,2,FALSO,NO ...,Miercoles,10,4,215,15,0


Para empezar, analizamos los datos.

Características como el tamaño, esquema, tipos, etc

In [2]:

print("Cantidad de registros: " ,(df.count()))
print ("\nCantidad de columnas: ", len(df.columns))

Cantidad de registros:  22633

Cantidad de columnas:  14


Visualizamos los features y las clases
La clase de este caso es de tipo binaria, se llama "label" y representa el volumen de ventas (alto = 1 o bajo = 0)

Imprimiendo el Schema, podemos ver los nombres de los features y el tipo de datos (integer, string, etc)

In [3]:
df.printSchema()

root
 |-- id_tieda: integer (nullable = true)
 |-- tipo_tienda: string (nullable = true)
 |-- provincia: string (nullable = true)
 |-- supervisor: string (nullable = true)
 |-- mes_del_anio: integer (nullable = true)
 |-- semana_del_anio: integer (nullable = true)
 |-- es_quincena: string (nullable = true)
 |-- es_feriado: string (nullable = true)
 |-- dia_de_la_semana: string (nullable = true)
 |-- horas_operacion: integer (nullable = true)
 |-- pos_en_uso: integer (nullable = true)
 |-- clientes_totales: integer (nullable = true)
 |-- clientes_vip: integer (nullable = true)
 |-- label: integer (nullable = true)



Ejecutamos un análisis estadístico de los datos, para conocer media, desviación estandar, minimo y máximo de los features

In [4]:
df.describe().toPandas()

Unnamed: 0,summary,id_tieda,tipo_tienda,provincia,supervisor,mes_del_anio,semana_del_anio,es_quincena,es_feriado,dia_de_la_semana,horas_operacion,pos_en_uso,clientes_totales,clientes_vip,label
0,count,22633.0,22633,22633,22633,22633.0,22633.0,22633,22633,22633,22633.0,22633.0,22633.0,22633.0,22633.0
1,mean,61.04347633985773,,,,6.529492334202271,26.730216939866565,,,,11.44982989440198,4.1691335660319,259.6871382494588,18.814739539610304,0.3190915919232978
2,stddev,30.795570212598,,,,3.4496745756308216,15.06362244385176,,,,3.354689991394997,1.3261874953284474,111.18561891445763,11.719809410253008,0.4661349033335475
3,min,12.0,FS,ALAJUELA,1670344_080,1.0,1.0,FALSO,Anexion del Partido de Nicoya ...,Domingo,0.0,0.0,1.0,0.0,0.0
4,max,111.0,Mall,SAN JOSE,2032238_080,12.0,53.0,VERDADERO,Anio Nuevo ...,Viernes,23.0,8.0,932.0,142.0,1.0


Validamos la distribución de los datos entre clases, para identificar si está balanceado

In [5]:
df.groupBy('label').count().toPandas()

Unnamed: 0,label,count
0,1,7222
1,0,15411


Una distribución aproximada de 32% para la clase 1 y 38% para la clase 0

Es aceptable para el ejercicio y para la realidad de los datos en el negocio en cuestión

Por lo tanto, continuamos con el análisis

Analizamos distribución de algunos features que con conocimiento del negocio se consideran relevantes, tales como:

1. Tipo de tienda
2. Si es o no quincena
3. El día de la semana

In [6]:
df.groupBy('tipo_tienda').count().toPandas()

Unnamed: 0,tipo_tienda,count
0,Mall,5857
1,FS,12773
2,IS,4003


In [7]:
df.groupBy('es_quincena').count().toPandas()

Unnamed: 0,es_quincena,count
0,VERDADERO,7743
1,FALSO,14890


In [8]:
df.groupBy('dia_de_la_semana').count().toPandas()

Unnamed: 0,dia_de_la_semana,count
0,Jueves,3221
1,Miercoles,3226
2,Viernes,3222
3,Sabado,3226
4,Martes,3223
5,Domingo,3227
6,Lunes,3288


Convertimos datos catergóricos en numéricos, usando OneHotEncoder

En este caso serán:

1. Tipo de tienda
2. Si es o no quincena
3. Día de la semana

In [9]:
#import required libraries
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

In [10]:
tipo_tienda_indexer = StringIndexer(inputCol="tipo_tienda", outputCol="tipo_tienda_index").fit(df)
df = tipo_tienda_indexer.transform(df)
tipo_tienda_encoder = OneHotEncoder(inputCol="tipo_tienda_index", outputCol="tipo_tienda_vec")
df = tipo_tienda_encoder.transform(df)

es_quincena_indexer = StringIndexer(inputCol="es_quincena", outputCol="es_quincena_index").fit(df)
df = es_quincena_indexer.transform(df)
es_quincena_encoder = OneHotEncoder(inputCol="es_quincena_index", outputCol="es_quincena_vec")
df = es_quincena_encoder.transform(df)

dia_de_la_semana_indexer = StringIndexer(inputCol="dia_de_la_semana", outputCol="dia_de_la_semana_index").fit(df)
df = dia_de_la_semana_indexer.transform(df)
dia_de_la_semana_encoder = OneHotEncoder(inputCol="dia_de_la_semana_index", outputCol="dia_de_la_semana_vec")
df = dia_de_la_semana_encoder.transform(df)

In [11]:
df.select(['tipo_tienda','tipo_tienda_index','tipo_tienda_vec']).show(20,False)
df.select(['es_quincena','es_quincena_index','es_quincena_vec']).show(20,False)
df.select(['dia_de_la_semana','dia_de_la_semana_index','dia_de_la_semana_vec']).show(20,False)

+----------------+-----------------+---------------+
|tipo_tienda     |tipo_tienda_index|tipo_tienda_vec|
+----------------+-----------------+---------------+
|FS              |0.0              |(2,[0],[1.0])  |
|FS              |0.0              |(2,[0],[1.0])  |
|FS              |0.0              |(2,[0],[1.0])  |
|FS              |0.0              |(2,[0],[1.0])  |
|FS              |0.0              |(2,[0],[1.0])  |
|FS              |0.0              |(2,[0],[1.0])  |
|FS              |0.0              |(2,[0],[1.0])  |
|FS              |0.0              |(2,[0],[1.0])  |
|FS              |0.0              |(2,[0],[1.0])  |
|FS              |0.0              |(2,[0],[1.0])  |
|FS              |0.0              |(2,[0],[1.0])  |
|FS              |0.0              |(2,[0],[1.0])  |
|FS              |0.0              |(2,[0],[1.0])  |
|FS              |0.0              |(2,[0],[1.0])  |
|FS              |0.0              |(2,[0],[1.0])  |
|FS              |0.0              |(2,[0],[1.

Creamos un único vector de features y clase usando "VectorAssembler"

Este vector será usando en el entrenamiento del modelo
   

In [12]:
from pyspark.ml.feature import VectorAssembler

Construimos el vector usando solo los siguientes atributos:
1. tipo_tienda
2. es_quincena
3. dia_de_la_semana
4. horas_operacion   <<<--- este corresponde a la cantidad de horas que operó la tienda en el día espcífico
5. pos_en_uso    <<<-- corresponde a la cantidad de puntos de venta (cajas registradoras) que operaron en el día

In [13]:
df_assembler = VectorAssembler(inputCols=                             
['tipo_tienda_vec',
 'es_quincena_vec',
 'dia_de_la_semana_vec',
 'horas_operacion',
 'pos_en_uso'], 
                               
outputCol="features")

df = df_assembler.transform(df)

Imprimimos los primeros 20 resultados, con los datos categóricos en vector

In [14]:
df.select(['features','label']).show(20,False)

+----------------------------------------+-----+
|features                                |label|
+----------------------------------------+-----+
|(11,[0,3,9,10],[1.0,1.0,11.0,4.0])      |0    |
|(11,[0,7,9,10],[1.0,1.0,11.0,4.0])      |1    |
|(11,[0,5,9,10],[1.0,1.0,11.0,4.0])      |0    |
|(11,[0,2,9,10],[1.0,1.0,11.0,4.0])      |0    |
|(11,[0,2,8,9,10],[1.0,1.0,1.0,12.0,4.0])|0    |
|(11,[0,2,6,9,10],[1.0,1.0,1.0,15.0,4.0])|0    |
|(11,[0,2,4,9,10],[1.0,1.0,1.0,16.0,4.0])|0    |
|(11,[0,2,3,9,10],[1.0,1.0,1.0,12.0,4.0])|0    |
|(11,[0,2,7,9,10],[1.0,1.0,1.0,11.0,4.0])|0    |
|(11,[0,2,5,9,10],[1.0,1.0,1.0,10.0,4.0])|0    |
|(11,[0,2,9,10],[1.0,1.0,11.0,4.0])      |0    |
|(11,[0,2,8,9,10],[1.0,1.0,1.0,11.0,4.0])|0    |
|(11,[0,6,9,10],[1.0,1.0,14.0,4.0])      |0    |
|(11,[0,4,9,10],[1.0,1.0,13.0,4.0])      |0    |
|(11,[0,3,9,10],[1.0,1.0,11.0,4.0])      |0    |
|(11,[0,7,9,10],[1.0,1.0,11.0,4.0])      |0    |
|(11,[0,5,9,10],[1.0,1.0,11.0,4.0])      |0    |
|(11,[0,2,9,10],[1.0

Ya que tenemos los datos depurados y los features seleccionados, contruimos el modelo de aprendizaje automático

In [15]:
#seleccionamos los datos para features y label, que usará el modelo
model_df=df.select(['features','label'])

#partimos los datos en entrenamiento (75%) y prueba (25%)
training_df,test_df = model_df.randomSplit([0.75,0.25])


Exploramos los conjuntos de entrenamiento y pruebas, para confirmar que mantiene una proporción similar al conjunto original (aprox 30% de clase 1)

In [16]:
print ("conjunto de entrenamiento: ",training_df.count())
training_df.groupBy('label').count().toPandas()


conjunto de entrenamiento:  17052


Unnamed: 0,label,count
0,1,5447
1,0,11605


In [17]:
print ("conjunto de prueba: ",test_df.count())
test_df.groupBy('label').count().toPandas()

conjunto de prueba:  5581


Unnamed: 0,label,count
0,1,1775
1,0,3806


Usamos los conjuntos training_df,test_df para los siguientes algoritmos:

**LOGISTIC REGRESSION**

In [18]:
from pyspark.ml.classification import LogisticRegression

#Entrenar
log_reg=LogisticRegression().fit(training_df)
lr_summary=log_reg.summary

#Probar
model_predictions = log_reg.transform(test_df)

#Evaluar
model_predictions = log_reg.evaluate(test_df)
print ("Logistic regression \n")
print ("Accuracy: ",model_predictions.accuracy)
print ("weighted Precision: ",model_predictions.weightedPrecision)
print("Area under ROC: ",model_predictions.areaUnderROC)
print("Recall por label: ",model_predictions.recallByLabel)


Logistic regression 

Accuracy:  0.83390073463537
weighted Precision:  0.830579978477522
Area under ROC:  0.8876522614404243
Recall por label:  [0.906463478717814, 0.6783098591549296]


**DECISION TREE**

In [19]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Entrenar
dt = DecisionTreeClassifier().fit(training_df)

# Probar
predictions = dt.transform(test_df)

# Evaluar
train_output_df = dt.transform(training_df)
test_output_df = predictions

metrics = ['weightedPrecision', 'weightedRecall', 'accuracy']
print ("Decision Tree \n")
for metric in metrics:
    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName=metric)
    print('Train ' + metric + ' = ' + str(evaluator.evaluate(train_output_df)))
    print('Test ' + metric + ' = ' + str(evaluator.evaluate(test_output_df)))
    print("\n")


Decision Tree 

Train weightedPrecision = 0.8238403280177266
Test weightedPrecision = 0.829716632492625


Train weightedRecall = 0.8275275627492376
Test weightedRecall = 0.8331840172012184


Train accuracy = 0.8275275627492377
Test accuracy = 0.8331840172012184




**RANDOM FOREST**

In [20]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#Entrenar
rf = RandomForestClassifier(numTrees=20).fit(training_df)

#Probar
predictions = rf.transform(test_df)

#Evaluar
train_output_df = rf.transform(training_df)
test_output_df = predictions

metrics = ['weightedPrecision', 'weightedRecall', 'accuracy']
print ("Random Forest \n")
for metric in metrics:
    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName=metric)
    print('Train ' + metric + ' = ' + str(evaluator.evaluate(train_output_df)))
    print('Test ' + metric + ' = ' + str(evaluator.evaluate(test_output_df)))
    print("\n")

Random Forest 

Train weightedPrecision = 0.8193227877460927
Test weightedPrecision = 0.8236682577747989


Train weightedRecall = 0.8213699272812574
Test weightedRecall = 0.8258376635011646


Train accuracy = 0.8213699272812574
Test accuracy = 0.8258376635011647




**GRADIENT-BOOSTED TREE**

In [21]:
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#Entrenar
gbt = GBTClassifier(maxIter=50).fit(training_df)

#Probar
predictions = gbt.transform(test_df)

#Evaluar
train_output_df = gbt.transform(training_df)
test_output_df = predictions

metrics = ['weightedPrecision', 'weightedRecall', 'accuracy']
print ("Gradient-Boosted Tree \n")
for metric in metrics:
    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName=metric)
    print('Train ' + metric + ' = ' + str(evaluator.evaluate(train_output_df)))
    print('Test ' + metric + ' = ' + str(evaluator.evaluate(test_output_df)))
    print("\n")


Gradient-Boosted Tree 

Train weightedPrecision = 0.85496042303414
Test weightedPrecision = 0.8550221553564078


Train weightedRecall = 0.8555594651653765
Test weightedRecall = 0.8550438989428417


Train accuracy = 0.8555594651653765
Test accuracy = 0.8550438989428418




## Conclusiones:

Los 4 modelos dieron resultados similares, siendo el de mejores métricas el "Gradient-Boosted Tree configurado en 50 iteraciones (después de probar varios hiperparámetros)

Comparando los resultados del train y el test, se puede concluir que el modelo esta fitting (ni over ni under)

Con resultados de métrias cercanas al 85% podemos decir que el conjunto de datos y los modelos construidos a partir de estos, serían útilies para predecir días de altas ventas.

Puede usarse como herramienta para que el departamento de Operaciones de las tiendas se prepare con horarios, cantidad de personal, previsión de materia prima, etc., y así evitar errores en proyecciones que pudieran generar desperdicios (en caso de suponer un día de altas ventas cuando no lo sea) o no lograr potenciar sus ventas y dar mala experiencia a sus clientes (por suponer un día de bajas ventas, cuando eran altas realmente). También sería útil para un departamento de Mercadeo que conozca con mayor certeza qué días generar acciones para atraer más clientes, o que día sus promociones serán más efectivas si están destinadas a días bajo o altos de ventas.

