# Load data from database

### create session

In [7]:

from pyspark.sql import SparkSession

jdbc_path = "/mnt/c/Users/user/Desktop/Quant-AI-Project/postgresql-42.7.1.jar"
spark = SparkSession.builder \
    .appName("ETA_Model_Training") \
    .config("spark.driver.memory", "4g") \
    .config("spark.jars", jdbc_path) \
    .getOrCreate()
print("âœ… Spark Session crÃ©Ã©e")

âœ… Spark Session crÃ©Ã©e


### load data

In [8]:

import os
from dotenv import load_dotenv

load_dotenv()

jdbc_url = f"jdbc:postgresql://localhost:5433/{os.getenv('DATABASE_NAME')}"
connection_properties = {
    "user": os.getenv('DATABASE_USER'),
    "password": os.getenv('DATABASE_PASSWORD'),
    "driver": "org.postgresql.Driver"
}
print("ðŸ“¥ Chargement des donnÃ©es Silver...")
df = spark.read.jdbc(
    url=jdbc_url,
    table="silver_table",
    properties=connection_properties
)

print(f"âœ… {df.count()} lignes chargÃ©es")
print(f"ðŸ“Š Colonnes disponibles: {df.columns}")

# Afficher un aperÃ§u
df.show(5)
df.printSchema()


ðŸ“¥ Chargement des donnÃ©es Silver...
âœ… 589 lignes chargÃ©es
ðŸ“Š Colonnes disponibles: ['open_time', 'open', 'high', 'low', 'close', 'volume', 'close_time', 'quote_asset_volume', 'number_of_trades', 'taker_buy_base_volume', 'taker_buy_quote_volume', 'close_t_plus_10', 'return', 'MA_5', 'MA_10', 'taker_ratio']
+-------------------+--------+--------+--------+--------+--------+--------------------+------------------+----------------+---------------------+----------------------+---------------+--------------------+-----------------+-----------------+-------------------+
|          open_time|    open|    high|     low|   close|  volume|          close_time|quote_asset_volume|number_of_trades|taker_buy_base_volume|taker_buy_quote_volume|close_t_plus_10|              return|             MA_5|            MA_10|        taker_ratio|
+-------------------+--------+--------+--------+--------+--------+--------------------+------------------+----------------+---------------------+----------------

In [None]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.functions import col, abs as spark_abs, mean
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression



total_rows = df.count()

# STEP 1: DEFINE FEATURES
feature_cols = [
    'open', 'high', 'low', 'close',              
    'volume', 'quote_asset_volume',               
    'number_of_trades',                           
    'taker_buy_base_volume', 
    'taker_buy_quote_volume',  
    'return',                                    
    'MA_5', 
    'MA_10',                             
    'taker_ratio'
]

target_col = 'close_t_plus_10'


# Calculate split point (80% train, 20% test)
train_size = int(total_rows * 0.8)

# Get train data (first 80% chronologically)
train_df = df.orderBy("open_time").limit(train_size)

# Get test data (last 20% chronologically)
test_df = df.orderBy("open_time").subtract(train_df)

# Verify split
train_count = train_df.count()
test_count = test_df.count()

print(f"Train samples: {train_count} ({train_count/total_rows*100:.1f}%)")
print(f"Test samples: {test_count} ({test_count/total_rows*100:.1f}%)")

# Drop time columns after split (keep only features and target)
train_df = train_df.drop("open_time", "close_time")
test_df = test_df.drop("open_time", "close_time")

# Assemble features into vector
assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features_raw",
    handleInvalid="skip"
)

# Scale features (helps with large Bitcoin price values)
scaler = StandardScaler(
    inputCol="features_raw",
    outputCol="features",
    withStd=True,
    withMean=True
)

lr_model = LinearRegression(
    featuresCol="features",
    labelCol=target_col,
    maxIter=100,
    regParam=0.1,              # Ridge regularization
    elasticNetParam=0.0,       # Pure Ridge (L2)
    standardization=False      # Already scaled
)


# Create pipeline
pipeline = Pipeline(stages=[
    assembler,
    scaler,
    lr_model
])
# STEP 2: TRAIN MODEL
model = pipeline.fit(train_df)
# 
# STEP 3: MAKE PREDICTIONS
train_pred = model.transform(train_df)
test_pred = model.transform(test_df)

# STEP 4: EVALUATE MODEL

# Define evaluators
rmse_evaluator = RegressionEvaluator(
    labelCol=target_col,
    predictionCol="prediction",
    metricName="rmse"
)

mae_evaluator = RegressionEvaluator(
    labelCol=target_col,
    predictionCol="prediction",
    metricName="mae"
)

r2_evaluator = RegressionEvaluator(
    labelCol=target_col,
    predictionCol="prediction",
    metricName="r2"
)

# Calculate metrics for TRAIN set
train_rmse = rmse_evaluator.evaluate(train_pred)
train_mae = mae_evaluator.evaluate(train_pred)
train_r2 = r2_evaluator.evaluate(train_pred)

# Calculate metrics for TEST set
test_rmse = rmse_evaluator.evaluate(test_pred)
test_mae = mae_evaluator.evaluate(test_pred)
test_r2 = r2_evaluator.evaluate(test_pred)

# Calculate MAPE (Mean Absolute Percentage Error) - Important for Bitcoin!
train_mape_df = train_pred.withColumn(
    "percentage_error", 
    spark_abs((col(target_col) - col("prediction")) / col(target_col)) * 100
)
train_mape = train_mape_df.select(mean("percentage_error")).first()[0]

test_mape_df = test_pred.withColumn(
    "percentage_error", 
    spark_abs((col(target_col) - col("prediction")) / col(target_col)) * 100
)
test_mape = test_mape_df.select(mean("percentage_error")).first()[0]

# DISPLAY RESULTS
print("\n" + "=" * 70)
print(" TRAINING SET METRICS")
print("=" * 70)
print(f"  RMSE: ${train_rmse:,.2f}")
print(f"  MAE:  ${train_mae:,.2f}")
print(f"  RÂ²:   {train_r2:.6f}")
print(f"  MAPE: {train_mape:.2f}%")

print("\n" + "=" * 70)
print(" TEST SET METRICS (MOST IMPORTANT)")
print("=" * 70)
print(f"  RMSE: ${test_rmse:,.2f}")
print(f"  MAE:  ${test_mae:,.2f}")
print(f"  RÂ²:   {test_r2:.6f}")
print(f"  MAPE: {test_mape:.2f}%")
print("=" * 70)

Total clean rows: 589
Train samples: 471 (80.0%)
Test samples: 118 (20.0%)
Pipeline stages:
  1. VectorAssembler - Combine features
  2. StandardScaler - Scale features
  3. RandomForestRegressor - Train model

 TRAINING SET METRICS
  RMSE: $80.61
  MAE:  $61.72
  RÂ²:   0.884222
  MAPE: 0.07%

 TEST SET METRICS (MOST IMPORTANT)
  RMSE: $84.04
  MAE:  $69.44
  RÂ²:   0.196408
  MAPE: 0.07%


In [10]:
model_path = "/mnt/c/Users/user/Desktop/Quant-AI-Project/ml/models/btc_price_predictor"
print(f"\nðŸ’¾ Saving model to: {model_path}")

model.write().overwrite().save(model_path)
print("âœ… Model saved successfully!")



ðŸ’¾ Saving model to: /mnt/c/Users/user/Desktop/Quant-AI-Project/ml/models/btc_price_predictor
âœ… Model saved successfully!
