# Machine Learning con la librería MLlib  de Spark

En el presente script construiremos una red neuronal para clasificación múltiple

Todos los procesos serán coleccionados en un Pipeline explícito 

* No mostramos la salida de los Stages del Pipeline

## Load the libraries

In [1]:
import os
import numpy as np
import pandas as pd
from pyspark.sql.types import * #<-- importa todos los tipos de dato como: StringType, FloatType DoubleType, DateType, etc.
from pyspark.ml import Pipeline
from pyspark.sql import functions as f
from pyspark.sql.functions import udf, StringType
from pyspark.sql import SparkSession, functions as F
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import MultilayerPerceptronClassifier, MultilayerPerceptronClassificationModel
from pyspark.ml.feature import OneHotEncoder, VectorAssembler, StringIndexer, MinMaxScaler

## Initialize Spark Session

In [2]:
spark = SparkSession.builder.appName('deep_learning').getOrCreate()

In [3]:
spark

## Carga y limpieza de datos

In [4]:
data = (spark.read
          .format("csv")
          .option('header', 'true')
          .load("./data/Iris_data/Iris.csv"))

In [5]:
data.show(5)

+---+-------------+------------+-------------+------------+-----------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|
+---+-------------+------------+-------------+------------+-----------+
|  1|          5.1|         3.5|          1.4|         0.2|Iris-setosa|
|  2|          4.9|         3.0|          1.4|         0.2|Iris-setosa|
|  3|          4.7|         3.2|          1.3|         0.2|Iris-setosa|
|  4|          4.6|         3.1|          1.5|         0.2|Iris-setosa|
|  5|          5.0|         3.6|          1.4|         0.2|Iris-setosa|
+---+-------------+------------+-------------+------------+-----------+
only showing top 5 rows



### Seleccionamos variables para trabajar:

In [6]:
# Columnas que no usaremos:
unused_cols = ['Id'] #<-- vacío si usaremos todas las columnas

In [7]:
# Columnas que usaremos:
selected_cols = [ c for c in data.columns if c not in unused_cols ]
selected_cols

['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm', 'Species']

In [8]:
data = data.select( selected_cols )
data.show(5)

+-------------+------------+-------------+------------+-----------+
|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|
+-------------+------------+-------------+------------+-----------+
|          5.1|         3.5|          1.4|         0.2|Iris-setosa|
|          4.9|         3.0|          1.4|         0.2|Iris-setosa|
|          4.7|         3.2|          1.3|         0.2|Iris-setosa|
|          4.6|         3.1|          1.5|         0.2|Iris-setosa|
|          5.0|         3.6|          1.4|         0.2|Iris-setosa|
+-------------+------------+-------------+------------+-----------+
only showing top 5 rows



In [9]:
data.printSchema()

root
 |-- SepalLengthCm: string (nullable = true)
 |-- SepalWidthCm: string (nullable = true)
 |-- PetalLengthCm: string (nullable = true)
 |-- PetalWidthCm: string (nullable = true)
 |-- Species: string (nullable = true)



### Ajustamos tipos de datos:

In [10]:
# Extraemos nombres de columnas categóricas:
categ_cols = ['Species']#[item[0] for item in data.dtypes if item[1].startswith('string')]
categ_cols

['Species']

In [11]:
# A las columnas categóricas les asignamos el tipo "String"
# A las columnas numéricas les asignamos el tipo "Double"

for col in data.columns:    
    if col in categ_cols:
        # Asignamos el tipo "String"
        data = data.withColumn( col , data[col].cast( StringType() ) )
    else:
        # Asignamos el tipo "Double"
        data = data.withColumn( col , data[col].cast( DoubleType() ) )

In [12]:
data.printSchema()

root
 |-- SepalLengthCm: double (nullable = true)
 |-- SepalWidthCm: double (nullable = true)
 |-- PetalLengthCm: double (nullable = true)
 |-- PetalWidthCm: double (nullable = true)
 |-- Species: string (nullable = true)



### Identificamos columnas numéricas y categóricas

In [13]:
data_types = data.dtypes
numerical_columns = [ item[0] for item in data.dtypes if item[1] != 'string' ]
categoric_columns = [ item[0] for item in data.dtypes if item[1].startswith('string') ]

