In [3]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName("Auto-MPG-Efficiency _predictin").getOrCreate()

In [None]:
# Đọc tệp CSV bằng Spark
df = spark.read.csv('/Reduced-Data.csv', header=True, inferSchema=True)


                                                                                

In [6]:
df.show(30)

+---+------+------+------------+---------------+---+--------+-------+---------+-------------+-----------------+-------------------+
|  r|m (kg)|    Mt|Ewltp (g/km)|             Ft| Fm|ec (cm3)|ep (KW)|z (Wh/km)|Erwltp (g/km)|Fuel consumption |Electric range (km)|
+---+------+------+------------+---------------+---+--------+-------+---------+-------------+-----------------+-------------------+
|  1|1337.0|1446.0|       126.0|            lpg|  B|   999.0|   74.0|     NULL|          1.7|              7.8|               NULL|
|  1|1670.0|1782.0|       125.0|         petrol|  H|  2487.0|  131.0|     NULL|          0.8|              5.5|               NULL|
|  1|2044.0|2187.0|         0.0|       electric|  E|    NULL|  221.0|    172.0|         NULL|             NULL|              440.0|
|  1|1493.0|1576.0|       135.0|         petrol|  M|  1199.0|   96.0|     NULL|          2.0|              6.0|               NULL|
|  1|1649.0|1814.0|       131.0|         petrol|  H|  1598.0|  132.0|     NU

In [7]:
df.printSchema()

root
 |-- r: integer (nullable = true)
 |-- m (kg): double (nullable = true)
 |-- Mt: double (nullable = true)
 |-- Ewltp (g/km): double (nullable = true)
 |-- Ft: string (nullable = true)
 |-- Fm: string (nullable = true)
 |-- ec (cm3): double (nullable = true)
 |-- ep (KW): double (nullable = true)
 |-- z (Wh/km): double (nullable = true)
 |-- Erwltp (g/km): double (nullable = true)
 |-- Fuel consumption : double (nullable = true)
 |-- Electric range (km): double (nullable = true)



In [8]:
from pyspark.sql.functions import col, sum

# Kiểm tra số lượng giá trị thiếu trong mỗi cột
missing_counts = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
missing_counts.show()




+---+------+------+------------+---+---+--------+-------+---------+-------------+-----------------+-------------------+
|  r|m (kg)|    Mt|Ewltp (g/km)| Ft| Fm|ec (cm3)|ep (KW)|z (Wh/km)|Erwltp (g/km)|Fuel consumption |Electric range (km)|
+---+------+------+------------+---+---+--------+-------+---------+-------------+-----------------+-------------------+
|  0|   425|523557|       20891|  0|  1| 1670374|  35372|  8374652|      4966748|          3183164|            8398594|
+---+------+------+------------+---+---+--------+-------+---------+-------------+-----------------+-------------------+



                                                                                

In [9]:
df = df.drop('r')

In [10]:
df.show(5)

+------+------+------------+--------+---+--------+-------+---------+-------------+-----------------+-------------------+
|m (kg)|    Mt|Ewltp (g/km)|      Ft| Fm|ec (cm3)|ep (KW)|z (Wh/km)|Erwltp (g/km)|Fuel consumption |Electric range (km)|
+------+------+------------+--------+---+--------+-------+---------+-------------+-----------------+-------------------+
|1337.0|1446.0|       126.0|     lpg|  B|   999.0|   74.0|     NULL|          1.7|              7.8|               NULL|
|1670.0|1782.0|       125.0|  petrol|  H|  2487.0|  131.0|     NULL|          0.8|              5.5|               NULL|
|2044.0|2187.0|         0.0|electric|  E|    NULL|  221.0|    172.0|         NULL|             NULL|              440.0|
|1493.0|1576.0|       135.0|  petrol|  M|  1199.0|   96.0|     NULL|          2.0|              6.0|               NULL|
|1649.0|1814.0|       131.0|  petrol|  H|  1598.0|  132.0|     NULL|         0.59|              5.8|               NULL|
+------+------+------------+----

In [11]:
# Lấy số hàng
num_rows = df.count()

# Lấy số cột
num_columns = len(df.columns)

# Hiển thị kết quả giống như df.shape trong Pandas
print((num_rows, num_columns))




(10734656, 11)


                                                                                

