In [None]:
# Re-import necessary modules after kernel reset
import pandas as pd
import datetime
import xgboost as xgb
import matplotlib.pyplot as plt
import psycopg2
import logging
from sklearn.metrics import mean_squared_error, r2_score

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, window, first, lag, lead
from pyspark.sql.window import Window
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.mllib.tree import RandomForest
from pyspark.mllib.evaluation import RegressionMetrics

# Re-define the core functions due to kernel reset
def connect_to_postgres():
    try:
        conn = psycopg2.connect(
            dbname="airflow",
            user="airflow",
            password="airflow",
            host="localhost",
            port="5432"
        )
        return conn
    except Exception as e:
        logging.error(f"❌ Failed to connect to PostgreSQL: {e}")
        return None

def load_data_from_postgres(symbol):
    conn = connect_to_postgres()
    if conn:
        query = f"SELECT * FROM crypto_data WHERE symbol = '{symbol}';"
        df = pd.read_sql(query, conn)
        conn.close()
        return df
    return None

def prepare_timeseries_data(spark_df: DataFrame,
                             start: datetime.datetime = datetime.datetime(2023, 1, 1, 0, 0, 0),
                             end: datetime.datetime = datetime.datetime(2025, 10, 1, 0, 0, 0),
                             window_duration: str = "3 hours",
                             lag_days: int = 14) -> DataFrame:   # ✅ Đổi tên biến tại đây

    # Bước 1: Lọc theo khoảng thời gian
    spark_df = spark_df.filter((col("time") >= start) & (col("time") <= end))
    
    # Bước 2: Gom nhóm theo cửa sổ thời gian
    spark_df = spark_df.groupBy(
        window(col("time"), window_duration)
    ).agg(
        *[first(col_name).alias(col_name) for col_name in spark_df.columns if col_name != "time"]
    )

    # Bước 3: Thêm cột thời gian đại diện nhóm
    spark_df = spark_df.withColumn("time_group", col("window.start")).drop("window")

    # Bước 4: Tạo đặc trưng lịch sử
    window_spec = Window.orderBy("time_group")
    for i in range(1, lag_days + 1):    # ✅ Dùng lại biến mới tại đây
        for col_name in ["open", "high", "low", "close", "volume"]:
            spark_df = spark_df.withColumn(f"{col_name}_b_{i}", lag(col_name, i).over(window_spec))

    # Bước 5: Loại bỏ các hàng thiếu dữ liệu lịch sử
    spark_df = spark_df.dropna()

    # Bước 6: Tạo nhãn NEXT_CLOSE
    spark_df = spark_df.withColumn("NEXT_CLOSE", lead("close", 1).over(window_spec))

    # Bước 7: Loại bỏ hàng cuối cùng không có nhãn
    spark_df = spark_df.dropna()

    # Bước 8: Loại bỏ các cột không cần thiết
    spark_df = spark_df.drop("symbol", "time", "time_group")

    return spark_df

def split_data(spark_df, prediction_days=750):
    total_rows = spark_df.count()
    train_rows = total_rows - prediction_days
    train_df = spark_df.limit(train_rows)
    test_df = spark_df.subtract(train_df)
    return train_df, test_df

def transform_to_labeledpoint(df: DataFrame):
    return df.rdd.map(lambda row: LabeledPoint(row[-1], Vectors.dense(row[0:-1])))

def train_random_forest_regressor(train_rdd, num_trees=1000, max_depth=7, impurity="variance", max_bins=300, seed=13579):
    model = RandomForest.trainRegressor(
        data=train_rdd,
        categoricalFeaturesInfo={},
        numTrees=num_trees,
        featureSubsetStrategy="auto",
        impurity=impurity,
        maxDepth=max_depth,
        maxBins=max_bins,
        seed=seed
    )
    return model

def train_linear_regression(train_df, test_df):
    feature_cols = train_df.columns[:-1]
    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
    train_data = assembler.transform(train_df).select("features", "NEXT_CLOSE")
    test_data = assembler.transform(test_df).select("features", "NEXT_CLOSE")
    lr = LinearRegression(featuresCol="features", labelCol="NEXT_CLOSE")
    model = lr.fit(train_data)
    predictions = model.transform(test_data)
    evaluator = RegressionEvaluator(labelCol="NEXT_CLOSE", predictionCol="prediction", metricName="rmse")
    rmse = evaluator.evaluate(predictions)
    r2 = RegressionEvaluator(labelCol="NEXT_CLOSE", predictionCol="prediction", metricName="r2").evaluate(predictions)
    return rmse, r2