In [14]:
print('Columnas numéricas: \n', numerical_columns)
print('\nColumnas categóricas: \n', categoric_columns)

Columnas numéricas: 
 ['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm']

Columnas categóricas: 
 ['Species']


### Trabajamos con campos nulos:

In [15]:
from pyspark.sql.functions import isnull, when, count, col

# Mostramos el número de campos vacíos en cada columna:
df_nulls = data.select([count(when(isnull(c), True)).alias(c) for c in data.columns]).toPandas()
df_nulls

Unnamed: 0,SepalLengthCm,SepalWidthCm,PetalLengthCm,PetalWidthCm,Species
0,0,0,0,0,0


### Obtenemos algunos gráficos

In [16]:
#from pandas_profiling import ProfileReport

#df_pandas = data.toPandas()

#pfr = ProfileReport(df_pandas)
#pfr.to_notebook_iframe()
#pfr

## Extraemos las clases de datos que hay en cada columna

In [17]:
# Extraemos el número de clases de datos en cada columna
from pyspark.sql.functions import countDistinct, col

data.select( [ countDistinct( col(c) ).alias(c) for c in data.columns ] ).toPandas()

Unnamed: 0,SepalLengthCm,SepalWidthCm,PetalLengthCm,PetalWidthCm,Species
0,35,23,43,22,3


## Identificamos la columna de labels y las columnas de features:

In [18]:
label_col = "Species" #"x_3" # <-- nombre de la columna que usaremos como labels

# Obtenemos las clases de valores de la columna de labels:
classes = data.select([ label_col ]).distinct().toPandas()
classes

Unnamed: 0,Species
0,Iris-virginica
1,Iris-setosa
2,Iris-versicolor


In [19]:
# Numero de clases de salida:
n_class_out = len( classes )
n_class_out

3

In [20]:
# Columnas que formaran los features:
feature_cols = [col for col in selected_cols if col != label_col]
feature_cols

['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm']

### Split the dataset into Train, Validation and Test

In [21]:
train, validation, test  = data.randomSplit([0.7, 0.2, 0.1], 1234)

In [22]:
train.show(5)

+-------------+------------+-------------+------------+-----------+
|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|
+-------------+------------+-------------+------------+-----------+
|          4.4|         3.0|          1.3|         0.2|Iris-setosa|
|          4.4|         3.2|          1.3|         0.2|Iris-setosa|
|          4.6|         3.1|          1.5|         0.2|Iris-setosa|
|          4.6|         3.2|          1.4|         0.2|Iris-setosa|
|          4.6|         3.4|          1.4|         0.3|Iris-setosa|
+-------------+------------+-------------+------------+-----------+
only showing top 5 rows



# Inicia creación de Pipeline

Antes de llegar a este paso debemos tener el dataset limpio y listo para trabajar.

Estaremos usando la función `StringIndexer` que asigna un valor entero a cada categoría de datos, iniciando forzosamente dede 0.

0 se asigna a la categoría más frecuente, 1 a la siguiente categoría más frecuente y así sucesivamente.


### Stage 1: Creación de columna de labels codificados

In [23]:
Stage_1 = StringIndexer(inputCol = label_col, outputCol = "labels")
Stage_1

StringIndexer_11f67d3ac934

### Stage 2: Transformación de features categóricos a numéricos

In [24]:
# Extraemos nombres de columnas categóricas que formaran los features (ie. no incluyen el label):
categoric_cols_features = [ c for c in categoric_columns if c != label_col ]
categoric_cols_features

[]

In [25]:
# Creamos nombres de nuevas columnas numéricas que formaran los features:
categoric_cols_features_num = [ c+'_num' for c in categoric_cols_features]
categoric_cols_features_num

[]

In [26]:
# Agregamos nuevas columnas numéricas para los features:
Stage_2 = []
for i in range( len(categoric_cols_features) ):    
    st_i = StringIndexer(inputCol = categoric_cols_features[i] , outputCol = categoric_cols_features_num[i] , handleInvalid='keep')
    Stage_2.append(st_i)

Stage_2

[]

### Stage 3: Creación de columna de vectores de features

In [27]:
# Extraemos columnas numéricas para los features:
numeric_cols_features = [col for col in numerical_columns if col != label_col]
numeric_cols_features

['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm']

