In [1]:
import polars as pl
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from typing import List, Tuple
from sklearn.preprocessing import RobustScaler
from sklearn.model_selection import TimeSeriesSplit, RandomizedSearchCV
from sklearn.metrics import mean_squared_error, mean_absolute_error, accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, confusion_matrix
from sklearn.ensemble import VotingClassifier
from lightgbm import LGBMRegressor, LGBMClassifier
from xgboost import XGBClassifier
from scipy.stats import ks_2samp
from imblearn.over_sampling import SMOTE
from tqdm import tqdm
import warnings
import os
import time
import joblib

plt.style.use('ggplot')
sns.set_theme(style="whitegrid")
os.makedirs('plots', exist_ok=True)
warnings.filterwarnings('ignore')
joblib.parallel_backend('threading')

def create_target(
    df: pl.DataFrame,
    instrument_id: str,
    forward_periods: int = 50,
    time_window: float = 3.0,
    use_time: bool = False,
    target_type: str = 'price',
    price_type: str = 'mid_price'
) -> pl.DataFrame:
    """
    Создает целевую переменную для регрессии  и классификации .
    """
    timestamp_col = 'exchange_time'
    price_col = f'{instrument_id}_{price_type}'
    
    result_df = df.clone()
    
    if use_time:
        result_df = result_df.with_columns([
            (pl.col(timestamp_col) / 1e9).alias('timestamp_seconds')
        ])
        future_prices = []
        print("Создание целевой переменной с учетом времени...")
        for row in tqdm(result_df.iter_rows(named=True), total=result_df.height, desc="Обработка строк"):
            current_time = row['timestamp_seconds']
            target_time = current_time + time_window
            future_rows = result_df.filter(pl.col('timestamp_seconds') >= target_time)
            future_price = future_rows[0, price_col] if future_rows.height > 0 else row[price_col]
            future_prices.append(future_price)
        result_df = result_df.with_columns([
            pl.Series(future_prices).alias('future_price')
        ])
    else:
        result_df = result_df.with_columns([
            pl.col(price_col).shift(-forward_periods).alias('future_price')
        ])
    
    if target_type == 'price':
        result_df = result_df.with_columns([
            pl.col('future_price').fill_null(pl.col(price_col).mean())
            .replace([float('inf'), float('-inf')], pl.col(price_col).mean())
            .alias('future_price')
        ])
    elif target_type == 'returns':
        result_df = result_df.with_columns([
            ((pl.col('future_price') - pl.col(price_col)) / (pl.col(price_col) + 1e-6))
            .fill_null(0.0).replace([float('inf'), float('-inf')], 0.0)
            .alias('future_returns')
        ])
    elif target_type == 'direction':
        result_df = result_df.with_columns([
            (pl.col('future_price') > pl.col(price_col)).cast(pl.Int8).alias('future_direction')
        ])
    
    result_df = result_df.drop_nulls(subset=['future_price'])
    
    # Визуализация распределения цп
    plt.figure(figsize=(10, 6))
    if target_type == 'price':
        sns.histplot(result_df['future_price'].to_numpy(), bins=100)
        plt.title('Распределение future_price')
        plt.savefig('plots/future_price_distribution.png')
    elif target_type == 'returns':
        sns.histplot(result_df['future_returns'].to_numpy(), bins=100)
        plt.title('Распределение future_returns')
        plt.savefig('plots/future_returns_distribution.png')
    elif target_type == 'direction':
        sns.countplot(x=result_df['future_direction'].to_numpy())
        plt.title('Распределение future_direction')
        plt.savefig('plots/future_direction_distribution.png')
    plt.close()
    
    print(f"Target created: type={target_type}, price_type={price_type}, "
          f"use_time={use_time}, forward_periods={forward_periods}, time_window={time_window}")
    
    return result_df

