## Importaciones

In [1]:
import os
import math
import random
import numpy as np
from numba import njit
import pandas as pd
from datetime import datetime
from typing import Dict, Any, Tuple, List
import optuna
from catboost import CatBoostClassifier
from sklearn.model_selection import train_test_split
from modules.labeling_lib import get_labels_one_direction
from modules.labeling_lib import sliding_window_clustering
from modules.tester_lib import tester_one_direction
from modules.export_lib import export_model_to_ONNX
import warnings
warnings.filterwarnings("ignore")

In [2]:
# Obtener precios
def get_prices(hyper_params) -> pd.DataFrame:
    history_file = os.path.join(hyper_params["history_path"], f"{hyper_params['symbol']}_{hyper_params['timeframe']}.csv")
    p = pd.read_csv(history_file, sep=r"\s+")
    pFixed = pd.DataFrame(columns=['time', 'close'])
    pFixed['time'] = p['<DATE>'] + ' ' + p['<TIME>']
    pFixed['time'] = pd.to_datetime(pFixed['time'], format='mixed')
    pFixed['close'] = p['<CLOSE>']
    pFixed.set_index('time', inplace=True)
    return pFixed.dropna()
# Ingeniería de características
@njit
def compute_features(close, periods, periods_meta, stats):
    n = len(close)
    total_features = (len(periods) * len(stats)) + len(periods_meta)
    features = np.full((n, total_features), np.nan)

    def std_manual(x):
        m = np.mean(x)
        return np.sqrt(np.sum((x - m) ** 2) / (x.size - 1))

    def skew_manual(x):
        m = np.mean(x)
        s = std_manual(x)
        return np.mean(((x - m) / s) ** 3) if s != 0 else 0.0

    def kurt_manual(x):
        m = np.mean(x)
        s = std_manual(x)
        return np.mean(((x - m) / s) ** 4) - 3 if s != 0 else 0.0
    
    def zscore_manual(x):
        m = np.mean(x)
        s = std_manual(x)
        return (x[0] - m) / s if s != 0 else 0.0
    
    def entropy_manual(x):
        bins = 10
        minv = np.min(x)
        maxv = np.max(x)
        width = (maxv - minv) / bins
        if width == 0:
            return 0.0
        hist = np.zeros(bins)
        for val in x:
            idx = int((val - minv) / width)
            if idx == bins:  # caso borde
                idx -= 1
            hist[idx] += 1
        total = x.size
        entropy = 0.0
        for i in range(bins):
            p = hist[i] / total
            if p > 0:
                entropy -= p * np.log(p)
        return entropy

    def slope_manual(x):
        n = x.size
        x_idx = np.arange(n)
        x_mean = np.mean(x_idx)
        y_mean = np.mean(x)
        numerator = np.sum((x_idx - x_mean) * (x - y_mean))
        denominator = np.sum((x_idx - x_mean) ** 2)
        return numerator / denominator if denominator != 0 else 0.0

    # Procesar períodos normales
    col = 0
    for win in periods:
        for s in stats:
            for i in range(win, n):
                window = close[i - win:i][::-1]
                if s == "std":
                    features[i, col] = std_manual(window)
                elif s == "skew":
                    features[i, col] = skew_manual(window)
                elif s == "kurt":
                    features[i, col] = kurt_manual(window)
                elif s == "zscore":
                    features[i, col] = zscore_manual(window)
                elif s == "mean":
                    features[i, col] = np.mean(window)
                elif s == "range":
                    features[i, col] = np.max(window) - np.min(window)
                elif s == "median":
                    features[i, col] = np.median(window)
                elif s == "mad":
                    features[i, col] = np.mean(np.abs(window - np.mean(window)))
                elif s == "var":
                    features[i, col] = np.var(window)
                elif s == "entropy":
                    features[i, col] = entropy_manual(window)
                elif s == "slope":
                    features[i, col] = slope_manual(window)
            col += 1  # Incrementar col después de procesar todas las filas para esta estadística y ventana

    # Procesar períodos meta
    for win in periods_meta:
        for i in range(win, n):
            window = close[i - win:i][::-1]
            features[i, col] = std_manual(window)
        col += 1

    return features

