# Alpha Signal Comparison Notebook

Ce notebook a pour objectif de comparer systématiquement différents signaux et méthodes statistiques pour la prédiction court terme sur cryptomonnaies. Il s'appuie sur tous les modules du projet pour garantir robustesse et reproductibilité.

## Plan :
1. Chargement des features extraites
2. Synchronisation des séries temporelles
3. Boucle sur signaux et méthodes statistiques
4. Calcul des scores (valeur, p-value, lag optimal, robustesse)
5. Visualisations (heatmaps, distributions, etc.)
6. Tableau de synthèse et documentation des résultats
7. Export des meilleurs signaux/paramètres pour l'équipe backtest

In [13]:
# Import des librairies principales
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import os
import warnings
from tqdm.notebook import tqdm  # Ajout pour la barre de chargement
warnings.filterwarnings('ignore')

# Import des modules du projet
import sys
sys.path.append('..')

from feature_engineering.order_book_features import OrderBookFeatureExtractor
from feature_engineering.time_series_features import TimeSeriesFeatureExtractor
from feature_engineering.synchronization import AsynchronousSync, SyncConfig
from statistical_analysis.transfer_entropy import TransferEntropyAnalyzer
from statistical_analysis.causality_tests import CausalityTester
from statistical_analysis.correlation_analysis import CrossCorrelationAnalyzer

plt.style.use('seaborn-v0_8')
sns.set_palette("husl")

## 1. Chargement des données features extraites

On charge les features déjà extraites (order book, time series, etc.) depuis le dossier `processed_data/` ou `data_cache/` pour éviter tout retraitement inutile.

In [14]:
# Détection automatique des fichiers de features disponibles
feature_files = {}
data_cache_path = '../data_cache/'
processed_data_path = '../processed_data/'

# Recherche dans data_cache (parquet)
for fname in os.listdir(data_cache_path):
    if fname.endswith('.parquet'):
        symbol = fname.split('_')[0]
        feature_files[symbol] = os.path.join(data_cache_path, fname)

# Recherche dans processed_data (csv ou parquet)
for fname in os.listdir(processed_data_path):
    if fname.endswith('.parquet') or fname.endswith('.csv'):
        symbol = fname.split('_')[0]
        feature_files[symbol] = os.path.join(processed_data_path, fname)

print(f"Fichiers de features détectés : {feature_files}")

# Chargement des DataFrames
crypto_data = {}
for symbol, path in feature_files.items():
    if path.endswith('.parquet'):
        crypto_data[symbol] = pd.read_parquet(path)
    elif path.endswith('.csv'):
        crypto_data[symbol] = pd.read_csv(path, index_col=0, parse_dates=True)
    else:
        print(f"Format non supporté pour {path}")

for symbol, df in crypto_data.items():
    print(f"{symbol}: shape={df.shape}, colonnes={list(df.columns)[:8]} ...")

Fichiers de features détectés : {'ETH': '../data_cache/ETH_4a25fd7dc5f00c1639dfde83bf483c73.parquet', 'BTC': '../data_cache/BTC_e5b6226914df159d942f4575aa84f65d.parquet'}
ETH: shape=(53700040, 6), colonnes=['timestamp_us', 'symbol', 'price', 'volume', 'side', 'level'] ...
BTC: shape=(17601860, 6), colonnes=['timestamp_us', 'symbol', 'price', 'volume', 'side', 'level'] ...
ETH: shape=(53700040, 6), colonnes=['timestamp_us', 'symbol', 'price', 'volume', 'side', 'level'] ...
BTC: shape=(17601860, 6), colonnes=['timestamp_us', 'symbol', 'price', 'volume', 'side', 'level'] ...


## 2. Synchronisation asynchrone des séries temporelles

On synchronise les séries de tous les symboles sans arrondir les timestamps, en utilisant la classe AsynchronousSync du projet.

In [None]:
# Configuration de la synchronisation
symbols = list(crypto_data.keys())
sync_config = SyncConfig(
    tolerance_us=1_000_000,  # 1 seconde
    interpolation_method='linear',
    resampling_frequency_us=1_000_000,
    enable_cross_symbol_features=True
)
synchronizer = AsynchronousSync(config=sync_config, symbols=symbols)

