In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("BikeSharing_Modeling").getOrCreate()
joined_df = spark.read.parquet("data/joined_bike_data.parquet")


In [2]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline

# Features
categorical_cols = ["season", "yr", "mnth", "hr", "holiday", "weekday", "workingday", "weathersit"]
numerical_cols = ["temp", "atemp", "hum", "windspeed"]
target_col = "cnt"

# Index + OneHot
indexers = [StringIndexer(inputCol=col, outputCol=f"{col}_idx") for col in categorical_cols]
encoders = [OneHotEncoder(inputCol=f"{col}_idx", outputCol=f"{col}_ohe") for col in categorical_cols]

# Assemble all features
feature_cols = [f"{col}_ohe" for col in categorical_cols] + numerical_cols
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
  
# Scale
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")

# Full preprocessing pipeline
preprocessing = Pipeline(stages=indexers + encoders + [assembler, scaler])
model_input = preprocessing.fit(joined_df).transform(joined_df)


In [3]:
train_df, test_df = model_input.randomSplit([0.8, 0.2], seed=42)


Linear Regression

In [4]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol="scaled_features", labelCol="cnt")
lr_model = lr.fit(train_df)
lr_preds = lr_model.transform(test_df)

# Evaluation
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="cnt", predictionCol="prediction", metricName="rmse")
print("RMSE:", evaluator.evaluate(lr_preds))


RMSE: 102.66531858745806


Decision Tree Regressor

In [5]:
from pyspark.ml.regression import DecisionTreeRegressor

dt = DecisionTreeRegressor(featuresCol="scaled_features", labelCol="cnt")
dt_model = dt.fit(train_df)
dt_preds = dt_model.transform(test_df)

dt_rmse = evaluator.evaluate(dt_preds)
dt_r2 = evaluator.setMetricName("r2").evaluate(dt_preds)

print("Decision Tree RMSE:", dt_rmse)
print("Decision Tree R²:", dt_r2)


Decision Tree RMSE: 129.00656526720593
Decision Tree R²: 0.5124402234396651


Random Forest Regressor

In [6]:
from pyspark.ml.regression import RandomForestRegressor

rf = RandomForestRegressor(featuresCol="scaled_features", labelCol="cnt", numTrees=100)
rf_model = rf.fit(train_df)
rf_preds = rf_model.transform(test_df)

rf_rmse = evaluator.evaluate(rf_preds)
rf_r2 = evaluator.setMetricName("r2").evaluate(rf_preds)

print("Random Forest RMSE:", rf_rmse)
print("Random Forest R²:", rf_r2)

Random Forest RMSE: 0.5495092615117809
Random Forest R²: 0.5495092615117809


Gradient-Boosted Tree Regressor

In [7]:
from pyspark.ml.regression import GBTRegressor

gbt = GBTRegressor(featuresCol="scaled_features", labelCol="cnt", maxIter=50)
gbt_model = gbt.fit(train_df)
gbt_preds = gbt_model.transform(test_df)

gbt_rmse = evaluator.evaluate(gbt_preds)
gbt_r2 = evaluator.setMetricName("r2").evaluate(gbt_preds)

print("GBT RMSE:", gbt_rmse)
print("GBT R²:", gbt_r2)

GBT RMSE: 0.8808462474791239
GBT R²: 0.8808462474791239


Results Summary Table

In [17]:
print("\n--- Model Comparison ---")
print(f"Linear Regression    | RMSE: {evaluator.evaluate(lr_preds):.2f} | R²: {evaluator.setMetricName('r2').evaluate(lr_preds):.4f}")
print(f"Decision Tree        | RMSE: {dt_rmse:.2f} | R²: {dt_r2:.4f}")
print(f"Random Forest        | RMSE: {rf_rmse:.2f} | R²: {rf_r2:.4f}")
print(f"Gradient BoostedTree | RMSE: {gbt_rmse:.2f} | R²: {gbt_r2:.4f}")



--- Model Comparison ---
Linear Regression    | RMSE: 0.69 | R²: 0.6912
Decision Tree        | RMSE: 0.73 | R²: 0.7270
Random Forest        | RMSE: 0.76 | R²: 0.7645
Gradient BoostedTree | RMSE: 0.79 | R²: 0.7923


 Hyperparameter Tuning – Linear Regression

In [9]:
# Split data
lr = LinearRegression(featuresCol="scaled_features", labelCol="cnt")



In [10]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning   import TrainValidationSplit, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