In [12]:
# Điền giá trị 0 vào các ô null trong cột 'Electric range (km)'
df = df.fillna({'Electric range (km)': 0})


In [13]:
df.show(5)

+------+------+------------+--------+---+--------+-------+---------+-------------+-----------------+-------------------+
|m (kg)|    Mt|Ewltp (g/km)|      Ft| Fm|ec (cm3)|ep (KW)|z (Wh/km)|Erwltp (g/km)|Fuel consumption |Electric range (km)|
+------+------+------------+--------+---+--------+-------+---------+-------------+-----------------+-------------------+
|1337.0|1446.0|       126.0|     lpg|  B|   999.0|   74.0|     NULL|          1.7|              7.8|                0.0|
|1670.0|1782.0|       125.0|  petrol|  H|  2487.0|  131.0|     NULL|          0.8|              5.5|                0.0|
|2044.0|2187.0|         0.0|electric|  E|    NULL|  221.0|    172.0|         NULL|             NULL|              440.0|
|1493.0|1576.0|       135.0|  petrol|  M|  1199.0|   96.0|     NULL|          2.0|              6.0|                0.0|
|1649.0|1814.0|       131.0|  petrol|  H|  1598.0|  132.0|     NULL|         0.59|              5.8|                0.0|
+------+------+------------+----

In [14]:
# Loại bỏ cột 'z (Wh/km)' khỏi Spark DataFrame
df = df.drop('z (Wh/km)')


In [15]:
df.show(5)

+------+------+------------+--------+---+--------+-------+-------------+-----------------+-------------------+
|m (kg)|    Mt|Ewltp (g/km)|      Ft| Fm|ec (cm3)|ep (KW)|Erwltp (g/km)|Fuel consumption |Electric range (km)|
+------+------+------------+--------+---+--------+-------+-------------+-----------------+-------------------+
|1337.0|1446.0|       126.0|     lpg|  B|   999.0|   74.0|          1.7|              7.8|                0.0|
|1670.0|1782.0|       125.0|  petrol|  H|  2487.0|  131.0|          0.8|              5.5|                0.0|
|2044.0|2187.0|         0.0|electric|  E|    NULL|  221.0|         NULL|             NULL|              440.0|
|1493.0|1576.0|       135.0|  petrol|  M|  1199.0|   96.0|          2.0|              6.0|                0.0|
|1649.0|1814.0|       131.0|  petrol|  H|  1598.0|  132.0|         0.59|              5.8|                0.0|
+------+------+------------+--------+---+--------+-------+-------------+-----------------+-------------------+
o

In [16]:
df = df.drop('Mt')


In [17]:
df = df.drop('Ewltp (g/km)')


In [18]:
df = df.drop('Erwltp (g/km)')


In [19]:
df.show(5)

+------+--------+---+--------+-------+-----------------+-------------------+
|m (kg)|      Ft| Fm|ec (cm3)|ep (KW)|Fuel consumption |Electric range (km)|
+------+--------+---+--------+-------+-----------------+-------------------+
|1337.0|     lpg|  B|   999.0|   74.0|              7.8|                0.0|
|1670.0|  petrol|  H|  2487.0|  131.0|              5.5|                0.0|
|2044.0|electric|  E|    NULL|  221.0|             NULL|              440.0|
|1493.0|  petrol|  M|  1199.0|   96.0|              6.0|                0.0|
|1649.0|  petrol|  H|  1598.0|  132.0|              5.8|                0.0|
+------+--------+---+--------+-------+-----------------+-------------------+
only showing top 5 rows



In [20]:
# Lấy số hàng
num_rows = df.count()

# Lấy số cột
num_columns = len(df.columns)

# Hiển thị kết quả giống như df.shape trong Pandas
print((num_rows, num_columns))




(10734656, 7)


                                                                                

In [21]:
# Kiểm tra số lượng giá trị thiếu trong mỗi cột
missing_counts = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
missing_counts.show()




+------+---+---+--------+-------+-----------------+-------------------+
|m (kg)| Ft| Fm|ec (cm3)|ep (KW)|Fuel consumption |Electric range (km)|
+------+---+---+--------+-------+-----------------+-------------------+
|   425|  0|  1| 1670374|  35372|          3183164|                  0|
+------+---+---+--------+-------+-----------------+-------------------+



                                                                                