def engineer_features(df: pl.DataFrame, instrument_id: str) -> Tuple[pl.DataFrame, List[str]]:
    """
    Признаки из рыночных данных.
    """
    start_time = time.time()
    mid_col = f'{instrument_id}_mid_price'
    imbalance_col = f'{instrument_id}_book_imbalance'
    spread_col = f'{instrument_id}_spread'
    bid_vol_col = 'bid_qty_0'
    ask_vol_col = 'ask_qty_0'
    
    result_df = df.clone()
    
    
    required_cols = ['bid_price_0', 'ask_price_0', bid_vol_col, ask_vol_col]
    missing_cols = [col for col in required_cols if col not in result_df.columns]
    if missing_cols:
        raise ValueError(f"Missing columns in DataFrame: {missing_cols}")
    
    # Базовые признаки
    result_df = result_df.with_columns([
        ((pl.col('bid_price_0') + pl.col('ask_price_0')) / 2).alias(mid_col),
        ((pl.col(bid_vol_col) - pl.col(ask_vol_col)) / (pl.col(bid_vol_col) + pl.col(ask_vol_col) + 1e-6)).alias(imbalance_col),
        (pl.col('ask_price_0') - pl.col('bid_price_0')).alias(spread_col)
    ]).with_columns([
        pl.col(mid_col).fill_null(pl.col(mid_col).mean()).replace([float('inf'), float('-inf')], pl.col(mid_col).mean()),
        pl.col(imbalance_col).fill_null(0.0).replace([float('inf'), float('-inf')], 0.0),
        pl.col(spread_col).fill_null(pl.col(spread_col).mean()).replace([float('inf'), float('-inf')], pl.col(spread_col).mean())
    ])
    
    # Возвраты
    for window in tqdm([1, 5, 10, 20], desc="Создание возвратов"):
        result_df = result_df.with_columns([
            ((pl.col(mid_col) / pl.col(mid_col).shift(window) - 1))
            .fill_null(0.0).replace([float('inf'), float('-inf')], 0.0)
            .alias(f'returns_{window}')
        ])
    
    # SMA и EMA
    for window in tqdm([5, 10, 20], desc="Создание SMA и EMA"):
        result_df = result_df.with_columns([
            pl.col(mid_col).rolling_mean(window).fill_null(pl.col(mid_col).mean())
            .replace([float('inf'), float('-inf')], pl.col(mid_col).mean())
            .alias(f'sma_{window}'),
            pl.col(mid_col).ewm_mean(span=window).fill_null(pl.col(mid_col).mean())
            .replace([float('inf'), float('-inf')], pl.col(mid_col).mean())
            .alias(f'ema_{window}')
        ])
    
    # Волатильность
    for window in tqdm([10, 20], desc="Создание волатильности"):
        result_df = result_df.with_columns([
            pl.col('returns_1').rolling_std(window).fill_null(0.0)
            .replace([float('inf'), float('-inf')], 0.0)
            .alias(f'volatility_{window}')
        ])
    
    # Дисбаланс и спред
    result_df = result_df.with_columns([
        pl.col(imbalance_col).rolling_mean(10).fill_null(0.0).replace([float('inf'), float('-inf')], 0.0).alias('imbalance_ma_10'),
        pl.col(imbalance_col).shift(1).fill_null(0.0).replace([float('inf'), float('-inf')], 0.0).alias('imbalance_lag_1'),
        (pl.col(spread_col) / (pl.col(mid_col) + 1e-6)).fill_null(0.0).replace([float('inf'), float('-inf')], 0.0).alias('relative_spread')
    ])
    
    # Объемы
    result_df = result_df.with_columns([
        ((pl.col(bid_vol_col) - pl.col(ask_vol_col)) / (pl.col(bid_vol_col) + pl.col(ask_vol_col) + 1e-6))
        .rolling_mean(10).fill_null(0.0).replace([float('inf'), float('-inf')], 0.0)
        .alias('trade_flow_10'),
        (pl.col(bid_vol_col) + pl.col(ask_vol_col)).rolling_sum(5).fill_null(0.0)
        .replace([float('inf'), float('-inf')], 0.0)
        .alias('total_volume_5')
    ])
    
    # RSI
    def calculate_rsi(prices: pl.Series, period: int = 14) -> pl.Series:
        delta = prices.diff()
        gain = delta.clip(lower_bound=0)
        loss = (-delta).clip(lower_bound=0)
        avg_gain = gain.rolling_mean(period)
        avg_loss = loss.rolling_mean(period)
        rs = avg_gain / (avg_loss + 1e-6)
        rsi = 100 - (100 / (1 + rs))
        return rsi.fill_null(50.0).replace([float('inf'), float('-inf')], 50.0)
    
    result_df = result_df.with_columns([
        calculate_rsi(pl.col(mid_col), period=14).alias('rsi_14')
    ])
    
    # MACD
    result_df = result_df.with_columns([
        (pl.col(mid_col).ewm_mean(span=12) - pl.col(mid_col).ewm_mean(span=26))
        .fill_null(0.0).replace([float('inf'), float('-inf')], 0.0)
        .alias('macd')
    ])
    
    # Формирование списка признаков
    exclude_patterns = ['future_', 'exchange_time', 'adapter_time', 'instrument_id']
    feature_cols = [col for col in result_df.columns if not any(pattern in col for pattern in exclude_patterns)]
    feature_cols = [col for col in feature_cols if col != instrument_id]
    
    print(f"Количество признаков: {len(feature_cols)}")
    print(f"Инженерия признаков завершена за {time.time() - start_time:.2f} секунд")
    return result_df, feature_cols