# 1. Split the *preprocessed* data (with scaled_features)  
train_data, test_data = model_input.randomSplit([0.8, 0.2], seed=42)

# 2. Define the model using the correct features column  
lr = LinearRegression(featuresCol="scaled_features", labelCol="cnt")

# 3. Build a small grid of hyperparameters  
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam,      [0.01, 0.1, 0.5])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])  # Ridge → ElasticNet → Lasso
             .build())

# 4. Define the evaluator  
evaluator = RegressionEvaluator(labelCol="cnt", predictionCol="prediction", metricName="rmse")

# 5. Set up TrainValidationSplit  
tvs = TrainValidationSplit(estimator=lr,
                           estimatorParamMaps=paramGrid,
                           evaluator=evaluator,
                           trainRatio=0.8,
                           seed=42)

# 6. Fit on train_data  
lr_tuned = tvs.fit(train_data)

# 7. Get best model & evaluate on test_data  
best_lr = lr_tuned.bestModel
preds = best_lr.transform(test_data)

rmse = evaluator.evaluate(preds)
r2   = evaluator.setMetricName("r2").evaluate(preds)

print(f"Tuned LR → RMSE: {rmse:.3f}, R²: {r2:.4f}")
print("Best regParam:     ", best_lr.getOrDefault("regParam"))
print("Best elasticNetParam:", best_lr.getOrDefault("elasticNetParam"))


Tuned LR → RMSE: 102.669, R²: 0.6912
Best regParam:      0.01
Best elasticNetParam: 1.0


Decision Tree Regressor

In [11]:
from pyspark.ml.regression import DecisionTreeRegressor

dt = DecisionTreeRegressor(featuresCol="scaled_features", labelCol="cnt")

paramGrid_dt = (ParamGridBuilder()
                .addGrid(dt.maxDepth, [3, 5, 10])
                .addGrid(dt.minInstancesPerNode, [1, 2, 4])
                .build())

tvs_dt = TrainValidationSplit(estimator=dt,
                              estimatorParamMaps=paramGrid_dt,
                              evaluator=evaluator,
                              trainRatio=0.8,
                              seed=42)

dt_model = tvs_dt.fit(train_data)
dt_preds = dt_model.bestModel.transform(test_data)
dt_rmse  = evaluator.evaluate(dt_preds)
dt_r2    = evaluator.setMetricName("r2").evaluate(dt_preds)


Random Forest Regressor

In [12]:
from pyspark.ml.regression import RandomForestRegressor

rf = RandomForestRegressor(featuresCol="scaled_features", labelCol="cnt")

paramGrid_rf = (ParamGridBuilder()
                .addGrid(rf.numTrees, [10, 50])
                .addGrid(rf.maxDepth, [5, 10])
                .build())

tvs_rf = TrainValidationSplit(estimator=rf,
                              estimatorParamMaps=paramGrid_rf,
                              evaluator=evaluator,
                              trainRatio=0.8,
                              seed=42)

rf_model = tvs_rf.fit(train_data)
rf_preds = rf_model.bestModel.transform(test_data)
rf_rmse  = evaluator.evaluate(rf_preds)
rf_r2    = evaluator.setMetricName("r2").evaluate(rf_preds)


Gradient Boosted Trees (GBT)

In [13]:
from pyspark.ml.regression import GBTRegressor

gbt = GBTRegressor(featuresCol="scaled_features", labelCol="cnt")

paramGrid_gbt = (ParamGridBuilder()
                 .addGrid(gbt.maxIter, [10, 20])
                 .addGrid(gbt.maxDepth, [3, 5])
                 .build())

tvs_gbt = TrainValidationSplit(estimator=gbt,
                               estimatorParamMaps=paramGrid_gbt,
                               evaluator=evaluator,
                               trainRatio=0.8,
                               seed=42)

gbt_model = tvs_gbt.fit(train_data)
gbt_preds = gbt_model.bestModel.transform(test_data)
gbt_rmse  = evaluator.evaluate(gbt_preds)
gbt_r2    = evaluator.setMetricName("r2").evaluate(gbt_preds)


Results

In [14]:
# Make sure evaluator is reset to RMSE
evaluator.setMetricName("rmse")
lr_rmse = evaluator.evaluate(lr_preds)

# Then reset evaluator to R²
evaluator.setMetricName("r2")
lr_r2 = evaluator.evaluate(lr_preds)


In [15]:
results = {
    "Model": ["Linear Regression", "Decision Tree", "Random Forest", "GBT"],
    "RMSE":  [lr_rmse, dt_rmse, rf_rmse, gbt_rmse],
    "R²":    [lr_r2, dt_r2, rf_r2, gbt_r2]
}