In [22]:
df = df.dropna()


In [23]:
# Lấy số hàng
num_rows = df.count()

# Lấy số cột
num_columns = len(df.columns)

# Hiển thị kết quả giống như df.shape trong Pandas
print((num_rows, num_columns))



(7550972, 7)


                                                                                

In [24]:
df.show(5)

+------+------+---+--------+-------+-----------------+-------------------+
|m (kg)|    Ft| Fm|ec (cm3)|ep (KW)|Fuel consumption |Electric range (km)|
+------+------+---+--------+-------+-----------------+-------------------+
|1337.0|   lpg|  B|   999.0|   74.0|              7.8|                0.0|
|1670.0|petrol|  H|  2487.0|  131.0|              5.5|                0.0|
|1493.0|petrol|  M|  1199.0|   96.0|              6.0|                0.0|
|1649.0|petrol|  H|  1598.0|  132.0|              5.8|                0.0|
|1560.0|petrol|  H|  1987.0|  112.0|              5.2|                0.0|
+------+------+---+--------+-------+-----------------+-------------------+
only showing top 5 rows



In [25]:
# Hiển thị các giá trị unique trong cột 'Ft'
df.select("Ft").distinct().show()

# Hiển thị các giá trị unique trong cột 'Fm'
df.select("Fm").distinct().show()


                                                                                

+---------------+
|             Ft|
+---------------+
|         petrol|
|             ng|
|diesel/electric|
|            e85|
|            lpg|
|petrol/electric|
|         diesel|
|         DIESEL|
|PETROL/ELECTRIC|
|         PETROL|
|            LPG|
+---------------+





+---+
| Fm|
+---+
|  F|
|  B|
|  P|
|  M|
|  H|
+---+



                                                                                

In [26]:
# Loại bỏ khoảng trắng thừa trong tên cột "Fuel consumption "
df = df.withColumnRenamed("Fuel consumption ", "Fuel consumption")

In [27]:
from pyspark.sql.functions import lower, regexp_replace
# 1. Chuẩn hóa cột 'Ft' về chữ thường và thay dấu gạch chéo bằng dấu gạch ngang
df = df.withColumn("Ft", lower(regexp_replace("Ft", "/", "-")))

# Hiển thị các giá trị unique trong cột 'Ft' sau khi chuẩn hóa
print("Unique values in 'Ft' after normalization:")
df.select("Ft").distinct().show()

Unique values in 'Ft' after normalization:




+---------------+
|             Ft|
+---------------+
|         petrol|
|petrol-electric|
|             ng|
|            e85|
|            lpg|
|         diesel|
|diesel-electric|
+---------------+



                                                                                

In [28]:
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, StandardScaler

In [29]:
# Mã hóa cột phân loại
indexer_Ft = StringIndexer(inputCol="Ft", outputCol="Ft_index")
indexer_Fm = StringIndexer(inputCol="Fm", outputCol="Fm_index")
encoder_Ft = OneHotEncoder(inputCol="Ft_index", outputCol="Ft_encoded")
encoder_Fm = OneHotEncoder(inputCol="Fm_index", outputCol="Fm_encoded")

# Tạo VectorAssembler cho các đặc trưng trước khi scale
assembler = VectorAssembler(
    inputCols=["m (kg)", "Ft_encoded", "Fm_encoded", "ec (cm3)", "ep (KW)", "Electric range (km)"],
    outputCol="assembled_features"
)


In [30]:
# Chuẩn hóa các đặc trưng số
scaler = StandardScaler(inputCol="assembled_features", outputCol="features")

In [31]:
from pyspark.ml.regression import LinearRegression, RandomForestRegressor, GBTRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

In [32]:
# 1. Mô hình Linear Regression
lr = LinearRegression(featuresCol="features", labelCol="Fuel consumption", regParam=0.1)
pipeline_lr = Pipeline(stages=[indexer_Ft, indexer_Fm, encoder_Ft, encoder_Fm, assembler, scaler, lr])

In [33]:
# 2. Mô hình Random Forest Regressor
rf = RandomForestRegressor(featuresCol="features", labelCol="Fuel consumption", numTrees=50, maxDepth=10)  

pipeline_rf = Pipeline(stages=[indexer_Ft, indexer_Fm, encoder_Ft, encoder_Fm, assembler, scaler, rf])