def preprocess_data(
    df: pl.DataFrame,
    feature_cols: List[str],
    target_col: str,
    n_splits: int = 3,
    outlier_threshold: float = 3.0,
    sample_size: int = 50000
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, RobustScaler, np.ndarray]:
    """
    Предобработка данных: масштабирование, удаление выбросов, разделение на выборки.
    """
    start_time = time.time()
    
    if sample_size:
        df = df.sample(n=sample_size, shuffle=False)
    
    X = df.select(feature_cols).to_numpy()
    y = df[target_col].to_numpy()
    
  
    X = np.nan_to_num(X, nan=np.nanmean(X, axis=0), posinf=np.nanmean(X, axis=0), neginf=np.nanmean(X, axis=0))
    y = np.nan_to_num(y, nan=np.nanmean(y), posinf=np.nanmean(y), neginf=np.nanmean(y))
    
    # Удаление выбросов
    z_scores = np.abs((y - np.mean(y)) / np.std(y))
    mask = z_scores < outlier_threshold
    X, y = X[mask], y[mask]
    indices = np.arange(len(mask))[mask]
    
   
    tscv = TimeSeriesSplit(n_splits=n_splits)
    train_idx, val_idx = list(tscv.split(X))[-1]
    test_size = 0.15
    split_idx = int(len(X) * (1 - test_size))
    
    X_train, X_val, X_test = X[train_idx], X[val_idx], X[split_idx:]
    y_train, y_val, y_test = y[train_idx], y[val_idx], y[split_idx:]
    train_indices, val_indices, test_indices = indices[train_idx], indices[val_idx], indices[split_idx:]
    
   
    scaler = RobustScaler()
    X_train = scaler.fit_transform(X_train)
    X_val = scaler.transform(X_val)
    X_test = scaler.transform(X_test)
    
    print(f"Размеры выборок: train={X_train.shape}, val={X_val.shape}, test={X_test.shape}")
    print(f"Предобработка завершена за {time.time() - start_time:.2f} секунд")
    
    return X_train, X_val, X_test, y_train, y_val, y_test, scaler, train_indices, val_indices, test_indices