import pandas as pd
results_df = pd.DataFrame(results)
results_df


Unnamed: 0,Model,RMSE,R²
0,Linear Regression,102.665319,0.691218
1,Decision Tree,0.727018,0.727018
2,Random Forest,0.764511,0.764511
3,GBT,0.79228,0.79228


hyper paramater tuning 

In [20]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

# Reuse the same evaluator for all
evaluator = RegressionEvaluator(labelCol="cnt", predictionCol="prediction", metricName="rmse")

def cross_validate(model, param_grid, train_data, evaluator, num_folds=3):
    cv = CrossValidator(
        estimator=model,
        estimatorParamMaps=param_grid,
        evaluator=evaluator,
        numFolds=num_folds,
        parallelism=2
    )
    return cv.fit(train_data)

# --- 1. Linear Regression ---
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol='scaled_features', labelCol='cnt')

lr_param_grid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 0.3]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

lr_cv_model = cross_validate(lr, lr_param_grid, train_data, evaluator)
lr_best_model = lr_cv_model.bestModel
lr_rmse = evaluator.evaluate(lr_best_model.transform(test_data))
lr_r2 = RegressionEvaluator(labelCol="cnt", predictionCol="prediction", metricName="r2").evaluate(lr_best_model.transform(test_data))

print("Linear Regression Tuned RMSE:", lr_rmse)
print("Linear Regression Tuned R²:", lr_r2)

# --- 2. Decision Tree ---
from pyspark.ml.regression import DecisionTreeRegressor

dt = DecisionTreeRegressor(featuresCol='scaled_features', labelCol='cnt')

dt_param_grid = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [5, 10, 15]) \
    .addGrid(dt.maxBins, [32, 64]) \
    .build()

dt_cv_model = cross_validate(dt, dt_param_grid, train_data, evaluator)
dt_best_model = dt_cv_model.bestModel
dt_rmse = evaluator.evaluate(dt_best_model.transform(test_data))
dt_r2 = RegressionEvaluator(labelCol="cnt", predictionCol="prediction", metricName="r2").evaluate(dt_best_model.transform(test_data))

print("Decision Tree Tuned RMSE:", dt_rmse)
print("Decision Tree Tuned R²:", dt_r2)

# --- 3. Random Forest ---
from pyspark.ml.regression import RandomForestRegressor

rf = RandomForestRegressor(featuresCol='scaled_features', labelCol='cnt')

rf_param_grid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [20, 50]) \
    .addGrid(rf.maxDepth, [5, 10]) \
    .build()

rf_cv_model = cross_validate(rf, rf_param_grid, train_data, evaluator)
rf_best_model = rf_cv_model.bestModel
rf_rmse = evaluator.evaluate(rf_best_model.transform(test_data))
rf_r2 = RegressionEvaluator(labelCol="cnt", predictionCol="prediction", metricName="r2").evaluate(rf_best_model.transform(test_data))

print("Random Forest Tuned RMSE:", rf_rmse)
print("Random Forest Tuned R²:", rf_r2)

# --- 4. GBT Regressor ---
from pyspark.ml.regression import GBTRegressor

gbt = GBTRegressor(featuresCol='scaled_features', labelCol='cnt')

gbt_param_grid = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [3, 5]) \
    .addGrid(gbt.maxIter, [20, 50]) \
    .addGrid(gbt.stepSize, [0.1, 0.2]) \
    .build()

gbt_cv_model = cross_validate(gbt, gbt_param_grid, train_data, evaluator)
gbt_best_model = gbt_cv_model.bestModel
gbt_rmse = evaluator.evaluate(gbt_best_model.transform(test_data))


Linear Regression Tuned RMSE: 102.71859095063755
Linear Regression Tuned R²: 0.6908976186597764
Decision Tree Tuned RMSE: 86.10685246373131
Decision Tree Tuned R²: 0.7827900712801916
Random Forest Tuned RMSE: 89.65673028227825
Random Forest Tuned R²: 0.7645113190300485


comparaison table

In [21]:
tuned_results = pd.DataFrame({
    'Model': ['Linear Regression (Tuned)', 'Decision Tree (Tuned)', 'Random Forest (Tuned)', 'GBT (Tuned)'],
    'RMSE': [lr_rmse, dt_rmse, rf_rmse, gbt_rmse],
    'R²': [lr_r2, dt_r2, rf_r2, gbt_r2]
})