In [28]:
# Coleccionamos todas las columnas numericas que formaran a los features:
required_features = numeric_cols_features + categoric_cols_features_num
required_features

['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm']

In [29]:
Stage_3 = VectorAssembler(inputCols=required_features, outputCol='features')
Stage_3

VectorAssembler_473d7abbd748

### Stage 4: Creación de columna de vectores de features reescalados (entre 0 y 1)

In [30]:
### Reescalamos los features para tomen valore entre 0 y 1:
Stage_4 = MinMaxScaler(min=0.0, max=1.0, inputCol="features", outputCol="scaled_features")
Stage_4

MinMaxScaler_23a36ce26ff6

### Stage 5: Construcción del modelo

In [31]:
# Tamaño (dimensión) de cada feature:
dim_xi = len( required_features )
dim_xi

4

In [32]:
# Definimos el número de neuronas en cada capa de la red:
layers = [ dim_xi, 2, n_class_out]
layers

[4, 2, 3]

In [33]:
# Batch_size
batch_size = round( 0.15*train.count()  )# 128
batch_size

15

In [34]:
# Declaramos el clasificador: 
Stage_5 = MultilayerPerceptronClassifier(
    labelCol='labels', 
    featuresCol='scaled_features', 
    maxIter=200, 
    layers=layers, 
    blockSize=batch_size, 
    seed=1234,
    stepSize=0.05,
    solver='l-bfgs'
    )
    
Stage_5

MultilayerPerceptronClassifier_a58cfd7067b7

## Ensamble del Pipeline

In [35]:
# Coleccionamos todos los Satges en una lista:
Stages_list = [Stage_1] + Stage_2 + [ Stage_3, Stage_4, Stage_5]
Stages_list

[StringIndexer_11f67d3ac934,
 VectorAssembler_473d7abbd748,
 MinMaxScaler_23a36ce26ff6,
 MultilayerPerceptronClassifier_a58cfd7067b7]

In [36]:
pipeline = Pipeline(stages = Stages_list )
pipeline

Pipeline_ada19e020024

## Ejecución del Pipeline

In [37]:
# Ejecutamos el pipeline y se entrena el modelo con los datos de entrenamiento:

model = pipeline.fit(train)

model # <-- Contiene el modelo entrenado

PipelineModel_de54e83ba00d

## Predecimos datos de entrenamiento, validación y pruebas:

In [38]:
cols_to_show = ['labels', 'scaled_features', 'rawPrediction','probability','prediction']

In [39]:
# Predecimos datos de entrenamiento:
train_prediction = model.transform(train)
train_prediction.select(cols_to_show).show(5)

