In [1]:
sc

<pyspark.context.SparkContext at 0x7f4bc6b07710>

In [2]:
from pyspark.sql import SQLContext
sqlc = SQLContext(sc)

## Spark ML Tuning

### Elements:
- Model (Estimator or Pipeline)
- Set of ParamMaps (to perform the grid search) - you should use the ***ParamGridBuilder*** utility
- Evaluator (to assess the fitness of the model)

## Cross-Validation

- Splits the dataset into K folds
- Each fold is splitted into a training (2/3) and a test (1/3) sets
- It will fit K models and compute the average of the K evaluation metrics (according to the Evaluator)
- Based on the metrics, it will determine the best set of parameters
- Then it will fit the model one final time, using this set of parameters and the whole dataset
- This is a VERY computationally expensive 

In [3]:
!rm -rf metastore_db/*.lck

training = sqlc.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0),
    (4, "b spark who", 1.0),
    (5, "g d a y", 0.0),
    (6, "spark fly", 1.0),
    (7, "was mapreduce", 0.0),
    (8, "e spark program", 1.0),
    (9, "a e c l", 0.0),
    (10, "spark compile", 1.0),
    (11, "hadoop software", 0.0)
], ["id", "text", "label"])

In [4]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer

tokenizer = Tokenizer(inputCol="text", outputCol="words")

hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")

lr = LogisticRegression(maxIter=10)

pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

In [5]:
from pyspark.ml.tuning import ParamGridBuilder

paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()

In [6]:
paramGrid

[{Param(parent=u'LogisticRegression_4a49a578ae4aae2ac322', name='regParam', doc='regularization parameter (>= 0).'): 0.1,
  Param(parent=u'HashingTF_4b558295ebfb444a0fac', name='numFeatures', doc='number of features.'): 10},
 {Param(parent=u'LogisticRegression_4a49a578ae4aae2ac322', name='regParam', doc='regularization parameter (>= 0).'): 0.1,
  Param(parent=u'HashingTF_4b558295ebfb444a0fac', name='numFeatures', doc='number of features.'): 100},
 {Param(parent=u'LogisticRegression_4a49a578ae4aae2ac322', name='regParam', doc='regularization parameter (>= 0).'): 0.1,
  Param(parent=u'HashingTF_4b558295ebfb444a0fac', name='numFeatures', doc='number of features.'): 1000},
 {Param(parent=u'LogisticRegression_4a49a578ae4aae2ac322', name='regParam', doc='regularization parameter (>= 0).'): 0.01,
  Param(parent=u'HashingTF_4b558295ebfb444a0fac', name='numFeatures', doc='number of features.'): 10},
 {Param(parent=u'LogisticRegression_4a49a578ae4aae2ac322', name='regParam', doc='regularization 

In [7]:
from pyspark.ml.tuning import CrossValidator

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=3)

cvModel = crossval.fit(training)

In [8]:
cvModel.avgMetrics

[0.5833333333333333,
 0.5833333333333333,
 0.6111111111111112,
 0.5833333333333333,
 0.5833333333333333,
 0.6111111111111112]

In [9]:
cvModel.bestModel

PipelineModel_453db7cd8407daaccfb0

In [10]:
cvModel.bestModel.stages

[Tokenizer_4884a4a41b967001185a,
 HashingTF_4b558295ebfb444a0fac,
 LogisticRegression_4a49a578ae4aae2ac322]

In [11]:
lr_best = cvModel.bestModel.stages[2]

In [12]:
lr_best.coefficients

SparseVector(1000, {6: -1.4383, 66: -0.7317, 94: -0.4712, 105: 2.0311, 170: -0.1318, 181: -0.7851, 217: 0.9701, 234: -0.7317, 248: 0.755, 282: 0.755, 315: -0.7966, 361: 0.2486, 417: -0.0229, 463: 1.0866, 644: 0.8376, 695: 1.0866, 722: 0.2602, 878: 0.534, 953: -0.7851})

In [13]:
lr_summary = lr_best.summary

In [14]:
lr_summary.areaUnderROC

1.0

In [15]:
test = sqlc.createDataFrame([
    (4L, "spark i j k"),
    (5L, "l m n"),
    (6L, "mapreduce spark"),
    (7L, "apache hadoop")
], ["id", "text"])

In [16]:
prediction = cvModel.transform(test)

selected = prediction.select("id", "text", "probability", "prediction")

for row in selected.collect():
    print(row)

Row(id=4, text=u'spark i j k', probability=DenseVector([0.2661, 0.7339]), prediction=1.0)
Row(id=5, text=u'l m n', probability=DenseVector([0.9209, 0.0791]), prediction=0.0)
Row(id=6, text=u'mapreduce spark', probability=DenseVector([0.4429, 0.5571]), prediction=1.0)
Row(id=7, text=u'apache hadoop', probability=DenseVector([0.8584, 0.1416]), prediction=0.0)


## Train-Validation Split

- It uses the entire dataset
- The dataset is splitted into a training and a test sets according to the ***trainRatio*** parameter
- It will fit a model for each set of parameters and evaluate its metrics (according to the Evaluator)
- Based on the metrics, it will determine the best set of parameters
- Then it will fit the model one final time, using this set of parameters and the whole dataset
- This is a much less expensive, but it may not yield good results if the dataset is not large enough

In [17]:
from pyspark.ml.regression import LinearRegression

data = sqlc.read.format("libsvm").load("/home/ubuntu/spark/data/mllib/sample_linear_regression_data.txt")

train, test = data.randomSplit([0.7, 0.3])

lr = LinearRegression(maxIter=20, regParam=0.1)

In [18]:
from pyspark.ml.tuning import ParamGridBuilder

paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
    .build()

In [19]:
paramGrid

[{Param(parent=u'LinearRegression_4e67b8a0403fd4f2477e', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0,
  Param(parent=u'LinearRegression_4e67b8a0403fd4f2477e', name='regParam', doc='regularization parameter (>= 0).'): 0.1},
 {Param(parent=u'LinearRegression_4e67b8a0403fd4f2477e', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0,
  Param(parent=u'LinearRegression_4e67b8a0403fd4f2477e', name='regParam', doc='regularization parameter (>= 0).'): 0.01},
 {Param(parent=u'LinearRegression_4e67b8a0403fd4f2477e', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.5,
  Param(parent=u'LinearRegression_4e67b8a0403fd4f2477e', name='regParam', doc='reg

In [20]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import TrainValidationSplit

tvs = TrainValidationSplit(estimator=lr,
                           estimatorParamMaps=paramGrid,
                           evaluator=RegressionEvaluator(),
                           trainRatio=0.8)

model = tvs.fit(train)

In [21]:
model.bestModel

LinearRegression_4e67b8a0403fd4f2477e

In [22]:
model.bestModel.coefficients

DenseVector([-0.7614, 1.3539, -0.0553, 2.7065, 0.2539, 1.5791, -0.0471, 0.6942, -0.8981, 0.9647])

In [23]:
model.bestModel.summary.r2

0.0513340249451959

In [24]:
prediction = model.transform(test)

In [25]:
prediction.toPandas()[:5]

Unnamed: 0,label,features,prediction
0,-28.571479,"(-0.45971844654, -0.548942938693, 0.3342291457...",-0.727912
1,-26.805483,"(0.457255270422, -0.576096954, -0.20809839485,...",0.0984
2,-23.510884,"(-0.468353842218, 0.146954018594, 0.9113612952...",0.504391
3,-23.48744,"(-0.519535443126, 0.808035794841, 0.8498613208...",0.640791
4,-20.212077,"(0.560906580841, -0.920190439115, 0.9083058651...",1.700844
