Ваша команда ученых по данным работала с маленькой выборкой данных в 4Гб от общего датасета 
данных и рекомендует применять линейную регрессию со следующими параметрами:

maxIter=40, regParam=0.4, elasticNetParam=0.8

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.sql.functions import mean,col,split, col, regexp_extract, when, lit
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import QuantileDiscretizer
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

### Инициализирую спарк и загружаю данные

In [2]:
spark = SparkSession.builder.appName("PySparkML").getOrCreate()

In [3]:
spark

In [4]:
train_data = spark.read.parquet('train.parquet')
test_data = spark.read.parquet('test.parquet')

In [5]:
train_data.show(1)

+-----+---------------------+---------+------+------+----------------+---------+-----------------+
|ad_id|target_audience_count|has_video|is_cpm|is_cpc|         ad_cost|day_count|              ctr|
+-----+---------------------+---------+------+------+----------------+---------+-----------------+
|    1|     10707.2440058622|        1|     1|     0|201.829292651124|       15|0.431740082807281|
+-----+---------------------+---------+------+------+----------------+---------+-----------------+
only showing top 1 row



In [6]:
test_data.show(1)

+-----+---------------------+---------+------+------+----------------+---------+-----------------+
|ad_id|target_audience_count|has_video|is_cpm|is_cpc|         ad_cost|day_count|              ctr|
+-----+---------------------+---------+------+------+----------------+---------+-----------------+
|    6|     11418.7085911347|        1|     1|     0|204.104562956739|       11|0.909738306804039|
+-----+---------------------+---------+------+------+----------------+---------+-----------------+
only showing top 1 row



### Линейная модель предложенная нам учёными

In [7]:
feature = VectorAssembler(inputCols = train_data.columns[:7],outputCol="features")
lr = LinearRegression(labelCol="ctr", featuresCol="features", maxIter=40, regParam=0.4, elasticNetParam=0.8)
pipeline = Pipeline(stages=[feature, lr])
lr_model = pipeline.fit(train_data)
lr_prediction = lr_model.transform(test_data)
evaluator = RegressionEvaluator(labelCol="ctr", predictionCol="prediction", metricName="rmse")
evaluator.evaluate(lr_prediction, {evaluator.metricName: "rmse"})

0.40972941379994193

### Decision tree regression

In [8]:
# Train a DecisionTree model.
dt = DecisionTreeRegressor(labelCol="ctr", featuresCol="features")

# Chain indexer and tree in a Pipeline
pipeline = Pipeline(stages=[feature, dt])

# Train model.  This also runs the indexer.
model = pipeline.fit(train_data)

# Make predictions. на тестовых данных
predictions = model.transform(test_data)

# Select example rows to display.
predictions.select("prediction", "ctr", "features").show(1)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="ctr", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

treeModel = model.stages[1]
# summary only
print(treeModel)

+------------------+-----------------+--------------------+
|        prediction|              ctr|            features|
+------------------+-----------------+--------------------+
|2.1819122574651977|0.909738306804039|[6.0,11418.708591...|
+------------------+-----------------+--------------------+
only showing top 1 row

Root Mean Squared Error (RMSE) on test data = 0.0798908
DecisionTreeRegressionModel (uid=DecisionTreeRegressor_8ee65b8a2fc8) of depth 5 with 63 nodes


### Random forest regression

In [9]:
rf = RandomForestRegressor(labelCol="ctr", featuresCol="features")

# Chain indexer and forest in a Pipeline
pipeline = Pipeline(stages=[feature, rf])

# Train model.  This also runs the indexer.
model = pipeline.fit(train_data)

# Make predictions.
predictions = model.transform(test_data)

# Select example rows to display.
predictions.select("prediction", "ctr", "features").show(1)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="ctr", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

rfModel = model.stages[1]
print(rfModel)  # summary only

+------------------+-----------------+--------------------+
|        prediction|              ctr|            features|
+------------------+-----------------+--------------------+
|2.7930838774918514|0.909738306804039|[6.0,11418.708591...|
+------------------+-----------------+--------------------+
only showing top 1 row

Root Mean Squared Error (RMSE) on test data = 0.117644
RandomForestRegressionModel (uid=RandomForestRegressor_11bc113c7c70) with 20 trees


### Gradient-boosted tree regression

In [10]:
# Train a GBT model.
gbt = GBTRegressor(labelCol="ctr", featuresCol="features", maxIter=10)

# Chain indexer and GBT in a Pipeline
pipeline = Pipeline(stages=[feature, gbt])

# Train model.  This also runs the indexer.
model = pipeline.fit(train_data)

# Make predictions.
predictions = model.transform(test_data)

# Select example rows to display.
predictions.select("prediction", "ctr", "features").show(1)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="ctr", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

gbtModel = model.stages[1]
print(gbtModel)  # summary only

+------------------+-----------------+--------------------+
|        prediction|              ctr|            features|
+------------------+-----------------+--------------------+
|2.1996467185183977|0.909738306804039|[6.0,11418.708591...|
+------------------+-----------------+--------------------+
only showing top 1 row

Root Mean Squared Error (RMSE) on test data = 0.0736514
GBTRegressionModel (uid=GBTRegressor_fe2793dd1edb) with 10 trees


### У нас не стоит задачи выбрать наиболее отпимальную модель. По результатам выше видно, что GBT model даёт наилучшие результаты по умолчанию. Эту модель используем и подберём для неё гиперпараметры.

In [11]:
#add feature
feature = VectorAssembler(inputCols = train_data.columns[:7],outputCol="features")

# Train a GBT model.
gbt = GBTRegressor(labelCol="ctr", featuresCol="features", maxIter=10)

paramGrid = ParamGridBuilder()\
    .addGrid(gbt.maxDepth, [2, 3, 4, 5,6,7,8,9]).addGrid(gbt.maxBins, [10, 16, 20, 24, 32, 36]).addGrid(gbt.maxIter, [10]).build()

# A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
tvs = TrainValidationSplit(estimator=gbt,
                           estimatorParamMaps=paramGrid,
                           evaluator=RegressionEvaluator(labelCol="ctr", predictionCol="prediction", metricName="rmse"),
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)

# Run TrainValidationSplit, and choose the best set of parameters.
#model = tvs.fit(train_data)
pipeline = Pipeline(stages=[feature, tvs])

model = pipeline.fit(train_data)

In [12]:
# Make predictions on test data. model is the model with combination of parameters
# that performed best.
predictions = model.transform(test_data)
predictions.select("features", "ctr", "prediction").show(1)

+--------------------+-----------------+------------------+
|            features|              ctr|        prediction|
+--------------------+-----------------+------------------+
|[6.0,11418.708591...|0.909738306804039|2.2414021351496434|
+--------------------+-----------------+------------------+
only showing top 1 row



In [13]:
predictions = model.transform(test_data)
predictions.select("features", "ctr", "prediction").show(1)

evaluator = RegressionEvaluator(labelCol="ctr", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

+--------------------+-----------------+------------------+
|            features|              ctr|        prediction|
+--------------------+-----------------+------------------+
|[6.0,11418.708591...|0.909738306804039|2.2414021351496434|
+--------------------+-----------------+------------------+
only showing top 1 row

Root Mean Squared Error (RMSE) on test data = 0.0634109
