# Intro ML SUP en BD

Creación de la sesión Spark:

In [1]:
#import SparkSession
from pyspark.sql import SparkSession


In [2]:
# Crear el spark session object, llamarle "supervised_ml"
supervised_ml=SparkSession.builder.appName('Class 5').getOrCreate()




## Regression 

Carga de datos, archivo *Linear_regression_dataset.csv*:

In [20]:
# Carga de datos
df=supervised_ml.read.csv('Linear_regression_dataset.csv',inferSchema=True,header=True)



Se invocan las librerias correcpondientes a **LinearRegression**, asi como las de OneHotEncoder, StringIndexer, VectorAssembler:

In [21]:
# Importacion de libs y operaciones
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.regression import LinearRegression



Se visualizan algunos datos:

In [22]:
print((df.count(), len(df.columns))) 

(1232, 6)


Se muestran los primeros 10 datos:

In [23]:
# primeros 10 datos
df.show(10)



+-----+-----+-----+-----+-----+-----+
|var_1|var_2|var_3|var_4|var_5|label|
+-----+-----+-----+-----+-----+-----+
|  734|  688|   81|0.328|0.259|0.418|
|  700|  600|   94| 0.32|0.247|0.389|
|  712|  705|   93|0.311|0.247|0.417|
|  734|  806|   69|0.315| 0.26|0.415|
|  613|  759|   61|0.302| 0.24|0.378|
|  748|  676|   85|0.318|0.255|0.422|
|  669|  588|   97|0.315|0.251|0.411|
|  667|  845|   68|0.324|0.251|0.381|
|  758|  890|   64| 0.33|0.274|0.436|
|  726|  670|   88|0.335|0.268|0.422|
+-----+-----+-----+-----+-----+-----+
only showing top 10 rows



## Feature Engineering

Creamos un solo vector con todos los features i.e 'var_1', 'var_2', 'var_3', 'var_4', 'var_5', a este le llamaremos "features" y como salida colocamos a 'label':

In [24]:
# Vector Ensamblador
df_assembler = VectorAssembler(inputCols=['var_1',
 'var_2',
 'var_3',
 'var_4',
 'var_5'], outputCol="features")
df = df_assembler.transform(df)


In [26]:
# visulizacion de vector ensamblado compuesto por features y label
df.printSchema()
df.show(5)


root
 |-- var_1: integer (nullable = true)
 |-- var_2: integer (nullable = true)
 |-- var_3: integer (nullable = true)
 |-- var_4: double (nullable = true)
 |-- var_5: double (nullable = true)
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)

