In [37]:
# %pip install pyspark -qq

In [38]:
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.types import *

from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
import ast
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

from pyspark.ml.feature import VectorAssembler, StringIndexer, MinMaxScaler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor, GBTRegressor

In [39]:
# Khởi tạo Spark Session
# spark = SparkSession.builder.appName("kafka").getOrCreate()
# sc = spark.sparkContext


spark = SparkSession.builder \
    .appName("YourApp") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

In [40]:
def evaluator(predictions):
    # Tạo RegressionEvaluator
    evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="min_delay", metricName="mse")
    mse = evaluator.evaluate(predictions)

    evaluator.setMetricName("rmse")
    rmse = evaluator.evaluate(predictions)

    evaluator.setMetricName("mae")
    mae = evaluator.evaluate(predictions)

    evaluator.setMetricName("r2")
    r2 = evaluator.evaluate(predictions)

    # In ra các chỉ số đánh giá
    print("MSE:", mse)
    print("RMSE:",rmse)
    print("MAE:", mae)
    print("R-squared:",r2)

# 1. Bus

In [41]:
bus = spark.read.csv("../data/features_data/bus_feature_pd.csv", inferSchema=True, header=True)
bus.toPandas().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 49051 entries, 0 to 49050
Data columns (total 23 columns):
 #   Column             Non-Null Count  Dtype         
---  ------             --------------  -----         
 0   date               49051 non-null  object        
 1   route              49051 non-null  int32         
 2   time               49051 non-null  datetime64[ns]
 3   day                49051 non-null  object        
 4   location           49051 non-null  object        
 5   incident           49051 non-null  object        
 6   min_delay          49051 non-null  int32         
 7   min_gap            49051 non-null  int32         
 8   direction          49051 non-null  object        
 9   vehicle            49051 non-null  int32         
 10  day_month          49051 non-null  int32         
 11  month              49051 non-null  int32         
 12  hour               49051 non-null  int32         
 13  min                49051 non-null  int32         
 14  day_in

In [42]:
string_to_list_udf = udf(lambda s: ast.literal_eval(s), ArrayType(FloatType()))

bus = bus.withColumn("scaled_features_list", string_to_list_udf("scaled"))

list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())

bus = bus.withColumn("scaled_features_vector", list_to_vector_udf("scaled_features_list"))

In [43]:
train_data, dev_data, test_data = bus.randomSplit([0.7, 0.15, 0.15], seed=42)

In [44]:
train_data

DataFrame[date: date, route: int, time: timestamp, day: string, location: string, incident: string, min_delay: int, min_gap: int, direction: string, vehicle: int, day_month: int, month: int, hour: int, min: int, day_indexed: int, incident_indexed: int, direction_indexed: int, location_indexed: int, location_w2v: string, features: string, scaled: string, features_index: string, scaled_index: string, scaled_features_list: array<float>, scaled_features_vector: vector]

### 1.2 Ridge regression

In [12]:
# # Tạo một mô hình
# model = LinearRegression(featuresCol="scaled_features_vector", labelCol="min_delay", elasticNetParam=0.0)

# # Tạo một ParamGridBuilder
# param_grid = ParamGridBuilder() \
#     .addGrid(model.regParam, [0.1, 0.01, 0.001]) \
#     .build()

# # Tạo một CrossValidator
# crossval = CrossValidator(estimator=model,
#                           estimatorParamMaps=param_grid,
#                           evaluator=RegressionEvaluator(metricName="rmse", labelCol="min_delay"),
#                           numFolds=3)

# fitModel = crossval.fit(dev_data)

# best_model = fitModel.bestModel

# print("regParam:", best_model.getOrDefault("regParam"))

24/06/28 22:44:59 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [45]:
# Tạo một mô hình Ridge Regression
ridge = LinearRegression(featuresCol="scaled_features_vector", labelCol="min_delay", elasticNetParam=0.0, regParam=0.001)
# ridge = fitModel.bestModel
ridge_model = ridge.fit(train_data)


                                                                                

In [46]:
ridge_predictions = ridge_model.transform(test_data)
evaluator(ridge_predictions)

                                                                                

MSE: 16.603038100324348
RMSE: 4.074682576633958
MAE: 1.7349701565562081
R-squared: 0.7981149544162256


                                                                                

In [22]:
# MSE: 17.144832245575387
# RMSE: 4.140631865497751
# MAE: 1.7613354508365346
# R-squared: 0.7890555745043117

### decision trees