def get_features(data: pd.DataFrame, hp):
    close = data['close'].values
    index = data.index
    periods = hp["periods"]
    periods_meta = hp["periods_meta"]
    stats = hp["stats"]
    if len(stats) == 0:
        raise ValueError("La lista de estadísticas está vacía.")
    feats = compute_features(close, np.array(periods), np.array(periods_meta), stats)
    if np.isnan(feats).all():
        return pd.DataFrame(index=index)
    # Nombres de columnas
    colnames = []
    for p in periods:
        for s in stats:
            colnames.extend([f"{p}_{s}_feature"])
    for p in periods_meta:
        colnames.extend([f"{p}_std_meta_feature"])
    df = pd.DataFrame(feats, columns=colnames, index=index)
    df["close"] = data["close"]
    return df.dropna()
    
def test_model_one_direction(
        dataset: pd.DataFrame,
        result:  list,
        forward: datetime,
        backward: datetime,
        markup:  float,
        direction: str,
        plt: bool = False):

    pr_tst = dataset.copy()
    X = pr_tst.drop(columns=['close'])
    X_meta = X.loc[:,  X.columns.str.contains('meta_feature')]
    X      = X.loc[:, ~X.columns.str.contains('meta_feature')]

    pr_tst['labels']      = result[0].predict_proba(X)[:,1]
    pr_tst['meta_labels'] = result[1].predict_proba(X_meta)[:,1]

    # Corrección aquí:
    pr_tst[['labels', 'meta_labels']] = (pr_tst[['labels', 'meta_labels']] > 0.5).astype(float)

    return tester_one_direction(pr_tst, forward, backward, markup, direction, plt)

## Main

In [3]:
def fit_final_models(clustered: pd.DataFrame,
                     meta: pd.DataFrame,
                     oos_data: pd.DataFrame,
                     hp: Dict[str, Any]) -> Tuple[float, Any, Any]:
    """Entrena modelo principal + meta‑modelo y evalúa en OOS.

    Devuelve (R2, model, meta_model).
    """
    # ---------- 1) main model ----------
    X_main = clustered.drop(columns=['labels', *meta.columns[meta.columns.str.contains('_meta_feature')]])
    y_main = clustered['labels'].astype('int16')

    # ---------- 2) meta‑model ----------
    X_meta = meta.loc[:, meta.columns.str.contains('_meta_feature')]
    y_meta = meta['clusters'].astype('int16')
    # 3) Split aleatorio (70/30)
    train_X, test_X, train_y, test_y = train_test_split(
        X_main, y_main, train_size=0.7, shuffle=True)
    train_X_m, test_X_m, train_y_m, test_y_m = train_test_split(
        X_meta, y_meta, train_size=0.7, shuffle=True)
    # debug
    # common_index = X_main.index[0]
    # display(X_main.loc[[common_index]])
    # display(X_meta.loc[[common_index]])
    # 4) Hiper‑parámetros CatBoost (con valores por defecto + overrides)
    cat_main_params = dict(
        iterations=hp.get('cat_main_iterations', 500),
        depth=hp.get('cat_main_depth', 6),
        learning_rate=hp.get('cat_main_learning_rate', 0.15),
        l2_leaf_reg=hp.get('cat_main_l2_leaf_reg', 3.0),
        custom_loss=['Accuracy'],
        eval_metric='Accuracy',
        use_best_model=True,
        verbose=False,
        thread_count=-1,
        task_type='CPU',
    )
    model = CatBoostClassifier(**cat_main_params)
    model.fit(train_X, train_y, eval_set=(test_X, test_y), early_stopping_rounds=25)

    cat_meta_params = dict(
        iterations=hp.get('cat_meta_iterations', 500),
        depth=hp.get('cat_meta_depth', 6),
        learning_rate=hp.get('cat_meta_learning_rate', 0.15),
        l2_leaf_reg=hp.get('cat_meta_l2_leaf_reg', 3.0),
        custom_loss=['F1'],
        eval_metric='F1',
        use_best_model=True,
        verbose=False,
        thread_count=-1,
        task_type='CPU',
    )
    meta_model = CatBoostClassifier(**cat_meta_params)
    meta_model.fit(train_X_m, train_y_m, eval_set=(test_X_m, test_y_m), early_stopping_rounds=15)

    # 5) Evaluación en datos fuera de muestra
    R2 = test_model_one_direction(
        oos_data,
        [model, meta_model],
        hp['full forward'],
        hp['forward'],
        hp['markup'],
        hp['direction'],
        plt=False,
    )
    if math.isnan(R2):
        R2 = -1.0
    return R2, model, meta_model

