##  Linear Regression (Spark)  
**Mục tiêu**  
1. Xây mô hình hồi quy tuyến tính trên dữ liệu chất lượng không khí 2020-2021 (đã EDA & làm sạch).  
2. Đánh giá bằng RMSLE và RMSE (chỉ số thực), chọn mô hình tốt nhất.  
3. Lưu model


In [11]:
from pyspark.sql import SparkSession, functions as F
from pyspark.ml.feature import VectorAssembler, StringIndexer, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
import time
import matplotlib.pyplot as plt
import numpy as np

# Khởi tạo sparksession
spark = SparkSession.builder.appName("LinearRegressionAQI").getOrCreate()

df = spark.read.csv(r"E:\BTLPTDLL-2025\datasets\processed_data_2020_2021.csv", header=True, inferSchema=True)
print("Spark đã được khởi tạo!")
print("Rows:", df.count(), "  Cols:", len(df.columns))


Spark đã được khởi tạo!
Rows: 3941   Cols: 16


In [12]:
# 3. Tiền xử lý dữ liệu
# Loại bỏ các dòng thiếu AQI index
df = df.filter(df["AQI index"].isNotNull())
# Đổi tên/loại cho một số cột (optional)
print("Rows:", df.count(), "  Cols:", len(df.columns))

Rows: 3941   Cols: 16


### Đặc trưng gốc sử dụng

Trước khi tiến hành xây dựng pipeline, chúng ta phân loại các feature theo 3 nhóm cơ bản dựa trên tên cột của dataset:

| Nhóm        | Cột                                                                 |
|-------------|---------------------------------------------------------------------|
| Numeric     | `Wind`, `CO`, `Dew`, `Humidity`, `NO2`, `O3`, `PM10`, `PM2.5`, `Pressure`, `SO2`, `Temperature` |
| Categorical | `Station name`, `Dominent pollutant`, `Status`                      |
| Date/Time   | `Date`                                                              |

- **AQI index** là nhãn mục tiêu (target) cho bài toán hồi quy tuyến tính.
- Các đặc trưng số (numeric) sẽ được sử dụng trực tiếp hoặc chuẩn hóa.
- Các đặc trưng phân loại (categorical) sẽ được mã hóa (encoding) trước khi đưa vào mô hình.
- Có thể trích xuất thêm đặc trưng thời gian từ cột `Date` nếu cần.


### Baseline Linear Regression

Pipeline baseline gồm:
1. **StringIndexer** cho các cột phân loại (`Station name`, `Dominent pollutant`, `Status`).
2. **VectorAssembler** gom các feature số và đã mã hóa thành vector đặc trưng.
3. **LinearRegression** dự đoán trên target `AQI index`.

Feature set sử dụng toàn bộ các đặc trưng gốc đã liệt kê ở trên (bao gồm cả numeric và categorical).


In [16]:
# Chuyển đổi nhãn air_quality thành chỉ số số (label)
indexer = StringIndexer(inputCols=["Station name", "Dominent pollutant", "Status"],
                        outputCols=["StationIndex", "DominentPollutantIndex", "StatusIndex"],
                        handleInvalid="keep")

feature_cols = ['Wind',  'CO', 'Dew', 'Humidity', 'NO2', 'O3', 'PM10', '`PM2.5`', 'Pressure',
                 'SO2', 'Temperature', 'StationIndex', 'DominentPollutantIndex', 'StatusIndex']
target_col = "AQI index"
assembler = VectorAssembler(inputCols=feature_cols,
                            outputCol='features_raw')

scaler = StandardScaler(inputCol="features_raw",
                        outputCol="features",
                        withMean=True,
                        withStd=True)

linear_reg = LinearRegression(featuresCol="features",
                              labelCol=target_col,
                              maxIter=1000,
                              regParam=0.01,
                              elasticNetParam=0.0)
# Thêm cột id để chia K-fold
data_with_id = df.withColumn("id", F.monotonically_increasing_id())

# chia dữ liệu thành K phần
k = 5
folds = data_with_id.randomSplit([1.0 / k] * k, seed=42)