In [20]:
# # Tạo một mô hình
# model = DecisionTreeRegressor(featuresCol="scaled_features_vector", labelCol="min_delay")

# # Tạo một ParamGridBuilder
# param_grid = ParamGridBuilder() \
#             .addGrid(model.maxDepth, [2, 5, 10, 20, 30]) \
#             .addGrid(model.maxBins, [32, 64, 128]) \
#             .build()

# # Tạo một CrossValidator
# crossval = CrossValidator(estimator=model,
#                           estimatorParamMaps=param_grid,
#                           evaluator=RegressionEvaluator(metricName="rmse", labelCol="min_delay"),
#                           numFolds=3)

# fitModel = crossval.fit(dev_data)

# best_model = fitModel.bestModel

# print("maxDepth:", best_model.getOrDefault("maxDepth"))
# print("maxBins:", best_model.getOrDefault("maxBins"))

24/06/28 22:47:51 WARN CacheManager: Asked to cache already cached data.
24/06/28 22:47:51 WARN CacheManager: Asked to cache already cached data.

maxDepth: 5
maxBins: 128


                                                                                

In [47]:
decision_tree = DecisionTreeRegressor(featuresCol="scaled_features_vector", labelCol="min_delay", maxDepth=5, maxBins=128)
decision_tree_model = decision_tree.fit(train_data)

                                                                                

In [48]:
decision_tree_predictions = decision_tree_model.transform(test_data)
evaluator(decision_tree_predictions)



MSE: 11.240706612913698
RMSE: 3.352716303672844
MAE: 1.2377103149265793
R-squared: 0.8633183545547857


                                                                                

In [None]:
# MSE: 11.149358988243241
# RMSE: 3.3390655860948946
# MAE: 1.1966344948117174
# R-squared: 0.8628219225051258

### random forests

In [9]:
# # Tạo một mô hình
# model = RandomForestRegressor(featuresCol="scaled_features_vector", labelCol="min_delay")

# # Tạo một ParamGridBuilder
# param_grid = ParamGridBuilder() \
#             .addGrid(model.maxDepth, [5, 10, 15]) \
#             .addGrid(model.maxBins, [32, 64, 128]) \
#             .addGrid(model.numTrees, [10, 20, 30]) \
#             .build()

# # Tạo một CrossValidator
# crossval = CrossValidator(estimator=model,
#                           estimatorParamMaps=param_grid,
#                           evaluator=RegressionEvaluator(metricName="rmse", labelCol="min_delay"),
#                           numFolds=3)

# fitModel = crossval.fit(dev_data)

# best_model = fitModel.bestModel

# print("maxDepth:", best_model.getOrDefault("maxDepth"))
# print("maxBins:", best_model.getOrDefault("maxBins"))
# print("numTrees:", best_model.getOrDefault("numTrees"))

24/06/28 22:54:06 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
24/06/28 22:54:21 WARN DAGScheduler: Broadcasting large task binary with size 1077.7 KiB
24/06/28 22:54:21 WARN DAGScheduler: Broadcasting large task binary with size 1521.0 KiB
24/06/28 22:54:22 WARN DAGScheduler: Broadcasting large task binary with size 1018.7 KiB
24/06/28 22:54:22 WARN DAGScheduler: Broadcasting large task binary with size 1546.7 KiB
24/06/28 22:54:23 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
24/06/28 22:54:25 WARN DAGScheduler: Broadcasting large task binary with size 1014.3 KiB
24/06/28 22:54:26 WARN DAGScheduler: Broadcasting large task binary with size 1420.5 KiB
24/06/28 22:54:27 WARN DAGScheduler: Broadcasting large task binary with size 1479.4 KiB
24/06/28 22:54:28 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
24/06/28 22:54:3

maxDepth: 15
maxBins: 128
numTrees: 20


In [49]:
random_forests = RandomForestRegressor(featuresCol="scaled_features_vector", labelCol="min_delay", maxDepth=15, maxBins=128, numTrees= 20)
random_forests_model = random_forests.fit(train_data)