# ----------------------------------------------------------------------------
#      ─── FUNCIÓN OBJETIVO PARA OPTUNA ───
# ----------------------------------------------------------------------------

def objective(trial: optuna.trial.Trial, base_hp: Dict[str, Any], study=None) -> float:
    hp = base_hp.copy()

    # µ··· Espacio de búsqueda ···µ
    hp['n_clusters']               = trial.suggest_int('n_clusters', 5, 60, step=5)
    hp['window_size']              = trial.suggest_int('window_size', 100, 500, step=10)
    hp['label_min']                = trial.suggest_int('label_min', 1, 5)
    hp['label_max']                = trial.suggest_int('label_max', hp['label_min']+5, 30)
    hp['markup']                   = trial.suggest_float("markup", 0.05, 0.3)
    # CatBoost (main)
    hp['cat_main_iterations']      = trial.suggest_int('cat_main_iterations', 100, 1000, step=100)
    hp['cat_main_depth']           = trial.suggest_int('cat_main_depth', 4, 10)
    hp['cat_main_learning_rate']   = trial.suggest_float('cat_main_learning_rate', 0.01, 0.3, log=True)
    hp['cat_main_l2_leaf_reg']     = trial.suggest_float('cat_main_l2_leaf_reg', 1.0, 7.0)
    # CatBoost (meta)
    hp['cat_meta_iterations']      = trial.suggest_int('cat_meta_iterations', 100, 500, step=100)
    hp['cat_meta_depth']           = trial.suggest_int('cat_meta_depth', 4, 8)
    hp['cat_meta_learning_rate']   = trial.suggest_float('cat_meta_learning_rate', 0.03, 0.2, log=True)
    hp['cat_meta_l2_leaf_reg']     = trial.suggest_float('cat_meta_l2_leaf_reg', 1.0, 5.0)
    
    # 🔁 Períodos aleatorios
    # all_periods = list(range(5, 301, 5))
    # random_period_candidates = sorted(random.sample(all_periods, k=trial.suggest_int("n_periods", 5, 12)))
    # hp['periods'] = [p for p in random_period_candidates if trial.suggest_categorical(f"use_period_{p}", [True, False])]
    # if len(hp['periods']) == 0:
    #     return -np.inf
    
    # 📊 Selección de estadísticas
    stat_choices = ["std", "skew", "kurt", "zscore", "mean", "range", "median", "mad", "var", "entropy", "slope"]
    selected_stats = [s for s in stat_choices if trial.suggest_categorical(f"use_stat_{s}", [True, False])]
    if len(selected_stats) == 0:
        return -np.inf
    hp["stats"] = selected_stats

    # Dataset completo
    full_ds = get_features(get_prices(hp), hp)
    ds_train = full_ds[(full_ds.index > hp['backward']) & (full_ds.index < hp['forward'])]
    ds_oos   = full_ds[(full_ds.index >= hp['forward']) & (full_ds.index < hp['full forward'])]
    
    # Clustering con ventana deslizante
    data = sliding_window_clustering(
        ds_train,
        n_clusters=hp['n_clusters'],
        window_size=hp['window_size']
    )
    
    best_R2 = -math.inf
    for clust in np.sort(data['clusters'].unique()):
        clustered_data = data[data['clusters'] == clust].copy()
        if len(clustered_data) < 500:
            continue

        clustered_data = get_labels_one_direction(
            clustered_data,
            markup    = hp['markup'],
            min       = hp['label_min'],
            max       = hp['label_max'],
            direction = hp['direction'])

        clustered_data = clustered_data.drop(['close', 'clusters'], axis=1)
        meta_data = data.copy()
        meta_data['clusters'] = (meta_data['clusters'] == clust).astype(int)

        R2, model, meta_model = fit_final_models(
            clustered_data,
            meta_data.drop(['close'], axis=1),
            ds_oos,
            hp
        )

        if R2 < 1.0 and R2 > best_R2:
            best_R2 = R2
            best_pack = (model, meta_model)
            
            # Solo guardar si este R2 es mejor que cualquier guardado antes
            if study is not None:
                prev_best = study.user_attrs.get("best_r2", -np.inf)
                if best_R2 > prev_best:
                    study.set_user_attr("best_model", best_pack[0])
                    study.set_user_attr("best_meta_model", best_pack[1])
                    study.set_user_attr("best_r2", best_R2)
                    study.set_user_attr("best_stats", hp["stats"])
                    study.set_user_attr("best_periods", hp["periods"])
                    study.set_user_attr("best_periods_meta", hp["periods_meta"])

    return best_R2

