In [1]:
from sklearn.datasets import load_iris
import pandas as pd
import numpy as np

In [2]:
iris = load_iris()
iris_data = iris.data
iris_label = iris.target
iris_columns = list(map(lambda x: x.replace(' (cm)', '').replace('al ', 'al_'), iris.feature_names))

type(iris_data), iris_data.shape, type(iris_label), iris_label.shape, iris.feature_names, iris_columns

(numpy.ndarray,
 (150, 4),
 numpy.ndarray,
 (150,),
 ['sepal length (cm)',
  'sepal width (cm)',
  'petal length (cm)',
  'petal width (cm)'],
 ['sepal_length', 'sepal_width', 'petal_length', 'petal_width'])

In [3]:
from pyspark.sql import SparkSession
import mlflow

spark = (
    SparkSession.builder.config("spark.jars.packages", "org.mlflow:mlflow-spark:1.11.0")
    .master("local[*]")
    .getOrCreate()
)
mlflow.set_tracking_uri("http://127.0.0.1:5000")
mlflow.pyspark.ml.autolog()

spark

:: loading settings :: url = jar:file:/workspace/apache-spark-ml/.venv/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.mlflow#mlflow-spark added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-b4ac4571-095b-4c70-abd2-aa1292857581;1.0
	confs: [default]
	found org.mlflow#mlflow-spark;1.11.0 in central
	found org.slf4j#slf4j-api;1.7.25 in central
:: resolution report :: resolve 301ms :: artifacts dl 13ms
	:: modules in use:
	org.mlflow#mlflow-spark;1.11.0 from central in [default]
	org.slf4j#slf4j-api;1.7.25 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org

In [4]:
iris_pdf = pd.DataFrame(iris_data, columns=iris_columns)
iris_pdf['target'] = iris_label
iris_sdf = spark.createDataFrame(iris_pdf)
iris_sdf.show()

                                                                                

+------------+-----------+------------+-----------+------+
|sepal_length|sepal_width|petal_length|petal_width|target|
+------------+-----------+------------+-----------+------+
|         5.1|        3.5|         1.4|        0.2|     0|
|         4.9|        3.0|         1.4|        0.2|     0|
|         4.7|        3.2|         1.3|        0.2|     0|
|         4.6|        3.1|         1.5|        0.2|     0|
|         5.0|        3.6|         1.4|        0.2|     0|
|         5.4|        3.9|         1.7|        0.4|     0|
|         4.6|        3.4|         1.4|        0.3|     0|
|         5.0|        3.4|         1.5|        0.2|     0|
|         4.4|        2.9|         1.4|        0.2|     0|
|         4.9|        3.1|         1.5|        0.1|     0|
|         5.4|        3.7|         1.5|        0.2|     0|
|         4.8|        3.4|         1.6|        0.2|     0|
|         4.8|        3.0|         1.4|        0.1|     0|
|         4.3|        3.0|         1.1|        0.1|     

In [5]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier

train_sdf, test_sdf = iris_sdf.randomSplit([0.7, 0.3], seed=0)
vector_assembler = VectorAssembler(inputCols=iris_columns, outputCol='features')
train_sdf_vectorized = vector_assembler.transform(train_sdf)
dt = DecisionTreeClassifier(featuresCol='features', labelCol='target', maxDepth=10)

In [6]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

param_grid = ParamGridBuilder().addGrid(dt.maxDepth, [5, 10])   \
                                .addGrid(dt.minInstancesPerNode, [3, 6])   \
                                .build()
evaluator_accuracy = MulticlassClassificationEvaluator(labelCol='target', predictionCol='prediction', metricName='accuracy')
cv = CrossValidator(estimator=dt, estimatorParamMaps=param_grid, evaluator=evaluator_accuracy, numFolds=3)
cv_model = cv.fit(train_sdf_vectorized)

2023/06/05 11:12:37 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID 'adc701312b334efc879c5e845e498d73', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow
23/06/05 11:13:44 ERROR Instrumentation: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "mlflow-artifacts"
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.spark.ml.util.FileSystemOverwrite.handleOverwrite(ReadWrite.scala:673)
	at org.apache.spark.ml.util

In [21]:
type(cv_model)

pyspark.ml.tuning.CrossValidatorModel