24/07/02 23:21:42 WARN DAGScheduler: Broadcasting large task binary with size 1418.8 KiB
24/07/02 23:21:43 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
24/07/02 23:21:44 WARN DAGScheduler: Broadcasting large task binary with size 3.6 MiB
24/07/02 23:21:46 WARN DAGScheduler: Broadcasting large task binary with size 5.3 MiB
24/07/02 23:21:47 WARN DAGScheduler: Broadcasting large task binary with size 7.6 MiB
24/07/02 23:21:50 WARN DAGScheduler: Broadcasting large task binary with size 10.2 MiB
24/07/02 23:21:51 WARN DAGScheduler: Broadcasting large task binary with size 1128.7 KiB
24/07/02 23:21:53 WARN DAGScheduler: Broadcasting large task binary with size 13.1 MiB
24/07/02 23:21:55 WARN DAGScheduler: Broadcasting large task binary with size 1184.7 KiB
                                                                                

In [50]:
random_forests_predictions = random_forests_model.transform(test_data)
evaluator(random_forests_predictions)



MSE: 10.118249850435042
RMSE: 3.180919654822335
MAE: 1.1891150901873278
R-squared: 0.8769668948574423


                                                                                

In [51]:
# MSE: 9.978423469054947
# RMSE: 3.158864268855967
# MAE: 1.1805625191047229
# R-squared: 0.8772287313236498

### gradient-boosted trees

In [11]:
# # Tạo một mô hình
# model = GBTRegressor(featuresCol="scaled_features_vector", labelCol="min_delay")

# # Tạo một ParamGridBuilder
# param_grid = ParamGridBuilder() \
#             .addGrid(model.maxDepth, [2, 5, 10]) \
#             .addGrid(model.maxBins, [32, 64, 128]) \
#             .addGrid(model.maxIter, [10, 20, 15]) \
#             .build()

# # Tạo một CrossValidator
# crossval = CrossValidator(estimator=model,
#                           estimatorParamMaps=param_grid,
#                           evaluator=RegressionEvaluator(metricName="rmse", labelCol="min_delay"),
#                           numFolds=3)

# fitModel = crossval.fit(dev_data)

# best_model = fitModel.bestModel

# print("maxDepth:", best_model.getOrDefault("maxDepth"))
# print("maxBins:", best_model.getOrDefault("maxBins"))
# print("maxIter:", best_model.getOrDefault("maxIter"))

24/06/28 23:05:50 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
24/06/28 23:05:55 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/06/28 23:06:57 WARN DAGScheduler: Broadcasting large task binary with size 1003.4 KiB
24/06/28 23:06:57 WARN DAGScheduler: Broadcasting large task binary with size 1000.2 KiB
24/06/28 23:06:57 WARN DAGScheduler: Broadcasting large task binary with size 1001.4 KiB
24/06/28 23:06:57 WARN DAGScheduler: Broadcasting large task binary with size 1003.6 KiB
24/06/28 23:06:58 WARN DAGScheduler: Broadcasting large task binary with size 1008.1 KiB
24/06/28 23:06:58 WARN DAGScheduler: Broadcasting large task binary with size 1016.3 KiB
24/06/28 23:06:58 WARN DAGScheduler: Broadcasting large task binary with size 1027.3 KiB
24/06/28 23:06:58 WARN DAGScheduler: Broadcasting large task binary with size 104

maxDepth: 2
maxBins: 128
maxIter: 20


In [52]:
gbt = GBTRegressor(featuresCol="scaled_features_vector", labelCol="min_delay", maxDepth=5, maxBins=128, maxIter=20)
gbt_model = gbt.fit(train_data)

                                                                                

In [53]:
gbt_predictions = gbt_model.transform(test_data)
evaluator(gbt_predictions)

                                                                                

MSE: 9.86400380320847
RMSE: 3.1407011642638767
MAE: 1.0983258690286752
R-squared: 0.8800584058522178


                                                                                

In [None]:
# MSE: 10.536299213621342
# RMSE: 3.245966606978781
# MAE: 1.111105763901118
# R-squared: 0.8703648100702991

# 2. Subway

In [21]:
subway = spark.read.csv("../data/features_data/subway_feature_pd.csv", inferSchema=True, header=True)
subway.toPandas().info()

                                                                                

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 7382 entries, 0 to 7381
Data columns (total 25 columns):
 #   Column           Non-Null Count  Dtype         
---  ------           --------------  -----         
 0   date             7382 non-null   object        
 1   time             7382 non-null   datetime64[ns]
 2   day              7382 non-null   object        
 3   station          7382 non-null   object        
 4   code             7382 non-null   object        
 5   min_delay        7382 non-null   int32         
 6   min_gap          7382 non-null   int32         
 7   bound            7382 non-null   object        
 8   line             7382 non-null   object        
 9   vehicle          7382 non-null   int32         
 10  day_month        7382 non-null   int32         
 11  month            7382 non-null   int32         
 12  hour             7382 non-null   int32         
 13  min              7382 non-null   int32         
 14  at_station       7382 non-null   int32  

