dbtech@debian10:~/Spark$ spark-shell Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://debian10.local:4042 Spark context available as 'sc' (master = local[*], app id = local-1675586548395). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 3.3.1 /_/ Using Scala version 2.12.15 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_202) Type in expressions to have them evaluated. Type :help for more information. ... scala> // Pipelining Our Workflow scala> // p 420 scala> val Array(train, test) = df.randomSplit(Array(0.7, 0.3)) train: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [color: string, lab: string ... 2 more fields] test: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [color: string, lab: string ... 2 more fields] scala> val rForm = new RFormula() rForm: org.apache.spark.ml.feature.RFormula = RFormula: uid=rFormula_5d2f990bdd05 scala> val lr = new LogisticRegression().setLabelCol("label").setFeaturesCol("features") lr: org.apache.spark.ml.classification.LogisticRegression = logreg_c929337cd6f4 scala> // p 421 scala> import org.apache.spark.ml.Pipeline import org.apache.spark.ml.Pipeline scala> val stages = Array(rForm, lr) stages: Array[org.apache.spark.ml.Estimator[_ >: org.apache.spark.ml.classification.LogisticRegressionModel with org.apache.spark.ml.feature.RFormulaModel <: org.apache.spark.ml.Model[_ >: org.apache.spark.ml.classification.LogisticRegressionModel with org.apache.spark.ml.feature.RFormulaModel <: org.apache.spark.ml.Transformer with org.apache.spark.ml.param.shared.HasLabelCol with org.apache.spark.ml.param.shared.HasFeaturesCol with org.apache.spark.ml.util.MLWritable] with org.apache.spark.ml.param.shared.HasLabelCol with org.apache.spark.ml.param.shared.HasFeaturesCol with org.apache.spark.ml.util.MLWritable] with org.apache.spark.ml.param.shared.HasLabelCol with org.apache.spark.ml.param.shared.HasFeaturesCol with org.apache.spark.ml.util.DefaultParamsWritable{... scala> val pipeline = new Pipeline().setStages(stages) pipeline: org.apache.spark.ml.Pipeline = pipeline_ef9fc1ede161 scala> // Training and Evaluation: scala> import org.apache.spark.ml.tuning.ParamGridBuilder import org.apache.spark.ml.tuning.ParamGridBuilder scala> val params = (new ParamGridBuilder() | .addGrid(rForm.formula, Array( | "lab ~ . + color:value1", | "lab ~ . + color:value1 + color:value2")) | .addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0)) | .addGrid(lr.regParam, Array(0.1, 2.0)) | .build() | ) params: Array[org.apache.spark.ml.param.ParamMap] = Array({ logreg_c929337cd6f4-elasticNetParam: 0.0, rFormula_5d2f990bdd05-formula: lab ~ . + color:value1, logreg_c929337cd6f4-regParam: 0.1 }, { logreg_c929337cd6f4-elasticNetParam: 0.5, rFormula_5d2f990bdd05-formula: lab ~ . + color:value1, logreg_c929337cd6f4-regParam: 0.1 }, { logreg_c929337cd6f4-elasticNetParam: 1.0, rFormula_5d2f990bdd05-formula: lab ~ . + color:value1, logreg_c929337cd6f4-regParam: 0.1 }, { logreg_c929337cd6f4-elasticNetParam: 0.0, rFormula_5d2f990bdd05-formula: lab ~ . + color:value1, logreg_c929337cd6f4-regParam: 2.0 }, { logreg_c929337cd6f4-elasticNetParam: 0.5, rFormula_5d2f990bdd05-formula: lab ~ . + color:value1, logreg_c929337cd6f4-regParam: 2.0 }, { logreg_c929337c... scala> // p 422 scala> import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator scala> val evaluator = (new BinaryClassificationEvaluator() | .setMetricName("areaUnderROC") | .setRawPredictionCol("prediction") | .setLabelCol("label") | ) evaluator: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator = BinaryClassificationEvaluator: uid=binEval_38d30199ac9a, metricName=areaUnderROC, numBins=1000 scala> import org.apache.spark.ml.tuning.TrainValidationSplit import org.apache.spark.ml.tuning.TrainValidationSplit scala> val tvs = (new TrainValidationSplit() | .setTrainRatio(0.75) // also the default. | .setEstimatorParamMaps(params) // error: type mismatch | .setEstimator(pipeline) | .setEvaluator(evaluator) | ) tvs: org.apache.spark.ml.tuning.TrainValidationSplit = tvs_ea9c544d061a scala> // p 423 scala> val tvsFitted = tvs.fit(train) tvsFitted: org.apache.spark.ml.tuning.TrainValidationSplitModel = TrainValidationSplitModel: uid=tvs_ea9c544d061a, bestModel=pipeline_ef9fc1ede161, trainRatio=0.75 scala> evaluator.evaluate(tvsFitted.transform(test)) // 0.9166666666666667 res8: Double = 0.7651821862348178 scala> import org.apache.spark.ml.PipelineModel import org.apache.spark.ml.PipelineModel scala> import org.apache.spark.ml.classification.LogisticRegressionModel import org.apache.spark.ml.classification.LogisticRegressionModel scala> val trainedPipeline = tvsFitted.bestModel.asInstanceOf[PipelineModel] trainedPipeline: org.apache.spark.ml.PipelineModel = pipeline_ef9fc1ede161 scala> val TrainedLR = trainedPipeline.stages(1).asInstanceOf[LogisticRegressionModel] TrainedLR: org.apache.spark.ml.classification.LogisticRegressionModel = LogisticRegressionModel: uid=logreg_c929337cd6f4, numClasses=2, numFeatures=7 scala> val summaryLR = TrainedLR.summary summaryLR: org.apache.spark.ml.classification.LogisticRegressionTrainingSummary = org.apache.spark.ml.classification.BinaryLogisticRegressionTrainingSummaryImpl@780eb352 scala> summaryLR.objectiveHistory // 0.6751425885789243, 0.5543659647777687, 0.473776... res9: Array[Double] = Array(0.6719583059278968, 0.6475031325504873, 0.5887448750830291, 0.5817096472502368, 0.5800299510636623, 0.5790220897159984, 0.5787477711186043, 0.5786388131062691, 0.5784587857917531, 0.5780775104907241, 0.577918539244777, 0.5779135596254581, 0.5778821561258602, 0.5778810892221772, 0.5778795041325092, 0.5778776761948066, 0.5778775477073494, 0.5778775130411241, 0.577877496470387, 0.5778774811957399, 0.577877466886602, 0.5778774517581886, 0.5778774421090169, 0.5778774410762949, 0.5778774408394689, 0.5778774391599338, 0.5778774386511226, 0.5778774385861873, 0.577877438138773, 0.577877438073588, 0.5778774379134698, 0.5778774377171001, 0.5778774376810277, 0.5778774376302049, 0.577877437624541) scala> // Persisting and Applying Models: scala> tvsFitted.write.overwrite().save("/tmp/modelLocation") scala> // p 424 scala> import org.apache.spark.ml.tuning.TrainValidationSplitModel import org.apache.spark.ml.tuning.TrainValidationSplitModel scala> val model = TrainValidationSplitModel.load("/tmp/modelLocation") model: org.apache.spark.ml.tuning.TrainValidationSplitModel = TrainValidationSplitModel: uid=tvs_ea9c544d061a, bestModel=pipeline_ef9fc1ede161, trainRatio=0.75 scala> model.transform(test) res11: org.apache.spark.sql.DataFrame = [color: string, lab: string ... 7 more fields] scala>