### Iris 데이터 세트를 Spark DataFrame으로 변환

In [0]:
from sklearn.datasets import load_iris
import pandas as pd
import numpy as np

# iris 데이터 세트 로딩
iris = load_iris()
iris_data = iris.data
iris_label = iris.target

# iris 데이터 세트를 numpy에서 pandas DataFrame으로 변환 
iris_columns = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width']

iris_pdf = pd.DataFrame(iris_data, columns=iris_columns)
iris_pdf['label'] = iris_label

# iris Pandas DataFrame을 Spark DataFrame으로 변환 
iris_sdf = spark.createDataFrame(iris_pdf)
display(iris_sdf.limit(5))



sepal_length,sepal_width,petal_length,petal_width,label
5.1,3.5,1.4,0.2,0
4.9,3.0,1.4,0.2,0
4.7,3.2,1.3,0.2,0
4.6,3.1,1.5,0.2,0
5.0,3.6,1.4,0.2,0


### Feature Vectorization 변환과 Estimator 객체 생성.

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

iris_columns = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width']

train_sdf, test_sdf = iris_sdf.randomSplit([0.7, 0.3], seed=0)

# VectorAssembler 객체와 Estimator 객체 생성. 
vector_assembler = VectorAssembler(inputCols=iris_columns, outputCol='features')

# 학습 데이터 feature vectorization 적용.
train_sdf_vectorized = vector_assembler.transform(train_sdf)

dt = DecisionTreeClassifier(featuresCol='features', labelCol='label', maxDepth=8)

### CrossValidator로 교차 검증과 하이퍼파라미터 튜닝 수행. 
* ParamGridBuilder로 하이퍼파라미터 튜닝을 위한 그리드 서치(Grid Search)용 Param Grid를 생성.
* 교차 검증 시 성능 평가할 Evaluator 생성. 
* CrossValidator 객체 생성시 Estimator객체, 그리드 서치용 Param Grid, 성능 평가용 Evaluator 객체, Fold 수를 인자로 입력함. 
* CrossValidator 객체의 fit(입력DataFrame) 메소드를 호출하여 교차 검증과 하이퍼파라미터 튜닝 수행. fit() 수행 후 CrossValidatorModel 반환. 
* 반환된 CrossValidatorModel의 avgMetrics 속성에 교차 검증별 평균 성능 평가점수, bestModel에 최적 하이퍼파라미터로 refit된 EstimatorModel 객체를 가짐.

In [0]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

#CrossValidator에서 하이퍼파라미터 튜닝을 위한 그리드 서치(Grid Search)용 ParamGrid 생성.
# Spark ML DecisionTreeClassifier의 maxDepth는 max_depth, minInstancesPerNode는 min_samples_split(노드 분할 시 최소 sample 건수)
param_grid = ParamGridBuilder().addGrid(dt.maxDepth, [5, 10])\
                               .addGrid(dt.minInstancesPerNode, [3, 6])\
                               .build()

#CrossValidator에서 적용할 Evaluator 객체 생성. 
evaluator_accuracy = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')# 아직 df는 안들어옴

# Estimator 객체, 하이퍼파라미터 Grid를 가지는, Evaluator 객체, Fold수를 인자로 입력하여 CrossValidator 객체 생성. 
cv = CrossValidator(estimator=dt, estimatorParamMaps=param_grid, evaluator=evaluator_accuracy, numFolds=3)

# Cross validation 과 하이퍼파라미터 튜닝 수행. 
cv_model = cv.fit(train_sdf_vectorized) # vectorized 된 dataset이 들어가야 함

In [0]:
print(type(cv_model))

<class 'pyspark.ml.tuning.CrossValidatorModel'>


In [0]:
# param_grid는 list로 되어 있으며 개별 원소는 Dictionary로 되어 있음. 개별 Dictionary는 적용될 하이퍼파라미터의 개별 Param쌍에 대한 정보(param명, param설명')와 Param값을 가지고 있음 
print('param_grid type:', type(param_grid), 'param_grid:', param_grid)

