#### Feature Engineering pour la prédiction du prix Bitcoin

Ce module contient toutes les fonctions pour créer les features
à partir des données OHLCV brutes.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, isnan, row_number, lead, lag, avg
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor, LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
import pandas as pd
import matplotlib.pyplot as plt

In [2]:
spark = SparkSession.builder \
    .appName("Bitcoin") \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/01/21 16:40:21 WARN Utils: Your hostname, zoro, resolves to a loopback address: 127.0.1.1; using 192.168.68.111 instead (on interface wlp1s0)
26/01/21 16:40:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/21 16:40:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
def load_data(path: str):
    """Charge les fichiers parquet de la zone Silver"""
    df = spark.read.parquet(path)
    return df


df_silver = load_data("../../Data/Silver")
df_silver.show()

                                                                                

+-------------------+--------+--------+--------+--------+-------+--------------------+------------------+----------------+---------------------+----------------------+---------------+----------+--------------------+-----------------+-----------------+-------------------+
|          open_time|    open|    high|     low|   close| volume|          close_time|quote_asset_volume|number_of_trades|taker_buy_base_volume|taker_buy_quote_volume|close_t_plus_10|prev_close|              return|             MA_5|            MA_10|        taker_ratio|
+-------------------+--------+--------+--------+--------+-------+--------------------+------------------+----------------+---------------------+----------------------+---------------+----------+--------------------+-----------------+-----------------+-------------------+
|2026-01-18 21:41:00|95380.76|95380.77|95380.76|95380.76|0.30774|2026-01-18 21:41:...|     29352.4770786|              81|              0.19962|         19039.9093074|        95400.0| 

In [4]:
df_silver.printSchema()