In [34]:
# 3. Mô hình Gradient Boosting Regressor
gbt = GBTRegressor(featuresCol="features", labelCol="Fuel consumption")
pipeline_gbt = Pipeline(stages=[indexer_Ft, indexer_Fm, encoder_Ft, encoder_Fm, assembler, scaler, gbt])

In [36]:
# Chia dữ liệu thành tập huấn luyện và kiểm tra
train_data, test_data = df.randomSplit([0.8, 0.2], seed=1234)


In [36]:
# # Khởi tạo Linear Regression với giá trị regParam nhỏ
# lr = LinearRegression(featuresCol="features", labelCol="Fuel consumption", regParam=0.1)

# # Xây dựng pipeline như trước
# pipeline_lr = Pipeline(stages=[indexer_Ft, indexer_Fm, encoder_Ft, encoder_Fm, assembler, scaler, lr])


In [38]:
# Huấn luyện và dự đoán với Linear Regression
model_lr = pipeline_lr.fit(train_data)
predictions_lr = model_lr.transform(test_data)

24/12/03 07:10:47 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/12/03 07:11:16 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
                                                                                

In [39]:
from pyspark.ml.evaluation import RegressionEvaluator

# Đánh giá các mô hình với các chỉ số: RMSE, MAE, MSE, R2
evaluator_rmse = RegressionEvaluator(labelCol="Fuel consumption", predictionCol="prediction", metricName="rmse")
evaluator_mae = RegressionEvaluator(labelCol="Fuel consumption", predictionCol="prediction", metricName="mae")
evaluator_mse = RegressionEvaluator(labelCol="Fuel consumption", predictionCol="prediction", metricName="mse")
evaluator_r2 = RegressionEvaluator(labelCol="Fuel consumption", predictionCol="prediction", metricName="r2")

# Đánh giá cho mô hình Linear Regression
rmse_lr = evaluator_rmse.evaluate(predictions_lr)
mae_lr = evaluator_mae.evaluate(predictions_lr)
mse_lr = evaluator_mse.evaluate(predictions_lr)
r2_lr = evaluator_r2.evaluate(predictions_lr)

# In ra các kết quả đánh giá cho mô hình Linear Regression
print(f"Linear Regression - RMSE: {rmse_lr}, MAE: {mae_lr}, MSE: {mse_lr}, R2: {r2_lr}")




Linear Regression - RMSE: 0.642590342562376, MAE: 0.4534623074340169, MSE: 0.41292234835443165, R2: 0.8707315082013685


                                                                                

In [40]:
# Huấn luyện và dự đoán với Random Forest
model_rf = pipeline_rf.fit(train_data)
predictions_rf = model_rf.transform(test_data)

24/12/02 06:55:45 WARN DAGScheduler: Broadcasting large task binary with size 1326.4 KiB
24/12/02 06:56:33 WARN DAGScheduler: Broadcasting large task binary with size 1970.2 KiB
24/12/02 06:57:23 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB


In [41]:
# Huấn luyện và dự đoán với Gradient Boosting
model_gbt = pipeline_gbt.fit(train_data)
predictions_gbt = model_gbt.transform(test_data)

                                                                                

In [42]:
# Đánh giá cho mô hình Random Forest
rmse_rf = evaluator_rmse.evaluate(predictions_rf)
mae_rf = evaluator_mae.evaluate(predictions_rf)
mse_rf = evaluator_mse.evaluate(predictions_rf)
r2_rf = evaluator_r2.evaluate(predictions_rf)

# In kết quả đánh giá cho mô hình Random Forest
print(f"Random Forest Regressor - RMSE: {rmse_rf}, MAE: {mae_rf}, MSE: {mse_rf}, R2: {r2_rf}")

# Đánh giá cho mô hình Gradient Boosting
rmse_gbt = evaluator_rmse.evaluate(predictions_gbt)
mae_gbt = evaluator_mae.evaluate(predictions_gbt)
mse_gbt = evaluator_mse.evaluate(predictions_gbt)
r2_gbt = evaluator_r2.evaluate(predictions_gbt)

# In kết quả đánh giá cho mô hình Gradient Boosting
print(f"Gradient Boosting Regressor - RMSE: {rmse_gbt}, MAE: {mae_gbt}, MSE: {mse_gbt}, R2: {r2_gbt}")


                                                                                

