# Polynomial Regression Model

In [1]:
# import findspark
# findspark.init()
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("youtube")
sc = SparkContext(conf = conf)

In [2]:
# We start by importing SparkSession, which is our new API in Spark for doing DataFrame adn DataSet operations.
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions

In [3]:
spark = SparkSession.builder.appName("YT").getOrCreate()

In [4]:
#import data
df = spark.read.csv("features_scaled_df.csv",header=True, inferSchema=True)

In [5]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [6]:
#transform to vector
assembler = VectorAssembler(
    inputCols= ['titleLen', 'subscriberCount', 'avgViewCount', 'humanCount', 'HOW TO & STYLE', 'SPORTS', 'TRAVEL', 'Negative', 'titleINTJ'],
    outputCol="features")

output = assembler.transform(df)
output.select("features").show(truncate=False)

+----------------------------------------------------------------------------------+
|features                                                                          |
+----------------------------------------------------------------------------------+
|(9,[0,1,2],[0.275,0.0656551412939252,0.014979859821399953])                       |
|(9,[0,1,2],[0.375,0.0656551412939252,0.014979859821399953])                       |
|(9,[0,1,2,3],[0.225,0.0656551412939252,0.014979859821399953,0.11764705882352941]) |
|(9,[0,1,2,3],[0.3,0.0656551412939252,0.014979859821399953,0.058823529411764705])  |
|(9,[0,1,2],[0.275,0.0656551412939252,0.014979859821399953])                       |
|(9,[0,1,2],[0.275,0.0656551412939252,0.014979859821399953])                       |
|(9,[0,1,2],[0.375,0.0656551412939252,0.014979859821399953])                       |
|(9,[0,1,2],[0.425,0.0656551412939252,0.014979859821399953])                       |
|(9,[0,1,2],[0.425,0.0656551412939252,0.014979859821399953])     

In [8]:
data = output.withColumn("label",output["viewCount"])

In [9]:
from pyspark.ml.feature import PolynomialExpansion

In [10]:
polyExpansion = PolynomialExpansion(degree=3, inputCol="features", outputCol="polyFeatures")
polyDF = polyExpansion.transform(data)

In [11]:
polyData = polyDF.select(["polyFeatures","label"]).withColumnRenamed("polyFeatures", "features")
polyData.show()

+--------------------+---------+
|            features|    label|
+--------------------+---------+
|(219,[0,1,2,3,4,5...| 184447.0|
|(219,[0,1,2,3,4,5...| 217619.0|
|(219,[0,1,2,3,4,5...| 437777.0|
|(219,[0,1,2,3,4,5...| 191070.0|
|(219,[0,1,2,3,4,5...| 572569.0|
|(219,[0,1,2,3,4,5...| 848767.0|
|(219,[0,1,2,3,4,5...| 291800.0|
|(219,[0,1,2,3,4,5...| 218842.0|
|(219,[0,1,2,3,4,5...| 786868.0|
|(219,[0,1,2,3,4,5...| 249576.0|
|(219,[0,1,2,3,4,5...| 250697.0|
|(219,[0,1,2,3,4,5...| 355497.0|
|(219,[0,1,2,3,4,5...| 830987.0|
|(219,[0,1,2,3,4,5...| 129057.0|
|(219,[0,1,2,3,4,5...| 219546.0|
|(219,[0,1,2,3,4,5...| 555464.0|
|(219,[0,1,2,3,4,5...|1132180.0|
|(219,[0,1,2,3,4,5...| 406073.0|
|(219,[0,1,2,3,4,5...| 131059.0|
|(219,[0,1,2,3,4,5...| 209937.0|
+--------------------+---------+
only showing top 20 rows



In [12]:
# Split the data into training and test sets (30% held out for testing)
(trainingPolyData, testPolyData) = polyData.randomSplit([0.7, 0.3], seed = 123)

In [17]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
lr = LinearRegression()

In [18]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
# We use a ParamGridBuilder to construct a grid of parameters to search over.
# TrainValidationSplit will try all combinations of values and determine best model using
# the evaluator.
paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.fitIntercept, [False, True])\
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
    .build()

crossval = CrossValidator(estimator=lr,
                      estimatorParamMaps=paramGrid,
                      evaluator=RegressionEvaluator(),
                      numFolds=2)  # use 3+ folds in practice

#choose the best set of parameters.
cvModel = crossval.fit(trainingPolyData)

In [19]:
zip(cvModel.avgMetrics, paramGrid)

<zip at 0x7f9e302b5488>

In [21]:
#parameters used
bestLR = cvModel.bestModel
bestLR.extractParamMap()

{Param(parent='LinearRegression_4ec4817545809e44dd6d', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2,
 Param(parent='LinearRegression_4ec4817545809e44dd6d', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.5,
 Param(parent='LinearRegression_4ec4817545809e44dd6d', name='epsilon', doc='The shape parameter to control the amount of robustness. Must be > 1.0.'): 1.35,
 Param(parent='LinearRegression_4ec4817545809e44dd6d', name='featuresCol', doc='features column name'): 'features',
 Param(parent='LinearRegression_4ec4817545809e44dd6d', name='fitIntercept', doc='whether to fit an intercept term'): False,
 Param(parent='LinearRegression_4ec4817545809e44dd6d', name='labelCol', doc='label column name'): 'label',
 Param(parent='LinearRegression_4ec4817545809e44dd6d', name='loss', doc='The loss function to be optimized. Supported options: squaredError, h

In [34]:
#with the best features
lr = LinearRegression(featuresCol = 'features', labelCol='label', aggregationDepth=2, maxIter=100, regParam=0.01, elasticNetParam=0.5, epsilon=1.35, fitIntercept = False, standardization = True)

In [35]:
model = lr.fit(trainingPolyData)

In [36]:
#make predictions
lr_predictions = model.transform(testPolyData)
lr_predictions.select("prediction","label","features").show(10)

+------------------+-----+--------------------+
|        prediction|label|            features|
+------------------+-----+--------------------+
|10873.956538430633|194.0|(219,[0,1,2,3,4,5...|
|10873.956538430633|248.0|(219,[0,1,2,3,4,5...|
|10873.956538430633|267.0|(219,[0,1,2,3,4,5...|
| 875.8916101522123|115.0|(219,[0,1,2,3,4,5...|
| 875.8916101522123|118.0|(219,[0,1,2,3,4,5...|
| 875.8916101522123|141.0|(219,[0,1,2,3,4,5...|
| 875.8916101522123|141.0|(219,[0,1,2,3,4,5...|
| 875.8916101522123|146.0|(219,[0,1,2,3,4,5...|
| 875.8916101522123|166.0|(219,[0,1,2,3,4,5...|
| 875.8916101522123|170.0|(219,[0,1,2,3,4,5...|
+------------------+-----+--------------------+
only showing top 10 rows



In [37]:
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="label",metricName="mae")
mae = lr_evaluator.evaluate(lr_predictions)
print("Mean Absolute Error (MAE) on test data = %g" % mae)

Mean Absolute Error (MAE) on test data = 2.41071e+06


In [38]:
# from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="label",metricName="mse")
mse = lr_evaluator.evaluate(lr_predictions)
print("Mean Squared Error (MSE) on test data = %g" % mse)

Mean Squared Error (MSE) on test data = 5.99599e+14


In [41]:
mae

2410706.7710912465

In [39]:
mse

599598500178390.9

In [40]:
import math
rmse = math.sqrt(mse)
rmse

24486700.475531425