+------+--------------------+--------------------+--------------------+----------+
|labels|     scaled_features|       rawPrediction|         probability|prediction|
+------+--------------------+--------------------+--------------------+----------+
|   1.0|[0.0,0.4761904761...|[-60.976831522981...|[8.49815870494266...|       1.0|
|   1.0|[0.0,0.5714285714...|[-60.976831522981...|[8.49815870494266...|       1.0|
|   1.0|[0.05714285714285...|[-60.976831522981...|[8.49815870494266...|       1.0|
|   1.0|[0.05714285714285...|[-60.976831522981...|[8.49815870494266...|       1.0|
|   1.0|[0.05714285714285...|[-60.976831522981...|[8.49815870494266...|       1.0|
+------+--------------------+--------------------+--------------------+----------+
only showing top 5 rows



In [40]:
# Predecimos datos de validación:
val_prediction = model.transform(validation)
val_prediction.select(cols_to_show).show(5)

+------+--------------------+--------------------+--------------------+----------+
|labels|     scaled_features|       rawPrediction|         probability|prediction|
+------+--------------------+--------------------+--------------------+----------+
|   1.0|[-0.0285714285714...|[-60.976831522981...|[8.49815870494266...|       1.0|
|   1.0|[0.0,0.4285714285...|[-60.976831522981...|[8.49815870494266...|       1.0|
|   1.0|[0.02857142857142...|[-60.976831522981...|[8.49815870494266...|       1.0|
|   1.0|[0.14285714285714...|[-60.976831522981...|[8.49815870494266...|       1.0|
|   1.0|[0.17142857142857...|[-60.976831522981...|[8.49815870494266...|       1.0|
+------+--------------------+--------------------+--------------------+----------+
only showing top 5 rows



In [41]:
# Predecimos datos de pruebas:
test_prediction = model.transform(test)
test_prediction.select(cols_to_show).show(5)

+------+--------------------+--------------------+--------------------+----------+
|labels|     scaled_features|       rawPrediction|         probability|prediction|
+------+--------------------+--------------------+--------------------+----------+
|   1.0|[0.17142857142857...|[-60.976831522981...|[8.49815870494266...|       1.0|
|   1.0|[0.17142857142857...|[-60.976831522981...|[8.49815870494266...|       1.0|
|   1.0|[0.31428571428571...|[-60.976831522981...|[8.49815870494266...|       1.0|
|   0.0|[0.34285714285714...|[21.8418883945205...|[0.97142856271924...|       0.0|
|   1.0|[0.37142857142857...|[-60.976831522981...|[8.49815870494266...|       1.0|
+------+--------------------+--------------------+--------------------+----------+
only showing top 5 rows



## Evaluate the Predictions

In [42]:
# Seleccionamos las columnas 'label' y 'prediction' de los DataFrames predichos:
train_Labels_Prediction = train_prediction.select('labels','prediction')
validation_Labels_Prediction = val_prediction.select('labels','prediction')
test_Labels_Prediction = test_prediction.select('labels','prediction')

In [43]:
# Elegimos las siguientes métricas:
metrics = ['weightedPrecision', 'weightedRecall', 'accuracy']

# Obtenemos los valores de cada métrica, aplicadas a los datos de entrenamiento, validación y pruebas:
for metric in metrics:
    # Declaramos la métrica actual:
    evaluator = MulticlassClassificationEvaluator(labelCol='labels', predictionCol='prediction', metricName=metric)
    
    f_train = evaluator.evaluate( train_Labels_Prediction )
    f_validation = evaluator.evaluate( validation_Labels_Prediction )
    f_test = evaluator.evaluate( test_Labels_Prediction )

    print('\nValores de metrica << ' + metric ,'>>' )
    print('Train = ' , f_train)
    print('Validation = ' , f_validation)
    print('Test = ' , f_test)


Valores de metrica << weightedPrecision >>
Train =  0.9901875901875902
Validation =  0.9752252252252253
Test =  0.8979591836734693

Valores de metrica << weightedRecall >>
Train =  0.9898989898989898
Validation =  0.972972972972973
Test =  0.8571428571428572

Valores de metrica << accuracy >>
Train =  0.98989898989899
Validation =  0.972972972972973
Test =  0.8571428571428571


### Guardamos el modelo entrenado:

In [44]:
import os
from os import system

path = './Models_trained/pipeline_model_Iris'

# En caso de que exista la ruta la borramos para volver a crearla:
if os.path.exists( path ):
    # Comando a ejecutar:
    comando = 'rm -r ' + path
    # Ejecutamos el comando:
    system( comando )
    print('Se ejecutó: ',comando)


# Guardamos el modelo entrenado:
model.save( path )

Se ejecutó:  rm -r ./Models_trained/pipeline_model_Iris


### Cargamos el modelo entrenado y hacemos una predicción:

In [45]:
# Cargamos el modelo:
#from pyspark.ml.classification import MultilayerPerceptronClassificationModel
from pyspark.ml import PipelineModel

modelo_NN = PipelineModel.load( path )

In [46]:
# Predecimos datos de pruebas:
predictions_test = modelo_NN.transform(test)
predictions_test.show(5)

+-------------+------------+-------------+------------+---------------+------+-----------------+--------------------+--------------------+--------------------+----------+
|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|        Species|labels|         features|     scaled_features|       rawPrediction|         probability|prediction|
+-------------+------------+-------------+------------+---------------+------+-----------------+--------------------+--------------------+--------------------+----------+
|          5.0|         3.0|          1.6|         0.2|    Iris-setosa|   1.0|[5.0,3.0,1.6,0.2]|[0.17142857142857...|[-60.976831522981...|[8.49815870494266...|       1.0|
|          5.0|         3.3|          1.4|         0.2|    Iris-setosa|   1.0|[5.0,3.3,1.4,0.2]|[0.17142857142857...|[-60.976831522981...|[8.49815870494266...|       1.0|
|          5.5|         3.5|          1.3|         0.2|    Iris-setosa|   1.0|[5.5,3.5,1.3,0.2]|[0.31428571428571...|[-60.976831522981...|[8.4981

#### Calculamos la precisión manualmente:

In [47]:
wrong_test = predictions_test.filter( predictions_test['labels']!=predictions_test['prediction'] ).count()
right_test = predictions_test.filter( predictions_test['labels']==predictions_test['prediction'] ).count()

acc_test =  right_test/(right_test + wrong_test)

print( 'La precisión en las predicciones de los datos de prueba es del',round(acc_test * 100),'%' )

La precisión en las predicciones de los datos de prueba es del 86 %


### Consulta de predicciones individuales:

#### Creamos mapeo de labels:

Crearemos el DataFrame de pandas `labels_map` que contiene la relación entre los labels del dataset original y los labels reescalados que se usaron para entrenar el modelo.

In [48]:
Scaler_labels = Stage_1.fit(data)
data_transform = Scaler_labels.transform(data)
data_transform.show(5)

+-------------+------------+-------------+------------+-----------+------+
|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|labels|
+-------------+------------+-------------+------------+-----------+------+
|          5.1|         3.5|          1.4|         0.2|Iris-setosa|   0.0|
|          4.9|         3.0|          1.4|         0.2|Iris-setosa|   0.0|
|          4.7|         3.2|          1.3|         0.2|Iris-setosa|   0.0|
|          4.6|         3.1|          1.5|         0.2|Iris-setosa|   0.0|
|          5.0|         3.6|          1.4|         0.2|Iris-setosa|   0.0|
+-------------+------------+-------------+------------+-----------+------+
only showing top 5 rows



In [49]:
# Obtenemos el mapeo entre la columna de labels reales y labels reescalados:
labels_map = data_transform.select( [label_col , 'labels'] ).groupBy( [label_col,'labels'] ).count().toPandas()
labels_map = labels_map.sort_values('labels')

# Mapeo de labels:
labels_map[['labels', label_col]]

Unnamed: 0,labels,Species
0,0.0,Iris-setosa
2,1.0,Iris-versicolor
1,2.0,Iris-virginica


In [50]:
# Definimos la función que asigna el nombre del label original al label codificado:
def f_map_label(x):
    for i in range( len(labels_map) ):
        if x == labels_map['labels'][i]:
            y = labels_map[label_col][i]
    return y

# Declaramos la función como UDF de Spark:
f_map_label_udf = udf( lambda x: f_map_label(x) )#, StringType() )

In [51]:
# Llamamos a las componentes de features que necesitamos para generar una predicción:
feature_cols

['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm']

In [52]:
# Ingresamos features manualmente:
datos = [ 
    (5.5, 3.5, 1.3, 0.2), #<-- Registro 1
    (5.6, 3.0, 4.1, 1.3)  #<-- Registro 2
    ] 

sample_df = spark.createDataFrame(datos, feature_cols)

sample_df.show()

+-------------+------------+-------------+------------+
|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|
+-------------+------------+-------------+------------+
|          5.5|         3.5|          1.3|         0.2|
|          5.6|         3.0|          4.1|         1.3|
+-------------+------------+-------------+------------+



In [53]:
sample_data_test = modelo_NN.transform(sample_df)
sample_data_test = sample_data_test.withColumn('prediction_true', f_map_label_udf("prediction"))
sample_data_test.show()


+-------------+------------+-------------+------------+-----------------+--------------------+--------------------+--------------------+----------+---------------+
|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|         features|     scaled_features|       rawPrediction|         probability|prediction|prediction_true|
+-------------+------------+-------------+------------+-----------------+--------------------+--------------------+--------------------+----------+---------------+
|          5.5|         3.5|          1.3|         0.2|[5.5,3.5,1.3,0.2]|[0.31428571428571...|[-60.976831522981...|[8.49815870494266...|       1.0|Iris-versicolor|
|          5.6|         3.0|          4.1|         1.3|[5.6,3.0,4.1,1.3]|[0.34285714285714...|[21.8418883945205...|[0.97142856271924...|       0.0|    Iris-setosa|
+-------------+------------+-------------+------------+-----------------+--------------------+--------------------+--------------------+----------+---------------+



### Cerramos sesión Spark

In [54]:
spark.stop()