root
 |-- open_time: timestamp_ntz (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- volume: double (nullable = true)
 |-- close_time: timestamp_ntz (nullable = true)
 |-- quote_asset_volume: double (nullable = true)
 |-- number_of_trades: long (nullable = true)
 |-- taker_buy_base_volume: double (nullable = true)
 |-- taker_buy_quote_volume: double (nullable = true)
 |-- close_t_plus_10: double (nullable = true)
 |-- prev_close: double (nullable = true)
 |-- return: double (nullable = true)
 |-- MA_5: double (nullable = true)
 |-- MA_10: double (nullable = true)
 |-- taker_ratio: double (nullable = true)



In [5]:
# Vérifie si les features requises existent
features_requises = ["close_t_plus_10", "return", "MA_5", "MA_10", "taker_ratio"]

print("\nVerification des features:")
for feature in features_requises:
    existe = feature in df_silver.columns
    print(f"  {feature}: {'OK' if existe else 'MANQUANT'}")


Verification des features:
  close_t_plus_10: OK
  return: OK
  MA_5: OK
  MA_10: OK
  taker_ratio: OK


In [6]:
from pyspark.sql.functions import col, isnan, sum
from pyspark.sql.types import DoubleType, FloatType

# Vérifier les NULLs
null_counts = df_silver.select([
    sum(col(c).isNull().cast("int")).alias(c) 
    for c in df_silver.columns
])
print("Valeurs NULL par colonne:")
null_counts.show()


Valeurs NULL par colonne:
+---------+----+----+---+-----+------+----------+------------------+----------------+---------------------+----------------------+---------------+----------+------+----+-----+-----------+
|open_time|open|high|low|close|volume|close_time|quote_asset_volume|number_of_trades|taker_buy_base_volume|taker_buy_quote_volume|close_t_plus_10|prev_close|return|MA_5|MA_10|taker_ratio|
+---------+----+----+---+-----+------+----------+------------------+----------------+---------------------+----------------------+---------------+----------+------+----+-----+-----------+
|        0|   0|   0|  0|    0|     0|         0|                 0|               0|                    0|                     0|              0|         0|     0|   0|    0|          0|
+---------+----+----+---+-----+------+----------+------------------+----------------+---------------------+----------------------+---------------+----------+------+----+-----+-----------+



In [7]:
# # Target: prix futur à T+10
target = "close_t_plus_10"

num_cols = [
    # Prix
    'open',
    'high',
    'low',
    'close',
    
    
    # Volumes
    
    'volume',
    'quote_asset_volume',
    'taker_buy_base_volume',
    'taker_buy_quote_volume',
    
    # Features calculées
    'MA_10',
    'prev_close',
    'return',
    'MA_5',
    'taker_ratio'
]

#### Définir features et target

In [8]:
# Colonnes utilisées comme features 

feature_names = [
        "MA_5", "high", "low", "open", "close", "MA_10", "prev_close","return"
    ]

# feature_names = ["return", "MA_5", "MA_10", "taker_ratio", "volume"]

    

#### Préparer les données

In [9]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.sql import functions as F
from pyspark.sql.window import Window

def prepare_ml_data(df, feature_names, target_col, train_ratio=0.8):
    print("Preparation des donnees")

    # 1. Vectorisation des features
    assembler = VectorAssembler(
        inputCols=feature_names,
        outputCol="features_raw",
        handleInvalid="skip"
    )
    df = assembler.transform(df)

    # 2. Index temporel
    window = Window.orderBy("open_time")
    df = df.withColumn("time_index", F.row_number().over(window))

    # 3. Split temporel
    total_rows = df.count()
    split_point = int(total_rows * train_ratio)

    df_train_raw = df.filter(F.col("time_index") <= split_point)
    df_test_raw = df.filter(F.col("time_index") > split_point)

    # 4. Normalisation (fit sur train)
    scaler = StandardScaler(
        inputCol="features_raw",
        outputCol="features",
        withStd=True,
        withMean=True
    )

    scaler_model = scaler.fit(df_train_raw)
    df_train = scaler_model.transform(df_train_raw)
    df_test = scaler_model.transform(df_test_raw)

    # 5. Verification fuite temporelle
    train_max_time = df_train.agg(F.max("open_time")).collect()[0][0]
    test_min_time = df_test.agg(F.min("open_time")).collect()[0][0]

    if train_max_time >= test_min_time:
        print("Attention : fuite temporelle detectee")
    else:
        print("Pas de fuite temporelle")

    print("Train:", df_train.count())
    print("Test:", df_test.count())

    return df_train, df_test, scaler_model


In [10]:
df_train, df_test, scaler_model = prepare_ml_data(df_silver, feature_names, target)

# df_train.show(5)
# df_test.show(5)

Preparation des donnees


26/01/21 16:40:32 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:40:32 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:40:33 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:40:33 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:40:33 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:40:33 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 1

Pas de fuite temporelle


26/01/21 16:40:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:40:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


Train: 790


26/01/21 16:40:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:40:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:40:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:40:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


Test: 198


In [11]:
def train_linear_regression(df_train, target_col, max_iter=100, reg_param=0.01):
    """
    Entraîne un modèle Linear Regression
    
    Args:
        df_train: DataFrame d'entraînement
        target_col: Colonne cible
        max_iter: Nombre d'itérations
        reg_param: Paramètre de régularisation L2
    
    Returns:
        Modèle entraîné
    """
    print(f"[train_linear_regression] Entraînement...")
    
    lr = LinearRegression(
        featuresCol="features",
        labelCol=target_col,
        maxIter=max_iter,
        regParam=reg_param,
        elasticNetParam=0.0
    )
    
    model = lr.fit(df_train)
    
    print(f"  ✓ Modèle entraîné (RMSE train: {model.summary.rootMeanSquaredError:.2f})")
    
    return model

In [12]:
def train_random_forest(df_train, target_col, num_trees=100, max_depth=10, 
                       min_instances=5, seed=42):
    """
    Entraîne un modèle Random Forest
    
    Args:
        df_train: DataFrame d'entraînement
        target_col: Colonne cible
        num_trees: Nombre d'arbres
        max_depth: Profondeur max des arbres
        min_instances: Instances min par noeud
        seed: Seed pour reproductibilité
    
    Returns:
        Modèle entraîné
    """
    print(f"[train_random_forest] Entraînement...")
    print(f"  - Arbres: {num_trees}, Profondeur: {max_depth}")
    
    rf = RandomForestRegressor(
        featuresCol="features",
        labelCol=target_col,
        numTrees=num_trees,
        maxDepth=max_depth,
        minInstancesPerNode=min_instances,
        seed=seed
    )
    
    model = rf.fit(df_train)
    
    print(f"  ✓ Modèle entraîné")
    
    return model

In [13]:
def train_gbt(df_train, target_col, max_iter=100, max_depth=6, 
             step_size=0.1, seed=42):
    """
    Entraîne un modèle Gradient Boosted Trees
    
    Args:
        df_train: DataFrame d'entraînement
        target_col: Colonne cible
        max_iter: Nombre d'itérations
        max_depth: Profondeur max des arbres
        step_size: Learning rate
        seed: Seed pour reproductibilité
    
    Returns:
        Modèle entraîné
    """
    
    from pyspark.ml.regression import GBTRegressor

    print(f"[train_gbt] Entraînement...")
    print(f"  - Iterations: {max_iter}, Profondeur: {max_depth}, LR: {step_size}")
    
    gbt = GBTRegressor(
        featuresCol="features",
        labelCol=target_col,
        maxIter=max_iter,
        maxDepth=max_depth,
        stepSize=step_size,
        seed=seed
    )
    
    model = gbt.fit(df_train)
    
    print(f"  ✓ Modèle entraîné")
    
    return model

In [14]:
def evaluate_model(model, df_test, target_col, model_name="Model"):
    """
    Évalue un modèle sur le jeu de test
    
    Args:
        model: Modèle entraîné
        df_test: DataFrame de test
        target_col: Colonne cible
        model_name: Nom du modèle (pour affichage)
    
    Returns:
        Dict avec métriques {rmse, mae, r2}
    """
    print(f"[evaluate_model] Évaluation {model_name}...")
    
    # Prédictions
    predictions = model.transform(df_test)
    
    # Évaluateurs
    evaluator_rmse = RegressionEvaluator(
        labelCol=target_col, 
        predictionCol="prediction", 
        metricName="rmse"
    )
    evaluator_mae = RegressionEvaluator(
        labelCol=target_col, 
        predictionCol="prediction", 
        metricName="mae"
    )
    evaluator_r2 = RegressionEvaluator(
        labelCol=target_col, 
        predictionCol="prediction", 
        metricName="r2"
    )
    
    # Calcul des métriques
    rmse = evaluator_rmse.evaluate(predictions)
    mae = evaluator_mae.evaluate(predictions)
    r2 = evaluator_r2.evaluate(predictions)
    
    metrics = {
        'rmse': rmse,
        'mae': mae,
        'r2': r2,
        'predictions': predictions
    }
    
    print(f"  ✓ RMSE: ${rmse:.2f} | MAE: ${mae:.2f} | R²: {r2:.4f}")
    
    return metrics

In [15]:
# 1) Préparation des données
df_train, df_test, scaler_model = prepare_ml_data(
    df=df_silver,
    feature_names=feature_names,
    target_col=target,
    train_ratio=0.8
)

# 2) Entraînement des modèles
model_lr = train_linear_regression(df_train, target)
model_rf = train_random_forest(df_train, target)
model_gbt = train_gbt(df_train, target)

# 3) Évaluation
metrics_lr = evaluate_model(model_lr, df_test, target, model_name="Linear Regression")
metrics_rf = evaluate_model(model_rf, df_test, target, model_name="Random Forest")
metrics_gbt = evaluate_model(model_gbt, df_test, target, model_name="GBT")


Preparation des donnees


26/01/21 16:40:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:40:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:40:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:40:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:40:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:40:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 1

Pas de fuite temporelle
Train: 790


26/01/21 16:40:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:40:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:40:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:40:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


Test: 198
[train_linear_regression] Entraînement...


26/01/21 16:40:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:40:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:40:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:40:36 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
26/01/21 16:40:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:40:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance

  ✓ Modèle entraîné (RMSE train: 171.16)
[train_random_forest] Entraînement...
  - Arbres: 100, Profondeur: 10


26/01/21 16:40:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:40:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:40:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:40:39 WARN DAGScheduler: Broadcasting large task binary with size 1111.0 KiB
26/01/21 16:40:40 WARN DAGScheduler: Broadcasting large task binary with size 1523.3 KiB
26/01/21 16:40:40 WARN DAGScheduler: Broadcasting large task binary with size 1928.2 KiB
26/01/21 16:40:40 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB


  ✓ Modèle entraîné
[train_gbt] Entraînement...
  - Iterations: 100, Profondeur: 6, LR: 0.1


26/01/21 16:40:41 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:40:41 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:40:41 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:40:41 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:40:41 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:40:41 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 1

  ✓ Modèle entraîné
[evaluate_model] Évaluation Linear Regression...


26/01/21 16:41:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:41:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:41:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:41:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:41:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:41:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 1

  ✓ RMSE: $80.09 | MAE: $65.76 | R²: 0.3012
[evaluate_model] Évaluation Random Forest...


26/01/21 16:41:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:41:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:41:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:41:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:41:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:41:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 1

  ✓ RMSE: $68.43 | MAE: $55.89 | R²: 0.4899
[evaluate_model] Évaluation GBT...


26/01/21 16:41:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:41:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:41:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:41:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:41:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 16:41:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/21 1

  ✓ RMSE: $91.47 | MAE: $73.30 | R²: 0.0886


In [None]:
# ===== SAUVEGARDER LE MODÈLE =====
print("\nSauvegarde du modèle...")
model_rf.write().overwrite().save("../model/bitcoin_rfs_model")
print("Modèle sauvegardé: bitcoin_rfs_model/")


Sauvegarde du modèle...
Modèle sauvegardé: bitcoin_rfs_model/