def train_xgboost(train_df, test_df):
    train_pd = train_df.toPandas()
    test_pd = test_df.toPandas()
    X_train = train_pd.drop(columns=["NEXT_CLOSE"])
    y_train = train_pd["NEXT_CLOSE"]
    X_test = test_pd.drop(columns=["NEXT_CLOSE"])
    y_test = test_pd["NEXT_CLOSE"]
    model = xgb.XGBRegressor(n_estimators=200, max_depth=6, learning_rate=0.1, objective="reg:squarederror")
    model.fit(X_train, y_train)
    preds = model.predict(X_test)
    rmse = mean_squared_error(y_test, preds) ** 0.5  # manually take square root
    r2 = r2_score(y_test, preds)
    return rmse, r2

def evaluate_random_forest_model(model, test_rdd):
    preds = model.predict(test_rdd.map(lambda x: x.features))
    pred_label_rdd = preds.zip(test_rdd.map(lambda x: x.label))
    metrics = RegressionMetrics(pred_label_rdd)
    return metrics.rootMeanSquaredError, metrics.r2

# Proceed to execute the training for 10 crypto symbols
spark = SparkSession.builder.appName("CryptoPredict").getOrCreate()

symbols = ["BNBUSDT", "BTCUSDT", "ETHUSDT", "XRPUSDT", "SOLUSDT",
           "LTCUSDT", "ETCUSDT", "PEPEUSDT", "DOGEUSDT", "ADAUSDT"]

def train_all_models_for_one_symbol(symbol: str):
    print(f"\n🚀 Training models for: {symbol}")
    df = load_data_from_postgres(symbol)
    if df is None or df.empty:
        print(f"⚠️ Skipping {symbol} due to empty data.")
        return None

    # 💡 Auto-scale small value coins
    price_median = df["close"].median()
    scale_factor = 1.0
    if price_median < 0.01:  # tuỳ chỉnh ngưỡng này
        scale_factor = 1e6
        print(f"🔧 Scaling {symbol} values by {scale_factor} due to small price.")

        for col_name in ["open", "high", "low", "close"]:
            df[col_name] = df[col_name] * scale_factor
        df["volume"] = df["volume"]  # không scale volume
    else:
        print(f"✅ No scaling needed for {symbol} (median close = {price_median})")

    spark_df = spark.createDataFrame(df)
    processed_df = prepare_timeseries_data(spark_df)
    train_df, test_df = split_data(processed_df)

    results = {}

    try:
        rmse, r2 = train_linear_regression(train_df, test_df)
        results["linear_regression"] = {"rmse": rmse, "r2": r2}
    except Exception as e:
        print(f"❌ Linear Regression failed for {symbol}: {e}")
        results["linear_regression"] = {"rmse": None, "r2": None}

    try:
        rmse, r2 = train_xgboost(train_df, test_df)
        results["xgboost"] = {"rmse": rmse, "r2": r2}
        if r2 < 0:
            print(f"⚠️ XGBoost R² < 0 for {symbol} — model might be worse than baseline.")
    except Exception as e:
        print(f"❌ XGBoost failed for {symbol}: {e}")
        results["xgboost"] = {"rmse": None, "r2": None}

    try:
        train_rdd = transform_to_labeledpoint(train_df)
        test_rdd = transform_to_labeledpoint(test_df)
        model = train_random_forest_regressor(train_rdd)
        rmse, r2 = evaluate_random_forest_model(model, test_rdd)
        results["random_forest"] = {"rmse": rmse, "r2": r2}
        if r2 < 0:
            print(f"⚠️ Random Forest R² < 0 for {symbol}")
    except Exception as e:
        print(f"❌ Random Forest failed for {symbol}: {e}")
        results["random_forest"] = {"rmse": None, "r2": None}

    return {"symbol": symbol, "results": results}


# Run for all symbols and collect results
all_results = []
for sym in symbols:
    result = train_all_models_for_one_symbol(sym)
    if result:
        all_results.append(result)
# Format results into a DataFrame

records = []
for result in all_results:
    symbol = result["symbol"]
    for model, metrics in result["results"].items():
        records.append({
            "symbol": symbol,
            "model": model,
            "rmse": metrics["rmse"],
            "r2": metrics["r2"]
        })
        print(f"Model: {model}, Symbol: {symbol}, RMSE: {metrics['rmse']}, R2: {metrics['r2']}")

pd.DataFrame(records)
# Format results into a DataFrame
records = []
for result in all_results:
    symbol = result["symbol"]
    for model, metrics in result["results"].items():
        records.append({
            "symbol": symbol,
            "model": model,
            "rmse": metrics["rmse"],
            "r2": metrics["r2"]
        })

pd.DataFrame(records)



🚀 Training models for: BNBUSDT


  df = pd.read_sql(query, conn)


✅ No scaling needed for BNBUSDT (median close = 597.96)





🚀 Training models for: BTCUSDT


  df = pd.read_sql(query, conn)


✅ No scaling needed for BTCUSDT (median close = 78999.61)