Random Forest Regressor - RMSE: 0.459345460385933, MAE: 0.29886178090016313, MSE: 0.21099825197716474, R2: 0.933945387276971




Gradient Boosting Regressor - RMSE: 0.4395075680133984, MAE: 0.27975731812069365, MSE: 0.19316690234105205, R2: 0.9395276273358593


                                                                                

In [43]:
# Lấy tên các đặc trưng từ các cột đã chọn trong VectorAssembler
feature_names = ["m (kg)", "Ft_encoded", "Fm_encoded", "ec (cm3)", "ep (KW)", "Electric range (km)"]

# Lấy mức độ quan trọng của các đặc trưng trong Random Forest
importances_rf = model_rf.stages[-1].featureImportances

# Kết hợp tên đặc trưng với mức độ quan trọng
feature_importance_pairs = list(zip(feature_names, importances_rf))

# Sắp xếp các đặc trưng theo mức độ quan trọng giảm dần
sorted_features_rf = sorted(feature_importance_pairs, key=lambda x: x[1], reverse=True)

# Hiển thị các đặc trưng và mức độ quan trọng đã sắp xếp
print("Feature importances in Random Forest:")
for feature, importance in sorted_features_rf:
    print(f"{feature}: {importance}")


Feature importances in Random Forest:
ec (cm3): 0.14164604098088543
m (kg): 0.08495555765798497
Ft_encoded: 0.02741216133899213
Fm_encoded: 0.021924811257044943
ep (KW): 0.00570934113881092
Electric range (km): 0.0004987253843715504


In [44]:
# Lấy mức độ quan trọng của các đặc trưng trong Gradient Boosting
importances_gbt = model_gbt.stages[-1].featureImportances

# Kết hợp tên đặc trưng với mức độ quan trọng
feature_importance_pairs_gbt = list(zip(feature_names, importances_gbt))

# Sắp xếp các đặc trưng theo mức độ quan trọng giảm dần
sorted_features_gbt = sorted(feature_importance_pairs_gbt, key=lambda x: x[1], reverse=True)

# Hiển thị các đặc trưng và mức độ quan trọng đã sắp xếp
print("Feature importances in Gradient Boosting:")
for feature, importance in sorted_features_gbt:
    print(f"{feature}: {importance}")


Feature importances in Gradient Boosting:
m (kg): 0.1545350368508505
Fm_encoded: 0.059860843372079406
Ft_encoded: 0.0071888438628988395
Electric range (km): 0.006980139019134383
ep (KW): 0.0011479806061374246
ec (cm3): 0.0005768130444917685


In [45]:
# Lấy hệ số của mô hình Linear Regression
coefficients_lr = model_lr.stages[-1].coefficients

# Kết hợp tên đặc trưng với hệ số
feature_coef_pairs = list(zip(feature_names, coefficients_lr))

# Sắp xếp các đặc trưng theo hệ số (giá trị tuyệt đối của hệ số)
sorted_features_lr = sorted(feature_coef_pairs, key=lambda x: abs(x[1]), reverse=True)

# Hiển thị các đặc trưng và hệ số đã sắp xếp
print("Feature coefficients in Linear Regression:")
for feature, coef in sorted_features_lr:
    print(f"{feature}: {coef}")


Feature coefficients in Linear Regression:
ec (cm3): -0.49254603652947726
m (kg): 0.463846832505227
Ft_encoded: 0.33863534555981156
ep (KW): 0.19137058907520393
Fm_encoded: -0.15778084968056003
Electric range (km): 0.06521368400994926


In [47]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler

# Tạo đối tượng RegressionEvaluator để đánh giá mô hình
evaluator = RegressionEvaluator(labelCol="Fuel consumption", predictionCol="prediction", metricName="rmse")

# Mô hình Linear Regression
lr = LinearRegression(featuresCol="features", labelCol="Fuel consumption")

# Xây dựng ParamGrid để thử nghiệm các tham số của Linear Regression
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1, 0.01, 0.001, 0.0001])  # Thử nghiệm với các giá trị nhỏ hơn của regParam
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])  # Thử nghiệm Ridge và Lasso (ElasticNet = 0.0 cho Ridge, 1.0 cho Lasso)
             .addGrid(lr.maxIter, [10, 20])  # Tăng số vòng lặp
             .addGrid(lr.tol, [1e-6, 1e-4])  # Thử nghiệm với các giá trị khác nhau của tol
             .build())