def check_data_distribution(
    X_train: np.ndarray,
    X_val: np.ndarray,
    X_test: np.ndarray,
    feature_cols: List[str],
    y_train: np.ndarray,
    y_val: np.ndarray,
    y_test: np.ndarray
) -> None:
    """
    Проверка распределения признаков и целевой переменной.
    """
    print("Проверка распределения признаков...")
    for i, col in enumerate(feature_cols):
        train_val_p = ks_2samp(X_train[:, i], X_val[:, i]).pvalue
        train_test_p = ks_2samp(X_train[:, i], X_test[:, i]).pvalue
        print(f"{col}: Train-Val p-value={train_val_p:.4f}, Train-Test p-value={train_test_p:.4f}")
    
    plt.figure(figsize=(10, 6))
    sns.kdeplot(y_train, label='Train', color='blue')
    sns.kdeplot(y_val, label='Validation', color='orange')
    sns.kdeplot(y_test, label='Test', color='green')
    plt.title('Распределение целевой переменной')
    plt.legend()
    plt.savefig('plots/target_split_distribution.png')
    plt.close()

def train_and_evaluate(
    X_train: np.ndarray,
    X_val: np.ndarray,
    X_test: np.ndarray,
    y_train: np.ndarray,
    y_val: np.ndarray,
    y_test: np.ndarray,
    feature_cols: List[str],
    df: pl.DataFrame,
    train_indices: np.ndarray,
    val_indices: np.ndarray,
    test_indices: np.ndarray,
    regression_params: dict,
    classification_params: dict
) -> dict:
    """
    Обучение и оценка моделей.
    """
    results = {'regression': {}, 'classification_lgbm': {}, 'classification_xgb': {}, 'classification_ensemble': {}}
    
    # Регрессия
    start_time = time.time()
    print("Обучение регрессии...")
    reg_model = LGBMRegressor(random_state=42)
    reg_grid = RandomizedSearchCV(
        reg_model, regression_params, n_iter=10, cv=3, scoring='neg_mean_squared_error', n_jobs=2, random_state=42
    )
    reg_grid.fit(X_train, y_train)
    reg_best = reg_grid.best_estimator_
    
    for split, X, y in [('train', X_train, y_train), ('val', X_val, y_val), ('test', X_test, y_test)]:
        y_pred = reg_best.predict(X)
        results['regression'][f'rmse_{split}'] = mean_squared_error(y, y_pred, squared=False)
        results['regression'][f'mae_{split}'] = mean_absolute_error(y, y_pred)
    
    plt.figure(figsize=(10, 6))
    y_pred_test = reg_best.predict(X_test)
    plt.scatter(y_test, y_pred_test, alpha=0.5)
    plt.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'r--')
    plt.xlabel('Фактические значения')
    plt.ylabel('Предсказанные значения')
    plt.title('Фактические vs Предсказанные (Регрессия)')
    plt.savefig('plots/predictions_scatter.png')
    plt.close()
    
    plt.figure(figsize=(10, 6))
    sns.barplot(x=reg_best.feature_importances_, y=feature_cols)
    plt.title('Важность признаков (Регрессия)')
    plt.savefig('plots/feature_importance_regression.png')
    plt.close()
    
    print(f"Регрессия завершена за {time.time() - start_time:.2f} секунд")
    
    # Классификация
    start_time = time.time()
    print("Обучение классификации...")
    y_train_class = df['future_direction'][train_indices].to_numpy()
    y_val_class = df['future_direction'][val_indices].to_numpy()
    y_test_class = df['future_direction'][test_indices].to_numpy()
    
    y_train_class = np.nan_to_num(y_train_class, nan=0)
    smote = SMOTE(random_state=42, sampling_strategy='auto')
    X_train_resampled, y_train_class_resampled = smote.fit_resample(X_train, y_train_class)
    
    # LightGBM
    lgbm_model = LGBMClassifier(random_state=42, class_weight='balanced')
    lgbm_grid = RandomizedSearchCV(
        lgbm_model, classification_params, n_iter=10, cv=3, scoring='f1', n_jobs=2, random_state=42
    )
    lgbm_grid.fit(X_train_resampled, y_train_class_resampled)
    lgbm_best = lgbm_grid.best_estimator_
    
    # XGBoost
    xgb_model = XGBClassifier(
        random_state=42, 
        scale_pos_weight=len(y_train_class[y_train_class == 0]) / len(y_train_class[y_train_class == 1])
    )
    xgb_grid = RandomizedSearchCV(
        xgb_model, classification_params, n_iter=10, cv=3, scoring='f1', n_jobs=2, random_state=42
    )
    xgb_grid.fit(X_train_resampled, y_train_class_resampled)
    xgb_best = xgb_grid.best_estimator_
    
    # Ансамбль
    ensemble_model = VotingClassifier(
        estimators=[('lgbm', lgbm_best), ('xgb', xgb_best)],
        voting='soft'
    )
    ensemble_model.fit(X_train_resampled, y_train_class_resampled)
    
    # Оценка моделей
    for model_name, model in [('lgbm', lgbm_best), ('xgb', xgb_best), ('ensemble', ensemble_model)]:
        for split, X, y in [('train', X_train, y_train_class), ('val', X_val, y_val_class), ('test', X_test, y_test_class)]:
            y_pred = model.predict(X)
            y_proba = model.predict_proba(X)[:, 1]
            results[f'classification_{model_name}'][f'accuracy_{split}'] = accuracy_score(y, y_pred)
            if split == 'test':
                results[f'classification_{model_name}']['precision_test'] = precision_score(y, y_pred)
                results[f'classification_{model_name}']['recall_test'] = recall_score(y, y_pred)
                results[f'classification_{model_name}']['f1_test'] = f1_score(y, y_pred)
            results[f'classification_{model_name}'][f'roc_auc_{split}'] = roc_auc_score(y, y_proba)
        
        if model_name != 'ensemble':
            plt.figure(figsize=(10, 6))
            sns.barplot(x=model.feature_importances_, y=feature_cols)
            plt.title(f'Важность признаков ({model_name.upper()} Классификация)')
            plt.savefig(f'plots/feature_importance_classification_{model_name}.png')
            plt.close()
        
        cm = confusion_matrix(y_test_class, model.predict(X_test))
        plt.figure(figsize=(8, 6))
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
        plt.title(f'Матрица ошибок ({model_name.upper()} Классификация)')
        plt.xlabel('Предсказанный класс')
        plt.ylabel('Фактический класс')
        plt.savefig(f'plots/confusion_matrix_{model_name}.png')
        plt.close()
    
    print(f"Классификация завершена за {time.time() - start_time:.2f} секунд")
    return results