# ----------------------------------------------------------------------------
#                 ─── PIPELINE DE OPTIMIZACIÓN + EXPORT ───
# ----------------------------------------------------------------------------

def optimize_and_export(symbol, timeframe, model_number, n_trials):
    """Lanza Optuna, guarda el mejor modelo y lo exporta a ONNX."""

    common_file_folder = r"/mnt/c/Users/Administrador/AppData/Roaming/MetaQuotes/Terminal/Common/Files/"
    mql5_files_folder = r'/mnt/c/Users/Administrador/AppData/Roaming/MetaQuotes/Terminal/6C3C6A11D1C3791DD4DBF45421BF8028/MQL5/Files/'
    mql5_include_folder = r'/mnt/c/Users/Administrador/AppData/Roaming/MetaQuotes/Terminal/6C3C6A11D1C3791DD4DBF45421BF8028/MQL5/Include/ajmtrz/include/Dmitrievsky'

    base_hp: Dict[str, Any] = {
        'symbol': symbol,
        'timeframe': timeframe,
        'models_export_path': mql5_files_folder,
        'include_export_path': mql5_include_folder,
        'history_path': common_file_folder,
        'best_models': [],
        'stats': [],
        'model_number': model_number,
        'markup': 0.20,
        'label_min'  : 1,
        'label_max'  : 15,
        'direction': 'buy',
        'n_clusters': 30,
        'window_size': 350,
        'periods': [i for i in range(5, 300, 30)],
        'periods_meta': [5],
        'backward': datetime(2020, 3, 26),
        'forward': datetime(2024, 1, 1),
        'full forward': datetime(2026, 1, 1),
    }

    study = optuna.create_study(direction='maximize')
    study.optimize(lambda t: objective(t, base_hp, study), n_trials=n_trials, show_progress_bar=True)


    print("\n┌───────────────────────────────────────────────┐")
    print("│      MEJOR RESULTADO = {:.4f}                 │".format(study.best_value))
    print("└───────────────────────────────────────────────┘\n")
    print("Parámetros óptimos:\n", study.best_params)

    # Recuperar el mejor modelo y meta‑modelo
    base_hp.update(study.best_params)
    model      = study.user_attrs["best_model"]
    meta_model = study.user_attrs["best_meta_model"]
    best_r2    = study.user_attrs["best_r2"]
    base_hp['stats'] = study.user_attrs["best_stats"]
    base_hp['periods'] = study.user_attrs["best_periods"]
    base_hp['periods_meta'] = study.user_attrs["best_periods_meta"]
    base_hp.pop('best_models', None)
    print("Exportando modelos ONNX… R2 = {:.4f}".format(best_r2))
    export_model_to_ONNX(best_models=[model, meta_model], **base_hp)

if __name__ == "__main__":
    for i in range(10, 11):
        optimize_and_export('XAUUSD', 'H1', i, n_trials=15)

[I 2025-04-20 15:59:25,121] A new study created in memory with name: no-name-49adaee3-bed2-4110-9df4-073d4876e0c0


  0%|          | 0/15 [00:00<?, ?it/s]

[I 2025-04-20 15:59:31,762] Trial 0 finished with value: 0.9542523867020981 and parameters: {'n_clusters': 15, 'window_size': 450, 'label_min': 5, 'label_max': 22, 'markup': 0.26459910029550193, 'cat_main_iterations': 600, 'cat_main_depth': 5, 'cat_main_learning_rate': 0.158734945932034, 'cat_main_l2_leaf_reg': 6.101452446324627, 'cat_meta_iterations': 200, 'cat_meta_depth': 8, 'cat_meta_learning_rate': 0.049851031717072905, 'cat_meta_l2_leaf_reg': 1.8444094162138387, 'use_stat_std': False, 'use_stat_skew': False, 'use_stat_kurt': True, 'use_stat_zscore': False, 'use_stat_mean': False, 'use_stat_range': False, 'use_stat_median': False, 'use_stat_mad': False, 'use_stat_var': True, 'use_stat_entropy': False, 'use_stat_slope': True}. Best is trial 0 with value: 0.9542523867020981.
[I 2025-04-20 15:59:34,923] Trial 1 finished with value: 0.8583393063753539 and parameters: {'n_clusters': 10, 'window_size': 350, 'label_min': 2, 'label_max': 24, 'markup': 0.1158476497816509, 'cat_main_iterati