+-----+-----+-----+-----+-----+-----+--------------------+
|var_1|var_2|var_3|var_4|var_5|label|            features|
+-----+-----+-----+-----+-----+-----+--------------------+
|  734|  688|   81|0.328|0.259|0.418|[734.0,688.0,81.0...|
|  700|  600|   94| 0.32|0.247|0.389|[700.0,600.0,94.0...|
|  712|  705|   93|0.311|0.247|0.417|[712.0,705.0,93.0...|
|  734|  806|   69|0.315| 0.26|0.415|[734.0,806.0,69.0...|
|  613|  759|   61|0.302| 0.24|0.378|[613.0,759.0,61.0...|
+-----+-----+-----+-----+-----+-----+--------------------+
only showing top 5 rows



Partimos a continuación el set de datos en 75% training y 25% testing:

In [30]:
# Particion del data set
model_df=df.select(["features","label"])
train,test = model_df.randomSplit([0.75,0.25])


print(f"Size of train Dataset : {train.count()}" )
print(f"Size of test Dataset : {test.count()}" )

Size of train Dataset : 913
Size of test Dataset : 319


Creamos el Regresor Lineal: 

In [33]:
lr = LinearRegression()

Entrenamos el modelo de regresión lineal:

In [43]:
# Fit the model, le llamamos lr_model
lr_model = lr.fit(train)



Creamos el dataframe de prediciones (*predictions_df*) a partir del modelo de entrenamiento y el conjunto de datos test: 

In [44]:
predictions_df = lr_model.transform(test)

Visualizamos el contenido de *predictions_df*:

In [45]:
# visulizacion de predictions_df
predictions_df.select("prediction","label","features").show(5)



+-------------------+-----+--------------------+
|         prediction|label|            features|
+-------------------+-----+--------------------+
| 0.3107811187948488|0.311|[463.0,527.0,67.0...|
| 0.3187016812301118|0.332|[486.0,610.0,61.0...|
| 0.3282567662404914|0.317|[522.0,621.0,72.0...|
|0.34197350732810483| 0.34|[531.0,734.0,55.0...|
| 0.3357290410825567| 0.33|[533.0,660.0,62.0...|
+-------------------+-----+--------------------+
only showing top 5 rows



Ahora, evaluamos el modelo de Regresión Lineal, con los datos de TEST:

In [47]:
# evaluacion del modelo, le llamaremos model_predictions
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="label",metricName="r2")



Imprimimos el valor de R2:

In [48]:
# valor de R2
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(predictions_df))



R Squared (R2) on test data = 0.83179


Imprimimos el valor del meanSquaredError:

In [51]:
# valor del meanSquaredError
test_result = lr_model.evaluate(test)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 0.0127924


## Regresión con Árboles de Decisión

Importamos la librería *DecisionTreeRegressor*: 

In [52]:
# import lib
from pyspark.ml.regression import DecisionTreeRegressor




Creamos el Regresor DT, le llamaremos *dec_tree*:

In [53]:
# dec_tree
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'label')




Entrenamos el modelo:

In [54]:
# Train model, le llamaremos dec_tree_model
dec_tree_model = dt.fit(train)




Cuánto es la profundidad máxima por defecto, de este algoritmo?

R/ 30

Desplegamos las *featureImportances*:

In [55]:
dec_tree_model.featureImportances

SparseVector(5, {0: 0.9685, 1: 0.0128, 2: 0.0057, 3: 0.0037, 4: 0.0093})

Evaluamos el modelo con los datos de entrenamiento:

In [57]:
# Make predictions, le llamaremos model_predictions 
model_predictions = dec_tree_model.transform(test)




In [58]:
# visualizamos

model_predictions.show()

+--------------------+-----+-------------------+
|            features|label|         prediction|
+--------------------+-----+-------------------+
|[463.0,527.0,67.0...|0.311|               0.32|
|[486.0,610.0,61.0...|0.332| 0.3271666666666667|
|[522.0,621.0,72.0...|0.317|               0.32|
|[531.0,734.0,55.0...| 0.34|             0.3358|
|[533.0,660.0,62.0...| 0.33| 0.3271666666666667|
|[550.0,631.0,76.0...|0.318|0.34815384615384615|
|[554.0,536.0,77.0...|0.339|0.34815384615384615|
|[556.0,675.0,67.0...|0.348|0.34815384615384615|
|[558.0,688.0,67.0...| 0.35|0.34815384615384615|
|[558.0,740.0,60.0...| 0.36|0.34815384615384615|
|[564.0,648.0,74.0...|0.337|0.34815384615384615|
|[567.0,587.0,84.0...|0.349|0.34815384615384615|
|[569.0,620.0,77.0...|0.349|0.34815384615384615|
|[570.0,578.0,82.0...|0.363|0.34815384615384615|
|[571.0,577.0,83.0...|0.368|0.34815384615384615|
|[573.0,634.0,75.0...|0.342|0.34815384615384615|
|[573.0,717.0,62.0...|0.345|0.34815384615384615|
|[573.0,860.0,54.0..

Usando *RegressionEvaluator* calculamos e imprimimos el valor de las metricas R2 y RMSE:

In [59]:
# R2 value of the model on test data 
dt_evaluator = RegressionEvaluator(metricName='r2')
dt_r2 = dt_evaluator.evaluate(model_predictions)
print(f'The r-square value of DecisionTreeRegressor is {dt_r2}')

# RMSE value of the model on test data 




The r-square value of DecisionTreeRegressor is 0.7744509853846291


## Gradient-Boosted Tree Regressor

Importamos a GBTRegressor


In [61]:
# import
from pyspark.ml.regression import GBTRegressor




Creamos el Regresor GBTR, le llamaremos gbt:


In [62]:
# regresor
gbt = GBTRegressor(featuresCol = 'features', labelCol = 'label', maxIter=10)



Entrenamos el modelo:

In [64]:
# Train model, le llamaremos gbt_model
gbt_model = gbt.fit(train)






Desplegamos las featureImportances:

In [103]:
#Importances
gbt_model.featureImportances



SparseVector(5, {0: 0.2372, 1: 0.1879, 2: 0.1844, 3: 0.1762, 4: 0.2142})

Evaluamos el modelo con los datos de entrenamiento, le llamaremos model_predictions:

In [65]:
# Model
model_predictions = gbt_model.transform(test)





Desplegamos los valores del *model_predictions*

In [67]:
# show 
model_predictions.select('prediction', 'label', 'features').show(5)




+-------------------+-----+--------------------+
|         prediction|label|            features|
+-------------------+-----+--------------------+
| 0.3173249211305496|0.311|[463.0,527.0,67.0...|
|0.32732610958781505|0.332|[486.0,610.0,61.0...|
|0.32017509788836557|0.317|[522.0,621.0,72.0...|
| 0.3355940946602219| 0.34|[531.0,734.0,55.0...|
|0.32646390288063354| 0.33|[533.0,660.0,62.0...|
+-------------------+-----+--------------------+
only showing top 5 rows



Usando RegressionEvaluator calculamos e imprimimos el valor de las metricas R2 y RMSE:

In [70]:
 #Select (prediction, true label) and compute test error
gbt_evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")

gbt_evaluatorR2 = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="r2")
# R2 value of the model on test data 
r2 = gbt_evaluatorR2.evaluate(model_predictions)
print("R2 on test data = %g" % r2)



# RMSE value of the model on test data 
rmse = gbt_evaluator.evaluate(model_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)




R2 on test data = 0.782634
Root Mean Squared Error (RMSE) on test data = 0.0145419


 ## Exploracion de datos...

Usaremos el dataset https://archive.ics.uci.edu/ml/datasets/Bank+Marketing 

Indique a grandes razgos de que se trata este dataset:


Carga de datos, archivo bank_data.csv:


In [72]:
# Load csv Dataset 
spark=SparkSession.builder.appName('Class 5-Exploracion').getOrCreate()


df=spark.read.csv('bank_data.csv',inferSchema=True,header=True)

Determine la cantidad de datos en el dataset:

In [73]:
#number of records
print((df.count(),len(df.columns)))





(41188, 21)


A que dato corresponde cada columna?

In [75]:
# columns values
df.columns





['age',
 'job',
 'marital',
 'education',
 'default',
 'housing',
 'loan',
 'contact',
 'month',
 'day_of_week',
 'duration',
 'campaign',
 'pdays',
 'previous',
 'poutcome',
 'emp.var.rate',
 'cons.price.idx',
 'cons.conf.idx',
 'euribor3m',
 'nr.employed',
 'target_class']

Imprima el Schema:

In [76]:
#dataype of input data - Schema
df.printSchema()





root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- emp.var.rate: double (nullable = true)
 |-- cons.price.idx: double (nullable = true)
 |-- cons.conf.idx: double (nullable = true)
 |-- euribor3m: double (nullable = true)
 |-- nr.employed: double (nullable = true)
 |-- target_class: string (nullable = true)



En cuanto a la salida, como es la distrubución de clases?

In [77]:
# YES/NO Class Distribution
df.limit(5).toPandas().head()

#R/SI ES UN PROBLEM DE DISTRIBUCION, USANDO LA COLUMNA TARGET_CLASS



Unnamed: 0,age,job,marital,education,default,housing,loan,contact,month,day_of_week,...,campaign,pdays,previous,poutcome,emp.var.rate,cons.price.idx,cons.conf.idx,euribor3m,nr.employed,target_class
0,56,housemaid,married,basic.4y,no,no,no,telephone,may,mon,...,1,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no
1,57,services,married,high.school,unknown,no,no,telephone,may,mon,...,1,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no
2,37,services,married,high.school,no,yes,no,telephone,may,mon,...,1,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no
3,40,admin.,married,basic.6y,no,no,no,telephone,may,mon,...,1,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no
4,56,services,married,high.school,no,no,yes,telephone,may,mon,...,1,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no


Una tarea típica, resulta de convertir los valores binarios en 1 y 0, usando como referencia "label", convierta los no/yes en 0/1:

In [78]:
from pyspark.sql import functions as F
from pyspark.sql import *

In [101]:
# Ingrese acá la instrucción: 
df = df.withColumn("target_class", F.when(df["target_class"] == "no","0")
      .otherwise("1"))


In [102]:
# New 1/0 Class Distribution
df.limit(5).toPandas().head()






Unnamed: 0,age,job,marital,education,default,housing,loan,contact,month,day_of_week,...,campaign,pdays,previous,poutcome,emp.var.rate,cons.price.idx,cons.conf.idx,euribor3m,nr.employed,target_class
0,56,housemaid,married,basic.4y,no,no,no,telephone,may,mon,...,1,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,0
1,57,services,married,high.school,unknown,no,no,telephone,may,mon,...,1,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,0
2,37,services,married,high.school,no,yes,no,telephone,may,mon,...,1,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,0
3,40,admin.,married,basic.6y,no,no,no,telephone,may,mon,...,1,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,0
4,56,services,married,high.school,no,no,yes,telephone,may,mon,...,1,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,0


A continuación se presenta un ejercicio de Deep Learning para su revisión...

# Deep Learning 

Importamos las librerias necesarias:

In [104]:
import os
import numpy as np
import pandas as pd
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.sql import functions as f
from pyspark.sql.functions import udf, StringType
from pyspark.sql import SparkSession, functions as F
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.feature import OneHotEncoder, VectorAssembler, StringIndexer

Inicializamos la sesion SPARK:

In [105]:
spark = SparkSession.builder.appName('deep_learning').getOrCreate()

Leemos el dataset:

In [106]:
data = spark.read.csv('dl_data.csv', header=True, inferSchema=True)

In [107]:
data.printSchema()

root
 |-- Visit_Number_Bucket: string (nullable = true)
 |-- Page_Views_Normalized: double (nullable = true)
 |-- Orders_Normalized: integer (nullable = true)
 |-- Internal_Search_Successful_Normalized: double (nullable = true)
 |-- Internal_Search_Null_Normalized: double (nullable = true)
 |-- Email_Signup_Normalized: double (nullable = true)
 |-- Total_Seconds_Spent_Normalized: double (nullable = true)
 |-- Store_Locator_Search_Normalized: double (nullable = true)
 |-- Mapped_Last_Touch_Channel: string (nullable = true)
 |-- Mapped_Mobile_Device_Type: string (nullable = true)
 |-- Mapped_Browser_Type: string (nullable = true)
 |-- Mapped_Entry_Pages: string (nullable = true)
 |-- Mapped_Site_Section: string (nullable = true)
 |-- Mapped_Promo_Code: string (nullable = true)
 |-- Maped_Product_Name: string (nullable = true)
 |-- Mapped_Search_Term: string (nullable = true)
 |-- Mapped_Product_Collection: string (nullable = true)



Renombramos la columna TARGET:

In [108]:
data = data.withColumnRenamed('Orders_Normalized', 'label')

In [109]:
data.printSchema()

root
 |-- Visit_Number_Bucket: string (nullable = true)
 |-- Page_Views_Normalized: double (nullable = true)
 |-- label: integer (nullable = true)
 |-- Internal_Search_Successful_Normalized: double (nullable = true)
 |-- Internal_Search_Null_Normalized: double (nullable = true)
 |-- Email_Signup_Normalized: double (nullable = true)
 |-- Total_Seconds_Spent_Normalized: double (nullable = true)
 |-- Store_Locator_Search_Normalized: double (nullable = true)
 |-- Mapped_Last_Touch_Channel: string (nullable = true)
 |-- Mapped_Mobile_Device_Type: string (nullable = true)
 |-- Mapped_Browser_Type: string (nullable = true)
 |-- Mapped_Entry_Pages: string (nullable = true)
 |-- Mapped_Site_Section: string (nullable = true)
 |-- Mapped_Promo_Code: string (nullable = true)
 |-- Maped_Product_Name: string (nullable = true)
 |-- Mapped_Search_Term: string (nullable = true)
 |-- Mapped_Product_Collection: string (nullable = true)



Partimos lo datos en Train, Validation y Test:

In [110]:
train, validation, test  = data.randomSplit([0.7, 0.2, 0.1], 1234)

Construimos el Pipeline

In [111]:
categorical_columns = [item[0] for item in data.dtypes if item[1].startswith('string')]
numeric_columns = [item[0] for item in data.dtypes if item[1].startswith('double')]

indexers = [StringIndexer(inputCol=column, outputCol='{0}_index'.format(column)) for column in categorical_columns]

featuresCreator = VectorAssembler(inputCols=[indexer.getOutputCol() for indexer in indexers] + numeric_columns, outputCol="features")

layers = [len(featuresCreator.getInputCols()), 4, 2, 2]

classifier = MultilayerPerceptronClassifier(labelCol='label', featuresCol='features', maxIter=100, layers=layers, blockSize=128, seed=1234)

pipeline = Pipeline(stages=indexers + [featuresCreator, classifier])

Entrenamos...

In [112]:
model = pipeline.fit(train)

Validamos y Evaluamos

In [113]:
train_output_df = model.transform(train)
validation_output_df = model.transform(validation)
test_output_df = model.transform(test)

Llevamos a cabo, algunas predicciones:

In [114]:
train_predictionAndLabels = train_output_df.select("prediction", "label")
validation_predictionAndLabels = validation_output_df.select("prediction", "label")
test_predictionAndLabels = test_output_df.select("prediction", "label")

metrics = ['weightedPrecision', 'weightedRecall', 'accuracy']

for metric in metrics:
    evaluator = MulticlassClassificationEvaluator(metricName=metric)
    print('Train ' + metric + ' = ' + str(evaluator.evaluate(train_predictionAndLabels)))
    print('Validation ' + metric + ' = ' + str(evaluator.evaluate(validation_predictionAndLabels)))
    print('Test ' + metric + ' = ' + str(evaluator.evaluate(test_predictionAndLabels)))

Train weightedPrecision = 0.9704752959909059
Validation weightedPrecision = 0.9715411724152063
Test weightedPrecision = 0.9729448678984032
Train weightedRecall = 0.9699076779245063
Validation weightedRecall = 0.9709349593495935
Test weightedRecall = 0.9725118483412322
Train accuracy = 0.9699076779245063
Validation accuracy = 0.9709349593495935
Test accuracy = 0.9725118483412323


Puede mejorar el test accuracy del modelo variando alguno de los hyperparametros?

In [None]:
Si claro, el block size, iteraciones y los layers pueden ser factores importantes para mejorar.