def main():
    start_time = time.time()
    
    # Параметры для моделей
    instrument_id = 'BTC-USD'
    regression_params = {
        'n_estimators': [50, 100],
        'max_depth': [5, 7, 8],
        'learning_rate': [0.01, 0.1],
        'subsample': [0.8, 1.0],
        'colsample_bytree': [0.8, 1.0]
    }
    classification_params = {
        'n_estimators': [50, 100],
        'max_depth': [5, 7, 8],
        'learning_rate': [0.01, 0.1],
        'subsample': [0.8, 1.0],
        'colsample_bytree': [0.8, 1.0]
    }
    
    
    print("Загрузка данных...")
    df = pl.read_parquet("C:/Users/wertu/quant_assignment/data/parsed_ondo/2820_2025-02-18_bybit_book_1_fc.parquet")
    
    # Проверка столбцов
    required_cols = ['bid_price_0', 'ask_price_0', 'bid_qty_0', 'ask_qty_0', 'exchange_time']
    missing_cols = [col for col in required_cols if col not in df.columns]
    if missing_cols:
        raise ValueError(f"Missing columns: {missing_cols}")
    
  
    df = df.with_columns([
        ((pl.col('bid_price_0') + pl.col('ask_price_0')) / 2).alias(f'{instrument_id}_mid_price')
        .fill_null(pl.col('bid_price_0').mean())
        .replace([float('inf'), float('-inf')], pl.col('bid_price_0').mean())
    ])
    
    
    df_regression = create_target(
        df=df, instrument_id=instrument_id, forward_periods=50, use_time=False, 
        target_type='price', price_type='mid_price'
    )
    df_classification = create_target(
        df=df, instrument_id=instrument_id, forward_periods=50, use_time=False, 
        target_type='direction', price_type='mid_price'
    )
    
    
    df_regression, feature_cols = engineer_features(df_regression, instrument_id)
    
    
    X_train, X_val, X_test, y_train, y_val, y_test, scaler, train_indices, val_indices, test_indices = preprocess_data(
        df=df_regression, feature_cols=feature_cols, target_col='future_price', 
        n_splits=3, outlier_threshold=3.0, sample_size=50000
    )
    
   
    check_data_distribution(X_train, X_val, X_test, feature_cols, y_train, y_val, y_test)
    
    
    results = train_and_evaluate(
        X_train, X_val, X_test, y_train, y_val, y_test, feature_cols, df_classification,
        train_indices, val_indices, test_indices, regression_params, classification_params
    )
    
    
    print("\nРезультаты регрессии:")
    for metric, value in results['regression'].items():
        print(f"{metric}: {value:.6f}")
    
    for model in ['lgbm', 'xgb', 'ensemble']:
        print(f"\nРезультаты классификации ({model.upper()}):")
        for metric, value in results[f'classification_{model}'].items():
            print(f"{metric}: {value:.6f}")
    
    print(f"\nПайплайн завершен за {time.time() - start_time:.2f} секунд")

