In [28]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor, GBTRegressor
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

### read and prepare data

In [2]:
spark = SparkSession.builder.appName("SparkML").getOrCreate()

In [3]:
train_df = spark.read.options(header=True, inferschema="true").parquet("train.parquet")
test_df = spark.read.options(header=True, inferschema="true").parquet("test.parquet")

In [4]:
feature = VectorAssembler(inputCols=["target_audience_count", "has_video", "is_cpm", "is_cpc", "ad_cost", "day_count"], 
                          outputCol="features")

### estimate linear regression model

In [5]:
lr = LinearRegression(labelCol="ctr", featuresCol="features",maxIter=40)

In [6]:
paramGrid_lr = ParamGridBuilder().\
               addGrid(lr.regParam, [0.01, 0.1, 0.4, 0.5]).\
               addGrid(lr.elasticNetParam, [0.0, 0.5, 0.8, 1]).\
               build()

In [7]:
tvs_lr = TrainValidationSplit(estimator=lr,
                              estimatorParamMaps=paramGrid_lr,
                              evaluator=RegressionEvaluator(labelCol="ctr", predictionCol="prediction", metricName="rmse"),
                              trainRatio=0.7)

In [8]:
pipeline_lr = Pipeline(stages=[feature ,tvs_lr])

In [9]:
model_lr = pipeline_lr.fit(train_df)

In [10]:
lr_predictions = model_lr.transform(test_df)

In [11]:
lr_predictions.select("ctr", "prediction").show(5)

+-----------------+------------------+
|              ctr|        prediction|
+-----------------+------------------+
| 0.50005065193925|1.3459177576029333|
|0.637132195277704|  2.13722625375439|
|0.783706394973096|1.9340292712486686|
| 1.01044552869544|1.9093309290332527|
| 1.05570252090352|3.2669200529349043|
+-----------------+------------------+
only showing top 5 rows



In [12]:
evaluator = RegressionEvaluator(labelCol="ctr", predictionCol="prediction", metricName="rmse")

In [13]:
evaluator.evaluate(lr_predictions)

0.4277501414479523

### DecisionTreeRegressor

In [14]:
tr = DecisionTreeRegressor(labelCol="ctr", featuresCol="features")

In [15]:
paramGrid_tr = ParamGridBuilder().\
               addGrid(tr.maxDepth, [5, 7, 9, 12, 15]).\
               addGrid(tr.maxBins, [10, 20, 50, 100]).\
               addGrid(tr.minInstancesPerNode, [2, 3, 4, 5]).\
               addGrid(tr.minInfoGain, [0.0, 0.1, 0.3, 0.5]).\
               build()

In [16]:
tvs_tr = TrainValidationSplit(estimator=tr,
                              estimatorParamMaps=paramGrid_tr,
                              evaluator=RegressionEvaluator(labelCol="ctr", predictionCol="prediction", metricName="rmse"),
                              trainRatio=0.7)

In [17]:
pipeline_tr = Pipeline(stages=[feature ,tvs_tr])

In [18]:
model_tr = pipeline_tr.fit(train_df)

In [19]:
tr_predictions = model_tr.transform(test_df)

In [20]:
evaluator.evaluate(tr_predictions)

0.24993742654147597

### RandomForestRegressor

In [21]:
rfr = RandomForestRegressor(labelCol="ctr", featuresCol="features")

In [22]:
paramGrid_rfr = ParamGridBuilder().\
                addGrid(rfr.maxDepth, [5, 7, 9, 12, 15]).\
                addGrid(rfr.maxBins, [10, 20, 50, 100]).\
                addGrid(rfr.minInstancesPerNode, [2, 3, 4, 5]).\
                addGrid(rfr.minInfoGain, [0.0, 0.1, 0.3, 0.5]).\
                addGrid(rfr.numTrees, [5, 10, 15, 20, 25]).\
                build()

In [23]:
tvs_rfr = TrainValidationSplit(estimator=rfr,
                               estimatorParamMaps=paramGrid_tr,
                               evaluator=RegressionEvaluator(labelCol="ctr", predictionCol="prediction", metricName="rmse"),
                               trainRatio=0.7)

In [24]:
pipeline_rfr = Pipeline(stages=[feature ,tvs_rfr])

In [25]:
model_rfr = pipeline_rfr.fit(train_df)

In [26]:
rfr_predictions = model_rfr.transform(test_df)

In [27]:
evaluator.evaluate(rfr_predictions)

0.30012409768669673

### GradientBoostTreeRegressor

In [29]:
gbtr = GBTRegressor(labelCol="ctr", featuresCol="features")

In [30]:
paramGrid_gbtr = ParamGridBuilder().\
                 addGrid(gbtr.maxDepth, [5, 7, 9, 12, 15]).\
                 addGrid(gbtr.maxBins, [10, 20, 50, 100]).\
                 addGrid(gbtr.minInstancesPerNode, [2, 3, 4, 5]).\
                 addGrid(gbtr.minInfoGain, [0.0, 0.1, 0.3, 0.5]).\
                 addGrid(gbtr.maxIter, [10, 20, 40, 60]).\
                 build()

In [31]:
tvs_gbtr = TrainValidationSplit(estimator=gbtr,
                                estimatorParamMaps=paramGrid_gbtr,
                                evaluator=RegressionEvaluator(labelCol="ctr", predictionCol="prediction", metricName="rmse"),
                                trainRatio=0.7)

In [34]:
pipeline_gbtr = Pipeline(stages=[feature ,tvs_gbtr])

In [35]:
model_gbtr = pipeline_gbtr.fit(train_df)

KeyboardInterrupt: 

очень долго, оценивает на данной сетке бустинг((....времени почти уже не осталось до дедлайна, поэтому на текущий момент выбыраю модель из тех что есть.  
Наименьшую ошибку показала модель DecisionTreeRegressor. Посмотрим на параметры ее лучшей модели. Сохраним на всякий случай

In [45]:
model_tr.save("best_model")

In [61]:
from pprint import pprint

In [62]:
pprint(model_tr.stages[-1].bestModel.explainParams())

('cacheNodeIds: If false, the algorithm will pass trees to executors to match '
 'instances with nodes. If true, the algorithm will cache node IDs for each '
 'instance. Caching can speed up training of deeper trees. (default: False)\n'
 'checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint '
 '(-1). E.g. 10 means that the cache will get checkpointed every 10 '
 'iterations. Note: this setting will be ignored if the checkpoint directory '
 'is not set in the SparkContext (default: 10)\n'
 'featuresCol: features column name (default: features, current: features)\n'
 'impurity: Criterion used for information gain calculation '
 '(case-insensitive). Supported options: variance (default: variance)\n'
 'labelCol: label column name (default: label, current: ctr)\n'
 'maxBins: Max number of bins for discretizing continuous features.  Must be '
 'at least 2 and at least number of categories for any categorical feature. '
 '(default: 32, current: 100)\n'
 'maxDepth: Maximum 

Мы настраивали параметры maxDepth, maxBins, minInstancesPerNode, minInfoGain.  
Модель подобрала следующие параметры соответсвенно 7, 100, 2, 0. Последний параметр можно не указывать в пайплане, поскольку по умолчанию он тоже нулевой. Таким образом сделаем модель с полученными параметрами.