In [65]:
# string_to_list_udf = udf(lambda s: ast.literal_eval(s), ArrayType(FloatType()))

# subway = subway.withColumn("scaled_features_list", string_to_list_udf("scaled_features"))
# list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())

# subway = subway.withColumn("scaled_features_vector", list_to_vector_udf("scaled_features_list"))

In [22]:
string_to_list_udf = udf(lambda s: ast.literal_eval(s), ArrayType(FloatType()))

subway = subway.withColumn("scaled_features_list", string_to_list_udf("scaled"))

list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())

subway = subway.withColumn("scaled_features_vector", list_to_vector_udf("scaled_features_list"))

In [23]:
subway.toPandas().info()

                                                                                

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 7382 entries, 0 to 7381
Data columns (total 27 columns):
 #   Column                  Non-Null Count  Dtype         
---  ------                  --------------  -----         
 0   date                    7382 non-null   object        
 1   time                    7382 non-null   datetime64[ns]
 2   day                     7382 non-null   object        
 3   station                 7382 non-null   object        
 4   code                    7382 non-null   object        
 5   min_delay               7382 non-null   int32         
 6   min_gap                 7382 non-null   int32         
 7   bound                   7382 non-null   object        
 8   line                    7382 non-null   object        
 9   vehicle                 7382 non-null   int32         
 10  day_month               7382 non-null   int32         
 11  month                   7382 non-null   int32         
 12  hour                    7382 non-null   int32   

In [24]:
train_data, dev_data, test_data = subway.randomSplit([0.7, 0.15, 0.15], seed=42)

### 1.2 Ridge regression

In [77]:
# # Tạo một mô hình
# model = LinearRegression(featuresCol="scaled_features_vector", labelCol="min_delay", elasticNetParam=0.0)

# # Tạo một ParamGridBuilder
# param_grid = ParamGridBuilder() \
#     .addGrid(model.regParam, [0.1, 0.01, 0.001]) \
#     .build()

# # Tạo một CrossValidator
# crossval = CrossValidator(estimator=model,
#                           estimatorParamMaps=param_grid,
#                           evaluator=RegressionEvaluator(metricName="rmse", labelCol="min_delay"),
#                           numFolds=3)

# fitModel = crossval.fit(dev_data)

# best_model = fitModel.bestModel

# print("regParam:", best_model.getOrDefault("regParam"))

regParam: 0.1


In [25]:
# Tạo một mô hình Ridge Regression
ridge = LinearRegression(featuresCol="scaled_features_vector", labelCol="min_delay", elasticNetParam=0.0, regParam=0.1)
ridge_model = ridge.fit(train_data)


                                                                                

In [26]:
ridge_predictions = ridge_model.transform(test_data)
evaluator(ridge_predictions)

MSE: 7.442551664266598
RMSE: 2.728104042053125
MAE: 1.4477730151606374
R-squared: 0.7841985634475148


In [None]:
# MSE: 7.442551664266598
# RMSE: 2.728104042053125
# MAE: 1.4477730151606374
# R-squared: 0.7841985634475148

### decision trees

In [16]:
# # Tạo một mô hình
# model = DecisionTreeRegressor(featuresCol="scaled_features_vector", labelCol="min_delay")

# # Tạo một ParamGridBuilder
# param_grid = ParamGridBuilder() \
#             .addGrid(model.maxDepth, [2, 5, 10, 20, 30]) \
#             .addGrid(model.maxBins, [32, 64, 128]) \
#             .build()

# # Tạo một CrossValidator
# crossval = CrossValidator(estimator=model,
#                           estimatorParamMaps=param_grid,
#                           evaluator=RegressionEvaluator(metricName="rmse", labelCol="min_delay"),
#                           numFolds=3)

# fitModel = crossval.fit(dev_data)

# best_model = fitModel.bestModel

# print("maxDepth:", best_model.getOrDefault("maxDepth"))
# print("maxBins:", best_model.getOrDefault("maxBins"))

maxDepth: 5
maxBins: 64


In [27]:
decision_tree = DecisionTreeRegressor(featuresCol="scaled_features_vector", labelCol="min_delay", maxDepth=5, maxBins=128)
decision_tree_model = decision_tree.fit(train_data)

In [28]:
decision_tree_predictions = decision_tree_model.transform(test_data)
evaluator(decision_tree_predictions)

