# Import Libraries

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.sql.functions import abs, avg, stddev, format_number
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import DecisionTreeRegressor, RandomForestRegressor

# Create Spark Session

In [2]:
spark = SparkSession.builder.appName("TP_Part_Two").getOrCreate()

# Load Dataset from the file

In [3]:
row_data = spark.read.csv("Bike Rental UCI dataset.csv", inferSchema=True, header = True)

In [4]:
row_data.show()

+------+---+----+---+-------+----------+----------+----+----+---------+---------+----+------+
|season| yr|mnth| hr|holiday|workingday|weathersit|temp| hum|windspeed|dayOfWeek|days|demand|
+------+---+----+---+-------+----------+----------+----+----+---------+---------+----+------+
|     1|  0|   1|  0|      0|         0|         1|0.24|0.81|      0.0|      Sat|   0|    16|
|     1|  0|   1|  1|      0|         0|         1|0.22| 0.8|      0.0|      Sat|   0|    40|
|     1|  0|   1|  2|      0|         0|         1|0.22| 0.8|      0.0|      Sat|   0|    32|
|     1|  0|   1|  3|      0|         0|         1|0.24|0.75|      0.0|      Sat|   0|    13|
|     1|  0|   1|  4|      0|         0|         1|0.24|0.75|      0.0|      Sat|   0|     1|
|     1|  0|   1|  5|      0|         0|         2|0.24|0.75|   0.0896|      Sat|   0|     1|
|     1|  0|   1|  6|      0|         0|         1|0.22| 0.8|      0.0|      Sat|   0|     2|
|     1|  0|   1|  7|      0|         0|         1| 0.2|0.86

# Getting Predictions, using the provided notebook

In [5]:
indexer = StringIndexer(inputCol='dayOfWeek', outputCol='day_cat')
indexed_data =indexer.fit(row_data).transform(row_data)

In [6]:
indexed_data.show()

+------+---+----+---+-------+----------+----------+----+----+---------+---------+----+------+-------+
|season| yr|mnth| hr|holiday|workingday|weathersit|temp| hum|windspeed|dayOfWeek|days|demand|day_cat|
+------+---+----+---+-------+----------+----------+----+----+---------+---------+----+------+-------+
|     1|  0|   1|  0|      0|         0|         1|0.24|0.81|      0.0|      Sat|   0|    16|    0.0|
|     1|  0|   1|  1|      0|         0|         1|0.22| 0.8|      0.0|      Sat|   0|    40|    0.0|
|     1|  0|   1|  2|      0|         0|         1|0.22| 0.8|      0.0|      Sat|   0|    32|    0.0|
|     1|  0|   1|  3|      0|         0|         1|0.24|0.75|      0.0|      Sat|   0|    13|    0.0|
|     1|  0|   1|  4|      0|         0|         1|0.24|0.75|      0.0|      Sat|   0|     1|    0.0|
|     1|  0|   1|  5|      0|         0|         2|0.24|0.75|   0.0896|      Sat|   0|     1|    0.0|
|     1|  0|   1|  6|      0|         0|         1|0.22| 0.8|      0.0|      Sat| 

In [7]:
vec = VectorAssembler(
  inputCols= ['season',
    'mnth',
    'hr',
    'holiday',
    'workingday',
    'weathersit',
    'temp',
    'hum',
    'windspeed',
    'days',
    'day_cat']
  ,
   outputCol = 'features'                  
 )

In [8]:
data = vec.transform(indexed_data)

In [9]:
data.show()