print(f"\n=== Chia dữ liệu thành {k} folds ===")
for i, fold in enumerate(folds):
    print(f"Fold {i+1}: {fold. count()} samples")


pipline = Pipeline(stages=[indexer ,assembler, scaler, linear_reg])
#  Khởi tạo các evaluator
rmse_evaluator = RegressionEvaluator(
    labelCol=target_col, 
    predictionCol="prediction", 
    metricName="rmse"
)

mae_evaluator = RegressionEvaluator(
    labelCol=target_col, 
    predictionCol="prediction", 
    metricName="mae"
)

r2_evaluator = RegressionEvaluator(
    labelCol=target_col, 
    predictionCol="prediction", 
    metricName="r2"
)

mse_evaluator = RegressionEvaluator(
    labelCol=target_col, 
    predictionCol="prediction", 
    metricName="mse"
)

print("Khởi tạo mô hình và evaluators thành công!")



=== Chia dữ liệu thành 5 folds ===
Fold 1: 811 samples
Fold 2: 785 samples
Fold 3: 824 samples
Fold 4: 791 samples
Fold 5: 730 samples
Khởi tạo mô hình và evaluators thành công!


In [17]:
# lưu trữ kết quả
results = dict()
traing_times = []

# Huan luyện và đánh giá mô hình trên từng fold
for i in range(k):
    print(f"\n=== Fold {i+1} ===")
    # Tạo tập huấn luyện và tập kiểm tra
    test_fold = folds[i]
    train_fold = data_with_id.subtract(test_fold)
    
    # Huấn luyện mô hình
    start_time = time.time()
    model = pipline.fit(train_fold)
    end_time = time.time()
    
    training_time = end_time - start_time
    traing_times.append(training_time)
    print(f"Thời gian huấn luyện: {training_time:.2f} giây")
    
    # Dự đoán trên tập kiểm tra
    predictions = model.transform(test_fold)
    
    # Đánh giá mô hình
    rmse = rmse_evaluator.evaluate(predictions)
    mae = mae_evaluator.evaluate(predictions)
    r2 = r2_evaluator.evaluate(predictions)
    mse = mse_evaluator.evaluate(predictions)
    
 # In kết quả từng fold
    print(f"\nKẾT QUẢ: ")
    print(f"-" * 40)
    print(f"RMSE (Root Mean Squared Error) :  {rmse:.5f}")
    print(f"MAE (Mean Absolute Error)      : {mae:.5f}")
    print(f"R² (R-squared)                 : {r2:.5f}")
    print(f"MSE (Mean Squared Error)       : {mse:.5f}")
    print(f"Training Time                  : {training_time:.3f} giây")
    print(f"-" * 40)
    
        # Hiển thị một số dự đoán mẫu
    print(f"\nMẪU DỰ ĐOÁN (5 dòng đầu):")
    predictions. select(target_col, "prediction").show(5)
    # Lưu kết quả
    results["rmse"] = rmse
    results["mae"] = mae
    results["r2"] = r2
    results["mse"] = mse


=== Fold 1 ===
Thời gian huấn luyện: 1.82 giây

KẾT QUẢ: 
----------------------------------------
RMSE (Root Mean Squared Error) :  28.90281
MAE (Mean Absolute Error)      : 23.49708
R² (R-squared)                 : 0.65439
MSE (Mean Squared Error)       : 835.37267
Training Time                  : 1.818 giây
----------------------------------------

MẪU DỰ ĐOÁN (5 dòng đầu):
+---------+------------------+
|AQI index|        prediction|
+---------+------------------+
|    163.0|129.20591017589993|
|     76.0| 49.81089440570686|
|    132.0|116.90402755278325|
|    147.0|120.66731848402942|
|    110.0|103.55813819847285|
+---------+------------------+
only showing top 5 rows

=== Fold 2 ===
Thời gian huấn luyện: 1.71 giây

KẾT QUẢ: 
----------------------------------------
RMSE (Root Mean Squared Error) :  27.88075
MAE (Mean Absolute Error)      : 23.33700
R² (R-squared)                 : 0.68684
MSE (Mean Squared Error)       : 777.33618
Training Time                  : 1.714 giây
---