MSE: 3.233354002637506
RMSE: 1.79815294194835
MAE: 0.9424879280879955
R-squared: 0.906246880085224


In [None]:
# MSE: 2.2079554382698614
# RMSE: 1.485919055086737
# MAE: 0.9015402672112608
# R-squared: 0.9327149447226201

### random forests

In [92]:
# # Tạo một mô hình
# model = RandomForestRegressor(featuresCol="scaled_features_vector", labelCol="min_delay")

# # Tạo một ParamGridBuilder
# param_grid = ParamGridBuilder() \
#             .addGrid(model.maxDepth, [5, 10, 15]) \
#             .addGrid(model.maxBins, [32, 64, 128]) \
#             .addGrid(model.numTrees, [10, 20, 30]) \
#             .build()

# # Tạo một CrossValidator
# crossval = CrossValidator(estimator=model,
#                           estimatorParamMaps=param_grid,
#                           evaluator=RegressionEvaluator(metricName="rmse", labelCol="min_delay"),
#                           numFolds=3)

# fitModel = crossval.fit(dev_data)

# best_model = fitModel.bestModel

# print("maxDepth:", best_model.getOrDefault("maxDepth"))
# print("maxBins:", best_model.getOrDefault("maxBins"))
# print("numTrees:", best_model.getOrDefault("numTrees"))

In [29]:
random_forests = RandomForestRegressor(featuresCol="scaled_features_vector", labelCol="min_delay", maxDepth=15, maxBins=128, numTrees= 30)
random_forests_model = random_forests.fit(train_data)

24/07/02 23:12:28 WARN DAGScheduler: Broadcasting large task binary with size 1010.1 KiB
24/07/02 23:12:28 WARN DAGScheduler: Broadcasting large task binary with size 1615.0 KiB
24/07/02 23:12:29 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
24/07/02 23:12:29 WARN DAGScheduler: Broadcasting large task binary with size 3.5 MiB
24/07/02 23:12:30 WARN DAGScheduler: Broadcasting large task binary with size 4.8 MiB
24/07/02 23:12:31 WARN DAGScheduler: Broadcasting large task binary with size 6.2 MiB
24/07/02 23:12:31 WARN DAGScheduler: Broadcasting large task binary with size 7.6 MiB
24/07/02 23:12:32 WARN DAGScheduler: Broadcasting large task binary with size 9.0 MiB


In [30]:
random_forests_predictions = random_forests_model.transform(test_data)
evaluator(random_forests_predictions)

MSE: 3.108324554349222
RMSE: 1.7630441158261532
MAE: 0.9522713789278683
R-squared: 0.9098721870725468


In [None]:
# MSE: 2.4413778377078024
# RMSE: 1.5624909080400444
# MAE: 0.9419932877792747
# R-squared: 0.9256016494192205

### gradient-boosted trees

In [None]:
# # Tạo một mô hình
# model = GBTRegressor(featuresCol="scaled_features_vector", labelCol="min_delay")

# # Tạo một ParamGridBuilder
# param_grid = ParamGridBuilder() \
#             .addGrid(model.maxDepth, [2, 5, 10]) \
#             .addGrid(model.maxBins, [32, 64, 128]) \
#             .addGrid(model.maxIter, [10, 20, 15]) \
#             .build()

# # Tạo một CrossValidator
# crossval = CrossValidator(estimator=model,
#                           estimatorParamMaps=param_grid,
#                           evaluator=RegressionEvaluator(metricName="rmse", labelCol="min_delay"),
#                           numFolds=3)

# fitModel = crossval.fit(dev_data)

# best_model = fitModel.bestModel

# print("maxDepth:", best_model.getOrDefault("maxDepth"))
# print("maxBins:", best_model.getOrDefault("maxBins"))
# print("maxIter:", best_model.getOrDefault("maxIter"))

In [31]:
gbt = GBTRegressor(featuresCol="scaled_features_vector", labelCol="min_delay", maxDepth=5, maxBins=128, maxIter=10)
gbt_model = gbt.fit(train_data)

In [32]:
gbt_predictions = gbt_model.transform(test_data)
evaluator(gbt_predictions)

                                                                                

MSE: 2.633249583453101
RMSE: 1.622729054233362
MAE: 0.8212140801305352
R-squared: 0.9236472827405747


In [None]:
# MSE: 1.8307272936196486
# RMSE: 1.3530437146004
# MAE: 0.7692065596140352
# R-squared: 0.9442105646636013