In [1]:
spark

In [2]:
sc

# Data

In [8]:
!hadoop fs -rm -r /raw/simple-ml

Deleted /raw/simple-ml


In [13]:
!hadoop fs -mkdir -p /raw/simple-ml

In [16]:
!hadoop fs -put /home/cloudera/Hadoop/spark/data/simple-ml.json /raw/simple-ml

In [17]:
!hadoop fs -ls /raw/simple-ml

Found 1 items
-rw-r--r--   1 cloudera supergroup       7559 2018-02-13 13:55 /raw/simple-ml/simple-ml.json


# MLib in action

In [18]:
df = spark.read.json("/raw/simple-ml")

In [19]:
df.orderBy("value2").show()

+-----+----+------+------------------+
|color| lab|value1|            value2|
+-----+----+------+------------------+
|green|good|     1|14.386294994851129|
|green| bad|    16|14.386294994851129|
| blue| bad|     8|14.386294994851129|
| blue| bad|     8|14.386294994851129|
| blue| bad|    12|14.386294994851129|
|green| bad|    16|14.386294994851129|
|green|good|    12|14.386294994851129|
|  red|good|    35|14.386294994851129|
|  red|good|    35|14.386294994851129|
|  red| bad|     2|14.386294994851129|
|  red| bad|    16|14.386294994851129|
|  red| bad|    16|14.386294994851129|
| blue| bad|     8|14.386294994851129|
|green|good|     1|14.386294994851129|
|green|good|    12|14.386294994851129|
| blue| bad|     8|14.386294994851129|
|  red|good|    35|14.386294994851129|
| blue| bad|    12|14.386294994851129|
|  red| bad|    16|14.386294994851129|
|green|good|    12|14.386294994851129|
+-----+----+------+------------------+
only showing top 20 rows



## Transformers

In [20]:
from pyspark.ml.feature import RFormula

In [21]:
supervised = RFormula().setFormula("lab ~ color + value1 + value2")

In [22]:
fittedDF = supervised.fit(df)

In [23]:
preparedDF = fittedDF.transform(df)

In [24]:
preparedDF.printSchema()

root
 |-- color: string (nullable = true)
 |-- lab: string (nullable = true)
 |-- value1: long (nullable = true)
 |-- value2: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- label: double (nullable = true)



In [25]:
preparedDF.show()

+-----+----+------+------------------+--------------------+-----+
|color| lab|value1|            value2|            features|label|
+-----+----+------+------------------+--------------------+-----+
|green|good|     1|14.386294994851129|[0.0,1.0,1.0,14.3...|  1.0|
| blue| bad|     8|14.386294994851129|[0.0,0.0,8.0,14.3...|  0.0|
| blue| bad|    12|14.386294994851129|[0.0,0.0,12.0,14....|  0.0|
|green|good|    15| 38.97187133755819|[0.0,1.0,15.0,38....|  1.0|
|green|good|    12|14.386294994851129|[0.0,1.0,12.0,14....|  1.0|
|green| bad|    16|14.386294994851129|[0.0,1.0,16.0,14....|  0.0|
|  red|good|    35|14.386294994851129|[1.0,0.0,35.0,14....|  1.0|
|  red| bad|     1| 38.97187133755819|[1.0,0.0,1.0,38.9...|  0.0|
|  red| bad|     2|14.386294994851129|[1.0,0.0,2.0,14.3...|  0.0|
|  red| bad|    16|14.386294994851129|[1.0,0.0,16.0,14....|  0.0|
|  red|good|    45| 38.97187133755819|[1.0,0.0,45.0,38....|  1.0|
|green|good|     1|14.386294994851129|[0.0,1.0,1.0,14.3...|  1.0|
| blue| ba

In [26]:
train, test = preparedDF.randomSplit([0.7, 0.3])

## Estimators

In [27]:
from pyspark.ml.classification import LogisticRegression

In [28]:
lr = LogisticRegression() \
    .setLabelCol("label") \
    .setFeaturesCol("features")

In [29]:
fittedLR = lr.fit(train)

In [30]:
fittedLR.transform(train).select("label", "prediction").show()

+-----+----------+
|label|prediction|
+-----+----------+
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       1.0|
|  0.0|       1.0|
|  0.0|       1.0|
+-----+----------+
only showing top 20 rows



## Evaluators

In [35]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [36]:
evaluator = BinaryClassificationEvaluator()   \
    .setMetricName("areaUnderROC") \
    .setRawPredictionCol("prediction") \
    .setLabelCol("label")

In [37]:
evaluator.evaluate(fittedLR.transform(test))

0.9090909090909091

## Writing the model

In [39]:
fittedLR.write().overwrite().save("model")

In [40]:
!hadoop fs -ls model

Found 2 items
drwxr-xr-x   - cloudera cloudera          0 2018-02-13 13:57 model/data
drwxr-xr-x   - cloudera cloudera          0 2018-02-13 13:56 model/metadata


## Pipeline

In [43]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import RFormula
from pyspark.ml.classification import LogisticRegression

In [44]:
train, test = df.randomSplit([0.7, 0.3])

In [45]:
rForm = RFormula()
lr = LogisticRegression() \
    .setLabelCol("label") \
    .setFeaturesCol("features")

In [46]:
stages = [rForm, lr]
pipeline = Pipeline().setStages(stages)

## Evaluators

In [47]:
from pyspark.ml.tuning import ParamGridBuilder

In [48]:
params = ParamGridBuilder() \
    .addGrid(rForm.formula, ["lab ~ . + color:value1", "lab ~ . + color:value1 + color:value2"]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])  \
    .addGrid(lr.regParam, [0.1, 2.0])  \
    .build()

In [49]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [50]:
evaluator = BinaryClassificationEvaluator()   \
    .setMetricName("areaUnderROC") \
    .setRawPredictionCol("prediction") \
    .setLabelCol("label")

In [51]:
from pyspark.ml.tuning import TrainValidationSplit

In [52]:
tvs  = TrainValidationSplit() \
    .setTrainRatio(0.75) \
    .setEstimatorParamMaps(params) \
    .setEstimator(pipeline) \
    .setEvaluator(evaluator)

In [53]:
tvsFitted = tvs.fit(train)

In [54]:
evaluator.evaluate(tvsFitted.transform(test))

0.9130434782608696

## Saving the best model

In [55]:
best = tvsFitted.bestModel

In [56]:
best.write().overwrite().save("bestmodel")

# Loading the model

In [57]:
from pyspark.ml.pipeline import PipelineModel 

In [58]:
model = PipelineModel.load("bestmodel")

In [59]:
evaluator.evaluate(model.transform(test))

0.9130434782608696

In [60]:
model.transform(test)

DataFrame[color: string, lab: string, value1: bigint, value2: double, features: vector, label: double, rawPrediction: vector, probability: vector, prediction: double]

In [61]:
l = [('green', 1, 14.386294994), ('blue', 1, 14.386294994)]

df = spark.createDataFrame(l, ['color', 'value1', 'value2'])

In [62]:
df_fitted = model.transform(df)

In [63]:
df_fitted.show()

+-----+------+------------+--------------------+--------------------+--------------------+----------+
|color|value1|      value2|            features|       rawPrediction|         probability|prediction|
+-----+------+------------+--------------------+--------------------+--------------------+----------+
|green|     1|14.386294994|[0.0,1.0,1.0,14.3...|[-0.3258182494576...|[0.41925845399245...|       1.0|
| blue|     1|14.386294994|(7,[2,3,6],[1.0,1...|[1.37391263353745...|[0.79801156290428...|       0.0|
+-----+------+------------+--------------------+--------------------+--------------------+----------+