# Tạo các bước trong pipeline
indexer_Ft = StringIndexer(inputCol="Ft", outputCol="Ft_index")
indexer_Fm = StringIndexer(inputCol="Fm", outputCol="Fm_index")
encoder_Ft = OneHotEncoder(inputCol="Ft_index", outputCol="Ft_encoded")
encoder_Fm = OneHotEncoder(inputCol="Fm_index", outputCol="Fm_encoded")
assembler = VectorAssembler(
    inputCols=["m (kg)", "Ft_encoded", "Fm_encoded", "ec (cm3)", "ep (KW)", "Electric range (km)"],
    outputCol="assembled_features"
)
scaler = StandardScaler(inputCol="assembled_features", outputCol="features")

# Pipeline cho Linear Regression
pipeline_lr = Pipeline(stages=[indexer_Ft, indexer_Fm, encoder_Ft, encoder_Fm, assembler, scaler, lr])

# Tạo CrossValidator để thực hiện cross-validation với số lượng folds lớn hơn
crossval = CrossValidator(estimator=pipeline_lr, 
                          estimatorParamMaps=paramGrid, 
                          evaluator=evaluator, 
                          numFolds=3)  # Sử dụng 5-fold cross-validation để cải thiện độ chính xác

# Huấn luyện mô hình với cross-validation
model_cv = crossval.fit(train_data)

# Dự đoán với mô hình đã được tinh chỉnh
predictions_cv = model_cv.transform(test_data)

# Đánh giá mô hình tinh chỉnh
rmse_cv = evaluator.evaluate(predictions_cv)
mae_cv = RegressionEvaluator(labelCol="Fuel consumption", predictionCol="prediction", metricName="mae").evaluate(predictions_cv)
r2_cv = RegressionEvaluator(labelCol="Fuel consumption", predictionCol="prediction", metricName="r2").evaluate(predictions_cv)

print(f"RMSE after GridSearch: {rmse_cv}")
print(f"MAE after GridSearch: {mae_cv}")
print(f"R2 after GridSearch: {r2_cv}")

# Kiểm tra tham số tốt nhất
best_model = model_cv.bestModel
lr_model = best_model.stages[-1]  # Lấy mô hình LinearRegression từ pipeline

# Truy xuất tham số tốt nhất
print(f"Best regParam: {lr_model.getRegParam()}")
print(f"Best elasticNetParam: {lr_model.getElasticNetParam()}")
print(f"Best maxIter: {lr_model.getMaxIter()}")
print(f"Best tol: {lr_model.getTol()}")




RMSE after GridSearch: 0.6349936339099485
MAE after GridSearch: 0.45106002488739616
R2 after GridSearch: 0.8737698681333412
Best regParam: 0.0001
Best elasticNetParam: 0.0
Best maxIter: 10
Best tol: 0.0001


                                                                                

In [42]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler

# Tạo đối tượng RegressionEvaluator để đánh giá mô hình
evaluator = RegressionEvaluator(labelCol="Fuel consumption", predictionCol="prediction", metricName="rmse")

# Mô hình Random Forest Regressor
rf = RandomForestRegressor(featuresCol="features", labelCol="Fuel consumption")

# Xây dựng ParamGrid cho Random Forest với số lượng tham số rộng hơn và giới hạn độ sâu cây
paramGrid_rf = (ParamGridBuilder()
                .addGrid(rf.numTrees, [ 50])  # Tăng số lượng cây
                .addGrid(rf.maxDepth, [ 10])  # Tăng độ sâu tối đa của cây, có thể giảm độ sâu để tránh overfitting
                #.addGrid(rf.minInstancesPerNode, [10])  # Thử nghiệm với các giá trị minInstancesPerNode
               #.addGrid(rf.maxBins, [16])  # Thử nghiệm với các giá trị maxBins (giới hạn độ phân giải)
                #.addGrid(rf.subsamplingRate, [0.6])  # Thử nghiệm với tỷ lệ mẫu con
                .build())

