In [73]:
import logging
import subprocess
import time
import os
import csv
import optuna
import pandas as pd

ASSET_GROUPS = {
    1: {
        "SP": {"p_val": 50, "tr_cost": 12},
        "NQ": {"p_val": 20, "tr_cost": 12},
    },
    2: {
        "CAD": {"p_val": 100000, "tr_cost": 10},
        "AUD": {"p_val": 100000, "tr_cost": 10},
        "XAU": {"p_val": 100, "tr_cost": 15},
        "XAG": {"p_val": 5000, "tr_cost": 10},
    },
}
logger = logging.getLogger()

def ensure_directory(path):
    """
    Ensure that a directory exists at the specified path.
    If the directory does not exist, it will be created.
    """
    if not os.path.isdir(path):
        os.makedirs(path, exist_ok=True)
        logger.info(f"Directory created: {path}")


def save_params(params, path, file="last_params"):
    """
    Save hyperparameters to a CSV file.
    """
    ensure_directory(path)
    with open(path+"/"+file+".csv", mode='w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(params.keys())
        writer.writerow(params.values())


def load_scores(params, results_path, stat_name = 'stats_fl'):
    """
    Load and return the mean of 'stats_fl' scores from a CSV file based on given parameters.

    Args:
        params (dict): A dictionary of parameters to filter the CSV file. The keys are the column names,
                       and the values are the values to filter by. The values can be of type int, float, or str.
        results_path (str): The path to the directory containing the 'results.csv' file.

    Returns:
        float: The mean of the 'stats_fl' column for the filtered rows.

    Raises:
        FileNotFoundError: If the 'results.csv' file does not exist in the specified directory.
        pd.errors.EmptyDataError: If the CSV file is empty or only contains headers.
        KeyError: If the specified columns in params do not exist in the CSV file.
    """
    query_conditions = []

    for key, value in params.items():
        if isinstance(value, int):
            query_conditions.append(f"{key} == {value}")
        elif isinstance(value, float):
            query_conditions.append(f"{key} == {round(value, 5)}")
        elif isinstance(value, str):
            query_conditions.append(f"{key} == '{value}'")

    query = " and ".join(query_conditions)
    file_path = results_path + "/results.csv"

    while True:
        tmp = pd.read_csv(file_path).query(query)
        if not tmp.empty:
            if not tmp[stat_name].empty:
                return tmp[stat_name].fillna(0).mean()
            else:
                logger.error("Empty %s column in %s", stat_name, file_path)
        time.sleep(0.1)

def optimize_strategy(n_trials,
                      param_space_func,
                      study_name,
                      r_script_path,
                      instrument_name,
                      group_data,
                      stat_name = 'stats_fl',
                      db_path="sqlite:///results/optuna_results.db"):

    logger.info("Optimizing %s for %s", study_name, instrument_name)
    params_path = f"results/{study_name}"
    results_path = f"results/{study_name}"

    params_dataset = {
        "instrument_name": instrument_name,
        "group_data": group_data,
        **ASSET_GROUPS[group_data][instrument_name]}

    def objective(trial):

        params_strat = param_space_func(trial)

        logger.info("Params: %s", params_strat)
        params_strat.update(params_dataset)
        save_params(params_strat, params_path)
        subprocess.check_call(["Rscript", r_script_path], shell=False,
                              stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
                              )
        score = load_scores(params_strat, results_path, stat_name)
        return score

    study = optuna.create_study(direction='maximize',
                                study_name=study_name + "_" +
                                params_dataset['instrument_name'],
                                storage=db_path, load_if_exists=True)
    study.optimize(objective, n_trials=n_trials)
    best_param = study.best_params
    best_param.update(params_dataset)
    logger.ingo("Best params: %s", best_param)
    save_params(best_param, results_path, "best_params_" +
                params_dataset["instrument_name"])
    return best_param

def param_space_ma(trial):
    return {
        "fast_ma": trial.suggest_int("fast_ma", 1, 100),
        "ma_diff": trial.suggest_int("ma_diff", 1, 100),
        "signal_estimator": trial.suggest_categorical("signal_estimator", ["mean", "median"]),
        "window_regime": trial.suggest_int("window_regime", 2, 100),
        "treshold_regime": trial.suggest_float("treshold_regime", 0, 0.6, step=1e-05)
    }

def param_space_vb(trial):
    return {
        "fast_ma": trial.suggest_int("fast_ma", 10, 100),
        "ma_diff": trial.suggest_int("ma_diff", 1, 100),
        "signal_estimator": trial.suggest_categorical("signal_estimator", ["mean", "median"]),
        "window_regime": trial.suggest_int("window_regime", 2, 100),
        "treshold_regime": trial.suggest_float("treshold_regime", 0, 0.6, step=1e-05),
        "volat_param": trial.suggest_int("volat_param", 2, 100),
        "m_": trial.suggest_float("treshold_vol", 0.5, 3, step=1e-05)
    }

def param_space_2vb(trial):
    return {
        "fast_ma": trial.suggest_int("fast_ma", 10, 100),
        "ma_diff": trial.suggest_int("ma_diff", 1, 100),
        "signal_estimator": trial.suggest_categorical("signal_estimator", ["mean", "median"]),
        "window_regime": trial.suggest_int("window_regime", 2, 100),
        "treshold_regime": trial.suggest_float("treshold_regime", 0, 0.6, step=1e-05),
        "volat_param": trial.suggest_int("volat_param", 1, 100),
        "m_exit": trial.suggest_float("m_exit", 0.5, 3, step=1e-05),
        "m_diff": trial.suggest_float("m_diff", 0.1, 3, step=1e-05),

    }

In [75]:
n_trials = 3

study_dict = {
    'Moving_Average_Strategy': ["scripts/run_strat_ma.R", param_space_ma],
    'Volatility_Breakout_Strategy': ["scripts/run_strat_vb.R", param_space_vb],
    'Volatility_Breakout_Double_Strategy': ["scripts/run_strat_2vb.R", param_space_2vb],
}

for study_name, [r_script_path, param_space_func] in study_dict.items():
    for group in ASSET_GROUPS.keys():
        for asset in ASSET_GROUPS[group].keys():
            best_params = optimize_strategy(n_trials=n_trials,
                                            param_space_func=param_space_func,
                                            study_name=study_name,
                                            r_script_path=r_script_path,
                                            instrument_name="NQ",
                                            group_data=1,
                                            )

Moving_Average_Strategy scripts/run_strat_ma.R <function param_space_ma at 0x169c8c860>
Volatility_Breakout_Strategy scripts/run_strat_vb.R <function param_space_vb at 0x169c8e840>
Volatility_Breakout_Double_Strategy scripts/run_strat_2vb.R <function param_space_2vb at 0x169c8f420>


In [98]:
results = pd.read_csv("results/Volatility_Breakout_Double_Strategy/results.csv")

In [99]:
results.sort_values("stats_mm",ascending=False).head(10)

Unnamed: 0,selected_quarter,fast_ma,ma_diff,signal_estimator,window_regime,treshold_regime,volat_param,m_exit,m_diff,instrument_name,group_data,p_val,tr_cost,stats_mm,stats_mr,stats_fl
287,2022_Q1,91,57,mean,30,0.31998,59,2.45504,2.30502,SP,1,50,12,inf,-3.275983,inf
498,2022_Q3,83,36,mean,87,0.31273,53,2.44697,1.96947,NQ,1,20,12,inf,-6.53836,inf
499,2022_Q4,83,36,mean,87,0.31273,53,2.44697,1.96947,NQ,1,20,12,inf,-0.509347,inf
218,2022_Q3,90,56,mean,30,0.30741,62,2.47328,2.67091,SP,1,50,12,inf,-3.896617,inf
288,2022_Q3,91,57,mean,30,0.31998,59,2.45504,2.30502,SP,1,50,12,inf,-4.022893,inf
221,2023_Q4,90,56,mean,30,0.30741,62,2.47328,2.67091,SP,1,50,12,inf,-0.846684,inf
175,2022_Q1,93,56,mean,34,0.30345,50,2.71998,2.69663,SP,1,50,12,inf,-1.391649,inf
252,2022_Q1,96,74,mean,30,0.31056,68,2.08855,2.2235,SP,1,50,12,inf,-0.039179,inf
154,2022_Q1,84,36,mean,6,0.28793,61,2.7202,0.44376,SP,1,50,12,inf,-0.412514,
295,2022_Q3,81,48,mean,28,0.29265,56,2.30912,2.57936,SP,1,50,12,inf,-4.161402,inf


In [102]:
results["stats_mm"].mean().max()

inf

In [100]:
results[["stats_mm","stats_mr","stats_fl"]].mean().max()

inf

In [61]:
results[results['stats_mm'].isna()]

Unnamed: 0,selected_quarter,fast_ma,ma_diff,signal_estimator,window_regime,treshold_regime,instrument_name,group_data,p_val,tr_cost,volat_param,m_,stats_mm,stats_mr,stats_fl
35,2022_Q1,39,18,median,22,0.2158,NQ,1,20,12,33,2.3768,,,
36,2022_Q3,39,18,median,22,0.2158,NQ,1,20,12,33,2.3768,,,
37,2022_Q4,39,18,median,22,0.2158,NQ,1,20,12,33,2.3768,,,
39,2023_Q4,39,18,median,22,0.2158,NQ,1,20,12,33,2.3768,,,
40,2024_Q1,39,18,median,22,0.2158,NQ,1,20,12,33,2.3768,,,
41,2024_Q2,39,18,median,22,0.2158,NQ,1,20,12,33,2.3768,,,
63,2022_Q1,33,30,median,69,0.55395,AUD,2,100000,10,88,2.19372,,,
64,2022_Q3,33,30,median,69,0.55395,AUD,2,100000,10,88,2.19372,,,
67,2023_Q4,33,30,median,69,0.55395,AUD,2,100000,10,88,2.19372,,,
68,2024_Q1,33,30,median,69,0.55395,AUD,2,100000,10,88,2.19372,,,


In [105]:
import sqlite3
conn = sqlite3.connect("results/optuna_results.db")
print(pd.read_sql_query("SELECT name FROM sqlite_master WHERE type='table'", conn))
data = pd.read_sql_query("SELECT * FROM trial_values val LEFT JOIN trial_params par on val.trial_id = par.trial_id", conn)

                         name
0                     studies
1                version_info
2            study_directions
3       study_user_attributes
4     study_system_attributes
5                      trials
6       trial_user_attributes
7     trial_system_attributes
8                trial_params
9                trial_values
10  trial_intermediate_values
11           trial_heartbeats
12            alembic_version


In [106]:
data

Unnamed: 0,trial_value_id,trial_id,objective,value,value_type,param_id,trial_id.1,param_name,param_value,distribution_json
0,1,1,0,-86.722274,FINITE,1,1,fast_ma,33.00000,"{""name"": ""IntDistribution"", ""attributes"": {""lo..."
1,1,1,0,-86.722274,FINITE,2,1,ma_diff,77.00000,"{""name"": ""IntDistribution"", ""attributes"": {""lo..."
2,1,1,0,-86.722274,FINITE,3,1,signal_estimator,1.00000,"{""name"": ""CategoricalDistribution"", ""attribute..."
3,1,1,0,-86.722274,FINITE,5,1,treshold_regime,0.15954,"{""name"": ""FloatDistribution"", ""attributes"": {""..."
4,1,1,0,-86.722274,FINITE,4,1,window_regime,9.00000,"{""name"": ""IntDistribution"", ""attributes"": {""lo..."
...,...,...,...,...,...,...,...,...,...,...
5228,799,803,0,3.696711,FINITE,5251,803,signal_estimator,1.00000,"{""name"": ""CategoricalDistribution"", ""attribute..."
5229,799,803,0,3.696711,FINITE,5253,803,treshold_regime,0.37770,"{""name"": ""FloatDistribution"", ""attributes"": {""..."
5230,799,803,0,3.696711,FINITE,5255,803,treshold_vol,1.06833,"{""name"": ""FloatDistribution"", ""attributes"": {""..."
5231,799,803,0,3.696711,FINITE,5254,803,volat_param,62.00000,"{""name"": ""IntDistribution"", ""attributes"": {""lo..."


In [35]:
import logging
import subprocess
import time
import os
import csv
import optuna
import pandas as pd

ASSET_GROUPS = {
    1: {
        "SP": {"p_val": 50, "tr_cost": 12},
        "NQ": {"p_val": 20, "tr_cost": 12},
    },
    2: {
        "CAD": {"p_val": 100000, "tr_cost": 10},
        "AUD": {"p_val": 100000, "tr_cost": 10},
        "XAU": {"p_val": 100, "tr_cost": 15},
        "XAG": {"p_val": 5000, "tr_cost": 10},
    },
}
logger = logging.getLogger()

def ensure_directory(path):
    """
    Ensure that a directory exists at the specified path.
    If the directory does not exist, it will be created.
    """
    if not os.path.isdir(path):
        os.makedirs(path, exist_ok=True)
        print(f"Directory created: {path}")


def save_params(params, path, file="last_params"):
    """
    Save hyperparameters to a CSV file.
    """
    ensure_directory(path)
    with open(path+"/"+file+".csv", mode='w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(params.keys())
        writer.writerow(params.values())


def load_scores(params, results_path, stat_name = 'stats_fl'):
    """
    Load and return the mean of 'stats_fl' scores from a CSV file based on given parameters.

    Args:
        params (dict): A dictionary of parameters to filter the CSV file. The keys are the column names,
                       and the values are the values to filter by. The values can be of type int, float, or str.
        results_path (str): The path to the directory containing the 'results.csv' file.

    Returns:
        float: The mean of the 'stats_fl' column for the filtered rows.

    Raises:
        FileNotFoundError: If the 'results.csv' file does not exist in the specified directory.
        pd.errors.EmptyDataError: If the CSV file is empty or only contains headers.
        KeyError: If the specified columns in params do not exist in the CSV file.
    """
    query_conditions = []

    for key, value in params.items():
        if isinstance(value, int):
            query_conditions.append(f"{key} == {value}")
        elif isinstance(value, float):
            query_conditions.append(f"{key} == {round(value, 5)}")
        elif isinstance(value, str):
            query_conditions.append(f"{key} == '{value}'")

    query = " and ".join(query_conditions)
    file_path = results_path + "/results.csv"

    while True:
        tmp = pd.read_csv(file_path).query(query)
        if not tmp.empty:
            if not tmp[stat_name].empty:
                return tmp[stat_name].mean()
            else:
                logger.error("Empty %s column in %s", stat_name, file_path)
        time.sleep(0.1)

def constraints(trial):
    return [trial.user_attrs["constraint"]]


def optimize_strategy(n_trials,
                      param_space_func,
                      constraints_func,
                      study_name,
                      r_script_path,
                      instrument_name,
                      group_data,
                      db_path="sqlite:///results/optuna_results.db"):

    logger.info("Optimizing %s for %s", study_name, instrument_name)
    params_path = f"results/{study_name}"
    results_path = f"results/{study_name}"

    params_dataset = {
        "instrument_name": instrument_name,
        "group_data": group_data,
        **ASSET_GROUPS[group_data][instrument_name]}

    def objective(trial):

        params_strat = param_space_func(trial)
        # Make sure that fast_ma is always greater than slow_ma
        if constraints_func:
            cons = constraints_func(params_strat)
            trial.set_user_attr("constraint", cons)

        logger.info("Params: %s", params_strat)
        params_strat.update(params_dataset)
        save_params(params_strat, params_path)
        subprocess.check_call(["Rscript", r_script_path], shell=False,
                              stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
                              )
        score = load_scores(params_strat, results_path)
        return score

    sampler = optuna.samplers.NSGAIISampler(constraints_func=constraints)
    study = optuna.create_study(direction='maximize', sampler=sampler,
                                study_name=study_name + "_" +
                                params_dataset['instrument_name'],
                                storage=db_path, load_if_exists=True)
    study.optimize(objective, n_trials=n_trials)
    best_param = study.best_params
    best_param.update(params_dataset)
    save_params(best_param, results_path, "best_params_" +
                params_dataset["instrument_name"])
    return best_param

def constraints_ma(params):
    return params["fast_ma"] - params["slow_ma"]

def param_space_ma(trial):
    return {
        "slow_ma": trial.suggest_int("slow_ma", 10, 100),
        "fast_ma": trial.suggest_int("fast_ma", 1, 100),
        "signal_estimator": trial.suggest_categorical("signal_estimator", ["mean", "median"]),
        "window_regime": trial.suggest_int("window_regime", 2, 100),
        "treshold_regime": trial.suggest_float("treshold_regime", 0, 0.6, step=1e-05)
    }


In [36]:
study_name = 'Moving_Average_Strategy'
r_script_path = "helpers/run_strat_ma.R"
best_params = optimize_strategy(n_trials=3,
                                param_space_func=param_space_ma,
                                constraints_func=constraints_ma,
                                study_name=study_name,
                                r_script_path=r_script_path,
                                instrument_name="NQ",
                                group_data=1,
                                )

[I 2025-01-18 19:56:25,329] Using an existing study with name 'Moving_Average_Strategy_NQ' instead of creating a new one.


Directory created: results/Moving_Average_Strategy


[W 2025-01-18 19:56:26,064] Trial 2 failed with parameters: {'slow_ma': 92, 'fast_ma': 73, 'signal_estimator': 'median', 'window_regime': 84, 'treshold_regime': 0.31177000000000005} because of the following error: CalledProcessError(1, ['Rscript', 'helpers/run_strat_ma.R']).
Traceback (most recent call last):
  File "/Users/anfrejter/anaconda3/lib/python3.11/site-packages/optuna/study/_optimize.py", line 197, in _run_trial
    value_or_values = func(trial)
                      ^^^^^^^^^^^
  File "/var/folders/m4/j_mwks0x04ncx2w9srg4t8qh0000gn/T/ipykernel_9483/1094373641.py", line 116, in objective
    subprocess.check_call(["Rscript", r_script_path], shell=False,
  File "/Users/anfrejter/anaconda3/lib/python3.11/subprocess.py", line 413, in check_call
    raise CalledProcessError(retcode, cmd)
subprocess.CalledProcessError: Command '['Rscript', 'helpers/run_strat_ma.R']' returned non-zero exit status 1.
[W 2025-01-18 19:56:26,065] Trial 2 failed with value None.


CalledProcessError: Command '['Rscript', 'helpers/run_strat_ma.R']' returned non-zero exit status 1.