In [1]:
import io
import sys
from pyspark.sql import SparkSession
import pandas as pd

# Path to save the model
MODEL_PATH = 'spark_ml_model'

In [32]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import  VectorAssembler
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import GBTRegressor

In [3]:
spark = SparkSession.builder.appName("PySparkML").getOrCreate()

In [4]:
spark

In [5]:
# Load training data
train = spark.read.parquet('train.parquet')

In [6]:
train.show(5)

+-----+---------------------+---------+------+------+----------------+---------+-----------------+
|ad_id|target_audience_count|has_video|is_cpm|is_cpc|         ad_cost|day_count|              ctr|
+-----+---------------------+---------+------+------+----------------+---------+-----------------+
|    1|     10707.2440058622|        1|     1|     0|201.829292651124|       15|0.431740082807281|
|    5|     10643.3872649482|        1|     1|     0|192.577221699704|       15|0.809264519216201|
|    6|     11418.7085911347|        1|     1|     0|204.104562956739|       11|0.909738306804039|
|    7|     10109.3278687796|        1|     1|     0|194.255798599684|       12|0.941221039774456|
|    8|     10665.1119991977|        1|     1|     0|202.658042557742|       14|0.986790019690954|
+-----+---------------------+---------+------+------+----------------+---------+-----------------+
only showing top 5 rows



In [7]:
train.filter("ctr is null").count()

0

In [8]:
train.printSchema()

root
 |-- ad_id: integer (nullable = true)
 |-- target_audience_count: double (nullable = true)
 |-- has_video: integer (nullable = true)
 |-- is_cpm: integer (nullable = true)
 |-- is_cpc: integer (nullable = true)
 |-- ad_cost: double (nullable = true)
 |-- day_count: integer (nullable = true)
 |-- ctr: double (nullable = true)



In [9]:
train.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
ad_id,160151,100032.56564117614,57731.67777527107,1,200000
target_audience_count,160151,5996.667183284404,1116.256587870187,1170.47237880852,11418.7085911347
has_video,160151,0.5001404924102878,0.5000015412959168,0,1
is_cpm,160151,0.49996565741081855,0.5000015598546711,0,1
is_cpc,160151,0.5000343425891814,0.500001559854671,0,1
ad_cost,160151,109.99234379081676,90.02253879189986,18.7135343648365,213.027986239779
day_count,160151,15.002179193386242,2.0195705602836544,6,24
ctr,160151,4.9978372893810805,0.9974185644196207,0.431740082807281,9.69490945375879


In [10]:
# Load testing data
test = spark.read.parquet('test.parquet')

In [11]:
test.show(5)

+-----+---------------------+---------+------+------+----------------+---------+-----------------+
|ad_id|target_audience_count|has_video|is_cpm|is_cpc|         ad_cost|day_count|              ctr|
+-----+---------------------+---------+------+------+----------------+---------+-----------------+
|    2|     11012.2068140534|        1|     1|     0|196.691891825393|       17| 0.50005065193925|
|    3|     9923.69112524699|        1|     1|     0|202.617038691842|       15|0.637132195277704|
|    4|     10202.3140990505|        1|     1|     0|203.496891469936|       15|0.783706394973096|
|   10|     10239.9431887051|        1|     1|     0|195.804239443196|       15| 1.01044552869544|
|   13|     8373.52511906263|        1|     1|     0|202.221614839989|       13| 1.05570252090352|
+-----+---------------------+---------+------+------+----------------+---------+-----------------+
only showing top 5 rows



In [129]:
train.columns

['ad_id',
 'target_audience_count',
 'has_video',
 'is_cpm',
 'is_cpc',
 'ad_cost',
 'day_count',
 'ctr']

In [12]:
feature = VectorAssembler(inputCols=['ad_id', 'target_audience_count', 'has_video', 'is_cpm', 'is_cpc', 'ad_cost', 'day_count'], \
                          outputCol="features")

# Decision tree regression

In [13]:
# Train a DecisionTree model.
dt = DecisionTreeRegressor(labelCol="ctr", featuresCol="features")


# Chain indexer and tree in a Pipeline
pipeline = Pipeline(stages=[feature, dt])

# Train model.  This also runs the indexer.
model_dt = pipeline.fit(train)

# Make predictions.
predictions = model_dt.transform(test)


# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="ctr", predictionCol="prediction")

In [16]:
# Select example rows to display.

model_dt.transform(test)\
    .select("features", "ctr", "prediction")\
   .show(5)