if __name__ == "__main__":
    main()

Загрузка данных...
Target created: type=price, price_type=mid_price, use_time=False, forward_periods=50, time_window=3.0
Target created: type=direction, price_type=mid_price, use_time=False, forward_periods=50, time_window=3.0


Создание возвратов: 100%|████████████████████████████████████████████████████████████████| 4/4 [00:00<00:00, 29.65it/s]
Создание SMA и EMA: 100%|████████████████████████████████████████████████████████████████| 3/3 [00:00<00:00, 16.21it/s]
Создание волатильности: 100%|████████████████████████████████████████████████████████████| 2/2 [00:00<00:00, 14.57it/s]


Количество признаков: 26
Инженерия признаков завершена за 0.83 секунд
Размеры выборок: train=(37500, 26), val=(12500, 26), test=(7500, 26)
Предобработка завершена за 0.16 секунд
Проверка распределения признаков...
bid_price_0: Train-Val p-value=0.1888, Train-Test p-value=0.3088
bid_qty_0: Train-Val p-value=0.6742, Train-Test p-value=0.9385
ask_price_0: Train-Val p-value=0.1888, Train-Test p-value=0.2799
ask_qty_0: Train-Val p-value=0.4068, Train-Test p-value=0.3954
BTC-USD_mid_price: Train-Val p-value=0.1825, Train-Test p-value=0.2893
BTC-USD_book_imbalance: Train-Val p-value=0.9211, Train-Test p-value=0.5910
BTC-USD_spread: Train-Val p-value=0.9879, Train-Test p-value=0.9799
returns_1: Train-Val p-value=0.7923, Train-Test p-value=0.7733
returns_5: Train-Val p-value=0.9953, Train-Test p-value=0.9367
returns_10: Train-Val p-value=0.9887, Train-Test p-value=0.9780
returns_20: Train-Val p-value=0.4032, Train-Test p-value=0.9205
sma_5: Train-Val p-value=0.1764, Train-Test p-value=0.2823
em