In [44]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import VectorIndexer
from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [124]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
from pyspark.ml.regression import LinearRegression

In [46]:
spark = SparkSession.builder.appName("PySparkML").getOrCreate()

In [72]:
spark

In [156]:
testData = spark.read.parquet("test.parquet")
trainingData = spark.read.parquet("train.parquet")

In [159]:
testData.show(5)

+-----+---------------------+---------+------+------+----------------+---------+-----------------+
|ad_id|target_audience_count|has_video|is_cpm|is_cpc|         ad_cost|day_count|              ctr|
+-----+---------------------+---------+------+------+----------------+---------+-----------------+
|    6|     11418.7085911347|        1|     1|     0|204.104562956739|       11|0.909738306804039|
|   11|     9637.20484730933|        1|     1|     0|192.092306095236|       18| 1.02222752080496|
|   12|     9886.86231469735|        1|     1|     0|199.987605376721|       15| 1.02822730862374|
|   21|     9568.62828947957|        1|     1|     0|199.557502134239|       11| 1.23608059326114|
|   24|     8891.97983774145|        1|     1|     0|199.158928072324|       15| 1.25439490657975|
|   27|     9147.65018866017|        1|     1|     0|196.168514471689|       14| 1.30790620835937|
|   38|     10134.2838536135|        1|     1|     0| 199.27560146114|       15| 1.40625464282379|
|   45|   

In [64]:
train, validate = trainingData.randomSplit([0.8, 0.2])
print("We have %d training examples and %d validate examples." % (train.count(), validate.count()))

We have 79932 training examples and 19999 validate examples.


In [92]:
feature = VectorAssembler(inputCols=trainingData.columns[1:-1],outputCol="rawfeatures")
indexed_feature = VectorIndexer(inputCol="rawfeatures", outputCol="features", maxCategories=4)

In [180]:
gbt = GBTRegressor(labelCol="ctr", featuresCol="features")

pipeline_gbt = Pipeline(stages=[feature, indexed_feature, gbt])

paramGrid = ParamGridBuilder()\
              .addGrid(gbt.maxDepth, [2, 5])\
              .addGrid(gbt.maxIter, [10])\
              .build()

evaluator = RegressionEvaluator(metricName="rmse", labelCol="ctr", predictionCol="prediction")
cv = TrainValidationSplit(estimator=pipeline_gbt, evaluator=evaluator, estimatorParamMaps=paramGrid,trainRatio=0.8)

gbt_model = cv.fit(train)
#gbt_best_model = gbt_model.bestModel
gbt_predictions = gbt_model.transform(validate)

rmse_gbt = evaluator.evaluate(gbt_predictions)

print("RMSE on our test set: %g" % rmse_gbt)

RMSE on our test set: 0.257065


In [179]:
rf = RandomForestRegressor(labelCol="ctr", featuresCol="features")

pipeline_rf = Pipeline(stages=[feature, indexed_feature, rf])

paramGrid_rf = ParamGridBuilder()\
               .addGrid(rf.maxDepth, [2, 5, 10])\
               .addGrid(rf.maxBins, [5, 10, 20])\
               .addGrid(rf.numTrees, [5, 20, 50])\
               .build()

evaluator = RegressionEvaluator(metricName="rmse", labelCol="ctr", predictionCol="prediction")

cv_rf = TrainValidationSplit(estimator=pipeline_rf, evaluator=evaluator, estimatorParamMaps=paramGrid_rf,trainRatio=0.8)

rf_model = cv_rf.fit(train)
rf_best_model = rf_model.bestModel
rf_predictions = rf_best_model.transform(validate)

rmse_rf = evaluator.evaluate(rf_predictions)

print("RMSE on our test set: %g" % rmse_rf)

RMSE on our test set: 0.299541


In [160]:
dtr = DecisionTreeRegressor(labelCol="ctr", featuresCol="features")

pipeline_dtr = Pipeline(stages=[feature, indexed_feature, dtr])

paramGrid_dtr = ParamGridBuilder()\
                 .addGrid(dtr.maxDepth, [2, 5, 10, 20, 30])\
                 .addGrid(dtr.maxBins, [10, 20, 40, 80, 100])\
                 .build()

evaluator = RegressionEvaluator(metricName="rmse", labelCol="ctr", predictionCol="prediction")
tvs_dtr = TrainValidationSplit(estimator=pipeline_dtr, evaluator=evaluator, estimatorParamMaps=paramGrid_dtr,trainRatio=0.8)

dtr_model = tvs_dtr.fit(train)
dtr_best_model = dtr_model.bestModel

dtr_predictions = dtr_best_model.transform(validate)
rmse_dtr = evaluator.evaluate(dtr_predictions)
print("RMSE on our dtr test set: %g" % rmse_dtr)


RMSE on our dtr test set: 0.255257


In [None]:
dtr_best_model.write().overwrite().save('dtr_best_model')

In [None]:
model = PipelineModel.load('dtr_best_model')
dtr_best_model.transform(testData)\
            .select("ad_id","prediction")\
            .coalesce(1)\
            .write.csv(result+"/our_predictions.csv")

In [165]:
# print('Num Trees: {}'.format(model.bestModel._java_obj.getRegParam()))
print('Max Depth: {}'.format(dtr_model.bestModel.stages[-1]._java_obj.getMaxDepth()))
# print('Impurity: {}'.format(model.bestModel._java_obj.getImpurity()))

Max Depth: 5


In [108]:
feature_lg = VectorAssembler(inputCols=train.columns[:-1],outputCol="features")
feature_vector = feature_lg.transform(train)
lr = LinearRegression(maxIter=40, regParam=0.4, elasticNetParam=0.8,labelCol = 'ctr')
lrModel = lr.fit(feature_vector)
trainingSummary = lrModel.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 0.412181
r2: 0.829792
