In [1]:
import io
import sys

from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression, RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

from pyspark.sql import SparkSession

import numpy as np

In [2]:
# Используем как путь куда сохранить модель
MODEL_PATH = 'spark_ml_model'

In [3]:
spark = SparkSession.builder.appName('PySparkMLFitJob').getOrCreate()

In [4]:
spark

# 1. Get data

In [5]:
# download train and test data
train = spark.read.parquet('train.parquet')
test = spark.read.parquet('test.parquet')

In [6]:
train.show(5)

+-----+---------------------+---------+------+------+----------------+---------+-----------------+
|ad_id|target_audience_count|has_video|is_cpm|is_cpc|         ad_cost|day_count|              ctr|
+-----+---------------------+---------+------+------+----------------+---------+-----------------+
|    1|     10707.2440058622|        1|     1|     0|201.829292651124|       15|0.431740082807281|
|    5|     10643.3872649482|        1|     1|     0|192.577221699704|       15|0.809264519216201|
|    6|     11418.7085911347|        1|     1|     0|204.104562956739|       11|0.909738306804039|
|    7|     10109.3278687796|        1|     1|     0|194.255798599684|       12|0.941221039774456|
|    8|     10665.1119991977|        1|     1|     0|202.658042557742|       14|0.986790019690954|
+-----+---------------------+---------+------+------+----------------+---------+-----------------+
only showing top 5 rows



In [7]:
test.show(5)

+-----+---------------------+---------+------+------+----------------+---------+-----------------+
|ad_id|target_audience_count|has_video|is_cpm|is_cpc|         ad_cost|day_count|              ctr|
+-----+---------------------+---------+------+------+----------------+---------+-----------------+
|    2|     11012.2068140534|        1|     1|     0|196.691891825393|       17| 0.50005065193925|
|    3|     9923.69112524699|        1|     1|     0|202.617038691842|       15|0.637132195277704|
|    4|     10202.3140990505|        1|     1|     0|203.496891469936|       15|0.783706394973096|
|   10|     10239.9431887051|        1|     1|     0|195.804239443196|       15| 1.01044552869544|
|   13|     8373.52511906263|        1|     1|     0|202.221614839989|       13| 1.05570252090352|
+-----+---------------------+---------+------+------+----------------+---------+-----------------+
only showing top 5 rows



# 2. Pipeline building

In [8]:
features = VectorAssembler(
        inputCols=['has_video', 'is_cpm', 'is_cpc', 'ad_cost', 'day_count', 'target_audience_count'],
        outputCol='features'
)

# LR part

In [9]:
lr = LinearRegression(solver='normal', 
                      loss='squaredError',
                      labelCol='ctr',
                      featuresCol='features')

In [10]:
lr_paramGrid = ParamGridBuilder()\
    .addGrid(lr.standardization, [True, False]) \
    .addGrid(lr.elasticNetParam, [0.2, 0.5, 0.8, 1.0])\
    .build()

In [11]:
lr_tvs = TrainValidationSplit(estimator=lr,
                           estimatorParamMaps=lr_paramGrid,
                           evaluator=RegressionEvaluator(labelCol='ctr', predictionCol='prediction', metricName='rmse'),
                           trainRatio=0.8)

### build pipeline

In [12]:
lr_pipeline = Pipeline(stages=[features, lr_tvs])

In [13]:
lr_model = lr_pipeline.fit(train)

In [14]:
lr_prediction = lr_model.transform(test)

In [15]:
lr_prediction.select("prediction", "ctr", "features").show(10)