# Synchronisation vectorisée des séries temporelles (sans boucle ligne à ligne)
dfs = []
for symbol, df in crypto_data.items():
    idx = df.index
    # Conversion index en microsecondes
    if hasattr(idx, "to_pydatetime"):
        ts_us = (idx.to_series().apply(lambda x: int(x.timestamp() * 1e6)))
    else:
        ts_us = pd.Series(idx.values.astype(np.int64), index=idx)
        if ts_us.max() < 1e12:
            ts_us = ts_us * 1_000_000
    df = df.copy()
    df['timestamp_us'] = ts_us.values
    df = df.reset_index(drop=True)
    # On préfixe les colonnes pour chaque symbole
    df = df.rename(columns={col: f"{symbol}_{col}" for col in df.columns if col != 'timestamp_us'})
    dfs.append(df[['timestamp_us'] + [c for c in df.columns if c != 'timestamp_us']])

# Fusion outer join sur timestamp_us
from functools import reduce
sync_df = reduce(lambda left, right: pd.merge(left, right, on='timestamp_us', how='outer'), dfs)
sync_df = sync_df.sort_values('timestamp_us').reset_index(drop=True)
sync_df['timestamp'] = pd.to_datetime(sync_df['timestamp_us'] // 1_000_000, unit='s')
sync_df = sync_df.set_index('timestamp')
print(f"DataFrame synchronisé shape: {sync_df.shape}")

# Si besoin, on peut passer sync_df à la classe AsynchronousSync pour interpolation/traitement supplémentaire
# synchronizer = AsynchronousSync(config=sync_config, symbols=symbols)
# sync_points = synchronizer.process_dataframe(sync_df)  # À adapter selon l'API

## 3. Boucle sur signaux et méthodes statistiques

Pour chaque signal pertinent (mid_price, spread, imbalance, etc.) et chaque méthode (Transfer Entropy, Granger, corrélation croisée), on calcule la force du signal pour chaque paire de symboles.

In [None]:
# Définition des signaux et méthodes à tester
SIGNALS = ['mid_price', 'spread', 'imbalance', 'bid_volume_l1', 'ask_volume_l1']
METHODS = ['transfer_entropy', 'granger', 'correlation']

# Initialisation des analyseurs
te_analyzer = TransferEntropyAnalyzer(method='ksg', max_workers=4)
granger_tester = CausalityTester()
corr_analyzer = CrossCorrelationAnalyzer()

results = []

for signal in SIGNALS:
    # Vérifie que le signal existe pour tous les symboles
    available = [f"{sym}_{signal}" in sync_df.columns for sym in symbols]
    if not all(available):
        print(f"Signal {signal} absent pour certains symboles, ignoré.")
        continue
    for i, source in enumerate(symbols):
        for j, target in enumerate(symbols):
            if i == j:
                continue
            source_col = f"{source}_{signal}"
            target_col = f"{target}_{signal}"
            x = sync_df[source_col].values
            y = sync_df[target_col].values
            # Nettoyage NaN
            mask = np.isfinite(x) & np.isfinite(y)
            x = x[mask]
            y = y[mask]
            if len(x) < 200:
                continue
            # Méthode Transfer Entropy
            try:
                te_res = te_analyzer.analyze_transfer_entropy(x, y, source, target, lags_us=[1_000_000, 2_000_000, 5_000_000])
                best_te = max(te_res, key=lambda r: r.transfer_entropy)
                results.append({
                    'source': source,
                    'target': target,
                    'signal': signal,
                    'method': 'transfer_entropy',
                    'score': best_te.transfer_entropy,
                    'p_value': best_te.p_value,
                    'lag_us': best_te.lag_us,
                    'significant': best_te.p_value < 0.05
                })
            except Exception as e:
                print(f"TE erreur {source}->{target} {signal}: {e}")
            # Méthode Granger
            try:
                granger = granger_tester.granger_causality_test(x, y, max_lags=5)
                pval = granger.get('X->Y', {}).get('p_value', 1.0)
                results.append({
                    'source': source,
                    'target': target,
                    'signal': signal,
                    'method': 'granger',
                    'score': -np.log10(pval+1e-10),
                    'p_value': pval,
                    'lag_us': None,
                    'significant': pval < 0.05
                })
            except Exception as e:
                print(f"Granger erreur {source}->{target} {signal}: {e}")
            # Méthode Corrélation croisée
            try:
                corr = corr_analyzer.cross_correlation(x, y, max_lag=10)
                max_corr = np.max(np.abs(corr.correlation))
                lag = corr.lags[np.argmax(np.abs(corr.correlation))]
                results.append({
                    'source': source,
                    'target': target,
                    'signal': signal,
                    'method': 'correlation',
                    'score': max_corr,
                    'p_value': None,
                    'lag_us': lag*1_000_000,
                    'significant': max_corr > 0.2
                })
            except Exception as e:
                print(f"Corr erreur {source}->{target} {signal}: {e}")

results_df = pd.DataFrame(results)
print(f"Nombre total de tests réalisés : {len(results_df)}")
results_df.head()

## 4. Visualisations et synthèse des résultats

On visualise les scores des signaux/méthodes et on synthétise les résultats pour identifier les signaux robustes et ceux qui ne fonctionnent pas.

In [None]:
# Heatmap des scores par signal et méthode
import matplotlib.ticker as mticker

for method in METHODS:
    pivot = results_df[results_df['method'] == method].pivot_table(
        index=['source', 'target'], columns='signal', values='score', aggfunc='max')
    if pivot.empty:
        continue
    plt.figure(figsize=(12, 8))
    sns.heatmap(pivot, annot=True, fmt='.3f', cmap='viridis', cbar_kws={'label': method})
    plt.title(f'Heatmap {method} (score max par signal)')
    plt.ylabel('Source → Target')
    plt.xlabel('Signal')
    plt.tight_layout()
    plt.show()

# Distribution des scores et des p-values
plt.figure(figsize=(15, 5))
plt.subplot(1, 2, 1)
results_df[results_df['method']=='transfer_entropy']['score'].hist(bins=30, alpha=0.7)
plt.title('Distribution Transfer Entropy')
plt.xlabel('TE')
plt.subplot(1, 2, 2)
results_df[results_df['method']=='granger']['p_value'].hist(bins=30, alpha=0.7)
plt.title('Distribution p-value Granger')
plt.xlabel('p-value')
plt.tight_layout()
plt.show()

# Tableau de synthèse des meilleurs signaux
best_signals = results_df[results_df['significant']].groupby(['signal', 'method']).agg(
    n=('score', 'count'),
    mean_score=('score', 'mean'),
    min_pval=('p_value', 'min')
).sort_values('mean_score', ascending=False)
print("Tableau de synthèse des signaux/méthodes les plus robustes :")
display(best_signals.head(10))

# Export des meilleurs signaux pour l'équipe backtest
export_cols = ['source', 'target', 'signal', 'method', 'score', 'p_value', 'lag_us', 'significant']
best_signals_df = results_df[results_df['significant']][export_cols]
best_signals_df.to_csv('../results/best_alpha_signals.csv', index=False)
print(f"Exporté {len(best_signals_df)} signaux robustes dans ../results/best_alpha_signals.csv")

## 1. Extraction des features avancées à partir des carnets de commandes

Avant la synchronisation, nous extrayons des features avancées (mid_price, spread, imbalance, bid/ask_volume_l1, etc.) pour chaque symbole à partir des données de carnet de commandes de niveau 1.

In [15]:
# Extraction vectorisée des features avancées (mid_price, spread, imbalance, bid/ask_volume_l1) pour chaque symbole
from feature_engineering.order_book_features import OrderBookFeatureExtractor

ob_extractor = OrderBookFeatureExtractor()

for symbol, df in crypto_data.items():
    # On suppose que chaque ligne du DataFrame correspond à un snapshot de carnet (niveau 1)
    # On regroupe les lignes par timestamp_us (ou index) pour former des snapshots
    # Ici, on suppose que chaque ligne = un niveau, donc on regroupe par timestamp
    if 'timestamp_us' in df.columns:
        ts_col = 'timestamp_us'
    elif 'timestamp' in df.columns:
        ts_col = 'timestamp'
    else:
        # Si index est datetime ou float/int timestamp
        if hasattr(df.index, 'to_pydatetime') or np.issubdtype(df.index.dtype, np.datetime64):
            ts_col = None
            df = df.reset_index().rename(columns={'index': 'timestamp'})
        else:
            ts_col = None
            df = df.reset_index().rename(columns={'index': 'timestamp'})
    
    # On regroupe par timestamp pour reconstituer les snapshots
    order_book_data = []
    for ts, group in df.groupby(ts_col if ts_col else 'timestamp'):
        # On convertit le timestamp en microsecondes
        if isinstance(ts, pd.Timestamp):
            timestamp_us = int(ts.timestamp() * 1e6)
        elif isinstance(ts, (float, np.floating)) and ts > 1e12:
            timestamp_us = int(ts)
        elif isinstance(ts, (float, np.floating)):
            timestamp_us = int(ts * 1e6)
        elif isinstance(ts, (int, np.integer)) and ts > 1e12:
            timestamp_us = ts
        elif isinstance(ts, (int, np.integer)):
            timestamp_us = ts * 1_000_000
        else:
            continue
        # On convertit le group en liste de dicts (snapshots)
        snapshots = group.to_dict('records')
        order_book_data.append((symbol, timestamp_us, snapshots))
    # Extraction batchée des features
    features_df = ob_extractor.extract_batch_features(order_book_data)
    features_df = features_df.set_index('timestamp_us')
    crypto_data[symbol] = features_df
print("Features avancées extraites et injectées dans crypto_data.")

KeyboardInterrupt: 

In [None]:
# Configuration de la synchronisation
symbols = list(crypto_data.keys())
sync_config = SyncConfig(
    tolerance_us=1_000_000,  # 1 seconde
    interpolation_method='linear',
    resampling_frequency_us=1_000_000,
    enable_cross_symbol_features=True
)
synchronizer = AsynchronousSync(config=sync_config, symbols=symbols)

# Synchronisation vectorisée des séries temporelles (sans boucle ligne à ligne)
dfs = []
for symbol, df in crypto_data.items():
    idx = df.index
    # Conversion index en microsecondes
    if hasattr(idx, "to_pydatetime"):
        ts_us = (idx.to_series().apply(lambda x: int(x.timestamp() * 1e6)))
    else:
        ts_us = pd.Series(idx.values.astype(np.int64), index=idx)
        if ts_us.max() < 1e12:
            ts_us = ts_us * 1_000_000
    df = df.copy()
    df['timestamp_us'] = ts_us.values
    df = df.reset_index(drop=True)
    # On préfixe les colonnes pour chaque symbole
    df = df.rename(columns={col: f"{symbol}_{col}" for col in df.columns if col != 'timestamp_us'})
    dfs.append(df[['timestamp_us'] + [c for c in df.columns if c != 'timestamp_us']])

# Fusion outer join sur timestamp_us
from functools import reduce
sync_df = reduce(lambda left, right: pd.merge(left, right, on='timestamp_us', how='outer'), dfs)
sync_df = sync_df.sort_values('timestamp_us').reset_index(drop=True)
sync_df['timestamp'] = pd.to_datetime(sync_df['timestamp_us'] // 1_000_000, unit='s')
sync_df = sync_df.set_index('timestamp')
print(f"DataFrame synchronisé shape: {sync_df.shape}")

# Si besoin, on peut passer sync_df à la classe AsynchronousSync pour interpolation/traitement supplémentaire
# synchronizer = AsynchronousSync(config=sync_config, symbols=symbols)
# sync_points = synchronizer.process_dataframe(sync_df)  # À adapter selon l'API

In [None]:
# Définition des signaux et méthodes à tester
SIGNALS = ['mid_price', 'spread', 'imbalance', 'bid_volume_l1', 'ask_volume_l1']
METHODS = ['transfer_entropy', 'granger', 'correlation']

# Initialisation des analyseurs
te_analyzer = TransferEntropyAnalyzer(method='ksg', max_workers=4)
granger_tester = CausalityTester()
corr_analyzer = CrossCorrelationAnalyzer()

results = []

for signal in SIGNALS:
    # Vérifie que le signal existe pour tous les symboles
    available = [f"{sym}_{signal}" in sync_df.columns for sym in symbols]
    if not all(available):
        print(f"Signal {signal} absent pour certains symboles, ignoré.")
        continue
    for i, source in enumerate(symbols):
        for j, target in enumerate(symbols):
            if i == j:
                continue
            source_col = f"{source}_{signal}"
            target_col = f"{target}_{signal}"
            x = sync_df[source_col].values
            y = sync_df[target_col].values
            # Nettoyage NaN
            mask = np.isfinite(x) & np.isfinite(y)
            x = x[mask]
            y = y[mask]
            if len(x) < 200:
                continue
            # Méthode Transfer Entropy
            try:
                te_res = te_analyzer.analyze_transfer_entropy(x, y, source, target, lags_us=[1_000_000, 2_000_000, 5_000_000])
                best_te = max(te_res, key=lambda r: r.transfer_entropy)
                results.append({
                    'source': source,
                    'target': target,
                    'signal': signal,
                    'method': 'transfer_entropy',
                    'score': best_te.transfer_entropy,
                    'p_value': best_te.p_value,
                    'lag_us': best_te.lag_us,
                    'significant': best_te.p_value < 0.05
                })
            except Exception as e:
                print(f"TE erreur {source}->{target} {signal}: {e}")
            # Méthode Granger
            try:
                granger = granger_tester.granger_causality_test(x, y, max_lags=5)
                pval = granger.get('X->Y', {}).get('p_value', 1.0)
                results.append({
                    'source': source,
                    'target': target,
                    'signal': signal,
                    'method': 'granger',
                    'score': -np.log10(pval+1e-10),
                    'p_value': pval,
                    'lag_us': None,
                    'significant': pval < 0.05
                })
            except Exception as e:
                print(f"Granger erreur {source}->{target} {signal}: {e}")
            # Méthode Corrélation croisée
            try:
                corr = corr_analyzer.cross_correlation(x, y, max_lag=10)
                max_corr = np.max(np.abs(corr.correlation))
                lag = corr.lags[np.argmax(np.abs(corr.correlation))]
                results.append({
                    'source': source,
                    'target': target,
                    'signal': signal,
                    'method': 'correlation',
                    'score': max_corr,
                    'p_value': None,
                    'lag_us': lag*1_000_000,
                    'significant': max_corr > 0.2
                })
            except Exception as e:
                print(f"Corr erreur {source}->{target} {signal}: {e}")

results_df = pd.DataFrame(results)
print(f"Nombre total de tests réalisés : {len(results_df)}")
results_df.head()

In [None]:
# Heatmap des scores par signal et méthode
import matplotlib.ticker as mticker

for method in METHODS:
    pivot = results_df[results_df['method'] == method].pivot_table(
        index=['source', 'target'], columns='signal', values='score', aggfunc='max')
    if pivot.empty:
        continue
    plt.figure(figsize=(12, 8))
    sns.heatmap(pivot, annot=True, fmt='.3f', cmap='viridis', cbar_kws={'label': method})
    plt.title(f'Heatmap {method} (score max par signal)')
    plt.ylabel('Source → Target')
    plt.xlabel('Signal')
    plt.tight_layout()
    plt.show()

# Distribution des scores et des p-values
plt.figure(figsize=(15, 5))
plt.subplot(1, 2, 1)
results_df[results_df['method']=='transfer_entropy']['score'].hist(bins=30, alpha=0.7)
plt.title('Distribution Transfer Entropy')
plt.xlabel('TE')
plt.subplot(1, 2, 2)
results_df[results_df['method']=='granger']['p_value'].hist(bins=30, alpha=0.7)
plt.title('Distribution p-value Granger')
plt.xlabel('p-value')
plt.tight_layout()
plt.show()

# Tableau de synthèse des meilleurs signaux
best_signals = results_df[results_df['significant']].groupby(['signal', 'method']).agg(
    n=('score', 'count'),
    mean_score=('score', 'mean'),
    min_pval=('p_value', 'min')
).sort_values('mean_score', ascending=False)
print("Tableau de synthèse des signaux/méthodes les plus robustes :")
display(best_signals.head(10))

# Export des meilleurs signaux pour l'équipe backtest
export_cols = ['source', 'target', 'signal', 'method', 'score', 'p_value', 'lag_us', 'significant']
best_signals_df = results_df[results_df['significant']][export_cols]
best_signals_df.to_csv('../results/best_alpha_signals.csv', index=False)
print(f"Exporté {len(best_signals_df)} signaux robustes dans ../results/best_alpha_signals.csv")

In [None]:
# Signaux/méthodes jamais significatifs
fail_signals = results_df.groupby(['signal', 'method'])['significant'].sum()
fail_signals = fail_signals[fail_signals == 0]
if not fail_signals.empty:
    print("Signaux/méthodes sans valeur prédictive :")
    print(fail_signals)
else:
    print("Tous les couples signal/méthode ont au moins un cas significatif.")

## 6. Conclusion et recommandations pour le backtest

Ce notebook permet d'identifier objectivement les signaux et méthodes les plus robustes pour la prédiction court terme. Les résultats exportés peuvent être transmis à l'équipe backtest pour intégration dans les stratégies de trading.

In [None]:
# Diagnostic des colonnes synchronisées et présence des signaux attendus
print("Colonnes du DataFrame synchronisé :")
print(list(sync_df.columns)[:30])  # Affiche les 30 premières colonnes

for signal in ['mid_price', 'spread', 'imbalance', 'bid_volume_l1', 'ask_volume_l1']:
    for sym in symbols:
        col = f"{sym}_{signal}"
        print(f"Présence de {col} :", col in sync_df.columns)