# Tạo các bước trong pipeline
indexer_Ft = StringIndexer(inputCol="Ft", outputCol="Ft_index")
indexer_Fm = StringIndexer(inputCol="Fm", outputCol="Fm_index")
encoder_Ft = OneHotEncoder(inputCol="Ft_index", outputCol="Ft_encoded")
encoder_Fm = OneHotEncoder(inputCol="Fm_index", outputCol="Fm_encoded")
assembler = VectorAssembler(
    inputCols=["m (kg)", "Ft_encoded", "Fm_encoded", "ec (cm3)", "ep (KW)", "Electric range (km)"],
    outputCol="assembled_features"
)
scaler = StandardScaler(inputCol="assembled_features", outputCol="features")

# Pipeline cho Random Forest
pipeline_rf = Pipeline(stages=[indexer_Ft, indexer_Fm, encoder_Ft, encoder_Fm, assembler, scaler, rf])

# Tạo CrossValidator với số lần thử nhiều hơn (numFolds=5)
crossval_rf = CrossValidator(estimator=pipeline_rf, 
                             estimatorParamMaps=paramGrid_rf, 
                             evaluator=evaluator, 
                             numFolds=3) 
# Huấn luyện mô hình Random Forest với cross-validation
model_rf = crossval_rf.fit(train_data)

# Dự đoán với mô hình đã được tinh chỉnh
predictions_rf = model_rf.transform(test_data)

# Đánh giá mô hình Random Forest
rmse_rf = evaluator.evaluate(predictions_rf)
mae_rf = RegressionEvaluator(labelCol="Fuel consumption", predictionCol="prediction", metricName="mae").evaluate(predictions_rf)
r2_rf = RegressionEvaluator(labelCol="Fuel consumption", predictionCol="prediction", metricName="r2").evaluate(predictions_rf)

print(f"RMSE after GridSearch for Random Forest: {rmse_rf}")
print(f"MAE after GridSearch for Random Forest: {mae_rf}")
print(f"R2 after GridSearch for Random Forest: {r2_rf}")

# Kiểm tra tham số tốt nhất
best_model_rf = model_rf.bestModel
# Truy xuất tham số từ mô hình RandomForestRegressor
rf_model = best_model_rf.stages[-1]  # Lấy mô hình RandomForestRegressor
print(f"Best numTrees: {rf_model.getNumTrees()}")
print(f"Best maxDepth: {rf_model.getMaxDepth()}")
print(f"Best minInstancesPerNode: {rf_model.getMinInstancesPerNode()}")
# print(f"Best maxBins: {rf_model.getMaxBins()}")
# print(f"Best subsamplingRate: {rf_model.getSubsamplingRate()}")


24/12/03 07:42:30 WARN CacheManager: Asked to cache already cached data.
24/12/03 07:42:30 WARN CacheManager: Asked to cache already cached data.
24/12/03 07:42:49 WARN DAGScheduler: Broadcasting large task binary with size 1278.8 KiB
24/12/03 07:43:17 WARN DAGScheduler: Broadcasting large task binary with size 1872.7 KiB
24/12/03 07:43:48 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
24/12/03 07:47:30 WARN DAGScheduler: Broadcasting large task binary with size 1278.9 KiB
24/12/03 07:47:56 WARN DAGScheduler: Broadcasting large task binary with size 1872.8 KiB
24/12/03 07:48:25 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
24/12/03 07:52:32 WARN DAGScheduler: Broadcasting large task binary with size 1298.0 KiB
24/12/03 07:52:59 WARN DAGScheduler: Broadcasting large task binary with size 1901.4 KiB
24/12/03 07:53:28 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
24/12/03 07:54:34 WARN CacheManager: Asked to cache already ca

RMSE after GridSearch for Random Forest: 0.44341813045828027
MAE after GridSearch for Random Forest: 0.2941694747987062
R2 after GridSearch for Random Forest: 0.9384467219618372


                                                                                

TypeError: 'int' object is not callable

In [45]:
# Lấy mô hình RandomForestModel đã huấn luyện từ pipeline
rf_model = best_model_rf.stages[-1]  # Lấy mô hình RandomForestRegressor

# Truy xuất các tham số từ mô hình đã huấn luyện
print(f"Best numTrees: {rf_model._java_obj.getNumTrees()}")
print(f"Best maxDepth: {rf_model._java_obj.getMaxDepth()}")
print(f"Best minInstancesPerNode: {rf_model._java_obj.getMinInstancesPerNode()}")