+------------------+-----------------+--------------------+
|        prediction|              ctr|            features|
+------------------+-----------------+--------------------+
|1.3012103231330485| 0.50005065193925|[1.0,1.0,0.0,196....|
| 2.102279397581597|0.637132195277704|[1.0,1.0,0.0,202....|
|1.8965282187698946|0.783706394973096|[1.0,1.0,0.0,203....|
|1.8716882642094221| 1.01044552869544|[1.0,1.0,0.0,195....|
|3.2460924047550845| 1.05570252090352|[1.0,1.0,0.0,202....|
| 2.209269655580119|  1.0842802285893|[1.0,1.0,0.0,198....|
|1.9751670095789402| 1.15800342076409|[1.0,1.0,0.0,196....|
|2.2974898870563933| 1.19586163368964|[1.0,1.0,0.0,199....|
|2.2098412135490664| 1.20804684294711|[1.0,1.0,0.0,200....|
|2.3757978487189426| 1.23803310280742|[1.0,1.0,0.0,204....|
+------------------+-----------------+--------------------+
only showing top 10 rows



In [16]:
lr_evaluator = RegressionEvaluator(labelCol='ctr', predictionCol='prediction', metricName='rmse')

In [17]:
lr_tvs_rmse = lr_evaluator.evaluate(lr_prediction)

In [18]:
lr_tvs_rmse

0.4277179180033145

In [19]:
np.round(lr_tvs_rmse, 3)

0.428

In [20]:
print('standardization: {}'.format(lr_model.stages[-1].bestModel._java_obj.getStandardization()))
print('elasticNetParam: {}'.format(lr_model.stages[-1].bestModel._java_obj.getElasticNetParam()))

standardization: True
elasticNetParam: 0.2


In [21]:
lr_model.stages[-1].bestModel

LinearRegressionModel: uid=LinearRegression_4a596de0480d, numFeatures=6

# RF part

In [22]:
rf = RandomForestRegressor(numTrees=10, 
                          maxDepth=10,
                          bootstrap=True,
                          seed=42,
                          labelCol='ctr',
                          featuresCol='features')

In [23]:
rf_paramGrid = ParamGridBuilder()\
    .addGrid(rf.maxDepth, [4, 7, 10]) \
    .addGrid(rf.numTrees, [10, 20])\
    .build()

In [24]:
rf_tvs = TrainValidationSplit(estimator=rf,
                           estimatorParamMaps=rf_paramGrid,
                           evaluator=RegressionEvaluator(labelCol='ctr', predictionCol='prediction', metricName='rmse'),
                           trainRatio=0.8)

### build pipeline

In [25]:
rf_pipeline = Pipeline(stages=[features, rf_tvs])

In [26]:
rf_model = rf_pipeline.fit(train)

In [27]:
rf_prediction = rf_model.transform(test)

In [28]:
rf_prediction.select("prediction", "ctr", "features").show(10)

+------------------+-----------------+--------------------+
|        prediction|              ctr|            features|
+------------------+-----------------+--------------------+
|3.4912131168792184| 0.50005065193925|[1.0,1.0,0.0,196....|
|3.6134328199433994|0.637132195277704|[1.0,1.0,0.0,202....|
|3.6137126239618618|0.783706394973096|[1.0,1.0,0.0,203....|
|3.4838028482770214| 1.01044552869544|[1.0,1.0,0.0,195....|
|3.6134328199433994| 1.05570252090352|[1.0,1.0,0.0,202....|
| 3.657848283721369|  1.0842802285893|[1.0,1.0,0.0,198....|
|3.4838028482770214| 1.15800342076409|[1.0,1.0,0.0,196....|
|  3.65002900725183| 1.19586163368964|[1.0,1.0,0.0,199....|
|3.6134328199433994| 1.20804684294711|[1.0,1.0,0.0,200....|
| 3.629956268753129| 1.23803310280742|[1.0,1.0,0.0,204....|
+------------------+-----------------+--------------------+
only showing top 10 rows



In [29]:
rf_evaluator = RegressionEvaluator(labelCol='ctr', predictionCol='prediction', metricName='rmse')

In [30]:
rf_tvs_rmse = rf_evaluator.evaluate(rf_prediction)

In [31]:
rf_tvs_rmse

0.32922209388736795

In [32]:
np.round(rf_tvs_rmse, 3)

0.329

In [33]:
print('MaxDepth: {}'.format(rf_model.stages[-1].bestModel._java_obj.getMaxDepth()))
print('NumTrees: {}'.format(rf_model.stages[-1].bestModel._java_obj.getNumTrees()))

MaxDepth: 10
NumTrees: 10


In [34]:
rf_model.stages[-1].bestModel

RandomForestRegressionModel: uid=RandomForestRegressor_62a876314e91, numTrees=10, numFeatures=6

## compare

In [35]:
if lr_tvs_rmse > rf_tvs_rmse:
    rf_model.write().overwrite().save(MODEL_PATH)
else:
    lr_model.write().overwrite().save(MODEL_PATH)