In [17]:
import pandas as pd
import numpy as np

def dc_backtest(ticks: pd.DataFrame, theta: float, Y: float, volume: int = 1000):
    """
    Simple Python reimplementation of your DC strategy.
    ticks: DataFrame with columns ['timestamp','bid','ask'].
    theta: DC threshold in percent.
    Y: scaling factor.
    Returns: total PnL (simplified: ask-bid midpoint, 1 unit per trade).
    """
    price_series = (ticks['bid'] + ticks['ask']) / 2
    last_extreme = price_series.iloc[0]
    looking_for_upturn = True
    entry_price = None
    max_overshoot = 0.0
    pnl = 0.0

    for price in price_series:
        if looking_for_upturn:
            if (price - last_extreme) / last_extreme * 100 >= theta:
                entry_price = price
                max_overshoot = 0.0
                looking_for_upturn = False
                last_extreme = price
            elif price < last_extreme:
                last_extreme = price
        else:
            if price > last_extreme:
                overshoot = (price - last_extreme) / last_extreme * 100
                max_overshoot = max(max_overshoot, overshoot)
                last_extreme = price

            scaled_threshold = theta * Y * np.exp(-max_overshoot)
            if (last_extreme - price) / last_extreme * 100 >= scaled_threshold:
                if entry_price is not None:
                    pnl += (price - entry_price) * volume
                    entry_price = None
                looking_for_upturn = True
                last_extreme = price
    return pnl

# Example usage with Bayesian optimization:
from skopt import gp_minimize
from skopt.space import Real
from skopt.utils import use_named_args

# Load your tick data here
ticks = pd.read_csv('eurusd_ticks.csv')  # columns: timestamp,bid,ask

space = [
    Real(0.1, 1.0, name='theta'),  # adjust bounds as needed
    Real(0.3, 0.9, name='Y'),
]

@use_named_args(space)
def objective(theta, Y):
    return -dc_backtest(ticks, theta, Y)  # negative PnL for minimization

res = gp_minimize(objective, space, n_calls=25, n_initial_points=5, random_state=42)
print("Best θ,Y:", res.x, "Best PnL:", -res.fun)


Best θ,Y: [0.6406946334587516, 0.7144225777696879] Best PnL: 87.16499999999992


In [18]:
import pandas as pd
import numpy as np
import xgboost as xgb
from sklearn.model_selection import train_test_split
from skopt import gp_minimize
from skopt.space import Real, Integer
from skopt.utils import use_named_args

# -----------------------------
# Step 1: Feature + Label Creation
# -----------------------------
def create_dc_features(ticks, theta, Y, lookahead=20):
    price_series = (ticks['bid'] + ticks['ask']) / 2
    last_extreme = price_series.iloc[0]
    looking_for_upturn = True
    max_overshoot = 0.0

    features = []
    labels = []

    for i, price in enumerate(price_series):
        feature = [price, last_extreme, (price - last_extreme)/last_extreme*100, max_overshoot]

        if looking_for_upturn:
            if (price - last_extreme)/last_extreme*100 >= theta:
                features.append(feature)
                future_prices = price_series[i+1:i+1+lookahead]
                pnl = future_prices.max() - price if len(future_prices) > 0 else 0
                labels.append(1 if pnl > 0 else 0)
                looking_for_upturn = False
                last_extreme = price
                max_overshoot = 0.0
            elif price < last_extreme:
                last_extreme = price
        else:
            if price > last_extreme:
                overshoot = (price - last_extreme)/last_extreme*100
                max_overshoot = max(max_overshoot, overshoot)
                last_extreme = price

            scaled_threshold = theta * Y * np.exp(-max_overshoot)
            if (last_extreme - price)/last_extreme*100 >= scaled_threshold:
                looking_for_upturn = True
                last_extreme = price
                max_overshoot = 0.0

    return np.array(features), np.array(labels)

# -----------------------------
# Step 2: Meta-labeled DC Backtest
# -----------------------------
def dc_meta_backtest(ticks, theta, Y, clf, volume=1000):
    price_series = (ticks['bid'] + ticks['ask']) / 2
    last_extreme = price_series.iloc[0]
    looking_for_upturn = True
    entry_price = None
    max_overshoot = 0.0
    pnl = 0.0

    for i, price in enumerate(price_series):
        feature = [price, last_extreme, (price - last_extreme)/last_extreme*100, max_overshoot]

        if looking_for_upturn:
            if (price - last_extreme)/last_extreme*100 >= theta:
                pred = clf.predict([feature])[0]
                if pred == 1:
                    entry_price = price
                looking_for_upturn = False
                last_extreme = price
                max_overshoot = 0.0
            elif price < last_extreme:
                last_extreme = price
        else:
            if price > last_extreme:
                overshoot = (price - last_extreme)/last_extreme*100
                max_overshoot = max(max_overshoot, overshoot)
                last_extreme = price

            scaled_threshold = theta * Y * np.exp(-max_overshoot)
            if (last_extreme - price)/last_extreme*100 >= scaled_threshold:
                if entry_price is not None:
                    pnl += (price - entry_price) * volume
                    entry_price = None
                looking_for_upturn = True
                last_extreme = price
                max_overshoot = 0.0

    return pnl

# -----------------------------
# Step 3: Load tick data
# -----------------------------
ticks = pd.read_csv('eurusd_ticks.csv')  # timestamp,bid,ask

# -----------------------------
# Step 4: Bayesian Optimization
# -----------------------------
space = [
    Real(0.1, 1.0, name='theta'),    # DC threshold
    Real(0.3, 0.9, name='Y'),        # scaling factor
    Integer(50, 300, name='n_estimators'),  # XGBoost param
    Integer(2, 5, name='max_depth'),        # XGBoost param
]

@use_named_args(space)
def objective(theta, Y, n_estimators, max_depth):
    X, y = create_dc_features(ticks, theta, Y)
    if len(y) < 10:
        return 1e6  # avoid tiny datasets
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    clf = xgb.XGBClassifier(
        n_estimators=n_estimators,
        max_depth=max_depth,
        learning_rate=0.1,
        use_label_encoder=False,
        eval_metric='logloss',
        random_state=42
    )
    clf.fit(X_train, y_train)
    pnl = dc_meta_backtest(ticks, theta, Y, clf)
    return -pnl  # maximize PnL

res = gp_minimize(objective, space, n_calls=25, n_initial_points=5, random_state=42)

print("Best parameters:")
print("θ (DC threshold):", res.x[0])
print("Y (scaling factor):", res.x[1])
print("n_estimators:", res.x[2])
print("max_depth:", res.x[3])
print("Best PnL:", -res.fun)


Parameters: { "use_label_encoder" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)
Parameters: { "use_label_encoder" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)
Parameters: { "use_label_encoder" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)
Parameters: { "use_label_encoder" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)
Parameters: { "use_label_encoder" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)
Parameters: { "use_label_encoder" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)
Parameters: { "use_label_encoder" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)
Parameters: { "use_label_encoder" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)
Parameters: { "use_label_encoder" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)
Parameters: { "use_label_encoder" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)
Parameters: { "use_label_encoder" } are not used.


Best parameters:
θ (DC threshold): 0.6043167766179658
Y (scaling factor): 0.7496517032407144
n_estimators: 185
max_depth: 2
Best PnL: 78.70000000000155
