# Pipeline and Grid Search
+ Импорт данных
+ Преобразование признаков(PolynomialExpansion)
+ Масштабирование
+ Обучение и применение модели
+ Объединение в Pipeline
+ Grid Search
+ Cross Validation
## Импорт данных

In [12]:
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors

val data = sqlContext.read.parquet("/user/supp.bda08/iris.parquet").toDF("label","Features")

data.show(5)

val splited = data.randomSplit(weights = Array(0.85, 0.15), seed = 11)

val train = splited(0).cache()
val test = splited(1)

+-----+-----------------+
|label|         Features|
+-----+-----------------+
|  0.0|[5.1,3.5,1.4,0.2]|
|  0.0|[4.9,3.0,1.4,0.2]|
|  0.0|[4.7,3.2,1.3,0.2]|
|  0.0|[4.6,3.1,1.5,0.2]|
|  0.0|[5.0,3.6,1.4,0.2]|
+-----+-----------------+
only showing top 5 rows



data = [label: double, Features: vector]
splited = Array([label: double, Features: vector], [label: double, Features: vector])
train = [label: double, Features: vector]
test = [label: double, Features: vector]


lastException: Throwable = null


[label: double, Features: vector]

## Преобразование признаков
Попробуем для начала степенное преобразование признаков 2 степени 
с помощью трансформера PolynomialExpansion

In [13]:
import org.apache.spark.ml.feature.PolynomialExpansion

val polynomialExpansion = new PolynomialExpansion()
  .setInputCol("Features")
  .setOutputCol("polyFeatures")
  .setDegree(2)
val polyDF = polynomialExpansion.transform(train)
val polyDF_test = polynomialExpansion.transform(test)

polyDF.select("polyFeatures").show(5)

polynomialExpansion: org.apache.spark.ml.feature.PolynomialExpansion = poly_8b0f52847fd1
polyDF: org.apache.spark.sql.DataFrame = [label: double, Features: vector, polyFeatures: vector]
polyDF_test: org.apache.spark.sql.DataFrame = [label: double, Features: vector, polyFeatures: vector]