results_df = pd.concat([results_df, tuned_results], ignore_index=True)
results_df.sort_values(by="R²", ascending=False)


Unnamed: 0,Model,RMSE,R²
3,GBT,0.79228,0.79228
7,GBT (Tuned),57.122435,0.79228
5,Decision Tree (Tuned),86.106852,0.78279
2,Random Forest,0.764511,0.764511
6,Random Forest (Tuned),89.65673,0.764511
1,Decision Tree,0.727018,0.727018
0,Linear Regression,102.665319,0.691218
4,Linear Regression (Tuned),102.718591,0.690898


In [None]:

df = joined_df


In [None]:
from pyspark.sql.functions import sum

total_rentals = df.select(sum("cnt")).collect()[0][0]
print("Total Rentals:", total_rentals)

# Save to CSV manually if needed:
import pandas as pd
pd.DataFrame([{"Total Rentals": total_rentals}]).to_csv("kpi_total_rentals.csv", index=False)



In [None]:
from pyspark.sql.functions import avg

hourly_avg = df.groupBy("hr").agg(avg("cnt").alias("avg_rentals")).orderBy("hr")
hourly_avg_pd = hourly_avg.toPandas()

# Save
hourly_avg_pd.to_csv("kpi_avg_rentals_by_hour.csv", index=False)

# Plot
import matplotlib.pyplot as plt
import seaborn as sns

plt.figure(figsize=(10, 5))
sns.barplot(data=hourly_avg_pd, x="hr", y="avg_rentals", palette="viridis")
plt.title("Average Rentals by Hour")
plt.xlabel("Hour of the Day")
plt.ylabel("Average Rentals")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()



In [None]:
seasonal_totals = df.groupBy("season").agg(sum("cnt").alias("total_rentals")).orderBy("season")
seasonal_pd = seasonal_totals.toPandas()

# Save
seasonal_pd.to_csv("kpi_rentals_by_season.csv", index=False)

# Plot
plt.figure(figsize=(6, 4))
sns.barplot(data=seasonal_pd, x="season", y="total_rentals", palette="muted")
plt.title("Total Rentals by Season")
plt.xlabel("Season")
plt.ylabel("Total Rentals")
plt.tight_layout()
plt.show()



In [None]:
from pyspark.sql.functions import when

df_binned = df.withColumn("temp_range", when(df.temp < 0.3, "Low")
                                        .when((df.temp >= 0.3) & (df.temp < 0.6), "Medium")
                                        .otherwise("High"))

temp_range_avg = df_binned.groupBy("temp_range").agg(avg("cnt").alias("avg_rentals"))
temp_range_pd = temp_range_avg.toPandas()

# Save
temp_range_pd.to_csv("kpi_rentals_by_temp_range.csv", index=False)

# Plot
plt.figure(figsize=(6, 4))
sns.barplot(data=temp_range_pd, x="temp_range", y="avg_rentals", order=["Low", "Medium", "High"])
plt.title("Average Rentals by Temperature Range")
plt.xlabel("Temperature Range")
plt.ylabel("Average Rentals")
plt.tight_layout()
plt.show()


In [None]:
from pyspark.sql.functions import sum as _sum

# Total casual and registered users
totals = joined_df.agg(
    _sum("casual").alias("casual_total"),
    _sum("registered").alias("registered_total")
).collect()[0]

casual_total = totals["casual_total"]
registered_total = totals["registered_total"]

# Then visualize
import matplotlib.pyplot as plt

plt.figure(figsize=(6, 6))
plt.pie([casual_total, registered_total], labels=["Casual", "Registered"], autopct="%1.1f%%", startangle=90)
plt.title("Distribution of Casual vs Registered Users")
plt.show()



In [None]:
df.groupBy("weathersit").agg(avg("cnt").alias("avg_rentals")).orderBy("weathersit").show()


In [None]:
df.groupBy("dteday").agg(sum("cnt").alias("daily_total")).orderBy("daily_total", ascending=False).show(1)


In [None]:
df.select(avg("casual").alias("avg_casual"), avg("registered").alias("avg_registered")).show()


In [None]:
df.groupBy("weekday").agg(avg("cnt").alias("avg_rentals")).orderBy("weekday").show()


In [None]:
from pyspark.sql.functions import when

df = df.withColumn("temp_range", when(df.temp < 0.3, "Low")
                                  .when((df.temp >= 0.3) & (df.temp < 0.6), "Medium")
                                  .otherwise("High"))

df.groupBy("temp_range").agg(avg("cnt").alias("avg_rentals")).orderBy("temp_range").show()