In [22]:
type(param_grid), param_grid

(list,
 [{Param(parent='DecisionTreeClassifier_79cf0245c728', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 5,
   Param(parent='DecisionTreeClassifier_79cf0245c728', name='minInstancesPerNode', doc='Minimum number of instances each child must have after split. If a split causes the left or right child to have fewer than minInstancesPerNode, the split will be discarded as invalid. Should be >= 1.'): 3},
  {Param(parent='DecisionTreeClassifier_79cf0245c728', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 5,
   Param(parent='DecisionTreeClassifier_79cf0245c728', name='minInstancesPerNode', doc='Minimum number of instances each child must have after split. If a split causes the left or right child to have fewer than minInstancesPerNode, the split will be disc

In [23]:
cv_model.__dict__

{'uid': 'CrossValidatorModel_b84077e485ed',
 '_paramMap': {Param(parent='CrossValidatorModel_b84077e485ed', name='estimator', doc='estimator to be cross-validated'): DecisionTreeClassifier_79cf0245c728,
  Param(parent='CrossValidatorModel_b84077e485ed', name='estimatorParamMaps', doc='estimator param maps'): [{Param(parent='DecisionTreeClassifier_79cf0245c728', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 5,
    Param(parent='DecisionTreeClassifier_79cf0245c728', name='minInstancesPerNode', doc='Minimum number of instances each child must have after split. If a split causes the left or right child to have fewer than minInstancesPerNode, the split will be discarded as invalid. Should be >= 1.'): 3},
   {Param(parent='DecisionTreeClassifier_79cf0245c728', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal nod

In [24]:
cv_model.getEstimatorParamMaps()

[{Param(parent='DecisionTreeClassifier_79cf0245c728', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 5,
  Param(parent='DecisionTreeClassifier_79cf0245c728', name='minInstancesPerNode', doc='Minimum number of instances each child must have after split. If a split causes the left or right child to have fewer than minInstancesPerNode, the split will be discarded as invalid. Should be >= 1.'): 3},
 {Param(parent='DecisionTreeClassifier_79cf0245c728', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 5,
  Param(parent='DecisionTreeClassifier_79cf0245c728', name='minInstancesPerNode', doc='Minimum number of instances each child must have after split. If a split causes the left or right child to have fewer than minInstancesPerNode, the split will be discarded as in

In [25]:
list(zip(cv_model.avgMetrics, cv_model.getEstimatorParamMaps()))

[(0.9239774114774114,
  {Param(parent='DecisionTreeClassifier_79cf0245c728', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 5,
   Param(parent='DecisionTreeClassifier_79cf0245c728', name='minInstancesPerNode', doc='Minimum number of instances each child must have after split. If a split causes the left or right child to have fewer than minInstancesPerNode, the split will be discarded as invalid. Should be >= 1.'): 3}),
 (0.9154304029304029,
  {Param(parent='DecisionTreeClassifier_79cf0245c728', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 5,
   Param(parent='DecisionTreeClassifier_79cf0245c728', name='minInstancesPerNode', doc='Minimum number of instances each child must have after split. If a split causes the left or right child to have fewer than minIn

In [26]:
[m for m in cv_model.getEstimatorParamMaps()]

[{Param(parent='DecisionTreeClassifier_79cf0245c728', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 5,
  Param(parent='DecisionTreeClassifier_79cf0245c728', name='minInstancesPerNode', doc='Minimum number of instances each child must have after split. If a split causes the left or right child to have fewer than minInstancesPerNode, the split will be discarded as invalid. Should be >= 1.'): 3},
 {Param(parent='DecisionTreeClassifier_79cf0245c728', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 5,
  Param(parent='DecisionTreeClassifier_79cf0245c728', name='minInstancesPerNode', doc='Minimum number of instances each child must have after split. If a split causes the left or right child to have fewer than minInstancesPerNode, the split will be discarded as in

In [27]:
params = [{p.name: v for p, v in m.items() } for m in cv_model.getEstimatorParamMaps()]
params

[{'maxDepth': 5, 'minInstancesPerNode': 3},
 {'maxDepth': 5, 'minInstancesPerNode': 6},
 {'maxDepth': 10, 'minInstancesPerNode': 3},
 {'maxDepth': 10, 'minInstancesPerNode': 6}]

In [28]:
params = [{p.name: v for p, v in m.items() } for m in cv_model.getEstimatorParamMaps()]
params

[{'maxDepth': 5, 'minInstancesPerNode': 3},
 {'maxDepth': 5, 'minInstancesPerNode': 6},
 {'maxDepth': 10, 'minInstancesPerNode': 3},
 {'maxDepth': 10, 'minInstancesPerNode': 6}]

In [29]:
list(zip(params, cv_model.avgMetrics))
cv_result = pd.DataFrame({'params': params, 'evaluation_result': cv_model.avgMetrics})
cv_result

Unnamed: 0,params,evaluation_result
0,"{'maxDepth': 5, 'minInstancesPerNode': 3}",0.923977
1,"{'maxDepth': 5, 'minInstancesPerNode': 6}",0.91543
2,"{'maxDepth': 10, 'minInstancesPerNode': 3}",0.923977
3,"{'maxDepth': 10, 'minInstancesPerNode': 6}",0.91543


In [30]:
def get_cv_result_pdf(cv_model):
    params = [{p.name: v for p, v in m.items()} for m in cv_model.getEstimatorParamMaps()]
    return pd.DataFrame({'params': params, 'evaluation_result': cv_model.avgMetrics})

In [31]:
test_sdf_vectorized = vector_assembler.transform(test_sdf)
predictions = cv_model.transform(test_sdf_vectorized)
predictions.show()
evaluation_accuracy = MulticlassClassificationEvaluator(labelCol='target', predictionCol='prediction')
evaluation_accuracy.evaluate(predictions)

+------------+-----------+------------+-----------+------+-----------------+--------------+--------------------+----------+
|sepal_length|sepal_width|petal_length|petal_width|target|         features| rawPrediction|         probability|prediction|
+------------+-----------+------------+-----------+------+-----------------+--------------+--------------------+----------+
|         4.6|        3.1|         1.5|        0.2|     0|[4.6,3.1,1.5,0.2]|[35.0,0.0,0.0]|       [1.0,0.0,0.0]|       0.0|
|         5.1|        3.5|         1.4|        0.2|     0|[5.1,3.5,1.4,0.2]|[35.0,0.0,0.0]|       [1.0,0.0,0.0]|       0.0|
|         5.0|        3.4|         1.5|        0.2|     0|[5.0,3.4,1.5,0.2]|[35.0,0.0,0.0]|       [1.0,0.0,0.0]|       0.0|
|         5.4|        3.7|         1.5|        0.2|     0|[5.4,3.7,1.5,0.2]|[35.0,0.0,0.0]|       [1.0,0.0,0.0]|       0.0|
|         5.7|        4.4|         1.5|        0.4|     0|[5.7,4.4,1.5,0.4]|[35.0,0.0,0.0]|       [1.0,0.0,0.0]|       0.0|
|       

0.9545454545454546

In [32]:
best_dt_model = cv_model.bestModel
best_dt_model

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_79cf0245c728, depth=4, numNodes=9, numClasses=3, numFeatures=4

In [33]:
best_model_predictions = best_dt_model.transform(test_sdf_vectorized)
best_model_predictions.show()
evaluation_accuracy.evaluate(best_model_predictions)

+------------+-----------+------------+-----------+------+-----------------+--------------+--------------------+----------+
|sepal_length|sepal_width|petal_length|petal_width|target|         features| rawPrediction|         probability|prediction|
+------------+-----------+------------+-----------+------+-----------------+--------------+--------------------+----------+
|         4.6|        3.1|         1.5|        0.2|     0|[4.6,3.1,1.5,0.2]|[35.0,0.0,0.0]|       [1.0,0.0,0.0]|       0.0|
|         5.1|        3.5|         1.4|        0.2|     0|[5.1,3.5,1.4,0.2]|[35.0,0.0,0.0]|       [1.0,0.0,0.0]|       0.0|
|         5.0|        3.4|         1.5|        0.2|     0|[5.0,3.4,1.5,0.2]|[35.0,0.0,0.0]|       [1.0,0.0,0.0]|       0.0|
|         5.4|        3.7|         1.5|        0.2|     0|[5.4,3.7,1.5,0.2]|[35.0,0.0,0.0]|       [1.0,0.0,0.0]|       0.0|
|         5.7|        4.4|         1.5|        0.4|     0|[5.7,4.4,1.5,0.4]|[35.0,0.0,0.0]|       [1.0,0.0,0.0]|       0.0|
|       

0.9545454545454546

In [73]:
from pyspark.ml import Pipeline

train_sdf, test_sdf = iris_sdf.randomSplit([0.7, 0.3], seed=0)
stage_1 = VectorAssembler(inputCols=iris_columns, outputCol='features')
stage_2 = DecisionTreeClassifier(featuresCol='features', labelCol='target', maxDepth=10)
pipeline_1 = Pipeline(stages=[stage_1, stage_2])

param_grid_1 = ParamGridBuilder().addGrid(stage_2.maxDepth, [5, 7, 8, 10])   \
                                .addGrid(stage_2.minInstancesPerNode, [3, 5, 6])    \
                                .build()
evaluation_accuracy_1 = MulticlassClassificationEvaluator(labelCol='target', predictionCol='prediction', metricName='accuracy')
cv = CrossValidator(estimator=pipeline_1, estimatorParamMaps=param_grid_1, evaluator=evaluation_accuracy_1, numFolds=3)
cv_model_1 = cv.fit(train_sdf)
cv_result_pdf = get_cv_result_pdf(cv_model_1)
cv_result_pdf

                                                                                

Unnamed: 0,params,evaluation_result
0,"{'maxDepth': 5, 'minInstancesPerNode': 3}",0.93331
1,"{'maxDepth': 5, 'minInstancesPerNode': 5}",0.93331
2,"{'maxDepth': 5, 'minInstancesPerNode': 6}",0.93331
3,"{'maxDepth': 7, 'minInstancesPerNode': 3}",0.93331
4,"{'maxDepth': 7, 'minInstancesPerNode': 5}",0.93331
5,"{'maxDepth': 7, 'minInstancesPerNode': 6}",0.93331
6,"{'maxDepth': 8, 'minInstancesPerNode': 3}",0.93331
7,"{'maxDepth': 8, 'minInstancesPerNode': 5}",0.93331
8,"{'maxDepth': 8, 'minInstancesPerNode': 6}",0.93331
9,"{'maxDepth': 10, 'minInstancesPerNode': 3}",0.93331


In [74]:
predictions_1 = cv_model_1.transform(test_sdf)
evaluation_accuracy_1.evaluate(predictions_1)

0.9545454545454546

In [103]:
stage_vectorized = VectorAssembler(inputCols=iris_columns, outputCol='features')
dt_estimator = DecisionTreeClassifier(featuresCol='features', labelCol='target')
param_grid_2 = ParamGridBuilder().addGrid(dt_estimator.maxDepth, [5, 7, 8, 10]) \
                                .addGrid(dt_estimator.minInstancesPerNode, [3, 5, 6]) \
                                .build()
evaluation_accuracy = MulticlassClassificationEvaluator(labelCol='target', predictionCol='prediction', metricName='accuracy')
stage_cv = CrossValidator(estimator=dt_estimator, estimatorParamMaps=param_grid_2, evaluator=evaluation_accuracy, numFolds=3)
pipeline_2 = Pipeline(stages=[stage_vectorized, stage_cv])
pipeline_model_2 = pipeline_2.fit(train_sdf)

'JavaPackage' object is not callable
2023/06/05 10:29:26 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID '3586fe17e9624d20aa0597adb1f2d2b6', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow
23/06/05 10:29:33 WARN BlockManager: Asked to remove block broadcast_6553, which does not exist


In [104]:
pipeline_model_2.stages
type(pipeline_model_2.stages[-1])

pyspark.ml.tuning.CrossValidatorModel

In [105]:
cv_model_2 = pipeline_model_2.stages[-1]
cv_result_pdf = get_cv_result_pdf(cv_model_2)
cv_result_pdf

Unnamed: 0,params,evaluation_result
0,"{'maxDepth': 5, 'minInstancesPerNode': 3}",0.93331
1,"{'maxDepth': 5, 'minInstancesPerNode': 5}",0.93331
2,"{'maxDepth': 5, 'minInstancesPerNode': 6}",0.93331
3,"{'maxDepth': 7, 'minInstancesPerNode': 3}",0.93331
4,"{'maxDepth': 7, 'minInstancesPerNode': 5}",0.93331
5,"{'maxDepth': 7, 'minInstancesPerNode': 6}",0.93331
6,"{'maxDepth': 8, 'minInstancesPerNode': 3}",0.93331
7,"{'maxDepth': 8, 'minInstancesPerNode': 5}",0.93331
8,"{'maxDepth': 8, 'minInstancesPerNode': 6}",0.93331
9,"{'maxDepth': 10, 'minInstancesPerNode': 3}",0.93331


In [106]:
predictions = pipeline_model_2.transform(test_sdf)
evaluation_accuracy.evaluate(predictions)

0.9545454545454546

In [107]:
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

vector_assembler = VectorAssembler(inputCols=iris_columns, outputCol='features')
train_sdf_vectorized = vector_assembler.transform(train_sdf)
dt_estimator = DecisionTreeClassifier(featuresCol='features', labelCol='target', maxDepth=10)
tvs_param_grid = ParamGridBuilder().addGrid(dt_estimator.maxDepth, [5, 7, 8, 10])   \
                                    .addGrid(dt_estimator.minInstancesPerNode, [3, 5, 6])   \
                                    .build()
evaluation_accuracy = MulticlassClassificationEvaluator(labelCol='target', predictionCol='prediction', metricName='accuracy')
tvs = TrainValidationSplit(estimator=dt_estimator, estimatorParamMaps=tvs_param_grid, evaluator=evaluation_accuracy, trainRatio=0.75, seed=0)
tvs_model = tvs.fit(train_sdf_vectorized)

'JavaPackage' object is not callable
2023/06/05 10:30:53 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID '9d112cf81cfc42b986bc0834c46cc300', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow


In [108]:
tvs_model.validationMetrics

[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]

In [109]:
params = [{p.name: v for p, v in m.items()} for m in tvs_model.getEstimatorParamMaps()]
params

[{'maxDepth': 5, 'minInstancesPerNode': 3},
 {'maxDepth': 5, 'minInstancesPerNode': 5},
 {'maxDepth': 5, 'minInstancesPerNode': 6},
 {'maxDepth': 7, 'minInstancesPerNode': 3},
 {'maxDepth': 7, 'minInstancesPerNode': 5},
 {'maxDepth': 7, 'minInstancesPerNode': 6},
 {'maxDepth': 8, 'minInstancesPerNode': 3},
 {'maxDepth': 8, 'minInstancesPerNode': 5},
 {'maxDepth': 8, 'minInstancesPerNode': 6},
 {'maxDepth': 10, 'minInstancesPerNode': 3},
 {'maxDepth': 10, 'minInstancesPerNode': 5},
 {'maxDepth': 10, 'minInstancesPerNode': 6}]

In [110]:
def get_tvs_result_pdf(tvs_model):
    params = [{p.name: v for p, v in m.items()} for m in tvs_model.getEstimatorParamMaps()]
    return pd.DataFrame({'params': params, 'evaluation_result': tvs_model.validationMetrics})

In [111]:
get_tvs_result_pdf(tvs_model)

Unnamed: 0,params,evaluation_result
0,"{'maxDepth': 5, 'minInstancesPerNode': 3}",1.0
1,"{'maxDepth': 5, 'minInstancesPerNode': 5}",1.0
2,"{'maxDepth': 5, 'minInstancesPerNode': 6}",1.0
3,"{'maxDepth': 7, 'minInstancesPerNode': 3}",1.0
4,"{'maxDepth': 7, 'minInstancesPerNode': 5}",1.0
5,"{'maxDepth': 7, 'minInstancesPerNode': 6}",1.0
6,"{'maxDepth': 8, 'minInstancesPerNode': 3}",1.0
7,"{'maxDepth': 8, 'minInstancesPerNode': 5}",1.0
8,"{'maxDepth': 8, 'minInstancesPerNode': 6}",1.0
9,"{'maxDepth': 10, 'minInstancesPerNode': 3}",1.0


In [112]:
best_dt_model = tvs_model.bestModel
best_model_predictions = best_dt_model.transform(test_sdf_vectorized)
evaluation_accuracy.evaluate(best_model_predictions)

0.9545454545454546