In [151]:
df = spark.read.json("data/simple-ml")
df.orderBy("value2").show()

+-----+----+------+------------------+
|color| lab|value1|            value2|
+-----+----+------+------------------+
|green|good|     1|14.386294994851129|
|green| bad|    16|14.386294994851129|
| blue| bad|     8|14.386294994851129|
| blue| bad|     8|14.386294994851129|
| blue| bad|    12|14.386294994851129|
|green| bad|    16|14.386294994851129|
|green|good|    12|14.386294994851129|
|  red|good|    35|14.386294994851129|
|  red|good|    35|14.386294994851129|
|  red| bad|     2|14.386294994851129|
|  red| bad|    16|14.386294994851129|
|  red| bad|    16|14.386294994851129|
| blue| bad|     8|14.386294994851129|
|green|good|     1|14.386294994851129|
|green|good|    12|14.386294994851129|
| blue| bad|     8|14.386294994851129|
|  red|good|    35|14.386294994851129|
| blue| bad|    12|14.386294994851129|
|  red| bad|    16|14.386294994851129|
|green|good|    12|14.386294994851129|
+-----+----+------+------------------+
only showing top 20 rows



In [152]:
from pyspark.ml.feature import RFormula
supervised = RFormula(formula = "lab ~. + color:value1 + color:value2")
fittedRF = supervised.fit(df)

In [153]:
fittedRF

RFormulaModel: uid=RFormula_171df6d3f801, resolvedFormula=ResolvedRFormula(label=lab, terms=[color,value1,value2,{color,value1},{color,value2}], hasIntercept=true)

In [154]:
prepared_df = fittedRF.transform(df)
prepared_df.show()

+-----+----+------+------------------+--------------------+-----+
|color| lab|value1|            value2|            features|label|
+-----+----+------+------------------+--------------------+-----+
|green|good|     1|14.386294994851129|(10,[1,2,3,5,8],[...|  1.0|
| blue| bad|     8|14.386294994851129|(10,[2,3,6,9],[8....|  0.0|
| blue| bad|    12|14.386294994851129|(10,[2,3,6,9],[12...|  0.0|
|green|good|    15| 38.97187133755819|(10,[1,2,3,5,8],[...|  1.0|
|green|good|    12|14.386294994851129|(10,[1,2,3,5,8],[...|  1.0|
|green| bad|    16|14.386294994851129|(10,[1,2,3,5,8],[...|  0.0|
|  red|good|    35|14.386294994851129|(10,[0,2,3,4,7],[...|  1.0|
|  red| bad|     1| 38.97187133755819|(10,[0,2,3,4,7],[...|  0.0|
|  red| bad|     2|14.386294994851129|(10,[0,2,3,4,7],[...|  0.0|
|  red| bad|    16|14.386294994851129|(10,[0,2,3,4,7],[...|  0.0|
|  red|good|    45| 38.97187133755819|(10,[0,2,3,4,7],[...|  1.0|
|green|good|     1|14.386294994851129|(10,[1,2,3,5,8],[...|  1.0|
| blue| ba

In [160]:
train, test = prepared_df.randomSplit([0.2,0.8])

In [161]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="label", featuresCol = "features")

In [162]:
fitted_lr = lr.fit(train)

In [163]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()\
    .setMetricName("areaUnderROC")\
    .setRawPredictionCol("prediction")\
    .setLabelCol("label")


In [164]:
evaluator.evaluate(fitted_lr.transform(test))

1.0

**Spark Pipeline**

In [165]:
#methods
rForm = RFormula()
lr = LogisticRegression().setLabelCol("label").setFeaturesCol("features")

In [166]:
from pyspark.ml import Pipeline
stages = [rForm, lr]
pipeline = Pipeline().setStages(stages)

In [170]:
from pyspark.ml.tuning import ParamGridBuilder
params = ParamGridBuilder()\
            .addGrid(rForm.formula,[
                "lab~.+color:value1",
                "lab~.+color:value1 + color:value2"])\
            .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
            .addGrid(lr.regParam, [0.1,2.0])\
            .build()

In [171]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()\
    .setMetricName("areaUnderROC")\
    .setRawPredictionCol("prediction")\
    .setLabelCol("label")

In [172]:
from pyspark.ml.tuning import TrainValidationSplit
train, test = df.randomSplit([0.7, 0.3])
tvs = TrainValidationSplit()\
        .setTrainRatio(0.75)\
        .setEstimatorParamMaps(params)\
        .setEstimator(pipeline)\
        .setEvaluator(evaluator)
tvs_fitted = tvs.fit(train)

In [173]:
evaluator.evaluate(tvs_fitted.transform(test)) 

0.9210526315789473

In [150]:
trained_pipeline = tvs_fitted.bestModel
trained_lr = trained_pipeline.stages[1]
trained_lr.summary.objectiveHistory

[0.6927819059876473,
 0.6858019208354478,
 0.6429501066450108,
 0.6256794931772237,
 0.6151941181084452,
 0.6104129579953256,
 0.6053693651795746,
 0.5990637704815193,
 0.5946606861915436,
 0.5927849584988801,
 0.589377571005326,
 0.5888121739708371,
 0.5883054484661103,
 0.5881302495270053,
 0.5879840721225077,
 0.5879554489019854,
 0.587938349074488,
 0.5879244671322672,
 0.5879151435829033,
 0.587906773490357,
 0.5879027628326016,
 0.5879005468767974,
 0.5878973198729107,
 0.5878869548157649,
 0.5878379009881225,
 0.5878280828975404,
 0.5878064517553149,
 0.5878021130716453,
 0.587795929363375,
 0.5877945559300486,
 0.5877869802692381,
 0.5877861054209362,
 0.5877783102711365,
 0.5877777141776835,
 0.5877694424078204,
 0.5877668509369062,
 0.5877646665437557,
 0.5877487548668824,
 0.5877444512117623,
 0.5877375709055844,
 0.5877334766258682,
 0.5877316790097143,
 0.5877279106585566,
 0.5877255199223633,
 0.5877245415887306,
 0.5877219976327248,
 0.5877200836853761,
 0.58771792572589