+--------------------+
|        polyFeatures|
+--------------------+
|[4.3,18.49,3.0,12...|
|[4.4,19.360000000...|
|[4.4,19.360000000...|
|[4.4,19.360000000...|
|[4.5,20.25,2.3,10...|
+--------------------+
only showing top 5 rows



# Масштабирование
Сделаем масштабирование признаков с помощью StandardScaler

In [14]:
import org.apache.spark.ml.feature.StandardScaler

val scaler = new StandardScaler()
  .setInputCol("polyFeatures")
  .setOutputCol("features")
  .setWithStd(true)
  .setWithMean(false)

val scalerModel = scaler.fit(polyDF)

val transformed = scalerModel.transform(polyDF)
val transformed_test = scalerModel.transform(polyDF_test)

transformed.show(5)

scaler: org.apache.spark.ml.feature.StandardScaler = stdScal_1bdced9ed3c7
scalerModel: org.apache.spark.ml.feature.StandardScalerModel = stdScal_1bdced9ed3c7
transformed: org.apache.spark.sql.DataFrame = [label: double, features: vector, polyFeatures: vector]
transformed_test: org.apache.spark.sql.DataFrame = [label: double, features: vector, polyFeatures: vector]


+-----+--------------------+--------------------+
|label|            features|        polyFeatures|
+-----+--------------------+--------------------+
|  0.0|[5.09330426783669...|[4.3,18.49,3.0,12...|
|  0.0|[5.21175320429801...|[4.4,19.360000000...|
|  0.0|[5.21175320429801...|[4.4,19.360000000...|
|  0.0|[5.21175320429801...|[4.4,19.360000000...|
|  0.0|[5.33020214075933...|[4.5,20.25,2.3,10...|
+-----+--------------------+--------------------+
only showing top 5 rows



## Обучение и применение модели
Обучим модель случайного леса с тремя классами. Для этого обернем наш классификатор в модель OneVsRest

In [4]:
import org.apache.spark.ml.classification.{OneVsRest, RandomForestClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator

val classifier = new RandomForestClassifier()
    .setNumTrees(10)
val ovr = new OneVsRest()
ovr.setClassifier(classifier)

val model = ovr.fit(transformed)


val preds = model.transform(transformed_test)
val evaluator = new MulticlassClassificationEvaluator()
val f1_score = evaluator.evaluate(preds)

println(s"f1 score = ${f1_score}")

classifier: org.apache.spark.ml.classification.RandomForestClassifier = rfc_b1c47e4c5ac6
ovr: org.apache.spark.ml.classification.OneVsRest = oneVsRest_2b2b0c05dcd4
model: org.apache.spark.ml.classification.OneVsRestModel = oneVsRest_2b2b0c05dcd4
preds: org.apache.spark.sql.DataFrame = [label: double, Features: vector, polyFeatures: vector, features: vector, prediction: double]
evaluator: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_ae2f905a07ae
f1_score: Double = 0.8584356819650937


f1 score = 0.8584356819650937


## Pipeline
Сложим все эти шаги в единый пайплайн 

In [5]:
import org.apache.spark.ml.Pipeline

val polynomialExpansion = new PolynomialExpansion()
  .setInputCol("Features")
  .setOutputCol("polyFeatures")

val scaler = new StandardScaler()
  .setInputCol("polyFeatures")
  .setOutputCol("features")
  .setWithStd(true)
  .setWithMean(false)

val classifier = new RandomForestClassifier()
val ovr = new OneVsRest()
ovr.setClassifier(classifier)



val pipeline = new Pipeline()
  .setStages(Array(polynomialExpansion, scaler, ovr))
val pipelineModel = pipeline.fit(train)


val preds = pipelineModel.transform(test)
val evaluator = new MulticlassClassificationEvaluator()
val f1_score = evaluator.evaluate(preds)

println(s"f1 score = ${f1_score}")

polynomialExpansion: org.apache.spark.ml.feature.PolynomialExpansion = poly_3433f58d3201
scaler: org.apache.spark.ml.feature.StandardScaler = stdScal_65fd456627e5
classifier: org.apache.spark.ml.classification.RandomForestClassifier = rfc_748a484d64c6
ovr: org.apache.spark.ml.classification.OneVsRest = oneVsRest_791cc8b3b8bd
pipeline: org.apache.spark.ml.Pipeline = pipeline_0c382ee84476
pipelineModel: org.apache.spark.ml.PipelineModel = pipeline_0c382ee84476
preds: org.apache.spark.sql.DataFrame = [label: double, Features: vector, polyFeatures: vector, features: vector, prediction: double]
evaluator: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_fed14d6bd7f2
f1_score: Double = 0.8584356819650937


f1 score = 0.8584356819650937


## Grid Search
Создадим простую сетку параметров:
+ Степень полинома (2, 3)
+ Количество деревьев в алгоритме случайного леса (5, 10, 20, 30)

In [6]:
import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}

val paramGrid = new ParamGridBuilder()
  .addGrid(polynomialExpansion.degree, Array(2, 3))
  .addGrid(classifier.numTrees, Array(5, 10, 20, 30))
  .build()

paramGrid = 


Array({
	poly_3433f58d3201-degree: 2,
	rfc_748a484d64c6-numTrees: 5
}, {
	poly_3433f58d3201-degree: 3,
	rfc_748a484d64c6-numTrees: 5
}, {
	poly_3433f58d3201-degree: 2,
	rfc_748a484d64c6-numTrees: 10
}, {
	poly_3433f58d3201-degree: 3,
	rfc_748a484d64c6-numTrees: 10
}, {
	poly_3433f58d3201-degree: 2,
	rfc_748a484d64c6-numTrees: 20
}, {
	poly_3433f58d3201-degree: 3,
	rfc_748a484d64c6-numTrees: 20
}, {
	poly_3433f58d3201-degree: 2,
	rfc_748a484d64c6-numTrees: 30
}, {
	poly_3433f58d3201-degree: 3,
	rfc_748a484d64c6-numTrees: 30
})


[{
	poly_3433f58d3201-degree: 2,
	rfc_748a484d64c6-numTrees: 5
}, {
	poly_3433f58d3201-degree: 3,
	rfc_748a484d64c6-numTrees: 5
}, {
	poly_3433f58d3201-degree: 2,
	rfc_748a484d64c6-numTrees: 10
}, {
	poly_3433f58d3201-degree: 3,
	rfc_748a484d64c6-numTrees: 10
}, {
	poly_3433f58d3201-degree: 2,
	rfc_748a484d64c6-numTrees: 20
}, {
	poly_3433f58d3201-degree: 3,
	rfc_748a484d64c6-numTrees: 20
}, {
	poly_3433f58d3201-degree: 2,
	rfc_748a484d64c6-numTrees: 30
}, {
	poly_3433f58d3201-degree: 3,
	rfc_748a484d64c6-numTrees: 30
}]

## Cross Validation
Создадим модель кросс-валидации по нашему пайплайну, 3 фолдам и сетке параметров.
В результате estimator вернет нам лучшую модель по выбранной метрике. 

In [7]:
val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(new MulticlassClassificationEvaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(3)

cv = cv_d683d14201c6


cv_d683d14201c6

In [8]:
val model = cv.fit(train)

model = cv_d683d14201c6


cv_d683d14201c6

Посмотрим на результаты лучшей модели: метрику и параметры

In [9]:
val f1 = evaluator.evaluate(model.transform(test))

val best_params = model.getEstimatorParamMaps
    .zip(model.avgMetrics)
    .maxBy(_._2)
    ._1

f1 = 0.9056122448979592
best_params = 


{
	poly_3433f58d3201-degree: 2,
	rfc_748a484d64c6-numTrees: 30
}