+--------------------+-----------------+------------------+
|            features|              ctr|        prediction|
+--------------------+-----------------+------------------+
|[2.0,11012.206814...| 0.50005065193925|1.9373299139305638|
|[3.0,9923.6911252...|0.637132195277704|1.9373299139305638|
|[4.0,10202.314099...|0.783706394973096|1.9373299139305638|
|[10.0,10239.94318...| 1.01044552869544|1.9373299139305638|
|[13.0,8373.525119...| 1.05570252090352|  2.67739587858396|
+--------------------+-----------------+------------------+
only showing top 5 rows



In [21]:
model_dt.stages[-1]

DecisionTreeRegressionModel: uid=DecisionTreeRegressor_fd2293b649aa, depth=5, numNodes=63, numFeatures=7

In [22]:
rmse_dt = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
r2_dt = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})
print("RMSE: ", rmse_dt)
print("R2:", r2_dt)

RMSE:  0.08669978079922172
R2: 0.9924213850962317


# Random forest regression

In [24]:
# Train a RandomForest model.
rf = RandomForestRegressor(labelCol="ctr", featuresCol="features")

# Chain indexer and tree in a Pipeline
pipeline = Pipeline(stages=[feature, rf])

# Train model.  This also runs the indexer.
model_rf = pipeline.fit(train)

# Make predictions.
predictions = model_rf.transform(test)


# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="ctr", predictionCol="prediction")

rmse_rf = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
r2_rf = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})
print("RMSE: ", rmse_rf)
print("R2:", r2_rf)

print(model_rf.stages[-1])  # summary only

RMSE:  0.13390959534254426
R2: 0.9819208821401785
RandomForestRegressionModel: uid=RandomForestRegressor_110e5caebf3e, numTrees=20, numFeatures=7


In [25]:
# Select example rows to display.

model_rf.transform(test)\
    .select("features", "ctr", "prediction")\
    .show(5)

+--------------------+-----------------+------------------+
|            features|              ctr|        prediction|
+--------------------+-----------------+------------------+
|[2.0,11012.206814...| 0.50005065193925| 2.730112009216512|
|[3.0,9923.6911252...|0.637132195277704|2.7433466438046983|
|[4.0,10202.314099...|0.783706394973096|  2.76309530327759|
|[10.0,10239.94318...| 1.01044552869544| 2.730112009216512|
|[13.0,8373.525119...| 1.05570252090352|2.7433466438046983|
+--------------------+-----------------+------------------+
only showing top 5 rows



# Gradient-boosted tree regression

In [26]:
# Train a GBT model.
gbt = GBTRegressor(labelCol="ctr", featuresCol="features", maxIter=10)

# Chain indexer and tree in a Pipeline
pipeline = Pipeline(stages=[feature, gbt])

# Train model.  This also runs the indexer.
model_gbt = pipeline.fit(train)

# Make predictions.
predictions = model_gbt.transform(test)


# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="ctr", predictionCol="prediction")

rmse_gbt = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
r2_gbt = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})
print("RMSE: ", rmse_gbt)
print("R2:", r2_gbt)
      

print(model_gbt.stages[-1])  # summary only

RMSE:  0.0860804871353769
R2: 0.9925292660110225
GBTRegressionModel: uid=GBTRegressor_45c859c8d3ab, numTrees=10, numFeatures=7


In [27]:
# Select example rows to display.

model_gbt.transform(test)\
    .select("features", "ctr", "prediction")\
    .show(5)

+--------------------+-----------------+------------------+
|            features|              ctr|        prediction|
+--------------------+-----------------+------------------+
|[2.0,11012.206814...| 0.50005065193925|1.8963457915257418|
|[3.0,9923.6911252...|0.637132195277704|1.8963457915257418|
|[4.0,10202.314099...|0.783706394973096|1.8963457915257418|
|[10.0,10239.94318...| 1.01044552869544|1.8963457915257418|
|[13.0,8373.525119...| 1.05570252090352| 2.660914265027981|
+--------------------+-----------------+------------------+
only showing top 5 rows



# Results

In [30]:

results = pd.DataFrame([['Decision tree regression', rmse_dt, r2_dt],
                        ['Random forest regression', rmse_rf, r2_rf],
                        ['Gradient-boosted tree regression', rmse_gbt, r2_gbt]], 
                       columns=['Название модели', 'RMSE', 'R2'])
results

Unnamed: 0,Название модели,RMSE,R2
0,Decision tree regression,0.0867,0.992421
1,Random forest regression,0.13391,0.981921
2,Gradient-boosted tree regression,0.08608,0.992529


In [34]:
gbt = GBTRegressor(labelCol="ctr", featuresCol="features", maxIter=20)

# Chain indexer and tree in a Pipeline
pipeline = Pipeline(stages=[feature, gbt])

paramGrid = ParamGridBuilder()\
    .addGrid(rf.maxDepth, [2, 3, 4, 5, 6])\
    .addGrid(rf.numTrees, [3, 6, 8, 9, 10])\
    .build()


# In this case the estimator is simply the linear regression.
# A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
tvs = TrainValidationSplit(estimator=pipeline,
                           estimatorParamMaps=paramGrid,
                           evaluator = RegressionEvaluator(labelCol="ctr", predictionCol="prediction"), seed=42)

# Run TrainValidationSplit, and choose the best set of parameters.
model_gbt_tvs = tvs.fit(train)

In [35]:
model_gbt_tvs.transform(test)\
    .select("features", "ctr", "prediction")\
    .show(5)

+--------------------+-----------------+------------------+
|            features|              ctr|        prediction|
+--------------------+-----------------+------------------+
|[2.0,11012.206814...| 0.50005065193925|1.8953752934886685|
|[3.0,9923.6911252...|0.637132195277704|1.8953752934886685|
|[4.0,10202.314099...|0.783706394973096|1.9032662808268475|
|[10.0,10239.94318...| 1.01044552869544|1.8953752934886685|
|[13.0,8373.525119...| 1.05570252090352|2.6599437669909074|
+--------------------+-----------------+------------------+
only showing top 5 rows



In [36]:
# Make predictions.
predictions = model_gbt_tvs.transform(test)
rmse_tvs_gbt = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
r2_tvs_gbt  = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})
print("RMSE: ", rmse_tvs_gbt)
print("R2:", r2_tvs_gbt)

RMSE:  0.08539828950356324
R2: 0.992647209623242


# Conclusion

As we can see, after tuning the parameters the result gets better, so we can save our model

In [58]:
# Save our best model

model_gbt_tvs.write().save(MODEL_PATH)

In [60]:
type(model_gbt_tvs)

pyspark.ml.tuning.TrainValidationSplitModel