param_grid type: <class 'list'> param_grid: [{Param(parent='DecisionTreeClassifier_61f50b80b3b6', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 5, Param(parent='DecisionTreeClassifier_61f50b80b3b6', name='minInstancesPerNode', doc='Minimum number of instances each child must have after split. If a split causes the left or right child to have fewer than minInstancesPerNode, the split will be discarded as invalid. Should be >= 1.'): 3}, {Param(parent='DecisionTreeClassifier_61f50b80b3b6', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 5, Param(parent='DecisionTreeClassifier_61f50b80b3b6', name='minInstancesPerNode', doc='Minimum number of instances each child must have after split. If a split causes the left or right child to have fewer than minInstancesPer

In [0]:
# cv_model객체의 모든 속성 확인. 
cv_model.__dict__

Out[7]: {'uid': 'CrossValidatorModel_3b9aeb4b1fb0',
 '_paramMap': {Param(parent='CrossValidatorModel_3b9aeb4b1fb0', name='estimator', doc='estimator to be cross-validated'): DecisionTreeClassifier_61f50b80b3b6,
  Param(parent='CrossValidatorModel_3b9aeb4b1fb0', name='estimatorParamMaps', doc='estimator param maps'): [{Param(parent='DecisionTreeClassifier_61f50b80b3b6', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 5,
    Param(parent='DecisionTreeClassifier_61f50b80b3b6', name='minInstancesPerNode', doc='Minimum number of instances each child must have after split. If a split causes the left or right child to have fewer than minInstancesPerNode, the split will be discarded as invalid. Should be >= 1.'): 3},
   {Param(parent='DecisionTreeClassifier_61f50b80b3b6', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 inte

In [0]:
#cv_model.getEstimatorParamMaps()는 ParamGridBuilder()로 생성된 Param Grid를 반환
cv_model.getEstimatorParamMaps()

Out[8]: [{Param(parent='DecisionTreeClassifier_61f50b80b3b6', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 5,
  Param(parent='DecisionTreeClassifier_61f50b80b3b6', name='minInstancesPerNode', doc='Minimum number of instances each child must have after split. If a split causes the left or right child to have fewer than minInstancesPerNode, the split will be discarded as invalid. Should be >= 1.'): 3},
 {Param(parent='DecisionTreeClassifier_61f50b80b3b6', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 5,
  Param(parent='DecisionTreeClassifier_61f50b80b3b6', name='minInstancesPerNode', doc='Minimum number of instances each child must have after split. If a split causes the left or right child to have fewer than minInstancesPerNode, the split will be discard

In [0]:
# avgMetrics 속성과 param grid 속성 값을 함께 출력
# zip(a, b) -> tuple
list(zip(cv_model.avgMetrics, cv_model.getEstimatorParamMaps()))

Out[9]: [(0.9440476190476191,
  {Param(parent='DecisionTreeClassifier_61f50b80b3b6', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 5,
   Param(parent='DecisionTreeClassifier_61f50b80b3b6', name='minInstancesPerNode', doc='Minimum number of instances each child must have after split. If a split causes the left or right child to have fewer than minInstancesPerNode, the split will be discarded as invalid. Should be >= 1.'): 3}),
 (0.9259103641456582,
  {Param(parent='DecisionTreeClassifier_61f50b80b3b6', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 5,
   Param(parent='DecisionTreeClassifier_61f50b80b3b6', name='minInstancesPerNode', doc='Minimum number of instances each child must have after split. If a split causes the left or right child to have fewer th

In [0]:
[m for m in cv_model.getEstimatorParamMaps()]

Out[10]: [{Param(parent='DecisionTreeClassifier_61f50b80b3b6', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 5,
  Param(parent='DecisionTreeClassifier_61f50b80b3b6', name='minInstancesPerNode', doc='Minimum number of instances each child must have after split. If a split causes the left or right child to have fewer than minInstancesPerNode, the split will be discarded as invalid. Should be >= 1.'): 3},
 {Param(parent='DecisionTreeClassifier_61f50b80b3b6', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 5,
  Param(parent='DecisionTreeClassifier_61f50b80b3b6', name='minInstancesPerNode', doc='Minimum number of instances each child must have after split. If a split causes the left or right child to have fewer than minInstancesPerNode, the split will be discar

In [0]:
# param 명과 값을 출력
params = [{p.name: v for p, v in m.items()} for m in cv_model.getEstimatorParamMaps()]
print(params)

[{'maxDepth': 5, 'minInstancesPerNode': 3}, {'maxDepth': 5, 'minInstancesPerNode': 6}, {'maxDepth': 10, 'minInstancesPerNode': 3}, {'maxDepth': 10, 'minInstancesPerNode': 6}]


In [0]:
import pandas as pd

print(list(zip(params, cv_model.avgMetrics)))
# param명과 값에 따른 evaluation  결과를 Pandas DataFrame으로 생성. 
cv_result= pd.DataFrame({'params': params, 
              'evaluation_result':cv_model.avgMetrics
})

display(cv_result)

[({'maxDepth': 5, 'minInstancesPerNode': 3}, 0.9440476190476191), ({'maxDepth': 5, 'minInstancesPerNode': 6}, 0.9259103641456582), ({'maxDepth': 10, 'minInstancesPerNode': 3}, 0.9440476190476191), ({'maxDepth': 10, 'minInstancesPerNode': 6}, 0.9259103641456582)]
  A field of type StructType expects a pandas.DataFrame, but got: <class 'pandas.core.series.Series'>
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


params,evaluation_result
"Map(maxDepth -> 5, minInstancesPerNode -> 3)",0.9440476190476192
"Map(maxDepth -> 5, minInstancesPerNode -> 6)",0.9259103641456582
"Map(maxDepth -> 10, minInstancesPerNode -> 3)",0.9440476190476192
"Map(maxDepth -> 10, minInstancesPerNode -> 6)",0.9259103641456582


In [0]:
import pandas as pd

# 교차 검증 결과를 pandas DataFrame으로 반환하는 함수 생성. 
def get_cv_result_pdf(cv_model):
    params = [{p.name: v for p, v in m.items()} for m in cv_model.getEstimatorParamMaps()]

    cv_result_pdf= pd.DataFrame({'params': params, 'evaluation_result':cv_model.avgMetrics })
    
    return cv_result_pdf

### CrossValidatorModel을 이용하여 예측 수행. 
* CrossValidatorModel 객체의 transform(테스트 DataFrame)을 호출하여 예측. CrossValidatorModel객체는 최적의 하이퍼 파라미터로 학습되었음. 
* CrossValidatorModel 객체의 bestModel은 최적 하이퍼파라미터로 EstimatorModel이 재학습된 것임.

In [0]:
# test_sdf에 feature vectorization 적용. 
test_sdf_vectorized = vector_assembler.transform(test_sdf)
predictions = cv_model.transform(test_sdf_vectorized)

print("\n##### cv_model로 테스트 데이터 예측 결과 ######")
evaluator_accuracy = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
print('정확도:', evaluator_accuracy.evaluate(predictions))

display(predictions)


##### cv_model로 테스트 데이터 예측 결과 ######
정확도: 0.9791666666666666


sepal_length,sepal_width,petal_length,petal_width,label,features,rawPrediction,probability,prediction
4.3,3.0,1.1,0.1,0,"Map(vectorType -> dense, length -> 4, values -> List(4.3, 3.0, 1.1, 0.1))","Map(vectorType -> dense, length -> 3, values -> List(38.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 3, values -> List(1.0, 0.0, 0.0))",0.0
4.7,3.2,1.3,0.2,0,"Map(vectorType -> dense, length -> 4, values -> List(4.7, 3.2, 1.3, 0.2))","Map(vectorType -> dense, length -> 3, values -> List(38.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 3, values -> List(1.0, 0.0, 0.0))",0.0
4.9,3.1,1.5,0.1,0,"Map(vectorType -> dense, length -> 4, values -> List(4.9, 3.1, 1.5, 0.1))","Map(vectorType -> dense, length -> 3, values -> List(38.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 3, values -> List(1.0, 0.0, 0.0))",0.0
4.9,3.1,1.5,0.2,0,"Map(vectorType -> dense, length -> 4, values -> List(4.9, 3.1, 1.5, 0.2))","Map(vectorType -> dense, length -> 3, values -> List(38.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 3, values -> List(1.0, 0.0, 0.0))",0.0
5.0,3.0,1.6,0.2,0,"Map(vectorType -> dense, length -> 4, values -> List(5.0, 3.0, 1.6, 0.2))","Map(vectorType -> dense, length -> 3, values -> List(38.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 3, values -> List(1.0, 0.0, 0.0))",0.0
5.2,4.1,1.5,0.1,0,"Map(vectorType -> dense, length -> 4, values -> List(5.2, 4.1, 1.5, 0.1))","Map(vectorType -> dense, length -> 3, values -> List(38.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 3, values -> List(1.0, 0.0, 0.0))",0.0
5.4,3.4,1.5,0.4,0,"Map(vectorType -> dense, length -> 4, values -> List(5.4, 3.4, 1.5, 0.4))","Map(vectorType -> dense, length -> 3, values -> List(38.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 3, values -> List(1.0, 0.0, 0.0))",0.0
5.4,3.4,1.7,0.2,0,"Map(vectorType -> dense, length -> 4, values -> List(5.4, 3.4, 1.7, 0.2))","Map(vectorType -> dense, length -> 3, values -> List(38.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 3, values -> List(1.0, 0.0, 0.0))",0.0
5.5,3.5,1.3,0.2,0,"Map(vectorType -> dense, length -> 4, values -> List(5.5, 3.5, 1.3, 0.2))","Map(vectorType -> dense, length -> 3, values -> List(38.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 3, values -> List(1.0, 0.0, 0.0))",0.0
5.5,4.2,1.4,0.2,0,"Map(vectorType -> dense, length -> 4, values -> List(5.5, 4.2, 1.4, 0.2))","Map(vectorType -> dense, length -> 3, values -> List(38.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 3, values -> List(1.0, 0.0, 0.0))",0.0


In [0]:
#cv_model은 cross validation 결과 best model이 가지는 hyperparameter로 refit되고, 해당 EstimatorModel이 bestModel속성으로 저장됨. 
best_dt_model = cv_model.bestModel
print("##### Cross validation 결과 Best Model ######")
print(best_dt_model)

##### Cross validation 결과 Best Model ######
DecisionTreeClassificationModel: uid=DecisionTreeClassifier_61f50b80b3b6, depth=3, numNodes=7, numClasses=3, numFeatures=4


In [0]:
best_model_predictions = best_dt_model.transform(test_sdf_vectorized)

print("\n##### Best Model로 테스트 데이터 예측 결과 ######")
evaluator_accuracy = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
print('정확도:', evaluator_accuracy.evaluate(best_model_predictions))

display(best_model_predictions)


##### Best Model로 테스트 데이터 예측 결과 ######
정확도: 0.9791666666666666


sepal_length,sepal_width,petal_length,petal_width,label,features,rawPrediction,probability,prediction
4.3,3.0,1.1,0.1,0,"Map(vectorType -> dense, length -> 4, values -> List(4.3, 3.0, 1.1, 0.1))","Map(vectorType -> dense, length -> 3, values -> List(38.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 3, values -> List(1.0, 0.0, 0.0))",0.0
4.7,3.2,1.3,0.2,0,"Map(vectorType -> dense, length -> 4, values -> List(4.7, 3.2, 1.3, 0.2))","Map(vectorType -> dense, length -> 3, values -> List(38.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 3, values -> List(1.0, 0.0, 0.0))",0.0
4.9,3.1,1.5,0.1,0,"Map(vectorType -> dense, length -> 4, values -> List(4.9, 3.1, 1.5, 0.1))","Map(vectorType -> dense, length -> 3, values -> List(38.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 3, values -> List(1.0, 0.0, 0.0))",0.0
4.9,3.1,1.5,0.2,0,"Map(vectorType -> dense, length -> 4, values -> List(4.9, 3.1, 1.5, 0.2))","Map(vectorType -> dense, length -> 3, values -> List(38.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 3, values -> List(1.0, 0.0, 0.0))",0.0
5.0,3.0,1.6,0.2,0,"Map(vectorType -> dense, length -> 4, values -> List(5.0, 3.0, 1.6, 0.2))","Map(vectorType -> dense, length -> 3, values -> List(38.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 3, values -> List(1.0, 0.0, 0.0))",0.0
5.2,4.1,1.5,0.1,0,"Map(vectorType -> dense, length -> 4, values -> List(5.2, 4.1, 1.5, 0.1))","Map(vectorType -> dense, length -> 3, values -> List(38.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 3, values -> List(1.0, 0.0, 0.0))",0.0
5.4,3.4,1.5,0.4,0,"Map(vectorType -> dense, length -> 4, values -> List(5.4, 3.4, 1.5, 0.4))","Map(vectorType -> dense, length -> 3, values -> List(38.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 3, values -> List(1.0, 0.0, 0.0))",0.0
5.4,3.4,1.7,0.2,0,"Map(vectorType -> dense, length -> 4, values -> List(5.4, 3.4, 1.7, 0.2))","Map(vectorType -> dense, length -> 3, values -> List(38.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 3, values -> List(1.0, 0.0, 0.0))",0.0
5.5,3.5,1.3,0.2,0,"Map(vectorType -> dense, length -> 4, values -> List(5.5, 3.5, 1.3, 0.2))","Map(vectorType -> dense, length -> 3, values -> List(38.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 3, values -> List(1.0, 0.0, 0.0))",0.0
5.5,4.2,1.4,0.2,0,"Map(vectorType -> dense, length -> 4, values -> List(5.5, 4.2, 1.4, 0.2))","Map(vectorType -> dense, length -> 3, values -> List(38.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 3, values -> List(1.0, 0.0, 0.0))",0.0


### Pipeline과 결합하여 사용하기
* CrossValidator에서 Estimator 대신 Pipeline을 적용할 수 있음. 
* Pipeline의 stage에 CrossValidator를 등록 시킬 수 있음.

In [0]:
display(train_sdf_vectorized)

sepal_length,sepal_width,petal_length,petal_width,label,features
4.4,2.9,1.4,0.2,0,"Map(vectorType -> dense, length -> 4, values -> List(4.4, 2.9, 1.4, 0.2))"
4.6,3.1,1.5,0.2,0,"Map(vectorType -> dense, length -> 4, values -> List(4.6, 3.1, 1.5, 0.2))"
4.6,3.4,1.4,0.3,0,"Map(vectorType -> dense, length -> 4, values -> List(4.6, 3.4, 1.4, 0.3))"
4.8,3.0,1.4,0.1,0,"Map(vectorType -> dense, length -> 4, values -> List(4.8, 3.0, 1.4, 0.1))"
4.8,3.4,1.6,0.2,0,"Map(vectorType -> dense, length -> 4, values -> List(4.8, 3.4, 1.6, 0.2))"
4.9,3.0,1.4,0.2,0,"Map(vectorType -> dense, length -> 4, values -> List(4.9, 3.0, 1.4, 0.2))"
5.0,3.4,1.5,0.2,0,"Map(vectorType -> dense, length -> 4, values -> List(5.0, 3.4, 1.5, 0.2))"
5.0,3.6,1.4,0.2,0,"Map(vectorType -> dense, length -> 4, values -> List(5.0, 3.6, 1.4, 0.2))"
5.1,3.5,1.4,0.2,0,"Map(vectorType -> dense, length -> 4, values -> List(5.1, 3.5, 1.4, 0.2))"
5.1,3.5,1.4,0.3,0,"Map(vectorType -> dense, length -> 4, values -> List(5.1, 3.5, 1.4, 0.3))"


#### CrossValidator에서 Estimator 대신 Pipeline을 적용

In [0]:
from pyspark.ml import Pipeline


train_sdf, test_sdf = iris_sdf.randomSplit([0.7, 0.3], seed=0)

iris_columns = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width']
stage_1 = VectorAssembler(inputCols=iris_columns, outputCol='features')
stage_2 = DecisionTreeClassifier(featuresCol='features', labelCol='label', maxDepth=10 )

# Feature Vectorization 변환, Estimator 객체 등록하여 Pipeline 객체 생성. 
pipeline_01 = Pipeline(stages=[stage_1, stage_2])

# 하이퍼파라미터 튜닝을 위한 그리드 서치(Grid Search)용 ParamGrid 생성. stage_2.maxDepth로 지정해야함
param_grid_01 = ParamGridBuilder().addGrid(stage_2.maxDepth, [5, 7, 8, 10])\
                               .addGrid(stage_2.minInstancesPerNode, [3, 5, 6])\
                               .build()

evaluator_accuracy_01 = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')

#CrossValidator 객체 생성 시 estimator 인자로 Estimator 객체가 아닌 Pipeline 객체 입력
cv = CrossValidator(estimator=pipeline_01, estimatorParamMaps=param_grid_01, evaluator=evaluator_accuracy_01, numFolds=3)

# Cross validationr과 하이퍼 파라미터 튜닝을 그리드 서치 방법으로 수행. 
# fit(입력 DataFrame)에서 입력 DataFrame은 feature vectorization이 되어 있지 않아야 함. Pipeline에서 feature vectorization을 수행하기 때문임. 
cv_model_01 = cv.fit(train_sdf) # pipeline_01.fit(train_sdf) 파이프라인에 이미 있으므로 원본을 넣는다.

cv_result_pdf = get_cv_result_pdf(cv_model_01)
display(cv_result_pdf)

params,evaluation_result
"Map(maxDepth -> 5, minInstancesPerNode -> 3)",0.9440476190476192
"Map(maxDepth -> 5, minInstancesPerNode -> 5)",0.9357142857142856
"Map(maxDepth -> 5, minInstancesPerNode -> 6)",0.9259103641456582
"Map(maxDepth -> 7, minInstancesPerNode -> 3)",0.9440476190476192
"Map(maxDepth -> 7, minInstancesPerNode -> 5)",0.9357142857142856
"Map(maxDepth -> 7, minInstancesPerNode -> 6)",0.9259103641456582
"Map(maxDepth -> 8, minInstancesPerNode -> 3)",0.9440476190476192
"Map(maxDepth -> 8, minInstancesPerNode -> 5)",0.9357142857142856
"Map(maxDepth -> 8, minInstancesPerNode -> 6)",0.9259103641456582
"Map(maxDepth -> 10, minInstancesPerNode -> 3)",0.9440476190476192


In [0]:
#cv_model의 tranform()호출 시 test_sdf는 feature vectorization되지 않아야 함. Pipeline에서 feature vectorization을 수행하기 때문임. 
predictions = cv_model_01.transform(test_sdf)
evaluator_accuracy = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
print(evaluator_accuracy.evaluate(predictions))

0.9791666666666666


#### Pipeline의 stage에 CrossValidator를 등록

In [0]:
# feature vectorization stage와 Cross Validation Stage로 Pipeline 구성. 
stage_vectorized = VectorAssembler(inputCols=iris_columns, outputCol='features')
# Cross Validation Stage를 위하여 CrossValidator용 Estimator 생성. 
dt_estimator = DecisionTreeClassifier(featuresCol='features', labelCol='label')

# Cross Validation Stage를 위하여 CrossValidator용 Param Grid 생성. 
param_grid_02 = ParamGridBuilder().addGrid(dt_estimator.maxDepth, [5, 7, 8, 10])\
                                  .addGrid(dt_estimator.minInstancesPerNode, [3, 5, 6])\
                                  .build()

evaluator_accuracy = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')

# Cross Validation Stage를 위하여 CrossValidator객체 생성. 
stage_cv = CrossValidator(estimator=dt_estimator, estimatorParamMaps=param_grid_02, evaluator=evaluator_accuracy, numFolds=3)

# feature vectorization stage와 Cross Validation Stage를 등록하여 Pipeline 객체 생성. 
pipeline_02 = Pipeline(stages=[stage_vectorized, stage_cv])

# Pipeline fit()수행하여 feature vectorization -> Cross Validation 수행. 
pipeline_model_02 = pipeline_02.fit(train_sdf)


In [0]:
print(pipeline_model_02.stages)
print(type(pipeline_model_02.stages[-1]))

[VectorAssembler_b33d2a209fd1, CrossValidatorModel_8ee58b67a192]
<class 'pyspark.ml.tuning.CrossValidatorModel'>


In [0]:
cv_model_02 = pipeline_model_02.stages[-1]
cv_result_pdf = get_cv_result_pdf(cv_model_02)
display(cv_result_pdf)

params,evaluation_result
"Map(maxDepth -> 5, minInstancesPerNode -> 3)",0.9440476190476192
"Map(maxDepth -> 5, minInstancesPerNode -> 5)",0.9357142857142856
"Map(maxDepth -> 5, minInstancesPerNode -> 6)",0.9259103641456582
"Map(maxDepth -> 7, minInstancesPerNode -> 3)",0.9440476190476192
"Map(maxDepth -> 7, minInstancesPerNode -> 5)",0.9357142857142856
"Map(maxDepth -> 7, minInstancesPerNode -> 6)",0.9259103641456582
"Map(maxDepth -> 8, minInstancesPerNode -> 3)",0.9440476190476192
"Map(maxDepth -> 8, minInstancesPerNode -> 5)",0.9357142857142856
"Map(maxDepth -> 8, minInstancesPerNode -> 6)",0.9259103641456582
"Map(maxDepth -> 10, minInstancesPerNode -> 3)",0.9440476190476192


In [0]:
# 예측을 위해 test용 DataFrame을 pipeline에 등록된 feature vectorization 적용 -> CrossValidatorModel의 transform() 수행하여 예측 
predictions = pipeline_model_02.transform(test_sdf) 
evaluator_accuracy = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
print(evaluator_accuracy.evaluate(predictions))

0.9791666666666666


### TrainValidationSplit를 이용하여 하이퍼 파라미터 튜닝

In [0]:
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

vector_assembler = VectorAssembler(inputCols=iris_columns, outputCol='features')
train_sdf_vectorized = vector_assembler.transform(train_sdf)

dt_estimator = DecisionTreeClassifier(featuresCol='features', labelCol='label', maxDepth=10 )
tvs_param_grid = ParamGridBuilder().addGrid(dt_estimator.maxDepth, [5, 7, 8, 10])\
                                  .addGrid(dt_estimator.minInstancesPerNode, [3, 5, 6])\
                                  .build()

evaluator_accuracy = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')

# TrainValidationSplit 객체 생성. 
tvs = TrainValidationSplit(estimator=dt_estimator, estimatorParamMaps=tvs_param_grid, evaluator=evaluator_accuracy, trainRatio=0.75, seed=0)
# train과 validation 데이터세트 분리후 하이퍼파라미터 튜닝 수행. 
tvs_model = tvs.fit(train_sdf_vectorized)

In [0]:
print('validation metrics:', tvs_model.validationMetrics) # cv_model.avgMetrics
params = [{p.name: v for p, v in m.items()} for m in tvs_model.getEstimatorParamMaps()]
print(params)

validation metrics: [0.95, 0.95, 0.9, 0.95, 0.95, 0.9, 0.95, 0.95, 0.9, 0.95, 0.95, 0.9]
[{'maxDepth': 5, 'minInstancesPerNode': 3}, {'maxDepth': 5, 'minInstancesPerNode': 5}, {'maxDepth': 5, 'minInstancesPerNode': 6}, {'maxDepth': 7, 'minInstancesPerNode': 3}, {'maxDepth': 7, 'minInstancesPerNode': 5}, {'maxDepth': 7, 'minInstancesPerNode': 6}, {'maxDepth': 8, 'minInstancesPerNode': 3}, {'maxDepth': 8, 'minInstancesPerNode': 5}, {'maxDepth': 8, 'minInstancesPerNode': 6}, {'maxDepth': 10, 'minInstancesPerNode': 3}, {'maxDepth': 10, 'minInstancesPerNode': 5}, {'maxDepth': 10, 'minInstancesPerNode': 6}]


In [0]:
import pandas as pd

# 검증 결과를 pandas DataFrame으로 반환하는 함수 생성. 
def get_tvs_result_pdf(tvs_model):
    params = [{p.name: v for p, v in m.items()} for m in tvs_model.getEstimatorParamMaps()]

    tvs_result_pdf= pd.DataFrame({'params': params, 'evaluation_result':tvs_model.validationMetrics })
    
    return tvs_result_pdf

tvs_result_pdf = get_tvs_result_pdf(tvs_model)
tvs_result_pdf.head(10)

Unnamed: 0,params,evaluation_result
0,"{'maxDepth': 5, 'minInstancesPerNode': 3}",0.95
1,"{'maxDepth': 5, 'minInstancesPerNode': 5}",0.95
2,"{'maxDepth': 5, 'minInstancesPerNode': 6}",0.9
3,"{'maxDepth': 7, 'minInstancesPerNode': 3}",0.95
4,"{'maxDepth': 7, 'minInstancesPerNode': 5}",0.95
5,"{'maxDepth': 7, 'minInstancesPerNode': 6}",0.9
6,"{'maxDepth': 8, 'minInstancesPerNode': 3}",0.95
7,"{'maxDepth': 8, 'minInstancesPerNode': 5}",0.95
8,"{'maxDepth': 8, 'minInstancesPerNode': 6}",0.9
9,"{'maxDepth': 10, 'minInstancesPerNode': 3}",0.95


In [0]:
#tvs_model은 fit()수행시 best model이 가지는 hyperparameter로 refit되고, 해당 EstimatorModel이 bestModel속성으로 저장됨. 
best_dt_model = tvs_model.bestModel

best_model_predictions = best_dt_model.transform(test_sdf_vectorized)

print("\n##### Best Model로 테스트 데이터 예측 결과 ######")
evaluator_accuracy = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
print('정확도:', evaluator_accuracy.evaluate(best_model_predictions))

display(best_model_predictions)


##### Best Model로 테스트 데이터 예측 결과 ######
정확도: 0.9791666666666666


sepal_length,sepal_width,petal_length,petal_width,label,features,rawPrediction,probability,prediction
4.3,3.0,1.1,0.1,0,"Map(vectorType -> dense, length -> 4, values -> List(4.3, 3.0, 1.1, 0.1))","Map(vectorType -> dense, length -> 3, values -> List(38.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 3, values -> List(1.0, 0.0, 0.0))",0.0
4.7,3.2,1.3,0.2,0,"Map(vectorType -> dense, length -> 4, values -> List(4.7, 3.2, 1.3, 0.2))","Map(vectorType -> dense, length -> 3, values -> List(38.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 3, values -> List(1.0, 0.0, 0.0))",0.0
4.9,3.1,1.5,0.1,0,"Map(vectorType -> dense, length -> 4, values -> List(4.9, 3.1, 1.5, 0.1))","Map(vectorType -> dense, length -> 3, values -> List(38.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 3, values -> List(1.0, 0.0, 0.0))",0.0
4.9,3.1,1.5,0.2,0,"Map(vectorType -> dense, length -> 4, values -> List(4.9, 3.1, 1.5, 0.2))","Map(vectorType -> dense, length -> 3, values -> List(38.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 3, values -> List(1.0, 0.0, 0.0))",0.0
5.0,3.0,1.6,0.2,0,"Map(vectorType -> dense, length -> 4, values -> List(5.0, 3.0, 1.6, 0.2))","Map(vectorType -> dense, length -> 3, values -> List(38.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 3, values -> List(1.0, 0.0, 0.0))",0.0
5.2,4.1,1.5,0.1,0,"Map(vectorType -> dense, length -> 4, values -> List(5.2, 4.1, 1.5, 0.1))","Map(vectorType -> dense, length -> 3, values -> List(38.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 3, values -> List(1.0, 0.0, 0.0))",0.0
5.4,3.4,1.5,0.4,0,"Map(vectorType -> dense, length -> 4, values -> List(5.4, 3.4, 1.5, 0.4))","Map(vectorType -> dense, length -> 3, values -> List(38.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 3, values -> List(1.0, 0.0, 0.0))",0.0
5.4,3.4,1.7,0.2,0,"Map(vectorType -> dense, length -> 4, values -> List(5.4, 3.4, 1.7, 0.2))","Map(vectorType -> dense, length -> 3, values -> List(38.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 3, values -> List(1.0, 0.0, 0.0))",0.0
5.5,3.5,1.3,0.2,0,"Map(vectorType -> dense, length -> 4, values -> List(5.5, 3.5, 1.3, 0.2))","Map(vectorType -> dense, length -> 3, values -> List(38.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 3, values -> List(1.0, 0.0, 0.0))",0.0
5.5,4.2,1.4,0.2,0,"Map(vectorType -> dense, length -> 4, values -> List(5.5, 4.2, 1.4, 0.2))","Map(vectorType -> dense, length -> 3, values -> List(38.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 3, values -> List(1.0, 0.0, 0.0))",0.0