Best numTrees: 50
Best maxDepth: 10
Best minInstancesPerNode: 1


In [37]:
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

# Tạo đối tượng RegressionEvaluator để đánh giá mô hình
evaluator = RegressionEvaluator(labelCol="Fuel consumption", predictionCol="prediction", metricName="rmse")

# Mô hình Gradient Boosting Regressor
gbt = GBTRegressor(featuresCol="features", labelCol="Fuel consumption")

# Xây dựng ParamGrid cho GBT với các tham số ít hơn
paramGrid_gbt = (ParamGridBuilder()
                 .addGrid(gbt.maxIter, [10, 20])  # Số vòng lặp (iteration)
                 .addGrid(gbt.maxDepth, [5])  # Độ sâu tối đa
                 .addGrid(gbt.stepSize, [0.1, 0.2])  # Bước đi trong quá trình huấn luyện
                 .build())

# Tạo các bước trong pipeline
indexer_Ft = StringIndexer(inputCol="Ft", outputCol="Ft_index")
indexer_Fm = StringIndexer(inputCol="Fm", outputCol="Fm_index")
encoder_Ft = OneHotEncoder(inputCol="Ft_index", outputCol="Ft_encoded")
encoder_Fm = OneHotEncoder(inputCol="Fm_index", outputCol="Fm_encoded")
assembler = VectorAssembler(
    inputCols=["m (kg)", "Ft_encoded", "Fm_encoded", "ec (cm3)", "ep (KW)", "Electric range (km)"],
    outputCol="assembled_features"
)
scaler = StandardScaler(inputCol="assembled_features", outputCol="features")

# Pipeline cho Gradient Boosting Regressor
pipeline_gbt = Pipeline(stages=[indexer_Ft, indexer_Fm, encoder_Ft, encoder_Fm, assembler, scaler, gbt])

# Tạo CrossValidator với số lần thử ít hơn (numFolds=2)
crossval_gbt = CrossValidator(estimator=pipeline_gbt, 
                              estimatorParamMaps=paramGrid_gbt, 
                              evaluator=evaluator, 
                              numFolds=2)  

# Huấn luyện mô hình Gradient Boosting Regressor với cross-validation
model_gbt = crossval_gbt.fit(train_data)

# Dự đoán với mô hình đã được tinh chỉnh
predictions_gbt = model_gbt.transform(test_data)

# Đánh giá mô hình Gradient Boosting Regressor
rmse_gbt = evaluator.evaluate(predictions_gbt)
mae_gbt = RegressionEvaluator(labelCol="Fuel consumption", predictionCol="prediction", metricName="mae").evaluate(predictions_gbt)
r2_gbt = RegressionEvaluator(labelCol="Fuel consumption", predictionCol="prediction", metricName="r2").evaluate(predictions_gbt)

print(f"RMSE after GridSearch for GBTRegressor: {rmse_gbt}")
print(f"MAE after GridSearch for GBTRegressor: {mae_gbt}")
print(f"R2 after GridSearch for GBTRegressor: {r2_gbt}")

# Kiểm tra tham số tốt nhất
best_model_gbt = model_gbt.bestModel
print(f"Best maxIter: {best_model_gbt.stages[-1].getMaxIter()}")
print(f"Best maxDepth: {best_model_gbt.stages[-1].getMaxDepth()}")
print(f"Best stepSize: {best_model_gbt.stages[-1].getStepSize()}")


24/12/03 12:47:25 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS

RMSE after GridSearch for GBTRegressor: 0.403885364062183
MAE after GridSearch for GBTRegressor: 0.25061303261898626
R2 after GridSearch for GBTRegressor: 0.9489329789538877
Best maxIter: 20
Best maxDepth: 5
Best stepSize: 0.2


                                                                                

In [39]:
# Lưu mô hình Gradient Boosting đã huấn luyện
model_gbt.bestModel.save("/kaggle/working/GBTModel")


                                                                                

In [40]:
import shutil

# Nén thư mục mô hình thành file .zip
shutil.make_archive('/kaggle/working/GBTModel', 'zip', '/kaggle/working/GBTModel')


'/kaggle/working/GBTModel.zip'