# Import libraries

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
from pyspark.ml.regression import LinearRegression, RandomForestRegressor, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
import numpy as np

# Load and check data

In [None]:
# Khởi tạo Spark Session
spark = SparkSession.builder \
    .appName("HPG_Stock_Prediction_PySpark") \
    .getOrCreate()

In [3]:
# Load Data
df = spark.read.csv('HPG_stock_price.csv', header=True, inferSchema=True)

In [None]:
# Hiển thị 5 dòng đầu tiên của DataFrame
df.show(5)

+----------+----+----+----+-----+-------+
|      time|open|high| low|close| volume|
+----------+----+----+----+-----+-------+
|2007-11-15|2.05|2.05|1.72|  2.0|1306330|
|2007-11-16|1.91|1.91|1.91| 1.91| 248510|
|2007-11-19|1.81|1.81|1.81| 1.81| 120480|
|2007-11-20|1.73|1.73|1.73| 1.73|  58710|
|2007-11-21|1.65| 1.8|1.65|  1.8| 728080|
+----------+----+----+----+-----+-------+
only showing top 5 rows



In [None]:
# Hiển thị schema của DataFrame
df.printSchema()

root
 |-- time: date (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- volume: integer (nullable = true)



# Clean Data

In [None]:
# Hiển thị các dòng có volume bằng 0
df.filter(F.col('volume') == 0).show(5)

+----------+-----+-----+-----+-----+------+
|      time| open| high|  low|close|volume|
+----------+-----+-----+-----+-----+------+
|2018-01-23|12.05|12.05|12.05|12.05|     0|
|2018-01-24|12.05|12.05|12.05|12.05|     0|
+----------+-----+-----+-----+-----+------+



In [7]:
# Hàm ffill giá trị trong cột target_col dựa trên thứ tự của order_col
def ffill_value(df, order_col, target_col):
    df = df.withColumn(target_col, F.when(F.col(target_col) == 0, None).otherwise(F.col(target_col)))

    window_spec = Window.orderBy(order_col)\
                        .rowsBetween(Window.unboundedPreceding, Window.currentRow)

    # 4. Thực hiện ffill bằng hàm last() với ignorenulls=True
    df = df.withColumn(
        target_col, 
        F.last(target_col, ignorenulls=True).over(window_spec)
    )
    return df

In [8]:
df = ffill_value(df, 'time', 'volume')

In [9]:
df.filter(F.col('time') > '2018-01-21').show(5)

+----------+-----+-----+-----+-----+--------+
|      time| open| high|  low|close|  volume|
+----------+-----+-----+-----+-----+--------+
|2018-01-22|11.27|12.09|11.27|12.05| 6662390|
|2018-01-23|12.05|12.05|12.05|12.05| 6662390|
|2018-01-24|12.05|12.05|12.05|12.05| 6662390|
|2018-01-25| 12.3|12.32|11.56|11.65|13773130|
|2018-01-26|11.75|12.22|11.56|12.11| 4998360|
+----------+-----+-----+-----+-----+--------+
only showing top 5 rows



In [10]:
stats = df.summary()
# Làm tròn tất cả các cột trừ cột 'summary'
stats.select("summary", *[F.round(F.col(c), 2).alias(c) for c in stats.columns if c != "summary"]).show()

+-------+------+------+------+------+------------+
|summary|  open|  high|   low| close|      volume|
+-------+------+------+------+------+------------+
|  count|4499.0|4499.0|4499.0|4499.0|      4499.0|
|   mean|  8.95|  9.07|  8.84|  8.95|  9383893.81|
| stddev|  8.91|  9.02|   8.8|  8.89|1.48433256E7|
|    min|  0.57|  0.57|  0.57|  0.57|        50.0|
|    25%|  1.51|  1.52|  1.49|  1.51|    484850.0|
|    50%|  4.97|  5.02|  4.92|  4.97|   2801350.0|
|    75%| 15.95| 16.19| 15.68| 15.95| 1.4222396E7|
|    max| 33.25| 33.48| 32.97| 33.25|  2.159991E8|
+-------+------+------+------+------+------------+



In [None]:
# 1. Định nghĩa Window
window_spec = Window.orderBy("time")

# 2. Tính Log Price (lp)
df = df.withColumn("lp", F.log(F.col("close")))

# 3. Tính Daily Log Return (ret_1d)
# F.lag(col, 1) lấy giá trị của dòng ngay trước đó
df = df.withColumn("ret_1d", F.col("lp") - F.lag(F.col("lp"), 1).over(window_spec))


In [12]:
df.show(5)

+----------+----+----+----+-----+-------+------------------+--------------------+
|      time|open|high| low|close| volume|                lp|              ret_1d|
+----------+----+----+----+-----+-------+------------------+--------------------+
|2007-11-15|2.05|2.05|1.72|  2.0|1306330|0.6931471805599453|                NULL|
|2007-11-16|1.91|1.91|1.91| 1.91| 248510|0.6471032420585384|-0.04604393850140687|
|2007-11-19|1.81|1.81|1.81| 1.81| 120480|0.5933268452777344|-0.05377639678080404|
|2007-11-20|1.73|1.73|1.73| 1.73|  58710|0.5481214085096876|-0.04520543676804678|
|2007-11-21|1.65| 1.8|1.65|  1.8| 728080|0.5877866649021191| 0.03966525639243146|
+----------+----+----+----+-----+-------+------------------+--------------------+
only showing top 5 rows



# Data preprocessing

In [None]:
def create_features(df):
    # 1. Định nghĩa các Window
    # Window cho Moving Average, Volatility (từ n-1 dòng trước đến dòng hiện tại)
    def w_spec(n):
        return Window.orderBy("time").rowsBetween(-(n-1), 0)
    
    # Window cho các hàm diff, lag (chỉ cần thứ tự)
    w_order = Window.orderBy("time")

    # 2. Price Movements & Momentum
    df = df.withColumn("Momentum_5", F.col("close") - F.lag("close", 5).over(w_order))
    df = df.withColumn("Momentum_10", F.col("close") - F.lag("close", 10).over(w_order))

    # 3. Moving Averages
    df = df.withColumn("MA_5", F.avg("close").over(w_spec(5)))
    df = df.withColumn("MA_10", F.avg("close").over(w_spec(10)))
    df = df.withColumn("MA_20", F.avg("close").over(w_spec(20)))

    # 4. Volatility
    df = df.withColumn("Volatility_5", F.stddev("close").over(w_spec(5)))
    df = df.withColumn("Volatility_10", F.stddev("close").over(w_spec(10)))

    # 5. RSI
    diff = F.col("close") - F.lag("close", 1).over(w_order)
    gain = F.when(diff > 0, diff).otherwise(0)
    loss = F.when(diff < 0, -diff).otherwise(0)
    
    avg_gain = F.avg(gain).over(w_spec(14))
    avg_loss = F.avg(loss).over(w_spec(14))
    rs = avg_gain / avg_loss
    df = df.withColumn("RSI", 100 - (100 / (1 + rs)))

    # 6. MACD
    ema12 = F.avg("close").over(w_spec(12))
    ema26 = F.avg("close").over(w_spec(26))
    df = df.withColumn("MACD", ema12 - ema26)

    return df.dropna()

In [None]:
def create_lagged_dataset(df, target='ret_1d'):
    lags = [1, 2, 3, 5, 10, 20]
    features = ['volume', 'MA_5', 'MA_10', 'MA_20', 
                'Momentum_5', 'Momentum_10', 'Volatility_5', 'Volatility_10', 'RSI', 'MACD']
    
    # 1. Kiểm tra các cột thực sự tồn tại trong df
    valid_features = [f for f in features if f in df.columns]
    
    # 2. Định nghĩa Window để lấy dữ liệu quá khứ
    window_spec = Window.orderBy("time")
    
    # 3. Tạo các cột lagged cho từng đặc trưng
    lagged_column_names = []
    for lag in lags:
        for feat in valid_features:
            col_name = f"{feat}_lag_{lag}"
            df = df.withColumn(col_name, F.lag(feat, lag).over(window_spec))
            lagged_column_names.append(col_name)
    
    # 4. Lấy giá trị giá đóng cửa ngày hôm trước (last_prices)
    df = df.withColumn("last_price", F.lag("close", 1).over(window_spec))
    
    # 5. Loại bỏ các dòng có giá trị Null (do phép lag tạo ra ở đầu tập dữ liệu)
    df = df.dropna()
    
    # 6. Gộp các cột lagged thành một Vector duy nhất
    assembler = VectorAssembler(inputCols=lagged_column_names, outputCol="features_vector")
    df_final = assembler.transform(df)
    
    # Tại thời điểm này, df_final đã có các cột: 
    # ['time', 'features_vector' (chứa X), target (y), 'last_price']
    
    return df_final, lagged_column_names

In [None]:
def train_test_split(df, test_size=0.1):
    # 1. Tính toán điểm chia dựa trên tổng số dòng
    total_rows = df.count()
    split_idx = int(total_rows * (1 - test_size))
    
    # 2. Thêm cột số thứ tự (row index) sau khi đã sắp xếp theo thời gian
    df_with_idx = df.withColumn("row_idx", F.row_number().over(Window.orderBy("time")))
    
    # 3. Phân tách dựa trên row_idx
    train_df = df_with_idx.filter(F.col("row_idx") <= split_idx).drop("row_idx")
    test_df = df_with_idx.filter(F.col("row_idx") > split_idx).drop("row_idx")
    
    return train_df, test_df

In [16]:
df_features = create_features(df)
df_features.show(5)

+----------+----+----+----+-----+------+------------------+--------------------+--------------------+--------------------+------------------+------------------+------------------+--------------------+--------------------+------------------+--------------------+
|      time|open|high| low|close|volume|                lp|              ret_1d|          Momentum_5|         Momentum_10|              MA_5|             MA_10|             MA_20|        Volatility_5|       Volatility_10|               RSI|                MACD|
+----------+----+----+----+-----+------+------------------+--------------------+--------------------+--------------------+------------------+------------------+------------------+--------------------+--------------------+------------------+--------------------+
|2007-11-29|1.69| 1.7|1.67| 1.69|128110|0.5247285289349821|                 0.0| -0.1100000000000001|-0.31000000000000005|1.6959999999999997|1.7530000000000001|1.7754545454545458|0.031304951684997126| 0.07674633541

In [17]:
df_model, feature_cols = create_lagged_dataset(df_features)

In [18]:
# Hiển thị cấu trúc dữ liệu chuẩn bị cho ML
df_model.select("time", "features_vector", "ret_1d", "last_price").show(5, truncate=False)

+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [19]:
# Thực thi phân tách
train_data, test_data = train_test_split(df_model, test_size=0.1)

print(f"Train count: {train_data.count()}")
print(f"Test count: {test_data.count()}")

Train count: 4005
Test count: 446


In [None]:
def calculate_metrics(predictions_df, label_col='ret_1d', prediction_col='prediction'):
    """
    predictions_df: DataFrame chứa cột label thực tế và cột dự đoán từ mô hình
    """
    # 1. Khởi tạo Evaluator
    evaluator = RegressionEvaluator(labelCol=label_col, predictionCol=prediction_col)

    # 2. Tính toán từng chỉ số
    mae = evaluator.evaluate(predictions_df, {evaluator.metricName: "mae"})
    rmse = evaluator.evaluate(predictions_df, {evaluator.metricName: "rmse"})
    r2 = evaluator.evaluate(predictions_df, {evaluator.metricName: "r2"})

    return mae, rmse, r2

def evaluate_all_scales(predictions_df, label_col='ret_1d', pred_col='prediction', last_price_col='last_price'):
    """
    predictions_df: DataFrame đã có các cột label, prediction và last_price
    """
    
    # 1. Tính toán các chỉ số trên Log Return
    evaluator = RegressionEvaluator(labelCol=label_col, predictionCol=pred_col)
    r2_ret = evaluator.evaluate(predictions_df, {evaluator.metricName: "r2"})
    
    # 2. Chuyển đổi sang Giá thực tế (Price) trực tiếp trên DataFrame
    df_with_prices = predictions_df.withColumn(
        "y_actual_p", F.col(last_price_col) * F.exp(F.col(label_col))
    ).withColumn(
        "y_pred_p", F.col(last_price_col) * F.exp(F.col(pred_col))
    )
    
    # 3. Tính toán các chỉ số trên thang đo Giá (Price)
    price_evaluator = RegressionEvaluator(labelCol="y_actual_p", predictionCol="y_pred_p")
    
    mae_p = price_evaluator.evaluate(df_with_prices, {price_evaluator.metricName: "mae"})
    rmse_p = price_evaluator.evaluate(df_with_prices, {price_evaluator.metricName: "rmse"})
    r2_p = price_evaluator.evaluate(df_with_prices, {price_evaluator.metricName: "r2"})
    
    # 4. Trả về kết quả
    return {
        'Return_R2': r2_ret,
        'Price_R2': r2_p,
        'Price_MAE': mae_p,
        'Price_RMSE': rmse_p,
        'df_with_prices': df_with_prices
    }

In [None]:
def cross_validate_time_series(model_class, df_train, n_splits=5):
    total_rows = df_train.count()
    fold_size = total_rows // (n_splits + 1)
    
    # 1. Đánh số thứ tự để chia Fold
    window_spec = Window.orderBy("time")
    df_with_idx = df_train.withColumn("row_idx", F.row_number().over(window_spec))
    
    fold_metrics = []

    for fold in range(n_splits):
        # Xác định index cho Train và Validation của Fold hiện tại
        train_end_idx = fold_size * (fold + 1)
        val_end_idx = fold_size * (fold + 2)
        
        train_fold = df_with_idx.filter(F.col("row_idx") <= train_end_idx)
        val_fold = df_with_idx.filter((F.col("row_idx") > train_end_idx) & (F.col("row_idx") <= val_end_idx))
        
        # 2. Scaling (MinMaxScaler trong Spark ML)
        scaler = MinMaxScaler(inputCol="features_vector", outputCol="scaled_features")
        scaler_model = scaler.fit(train_fold)
        
        train_scaled = scaler_model.transform(train_fold)
        val_scaled = scaler_model.transform(val_fold)
        
        # 3. Fit & Predict
        model = model_class(featuresCol="scaled_features", labelCol="ret_1d")
        fitted_model = model.fit(train_scaled)
        predictions = fitted_model.transform(val_scaled)
        
        # 4. Đánh giá đa thang đo 
        # Chúng ta giả định hàm trả về các metrics
        res = evaluate_all_scales(predictions)
        
        # Chuyển đổi đơn vị (giả sử giá gốc tính theo đơn vị nghìn đồng)
        price_mae = res['Price_MAE'] * 1000
        price_rmse = res['Price_RMSE'] * 1000
        
        fold_metrics.append({
            'Fold': f'Fold {fold+1}',
            'Price_MAE': price_mae,
            'Price_RMSE': price_rmse,
            'Price_R2': res['Price_R2'],
            'Return_R2': res['Return_R2']
        })
        
        print(f"Finished Fold {fold+1}: Price MAE = {price_mae:.4f}, Price RMSE = {price_rmse:.4f}, Price R2 = {res['Price_R2']:.4f}")

    return fold_metrics

In [None]:
# Define Models
models = {
    'Linear Regression': LinearRegression(),
    'Random Forest': RandomForestRegressor(),
    'Gradient Boosting': GBTRegressor()
}
results_summary = {}
for model_name, model in models.items():
    print(f"\n---  {model_name} ---")
    metrics_ = cross_validate_time_series(model.__class__, train_data, n_splits=5)
    results_summary[model_name] = metrics_


--- Cross-Validation for Linear Regression ---
Finished Fold 1: Price MAE = 20.8326, Price RMSE = 27.3924, Price R2 = 0.9871
Finished Fold 2: Price MAE = 62.6716, Price RMSE = 144.3306, Price R2 = 0.9498
Finished Fold 3: Price MAE = 140.3482, Price RMSE = 221.5027, Price R2 = 0.9937
Finished Fold 4: Price MAE = 204.2486, Price RMSE = 329.1927, Price R2 = 0.9964
Finished Fold 5: Price MAE = 398.5656, Price RMSE = 534.5650, Price R2 = 0.9907

--- Cross-Validation for Random Forest ---
Finished Fold 1: Price MAE = 19.3422, Price RMSE = 26.0903, Price R2 = 0.9883
Finished Fold 2: Price MAE = 42.8671, Price RMSE = 60.1530, Price R2 = 0.9913
Finished Fold 3: Price MAE = 112.9724, Price RMSE = 175.3557, Price R2 = 0.9960
Finished Fold 4: Price MAE = 182.1383, Price RMSE = 293.7407, Price R2 = 0.9971
Finished Fold 5: Price MAE = 358.2115, Price RMSE = 480.4925, Price R2 = 0.9925

--- Cross-Validation for Gradient Boosting ---
Finished Fold 1: Price MAE = 22.8878, Price RMSE = 31.2729, Price R

In [None]:
def train_and_evaluate_final(train_df, test_df, model_class):
    # 1. Scaling dữ liệu
    scaler = MinMaxScaler(inputCol="features_vector", outputCol="scaled_features")
    scaler_model = scaler.fit(train_df)
    
    train_scaled = scaler_model.transform(train_df)
    test_scaled = scaler_model.transform(test_df)
    
    # 2. Huấn luyện mô hình trên toàn bộ tập Train
    rf = model_class(featuresCol="scaled_features", labelCol="ret_1d")
    final_model = rf.fit(train_scaled)
    
    # 3. Dự báo trên tập Test
    predictions = final_model.transform(test_scaled)
    
    # 4. Đánh giá kết quả bằng hàm đa thang đo đã viết ở trên
    results = evaluate_all_scales(predictions)
    
    # In kết quả
    print("-" * 30)
    print("KẾT QUẢ ĐÁNH GIÁ TRÊN TẬP TEST:")
    print(f"Price MAE:  {results['Price_MAE'] * 1000:,.2f} VNĐ")
    print(f"Price RMSE: {results['Price_RMSE'] * 1000:,.2f} VNĐ")
    print(f"Price R2:   {results['Price_R2']:.4f}")
    print("-" * 30)
    
    return final_model, results

In [27]:
# Define Models
models = {
    'Linear Regression': LinearRegression(),
    'Random Forest': RandomForestRegressor(),
    'Gradient Boosting': GBTRegressor()
}
results_summary = {}
model_ = {}
for model_name, model in models.items():
    print(f"\n---  {model_name} ---")
    model_[model_name], metrics_ = train_and_evaluate_final(train_data, test_data, model.__class__)
    results_summary[model_name] = metrics_


---  Linear Regression ---
------------------------------
KẾT QUẢ ĐÁNH GIÁ TRÊN TẬP TEST:
Price MAE:  293.80 VNĐ
Price RMSE: 434.09 VNĐ
Price R2:   0.9662
------------------------------

---  Random Forest ---
------------------------------
KẾT QUẢ ĐÁNH GIÁ TRÊN TẬP TEST:
Price MAE:  275.72 VNĐ
Price RMSE: 399.04 VNĐ
Price R2:   0.9714
------------------------------

---  Gradient Boosting ---
------------------------------
KẾT QUẢ ĐÁNH GIÁ TRÊN TẬP TEST:
Price MAE:  323.12 VNĐ
Price RMSE: 460.09 VNĐ
Price R2:   0.9620
------------------------------


In [None]:
def train_full_and_predict_next(df_final, model_class):
    # 1. Huấn luyện trên TOÀN BỘ dữ liệu
    scaler = MinMaxScaler(inputCol="features_vector", outputCol="scaled_features")
    scaler_model = scaler.fit(df_final)
    full_data_scaled = scaler_model.transform(df_final)
    
    # Fit mô hình
    model_instance = model_class(featuresCol="scaled_features", labelCol="ret_1d")
    final_model = model_instance.fit(full_data_scaled)
    
    # 2. Chuẩn bị dữ liệu để dự báo ngày mai
    # Lấy dòng cuối cùng của tập dữ liệu (chứa đặc trưng của ngày gần nhất)
    last_row = df_final.orderBy(F.col("time").desc()).limit(1)
    
    # Lấy giá đóng cửa gần nhất (Last Price) để tính giá dự báo
    last_price = last_row.select("close").collect()[0][0]
    last_date = last_row.select("time").collect()[0][0]
    
    # 3. Thực hiện dự báo Log Return cho ngày kế tiếp
    last_row_scaled = scaler_model.transform(last_row)
    
    # Predict
    prediction_df = final_model.transform(last_row_scaled)
    pred_log_ret = prediction_df.select("prediction").collect()[0][0]
    
    # 4. Chuyển đổi Log Return dự báo sang Giá dự báo (VNĐ)
    predicted_price = last_price * np.exp(pred_log_ret)
    
    print("=" * 50)
    print(f"DỰ BÁO NGÀY KẾ TIẾP")
    print(f"Ngày dữ liệu gần nhất: {last_date}")
    print(f"Giá đóng cửa gần nhất: {last_price:,.2f} VNĐ")
    print(f"GIÁ DỰ BÁO NGÀY MAI:   {predicted_price * 1000:,.2f} VNĐ")
    print("=" * 50)
    
    return final_model, predicted_price

# THỰC THI
for model_name, model in models.items():
    print(f"\n---  {model_name} ---")
    model, next_price = train_full_and_predict_next(df_model, model.__class__)


---  Linear Regression ---


DỰ BÁO NGÀY KẾ TIẾP
Ngày dữ liệu gần nhất: 2025-12-01
Giá đóng cửa gần nhất: 26.50 VNĐ
GIÁ DỰ BÁO NGÀY MAI:   26,433.32 VNĐ

---  Random Forest ---
DỰ BÁO NGÀY KẾ TIẾP
Ngày dữ liệu gần nhất: 2025-12-01
Giá đóng cửa gần nhất: 26.50 VNĐ
GIÁ DỰ BÁO NGÀY MAI:   26,575.45 VNĐ

---  Gradient Boosting ---
DỰ BÁO NGÀY KẾ TIẾP
Ngày dữ liệu gần nhất: 2025-12-01
Giá đóng cửa gần nhất: 26.50 VNĐ
GIÁ DỰ BÁO NGÀY MAI:   26,527.45 VNĐ