+------+---+----+---+-------+----------+----------+----+----+---------+---------+----+------+-------+--------------------+
|season| yr|mnth| hr|holiday|workingday|weathersit|temp| hum|windspeed|dayOfWeek|days|demand|day_cat|            features|
+------+---+----+---+-------+----------+----------+----+----+---------+---------+----+------+-------+--------------------+
|     1|  0|   1|  0|      0|         0|         1|0.24|0.81|      0.0|      Sat|   0|    16|    0.0|(11,[0,1,5,6,7],[...|
|     1|  0|   1|  1|      0|         0|         1|0.22| 0.8|      0.0|      Sat|   0|    40|    0.0|(11,[0,1,2,5,6,7]...|
|     1|  0|   1|  2|      0|         0|         1|0.22| 0.8|      0.0|      Sat|   0|    32|    0.0|(11,[0,1,2,5,6,7]...|
|     1|  0|   1|  3|      0|         0|         1|0.24|0.75|      0.0|      Sat|   0|    13|    0.0|(11,[0,1,2,5,6,7]...|
|     1|  0|   1|  4|      0|         0|         1|0.24|0.75|      0.0|      Sat|   0|     1|    0.0|(11,[0,1,2,5,6,7]...|
|     1|  0|   1

In [10]:
modelData = data.select('features', 'demand') 

In [11]:
modelData.show()

+--------------------+------+
|            features|demand|
+--------------------+------+
|(11,[0,1,5,6,7],[...|    16|
|(11,[0,1,2,5,6,7]...|    40|
|(11,[0,1,2,5,6,7]...|    32|
|(11,[0,1,2,5,6,7]...|    13|
|(11,[0,1,2,5,6,7]...|     1|
|[1.0,1.0,5.0,0.0,...|     1|
|(11,[0,1,2,5,6,7]...|     2|
|(11,[0,1,2,5,6,7]...|     3|
|(11,[0,1,2,5,6,7]...|     8|
|(11,[0,1,2,5,6,7]...|    14|
|[1.0,1.0,10.0,0.0...|    36|
|[1.0,1.0,11.0,0.0...|    56|
|[1.0,1.0,12.0,0.0...|    84|
|[1.0,1.0,13.0,0.0...|    94|
|[1.0,1.0,14.0,0.0...|   106|
|[1.0,1.0,15.0,0.0...|   110|
|[1.0,1.0,16.0,0.0...|    93|
|[1.0,1.0,17.0,0.0...|    67|
|[1.0,1.0,18.0,0.0...|    35|
|[1.0,1.0,19.0,0.0...|    37|
+--------------------+------+
only showing top 20 rows



In [12]:
trainData, testData = modelData.randomSplit([0.8, 0.2], seed=42)

In [13]:
lr = LinearRegression(labelCol='demand', featuresCol='features')

In [14]:
lrModel = lr.fit(trainData)

In [15]:
summary = lrModel.summary

In [16]:
summary.predictions.show(n=20, truncate = False)

+---------------------------------------------------+------+-------------------+
|features                                           |demand|prediction         |
+---------------------------------------------------+------+-------------------+
|(11,[0,1,2,5,6,7],[1.0,1.0,1.0,1.0,0.22,0.8])      |40.0  |-81.80522447908571 |
|(11,[0,1,2,5,6,7],[1.0,1.0,2.0,1.0,0.22,0.8])      |32.0  |-74.06777891809314 |
|(11,[0,1,2,5,6,7],[1.0,1.0,4.0,1.0,0.24,0.75])     |1.0   |-43.03579858882388 |
|(11,[0,1,2,5,6,7],[1.0,1.0,6.0,1.0,0.22,0.8])      |2.0   |-43.11799667412288 |
|(11,[0,1,2,5,6,7],[1.0,1.0,7.0,1.0,0.2,0.86])      |3.0   |-52.917028752154295|
|(11,[0,1,2,5,6,7],[1.0,1.0,9.0,1.0,0.32,0.76])     |14.0  |16.312628978738484 |
|(11,[0,1,5,6,7,9],[1.0,1.0,1.0,0.18,0.55,13.0])    |28.0  |-48.53013555702124 |
|(11,[0,1,5,6,7,9],[1.0,12.0,2.0,0.24,0.7,721.0])   |26.0  |15.609075964982377 |
|(11,[0,1,5,6,7,9],[2.0,3.0,1.0,0.58,0.68,442.0])   |156.0 |140.31702758754537 |
|(11,[0,1,5,6,7,9],[2.0,4.0,

In [17]:
summary.explainedVariance

12783.68672467488

In [18]:
summary.meanAbsoluteError

105.68803342719963

In [19]:
testResults = lrModel.evaluate(testData)

In [20]:
testResults.residuals.show(n=10)

+------------------+
|         residuals|
+------------------+
| 63.77324414981645|
|20.086016344853636|
|101.86191142323328|
| 40.95449465191133|
| 78.04506776392819|
| 73.24248625339271|
| 68.09330075539387|
| -7.61768619583404|
| 77.29469338875032|
|  66.0127866248553|
+------------------+
only showing top 10 rows



In [21]:
testResults.residuals.groupBy().avg().show() 

+-------------------+
|     avg(residuals)|
+-------------------+
|-1.2691883539840256|
+-------------------+



In [22]:
df= testResults.residuals
df.select(abs(df.residuals)).groupBy().avg().show()

+-------------------+
|avg(abs(residuals))|
+-------------------+
| 107.92946359993586|
+-------------------+



In [23]:
print ("r2=%g"%testResults.r2)   # my model explains x % of the variance of the data
print ("rootMeanSquaredError=%g"%testResults.rootMeanSquaredError)
print ("meanAbsoluteError=%g"%testResults.meanAbsoluteError)

r2=0.37981
rootMeanSquaredError=143.168
meanAbsoluteError=107.929


In [24]:
insights = lrModel.evaluate(data)
pred = insights.predictions
pred_res = pred.withColumn('res_abs', abs(pred.prediction-pred.demand))

# Getting Insights from our results

In [33]:
pred_res.show()

+------+---+----+---+-------+----------+----------+----+----+---------+---------+----+------+-------+--------------------+-------------------+-------------------+
|season| yr|mnth| hr|holiday|workingday|weathersit|temp| hum|windspeed|dayOfWeek|days|demand|day_cat|            features|         prediction|            res_abs|
+------+---+----+---+-------+----------+----------+----+----+---------+---------+----+------+-------+--------------------+-------------------+-------------------+
|     1|  0|   1|  0|      0|         0|         1|0.24|0.81|      0.0|      Sat|   0|    16|    0.0|(11,[0,1,5,6,7],[...| -85.86191142323328| 101.86191142323328|
|     1|  0|   1|  1|      0|         0|         1|0.22| 0.8|      0.0|      Sat|   0|    40|    0.0|(11,[0,1,2,5,6,7]...| -81.80522447908571| 121.80522447908571|
|     1|  0|   1|  2|      0|         0|         1|0.22| 0.8|      0.0|      Sat|   0|    32|    0.0|(11,[0,1,2,5,6,7]...| -74.06777891809314| 106.06777891809314|
|     1|  0|   1|  3| 

In [25]:
hourly_insights = pred_res.groupBy('hr').agg(
    format_number(avg('prediction'), 2).alias('avg_prediction'), 
    format_number(avg('demand'), 2).alias('avg_demand'), 
    format_number(stddev('prediction'), 2).alias('stddev_prediction'), 
    format_number(stddev('demand'), 2).alias('stddev_demand')
).sort('hr')

hourly_insights.show()

+---+--------------+----------+-----------------+-------------+
| hr|avg_prediction|avg_demand|stddev_prediction|stddev_demand|
+---+--------------+----------+-----------------+-------------+
|  0|         78.62|     53.90|            77.75|        42.31|
|  1|         81.31|     33.38|            76.72|        33.54|
|  2|         85.72|     22.87|            75.31|        26.58|
|  3|         91.14|     11.73|            73.02|        13.24|
|  4|         95.36|      6.35|            72.19|         4.14|
|  5|         98.51|     19.89|            73.14|        13.20|
|  6|        104.27|     76.04|            73.98|        55.08|
|  7|        116.39|    212.06|            77.08|       161.44|
|  8|        134.60|    359.01|            81.83|       235.19|
|  9|        156.96|    219.31|            84.54|        93.70|
| 10|        180.86|    173.67|            87.95|       102.21|
| 11|        203.87|    208.14|            89.53|       127.50|
| 12|        223.55|    253.32|         

In [26]:
seasonal_insights = pred_res.groupBy('season').agg(
    format_number(avg('prediction'), 2).alias('avg_prediction'), 
    format_number(avg('demand'), 2).alias('avg_demand'), 
    format_number(stddev('prediction'), 2).alias('stddev_prediction'), 
    format_number(stddev('demand'), 2).alias('stddev_demand')
).sort('season')

seasonal_insights.show()

+------+--------------+----------+-----------------+-------------+
|season|avg_prediction|avg_demand|stddev_prediction|stddev_demand|
+------+--------------+----------+-----------------+-------------+
|     1|        113.67|    111.11|            98.49|       119.22|
|     2|        192.91|    208.34|           107.50|       188.36|
|     3|        257.95|    236.02|           101.29|       197.71|
|     4|        190.12|    198.87|            95.96|       182.97|
+------+--------------+----------+-----------------+-------------+



In [27]:
dayOfWeek_insights = pred_res.groupBy('dayOfWeek').agg(
    format_number(avg('prediction'), 2).alias('avg_prediction'), 
    format_number(avg('demand'), 2).alias('avg_demand'), 
    format_number(stddev('prediction'), 2).alias('stddev_prediction'), 
    format_number(stddev('demand'), 2).alias('stddev_demand')
).sort('dayOfWeek')

dayOfWeek_insights.show()

+---------+--------------+----------+-----------------+-------------+
|dayOfWeek|avg_prediction|avg_demand|stddev_prediction|stddev_demand|
+---------+--------------+----------+-----------------+-------------+
|      Fri|        195.14|    196.14|           115.58|       174.08|
|      Mon|        185.45|    183.74|           111.62|       179.51|
|      Sat|        185.01|    190.21|           115.28|       179.82|
|      Sun|        184.45|    177.47|           109.86|       168.17|
|      Thr|        196.94|    196.44|           112.48|       188.01|
|      Tue|        190.85|    191.24|           111.02|       187.82|
|      Wed|        190.28|    191.13|           115.80|       190.89|
+---------+--------------+----------+-----------------+-------------+



# Add Dummies to improve the model

I aim to get a bigger R2, and a smaller MAE

In [28]:
print ("r2=%g"%testResults.r2)   
print ("meanAbsoluteError=%g"%testResults.meanAbsoluteError)

r2=0.37981
meanAbsoluteError=107.929


In [29]:
indexers = [
    StringIndexer(inputCol="season", outputCol="season_index"),
    StringIndexer(inputCol="holiday", outputCol="holiday_index"),
    StringIndexer(inputCol="workingday", outputCol="workingday_index"),
    StringIndexer(inputCol="weathersit", outputCol="weathersit_index"),
    StringIndexer(inputCol="dayOfWeek", outputCol="dayOfWeek_index"),
    StringIndexer(inputCol="day_cat", outputCol="day_cat_index")
]

encoders = [
    OneHotEncoder(inputCol="season_index", outputCol="season_dummy"),
    OneHotEncoder(inputCol="holiday_index", outputCol="holiday_dummy"),
    OneHotEncoder(inputCol="workingday_index", outputCol="workingday_dummy"),
    OneHotEncoder(inputCol="weathersit_index", outputCol="weathersit_dummy"),
    OneHotEncoder(inputCol="dayOfWeek_index", outputCol="dayOfWeek_dummy"),
    OneHotEncoder(inputCol="day_cat_index", outputCol="day_cat_dummy")
]

assembler = VectorAssembler(inputCols=[
    "season_dummy", "holiday_dummy", "workingday_dummy", "weathersit_dummy",
    "dayOfWeek_dummy", "day_cat_dummy", "temp", "hum", "windspeed", "days", "hr", "features"
], outputCol="feature")

lr = LinearRegression(featuresCol="feature", labelCol="demand")

pipeline = Pipeline(stages=indexers + encoders + [assembler, lr])

train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

model = pipeline.fit(train_data)

predictions2 = model.transform(test_data)

evaluator = RegressionEvaluator(labelCol="demand", predictionCol="prediction", metricName="mae")
mae = evaluator.evaluate(predictions2)

evaluator_r2 = RegressionEvaluator(labelCol="demand", predictionCol="prediction", metricName="r2")
r2 = evaluator_r2.evaluate(predictions2)

print(f"Mean Absolute Error (MAE): {mae}")
print(f"R-squared (R2): {r2}")

Mean Absolute Error (MAE): 106.15093901048469
R-squared (R2): 0.39247535338541106


In [30]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.01, 0.001])  
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])  
             .addGrid(lr.maxIter, [10, 50, 100]) 
             .build())


evaluator = RegressionEvaluator(labelCol="demand", predictionCol="prediction", metricName="mae")

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5) 


cvModel = crossval.fit(train_data)

bestModel = cvModel.bestModel

predictions = bestModel.transform(test_data)

mae = evaluator.evaluate(predictions)
print(f"Mean Absolute Error (MAE): {mae}")

evaluator_r2 = RegressionEvaluator(labelCol="demand", predictionCol="prediction", metricName="r2")
r2 = evaluator_r2.evaluate(predictions)
print(f"R-squared (R2): {r2}")

Mean Absolute Error (MAE): 106.11679195429892
R-squared (R2): 0.3919153067721559


In [31]:
indexers = [
    StringIndexer(inputCol="season", outputCol="season_index"),
    StringIndexer(inputCol="holiday", outputCol="holiday_index"),
    StringIndexer(inputCol="workingday", outputCol="workingday_index"),
    StringIndexer(inputCol="weathersit", outputCol="weathersit_index"),
    StringIndexer(inputCol="dayOfWeek", outputCol="dayOfWeek_index"),
    StringIndexer(inputCol="day_cat", outputCol="day_cat_index")
]

encoders = [
    OneHotEncoder(inputCol="season_index", outputCol="season_dummy"),
    OneHotEncoder(inputCol="holiday_index", outputCol="holiday_dummy"),
    OneHotEncoder(inputCol="workingday_index", outputCol="workingday_dummy"),
    OneHotEncoder(inputCol="weathersit_index", outputCol="weathersit_dummy"),
    OneHotEncoder(inputCol="dayOfWeek_index", outputCol="dayOfWeek_dummy"),
    OneHotEncoder(inputCol="day_cat_index", outputCol="day_cat_dummy")
]

assembler = VectorAssembler(inputCols=[
    "season_dummy", "holiday_dummy", "workingday_dummy", "weathersit_dummy",
    "dayOfWeek_dummy", "day_cat_dummy", "temp", "hum", "windspeed", "days", "hr", "features"
], outputCol="feature")

dt = DecisionTreeRegressor(featuresCol="feature", labelCol="demand")
rf = RandomForestRegressor(featuresCol="feature", labelCol="demand")

pipeline_dt = Pipeline(stages=indexers + encoders + [assembler, dt])
pipeline_rf = Pipeline(stages=indexers + encoders + [assembler, rf])

model_dt = pipeline_dt.fit(train_data)
model_rf = pipeline_rf.fit(train_data)

predictions_dt = model_dt.transform(test_data)
predictions_rf = model_rf.transform(test_data)

evaluator = RegressionEvaluator(labelCol="demand", predictionCol="prediction", metricName="mae")

mae_dt = evaluator.evaluate(predictions_dt)
r2_dt = evaluator_r2.evaluate(predictions_dt)
print(f"DecisionTree MAE: {mae_dt}")
print(f"DecisionTree R-squared (R2): {r2_dt}")

mae_rf = evaluator.evaluate(predictions_rf)
r2_rf = evaluator_r2.evaluate(predictions_rf)
print(f"RandomForest MAE: {mae_rf}")
print(f"RandomForest R-squared (R2): {r2_rf}")                 

DecisionTree MAE: 72.71133082643351
DecisionTree R-squared (R2): 0.636476770275656
RandomForest MAE: 72.17396989630456
RandomForest R-squared (R2): 0.6537463632597177
