In [1]:
# @title Import Libraries
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.lines as mlines # Needed for custom legend handles
import seaborn as sns             # Needed for plotting
import os
from joblib import Parallel, delayed # <<< ADD FOR PARALLELISM
from autogluon.timeseries import TimeSeriesDataFrame, TimeSeriesPredictor
from autogluon.common import space
import json
import numpy as np
import math
from IPython.display import display
import time
import shutil
import gc
import pyarrow
import warnings

# Ignore the specific FutureWarning related to TimeSeriesScorer prediction_length
warnings.filterwarnings(
    "ignore",
    message="Passing `prediction_length` to `TimeSeriesScorer.__call__` is deprecated.*", # Match start of message
    category=FutureWarning,
    module="autogluon.timeseries.metrics.abstract" # Be specific about the source module
)
print("FutureWarning for TimeSeriesScorer prediction_length suppressed.")

# Also ignore the "path already exists" warning from Predictor init
warnings.filterwarnings(
    "ignore",
    message="path already exists!.*", # Match the start of the warning message
    category=UserWarning # This is often a UserWarning, adjust if necessary
)
print("Predictor path already exists warning suppressed.")



In [2]:
# @title Base Configuration (User Parameters, Paths)

# ----- Define LOCAL Paths -----
# Use raw strings (r"...") for Windows paths
local_base_path = r"G:\My Drive\code_projects\btc_forecast" # <<< UPDATE TO YOUR LOCAL BASE PATH

# --- Directory Paths ---
datasets_dir = os.path.join(local_base_path, 'datasets')     # Directory containing .feather files
results_base_dir = os.path.join(local_base_path, 'results') # Top-level results directory

# --- Parallelism Control ---
num_concurrent_runs = 15 # <<< SET MAX NUMBER OF PARALLEL ASSET RUNS
print(f"Setting max concurrent runs: {num_concurrent_runs}")

# --- Frequency & Date for Results ---
frequency = '15m' # Explicitly set frequency for filtering files and naming
date_today = pd.Timestamp.now().strftime("%Y%m%d") # Use YYYYMMDD format
results_dir_timedated = os.path.join(results_base_dir, f"{frequency}_{date_today}") # e.g., results/15m_20250506

try:
    os.makedirs(results_dir_timedated, exist_ok=True)
    print(f"Base results directory for this run: {results_dir_timedated}")
except OSError as e:
     print(f"Warning: Could not create results directory {results_dir_timedated}: {e}")

print(f"Looking for datasets in: '{datasets_dir}'")
print(f"Script frequency set to: '{frequency}'")

# --- Column Names ---
timestamp_col = 'date'
potential_target_cols = ['volume', 'open', 'high', 'low', 'close']
all_value_cols = [timestamp_col] + potential_target_cols

# --- User-defined Experiment Parameters ---
prediction_length = 1
max_train_dur = 2 * (60*60) # Reduced for local testing? Adjust as needed.
desired_windows = 10
backtest_strategy = "rolling"
error_metric = "WQL"
desired_quantiles = [0.05, 0.5, 0.95]

# --- Model Selection ---
full_hp = {"AutoETS": {}}
selected_algorithms = ["AutoETS"]
selected_hp = {alg: full_hp[alg] for alg in selected_algorithms if alg in full_hp}
print("\nSelected Models:", selected_algorithms)

# === Experiment Setup ===
candle_counts_to_test = [100, 250, 500, 750, 1000, 1500, 2000]
targets_to_process = potential_target_cols

# ----- Healthspan Evaluation Parameters -----
enable_healthspan_evaluation = True
health_evaluation_horizons = [50, 100, 250, 500, 750, 1000]  # <<< YOUR DEFINED HORIZONS
# evaluation_chunk_size = 50 # <<< COMMENT OUT OR REMOVE
# max_evaluation_chunks = 20   # <<< COMMENT OUT OR REMOVE
healthspan_threshold_std_devs = 1.5

# --- Pandas Display Options ---
pd.options.display.float_format = '{:.4f}'.format
pd.set_option('display.precision', 0)
pd.set_option('display.max_columns', None)

# --- Store config in a dictionary for passing to parallel function ---
config_params = {
    "datasets_dir": datasets_dir,
    "results_dir_timedated": results_dir_timedated,
    "frequency": frequency,
    "date_today": date_today,
    "timestamp_col": timestamp_col,
    "potential_target_cols": potential_target_cols,
    "all_value_cols": all_value_cols,
    "prediction_length": prediction_length,
    "max_train_dur": max_train_dur,
    "desired_windows": desired_windows,
    "backtest_strategy": backtest_strategy,
    "error_metric": error_metric,
    "desired_quantiles": desired_quantiles,
    "selected_hp": selected_hp,
    "candle_counts_to_test": candle_counts_to_test,
    "targets_to_process": targets_to_process,
    "enable_healthspan_evaluation": enable_healthspan_evaluation,
    "health_evaluation_horizons": health_evaluation_horizons,
    "healthspan_threshold_std_devs": healthspan_threshold_std_devs,
}

print("\nConfiguration set and stored in config_params.")

Setting max concurrent runs: 14
Base results directory for this run: G:\My Drive\code_projects\btc_forecast\results\15m_20250508
Looking for datasets in: 'G:\My Drive\code_projects\btc_forecast\datasets'
Script frequency set to: '15m'

Selected Models: ['AutoETS']

Configuration set and stored in config_params.


In [3]:
# @title Helper Functions (Serializer & Healthspan Calc)

# --- Define serializer function ---
def default_serializer(obj):
    if isinstance(obj, (np.integer, np.floating, np.bool_)): return obj.item()
    elif isinstance(obj, np.ndarray): return obj.tolist()
    elif isinstance(obj, pd.Timestamp): return obj.isoformat()
    elif isinstance(obj, pd.Timedelta):
         if pd.isna(obj): return None # Handle NaT
         return obj.total_seconds()
    elif hasattr(obj, 'get_hyperparameters') :
        try: return obj.get_hyperparameters()
        except Exception: return str(obj)
    elif isinstance(obj, (dict, list, str, int, float, bool, type(None))): return obj
    else: return str(obj)


def calculate_healthspan(wql_scores, horizons, initial_wql, threshold_std_devs): # MODIFIED SIGNATURE
    """ Calculates healthspan based on when WQL crosses a threshold at specific horizons. """
    if not wql_scores or not horizons or len(wql_scores) != len(horizons) or initial_wql is None or pd.isna(initial_wql):
        # print("  [Healthspan Calc] Cannot calculate: Invalid inputs (scores, horizons, or initial_wql).")
        return None, None
    
    # Assuming wql_scores and horizons passed are already aligned and numeric scores are filtered if needed by caller
    valid_scores_list = [float(s) for s in wql_scores if isinstance(s, (int, float, np.number)) and pd.notna(s)]
    if not valid_scores_list: # Should ideally be guaranteed by caller
        # print("  [Healthspan Calc] Cannot calculate: No valid numeric scores found.")
        return None, None

    scores_array = np.array(valid_scores_list) # Use only valid scores for std dev
    wql_std_dev = 0
    if len(scores_array) > 1:
        wql_std_dev = np.std(scores_array)
    # elif len(scores_array) == 1:
        # print("  [Healthspan Calc] Warning: Only 1 valid score for std dev calculation...")

    actual_wql_baseline = -initial_wql # Assuming initial_wql is the 'score_val' (negative WQL)
    threshold = actual_wql_baseline + abs(threshold_std_devs * wql_std_dev) # WQL degrades if it RISES above this
    # print(f"  [Healthspan Calc] Initial Score (-WQL): {initial_wql:.4f}, Actual WQL Baseline: {actual_wql_baseline:.4f}, Std Dev (WQL): {wql_std_dev:.4f}, Threshold (WQL): {threshold:.4f}")

    healthspan_steps = None
    # Iterate through the original wql_scores and their corresponding horizons
    for i, score in enumerate(wql_scores): # wql_scores here is the list of WQL values (not -WQL)
        if pd.isna(score):
            continue
        if float(score) > threshold: # If the actual WQL score exceeds the calculated degradation threshold
            healthspan_steps = horizons[i] # The healthspan is the horizon at which this occurred
            # print(f"  [Healthspan Calc] Threshold crossed at horizon {horizons[i]} (Score: {float(score):.4f}). Estimated healthspan: {healthspan_steps} steps.")
            break
            
    if healthspan_steps is None and wql_scores: # If threshold was never crossed
        # print(f"  [Healthspan Calc] Threshold not crossed within evaluated horizons up to {horizons[-1] if horizons else 'N/A'}.")
        healthspan_steps = horizons[-1] if horizons else None # Report the max horizon evaluated as healthspan

    return healthspan_steps, threshold

In [4]:
# @title File Discovery and Main Processing Loop

# --- DEFINE AND CREATE BASE TEMP MODEL DIRECTORY (ACCESSIBLE GLOBALLY WITHIN THIS CELL) ---
local_temp_models_base = "ag_temp_models_"+frequency # Define it here
try:
    # Clean up this base temp directory from previous full script runs, if it exists
    if os.path.exists(local_temp_models_base):
        print(f"Removing existing base temporary model directory: {local_temp_models_base}")
        shutil.rmtree(local_temp_models_base)
    os.makedirs(local_temp_models_base, exist_ok=True)
    print(f"Ensured base temporary model directory exists: {local_temp_models_base}")
except OSError as e:
    print(f"Warning: Could not create/clean base temporary model directory {local_temp_models_base}: {e}")


# --- Asset Processing Function (for Parallelism) ---
def process_asset(asset_file_path, config):
    """
    Loads, processes, trains, evaluates, and plots results for a single asset file.
    Args:
        asset_file_path (str): Path to the .feather asset file.
        config (dict): Dictionary containing all configuration parameters.
    Returns:
        list: A list of result dictionaries for all targets/candle counts processed
              for this asset, or an empty list if processing fails early.
    """
    # Extract config parameters
    frequency = config["frequency"]
    timestamp_col = config["timestamp_col"]
    potential_target_cols = config["potential_target_cols"]
    all_value_cols = config["all_value_cols"]
    prediction_length = config["prediction_length"]
    max_train_dur = config["max_train_dur"]
    desired_windows = config["desired_windows"]
    backtest_strategy = config["backtest_strategy"]
    error_metric = config["error_metric"] # Should be "WQL" for healthspan logic
    desired_quantiles = config["desired_quantiles"]
    selected_hp = config["selected_hp"]
    candle_counts_to_test = config["candle_counts_to_test"]
    global_targets_to_process = config["targets_to_process"]
    
    # Healthspan related parameters
    enable_healthspan_evaluation = config["enable_healthspan_evaluation"]
    health_evaluation_horizons = config.get("health_evaluation_horizons", []) # NEW
    healthspan_threshold_std_devs = config["healthspan_threshold_std_devs"]
    
    results_dir_timedated = config["results_dir_timedated"]
    date_today = config["date_today"]
    # local_temp_models_base is globally accessible from Cell [4]'s scope

    asset_filename = os.path.basename(asset_file_path)
    asset_results = []

    print(f"--- Starting Processing: {asset_filename} (PID: {os.getpid()}) ---")

    try:
        asset_name_part = asset_filename.split(f'-{frequency}-futures')[0]
        current_item_id = f"{asset_name_part}_FUT_{frequency}"
    except Exception as e:
        print(f"[{asset_filename}] Warning: Could not derive asset name. Skipping. Error: {e}")
        return asset_results

    print(f"[{asset_name_part}] Loading asset data...")
    try:
        df_full_asset_loaded = pd.read_feather(asset_file_path)
    except Exception as e:
        print(f"[{asset_name_part}] Error loading asset file: {e}. Skipping.")
        return asset_results

    print(f"[{asset_name_part}] Cleaning and preprocessing...")
    cols_to_keep_asset = [col for col in all_value_cols if col in df_full_asset_loaded.columns]
    df_full_asset_cleaned = df_full_asset_loaded[cols_to_keep_asset].copy()

    if timestamp_col not in df_full_asset_cleaned.columns: print(f"[{asset_name_part}] Error: Timestamp column missing. Skipping."); return asset_results
    asset_targets_available = [col for col in potential_target_cols if col in df_full_asset_cleaned.columns]
    if not asset_targets_available: print(f"[{asset_name_part}] Error: No target columns found. Skipping."); return asset_results

    df_full_asset_cleaned[timestamp_col] = pd.to_datetime(df_full_asset_cleaned[timestamp_col], errors='coerce')
    numeric_cols_asset = asset_targets_available
    for col in numeric_cols_asset: df_full_asset_cleaned[col] = pd.to_numeric(df_full_asset_cleaned[col], errors='coerce')
    cols_to_check_na_asset = [timestamp_col] + numeric_cols_asset
    df_full_asset_cleaned = df_full_asset_cleaned.dropna(subset=cols_to_check_na_asset).copy()

    if not df_full_asset_cleaned.empty:
        asset_timestamp_dtype = df_full_asset_cleaned[timestamp_col].dtype
        is_tz_aware_asset = pd.api.types.is_datetime64_ns_dtype(asset_timestamp_dtype) and df_full_asset_cleaned[timestamp_col].dt.tz is not None
        if is_tz_aware_asset: current_time_compare = pd.Timestamp.now(tz=df_full_asset_cleaned[timestamp_col].dt.tz)
        else: current_time_compare = pd.Timestamp.now().tz_localize(None)
        df_full_asset_cleaned = df_full_asset_cleaned.sort_values(timestamp_col).reset_index(drop=True)
        df_full_asset_cleaned = df_full_asset_cleaned[df_full_asset_cleaned[timestamp_col] < current_time_compare]

    if df_full_asset_cleaned.empty: print(f"[{asset_name_part}] DataFrame empty after filtering. Skipping."); return asset_results

    print(f"[{asset_name_part}] Starting Target Loop...")
    asset_loop_targets = [t for t in global_targets_to_process if t in asset_targets_available]
    
    min_data_for_any_eval_slice = prediction_length + 1 # Min length for any slice passed to predictor.evaluate

    for current_target_raw in asset_loop_targets:
        print(f"\n\n[{asset_name_part}] {'#'*60}")
        print(f"[{asset_name_part}] ### Processing Target: {current_target_raw} ###")
        print(f"[{asset_name_part}] {'#'*60}\n")

        df_experiment_base = df_full_asset_cleaned.copy()
        # results_summary_target_for_asset = [] # This was defined but not used; asset_results is used directly.

        ag_target_col_name = current_target_raw
        if current_target_raw == 'volume':
            ag_target_col_name = f"{current_target_raw}_log1p"
            print(f"[{asset_name_part}/{current_target_raw}] Applying log1p transformation. Training target will be: '{ag_target_col_name}'")
            if (df_experiment_base[current_target_raw] < 0).any():
                print(f"[{asset_name_part}/{current_target_raw}] Warning: Clipping negative values in '{current_target_raw}' before log1p.")
                df_experiment_base[current_target_raw] = df_experiment_base[current_target_raw].clip(lower=0)
            df_experiment_base[ag_target_col_name] = np.log1p(df_experiment_base[current_target_raw])
        else:
            print(f"[{asset_name_part}/{current_target_raw}] Using raw column as training target: '{ag_target_col_name}'")

        valid_candle_counts_for_target = []
        if enable_healthspan_evaluation and health_evaluation_horizons:
            # Max horizon that is valid (long enough for an evaluation slice)
            valid_horizons = [h for h in health_evaluation_horizons if h >= min_data_for_any_eval_slice]
            if not valid_horizons:
                print(f"[{asset_name_part}/{current_target_raw}] No valid health evaluation horizons >= {min_data_for_any_eval_slice}. Healthspan eval might be skipped.")
                max_eval_points_needed = 0
            else:
                max_eval_points_needed = max(valid_horizons)
            
            # A candle_count 'c' is valid if data has length for 'c' AND 'max_eval_points_needed' after it.
            # And 'c' itself must be long enough for training.
            valid_candle_counts_for_target = [
                c for c in candle_counts_to_test 
                if c >= (prediction_length + 1) and (c + max_eval_points_needed) <= len(df_experiment_base)
            ]
            if not valid_candle_counts_for_target and candle_counts_to_test:
                print(f"[{asset_name_part}/{current_target_raw}] Skipping target: Not enough data for any candle_count with specified healthspan horizons. Need {prediction_length+1} for train and up to {max_eval_points_needed} for eval.")
                continue
        else: # Healthspan not enabled or no horizons
            max_eval_points_needed = 0 # Ensure this is defined
            valid_candle_counts_for_target = [c for c in candle_counts_to_test if c >= (prediction_length + 1)]
            if not valid_candle_counts_for_target and candle_counts_to_test:
                print(f"[{asset_name_part}/{current_target_raw}] Skipping target: Not enough data for training (min {prediction_length+1} candles).")
                continue
        
        if not valid_candle_counts_for_target: # If still empty after all checks
             print(f"[{asset_name_part}/{current_target_raw}] No valid candle counts to test for this target. Skipping.")
             continue


        print(f"\n[{asset_name_part}/{current_target_raw}] {'='*10} Starting Candle Count Loop {'='*10}")
        print(f"[{asset_name_part}/{current_target_raw}] Testing candle_counts: {valid_candle_counts_for_target}")

        for candle_count in valid_candle_counts_for_target:
            print(f"\n[{asset_name_part}/{current_target_raw}] --- Running Candle Count: {candle_count} ---")
            iteration_start_time = time.time()
            fit_successful = False; best_score_val_internal = None; error_message = None; fit_num_val_windows_used = 0
            train_data_tsdf = None; predictor = None; external_eval_scores = []; healthspan_steps = None; wql_threshold = None
            training_data_end_time = None # Initialize

            # Uses global 'local_temp_models_base'
            path_suffix = f"{asset_name_part}_{frequency}_{current_target_raw}_{backtest_strategy}_{candle_count}cndl_{date_today}" # Added date_today for more uniqueness within a day
            models_dir = os.path.join(local_temp_models_base, path_suffix)
            try:
                if os.path.exists(models_dir): shutil.rmtree(models_dir)
                os.makedirs(models_dir, exist_ok=True)
            except OSError as e:
                print(f"[{asset_name_part}/{current_target_raw}/{candle_count}] Error creating model dir: {e}. Skipping.")
                asset_results.append({'target': current_target_raw,'asset': asset_name_part,'candle_count': candle_count,'best_score_val_internal': None,'error': f'Local model dir error: {e}','time_taken_seconds': time.time() - iteration_start_time,'num_data_points_train': 0,'num_val_windows_used': 0,'external_eval_wql_scores': [],'healthspan_steps': None,'wql_threshold': None}); continue

            df_train_filtered = pd.DataFrame()
            df_eval_slice = pd.DataFrame()

            current_max_eval_points_needed = 0
            if enable_healthspan_evaluation and health_evaluation_horizons:
                valid_horizons_for_setup = [h for h in health_evaluation_horizons if h >= min_data_for_any_eval_slice]
                if valid_horizons_for_setup:
                    current_max_eval_points_needed = max(valid_horizons_for_setup)
            
            if current_max_eval_points_needed > 0 :
                # Calculate available points for evaluation after taking 'candle_count' for training
                max_possible_eval_points_after_train = max(0, len(df_experiment_base) - candle_count)
                actual_eval_points_for_slice = min(current_max_eval_points_needed, max_possible_eval_points_after_train)

                if actual_eval_points_for_slice >= min_data_for_any_eval_slice:
                    df_eval_slice = df_experiment_base.tail(actual_eval_points_for_slice).copy()
                    train_end_idx = len(df_experiment_base) - actual_eval_points_for_slice
                    train_start_idx = max(0, train_end_idx - candle_count)
                    df_train_filtered = df_experiment_base.iloc[train_start_idx:train_end_idx].copy()
                else: # Not enough data for any meaningful healthspan eval for this candle_count
                    print(f"[{asset_name_part}/{current_target_raw}/{candle_count}] Not enough data ({actual_eval_points_for_slice}) for any specified health horizon. Training on tail data without healthspan eval.")
                    df_train_filtered = df_experiment_base.tail(candle_count).copy()
                    # df_eval_slice remains empty
            else: # Healthspan not enabled, no horizons, or no valid horizons
                df_train_filtered = df_experiment_base.tail(candle_count).copy()
                # df_eval_slice remains empty

            num_points_in_train = len(df_train_filtered)
            if df_train_filtered.empty or num_points_in_train < (prediction_length + 1):
                error_msg_detail = 'Empty training slice' if df_train_filtered.empty else f'Insufficient training data ({num_points_in_train}, need {prediction_length+1})'
                print(f"[{asset_name_part}/{current_target_raw}/{candle_count}] Error: {error_msg_detail}. Skipping.")
                asset_results.append({'target': current_target_raw,'asset': asset_name_part,'candle_count': candle_count,'error': error_msg_detail,'best_score_val_internal': None,'time_taken_seconds': time.time()-iteration_start_time,'num_data_points_train':0,'num_val_windows_used': 0,'external_eval_wql_scores': [],'healthspan_steps': None,'wql_threshold': None})
                if os.path.exists(models_dir): shutil.rmtree(models_dir, ignore_errors=True);
                continue
            
            training_data_end_time = df_train_filtered[timestamp_col].iloc[-1] # Set this after df_train_filtered is confirmed non-empty

            df_train_filtered["item_id"] = current_item_id
            id_column_name = "item_id"
            if df_train_filtered[timestamp_col].dt.tz is not None:
                try:
                    df_train_filtered[timestamp_col] = df_train_filtered[timestamp_col].dt.tz_localize(None)
                except Exception as tz_e:
                    print(f"[{asset_name_part}/{current_target_raw}/{candle_count}] TZ Error: {tz_e}. Skipping.")
                    asset_results.append({'target': current_target_raw,'asset': asset_name_part,'candle_count': candle_count, 'error': f'TZ conversion failed: {tz_e}', 'best_score_val_internal': None, 'time_taken_seconds': time.time()-iteration_start_time, 'num_data_points_train':num_points_in_train, 'num_val_windows_used': 0, 'external_eval_wql_scores': [], 'healthspan_steps': None, 'wql_threshold': None})
                    if os.path.exists(models_dir): shutil.rmtree(models_dir, ignore_errors=True);
                    continue
            try:
                required_cols_tsdf = [id_column_name, timestamp_col, ag_target_col_name]
                covariate_cols = [ptc for ptc in potential_target_cols if ptc in df_train_filtered.columns and ptc != current_target_raw]
                cols_for_tsdf = list(set(required_cols_tsdf + covariate_cols))

                missing_cols = [col for col in required_cols_tsdf if col not in df_train_filtered.columns] # Check against actual df_train_filtered
                if missing_cols: raise ValueError(f"Missing required cols for TSDF: {missing_cols} from {df_train_filtered.columns.tolist()}")

                train_data_tsdf = TimeSeriesDataFrame.from_data_frame(
                    df_train_filtered[cols_for_tsdf], # Use cols_for_tsdf which should be present
                    id_column=id_column_name, timestamp_column=timestamp_col
                )
            except Exception as e:
                print(f"[{asset_name_part}/{current_target_raw}/{candle_count}] Error creating Training TSDF: {e}")
                error_message = f"Train TSDF creation failed: {e}."
                asset_results.append({'target': current_target_raw,'asset': asset_name_part,'candle_count': candle_count,'best_score_val_internal': None,'error': error_message,'time_taken_seconds': time.time() - iteration_start_time,'num_data_points_train': num_points_in_train,'num_val_windows_used': 0,'external_eval_wql_scores': [],'healthspan_steps': None,'wql_threshold': None})
                if os.path.exists(models_dir): shutil.rmtree(models_dir, ignore_errors=True);
                continue

            fit_num_val_windows_used = 0; fit_val_step_size = 1
            if desired_windows > 0:
                total_data_len_val = len(train_data_tsdf); min_train_for_val = prediction_length; 
                # required_for_val = prediction_length * desired_windows * prediction_length # This seems too large
                required_for_val = prediction_length * desired_windows # A window needs prediction_length, and step size matters
                if total_data_len_val >= min_train_for_val + required_for_val: 
                    fit_num_val_windows_used = desired_windows
                    fit_val_step_size = prediction_length # Common step size
                else: 
                    max_possible = (total_data_len_val - min_train_for_val) // prediction_length # How many blocks of prediction_length fit
                    fit_num_val_windows_used = max(0, min(desired_windows, max_possible))
                    if fit_num_val_windows_used > 0: fit_val_step_size = prediction_length
            
            try:
                predictor = TimeSeriesPredictor(prediction_length=prediction_length, target=ag_target_col_name, eval_metric=error_metric, path=models_dir, quantile_levels=desired_quantiles)
            except Exception as e:
                error_message = f'Predictor init failed: {e}'; print(f"[{asset_name_part}/{current_target_raw}/{candle_count}] Predictor Init Error: {e}");
                asset_results.append({'target': current_target_raw,'asset': asset_name_part,'candle_count': candle_count,'best_score_val_internal': None,'error': error_message,'time_taken_seconds': time.time() - iteration_start_time,'num_data_points_train': num_points_in_train,'num_val_windows_used': fit_num_val_windows_used,'external_eval_wql_scores': [],'healthspan_steps': None,'wql_threshold': None})
                if os.path.exists(models_dir): shutil.rmtree(models_dir, ignore_errors=True);
                continue
            
            print(f"[{asset_name_part}/{current_target_raw}/{candle_count}] Fitting model on target '{ag_target_col_name}' with {fit_num_val_windows_used} val windows...")
            hpo_tune_kwargs = None
            try:
                predictor.fit(train_data_tsdf, presets="best_quality", time_limit=max_train_dur, num_val_windows=fit_num_val_windows_used, val_step_size=fit_val_step_size, refit_full=True, hyperparameters=selected_hp, hyperparameter_tune_kwargs=hpo_tune_kwargs, enable_ensemble=False, verbosity=0)
                fit_successful = True
            except Exception as fit_e: error_message = f"Fitting failed: {fit_e}"; print(f"[{asset_name_part}/{current_target_raw}/{candle_count}] ERROR during fitting: {fit_e}")

            best_model_name = "N/A"; leaderboard_df = None;
            if fit_successful:
                try:
                    if fit_num_val_windows_used > 0: leaderboard_df = predictor.leaderboard(silent=True)
                    if leaderboard_df is not None and not leaderboard_df.empty and 'score_val' in leaderboard_df.columns and pd.api.types.is_numeric_dtype(leaderboard_df['score_val']):
                        ets_lb = leaderboard_df[leaderboard_df['model'].str.contains("ETS", na=False)] # Check specific model if needed
                        if not ets_lb.empty: 
                            score = ets_lb["score_val"].iloc[0]
                            if pd.notna(score): 
                                best_score_val_internal = score
                                best_model_name = ets_lb["model"].iloc[0]
                        elif not leaderboard_df.empty: # Fallback to best model if ETS not found but LB exists
                             best_score_val_internal = leaderboard_df["score_val"].iloc[0]
                             best_model_name = leaderboard_df["model"].iloc[0]
                except Exception as lb_e: print(f"[{asset_name_part}/{current_target_raw}/{candle_count}] Leaderboard Error: {lb_e}"); best_score_val_internal = None

            # --- Healthspan Evaluation with Horizons ---
            if fit_successful and enable_healthspan_evaluation and not df_eval_slice.empty:
                external_eval_scores = []
                future_tsdf_full = None # Initialize
                try:
                    future_data_for_eval = df_eval_slice.copy()
                    future_data_for_eval["item_id"] = current_item_id
                    if future_data_for_eval[timestamp_col].dt.tz is not None:
                        future_data_for_eval[timestamp_col] = future_data_for_eval[timestamp_col].dt.tz_localize(None)

                    if ag_target_col_name not in future_data_for_eval.columns:
                        print(f"[{asset_name_part}/{current_target_raw}/{candle_count}] CRITICAL Error: Training target '{ag_target_col_name}' not in evaluation slice. Skipping external eval.")
                        if error_message is None: error_message = f"Target '{ag_target_col_name}' missing in eval slice for healthspan."
                    else:
                        eval_required_cols = [id_column_name, timestamp_col, ag_target_col_name]
                        eval_covariate_cols = [ptc for ptc in potential_target_cols if ptc in future_data_for_eval.columns and ptc != current_target_raw]
                        eval_cols_for_tsdf = list(set(eval_required_cols + eval_covariate_cols))
                        
                        missing_eval_cols = [col for col in eval_required_cols if col not in future_data_for_eval.columns]
                        if missing_eval_cols:
                             print(f"[{asset_name_part}/{current_target_raw}/{candle_count}] Error: Missing required columns {missing_eval_cols} for eval TSDF. Skipping external eval.")
                             if error_message is None: error_message = "Missing cols for eval TSDF."
                        else:
                            future_tsdf_full = TimeSeriesDataFrame.from_data_frame(future_data_for_eval[eval_cols_for_tsdf], id_column=id_column_name, timestamp_column=timestamp_col)
                    
                    if future_tsdf_full is not None and not future_tsdf_full.empty:
                        feasible_horizons = sorted(list(set(
                            h for h in health_evaluation_horizons
                            if h <= len(future_tsdf_full) and h >= min_data_for_any_eval_slice
                        )))

                        if not feasible_horizons:
                            print(f"[{asset_name_part}/{current_target_raw}/{candle_count}] No feasible health evaluation horizons with available data ({len(future_tsdf_full)} points).")
                        else:
                            print(f"[{asset_name_part}/{current_target_raw}/{candle_count}] Evaluating at feasible horizons: {feasible_horizons}")

                        for horizon_length in feasible_horizons:
                            current_evaluation_data_slice = future_tsdf_full.iloc[0:horizon_length]
                            time_diff_eval = pd.NaT
                            try:
                                horizon_end_timestamp_obj = current_evaluation_data_slice.index[-1][1]
                                horizon_end_time_eval = pd.to_datetime(horizon_end_timestamp_obj)
                                training_data_end_time_ts = pd.to_datetime(training_data_end_time)
                                if pd.notna(horizon_end_time_eval) and pd.notna(training_data_end_time_ts): time_diff_eval = horizon_end_time_eval - training_data_end_time_ts
                            except Exception: time_diff_eval = pd.NaT
                            
                            try:
                                eval_metrics = predictor.evaluate(current_evaluation_data_slice, model=best_model_name, metrics=[error_metric]) # Use best_model_name or 'AutoETS_FULL' if always ETS
                                wql_score_chunk = eval_metrics.get(error_metric, np.nan)
                                external_eval_scores.append({'time_since_train': time_diff_eval, 'WQL': wql_score_chunk, 'horizon': horizon_length})
                            except Exception as eval_e:
                                print(f"[{asset_name_part}/{current_target_raw}/{candle_count}] ERROR evaluating horizon {horizon_length}: {eval_e}")
                                external_eval_scores.append({'time_since_train': time_diff_eval, 'WQL': np.nan, 'horizon': horizon_length})
                except Exception as ext_eval_setup_e:
                    print(f"[{asset_name_part}/{current_target_raw}/{candle_count}] Healthspan evaluation setup error: {ext_eval_setup_e}")
                    if error_message is None: error_message = f"Healthspan eval setup failed: {ext_eval_setup_e}"

                if external_eval_scores:
                    valid_eval_items = [item for item in external_eval_scores if pd.notna(item['WQL']) and item.get('horizon') is not None] # Ensure horizon is present
                    wql_values_for_calc = [item['WQL'] for item in valid_eval_items]
                    horizons_for_calc = [item['horizon'] for item in valid_eval_items]

                    if wql_values_for_calc and horizons_for_calc and best_score_val_internal is not None and pd.notna(best_score_val_internal):
                        baseline_wql_score_for_healthspan = best_score_val_internal # This is already -WQL
                        healthspan_steps, wql_threshold = calculate_healthspan(
                            wql_scores=wql_values_for_calc,
                            horizons=horizons_for_calc,
                            initial_wql=baseline_wql_score_for_healthspan, # Pass the -WQL score
                            threshold_std_devs=healthspan_threshold_std_devs
                        )
            # --- End Healthspan Evaluation ---

            iteration_end_time = time.time()
            asset_results.append({
                'target': current_target_raw, 'asset': asset_name_part, 'candle_count': candle_count,
                'best_score_val_internal': best_score_val_internal, 'error': error_message,
                'time_taken_seconds': iteration_end_time - iteration_start_time,
                'num_data_points_train': num_points_in_train, 'num_val_windows_used': fit_num_val_windows_used,
                'external_eval_wql_scores': external_eval_scores, # Contains 'horizon'
                'healthspan_steps': healthspan_steps,
                'wql_threshold': wql_threshold,
                'best_model_name': best_model_name
            })
            print(f"--- Finished Iteration: Asset={asset_name_part}, Target={current_target_raw}, Candles={candle_count}, Time={asset_results[-1]['time_taken_seconds']:.1f}s, Score={best_score_val_internal if best_score_val_internal is not None else 'N/A'}, Healthspan={healthspan_steps if healthspan_steps is not None else 'N/A'} ---")

            if predictor is not None: del predictor
            if train_data_tsdf is not None: del train_data_tsdf
            if leaderboard_df is not None: del leaderboard_df
            if 'future_data_for_eval' in locals(): del future_data_for_eval # Explicitly delete
            if 'future_tsdf_full' in locals() and future_tsdf_full is not None: del future_tsdf_full
            if 'current_evaluation_data_slice' in locals(): del current_evaluation_data_slice
            # Aggressively clean the specific models_dir for this iteration if desired by user (they said they'll handle it)
            # However, for robustness if running this function standalone:
            if os.path.exists(models_dir):
                 try: shutil.rmtree(models_dir); # print(f"Cleaned up {models_dir}")
                 except Exception as e_rm: print(f"Could not clean up {models_dir}: {e_rm}")
            gc.collect()
        # === END CANDLE COUNT LOOP ===
        print(f"\n[{asset_name_part}/{current_target_raw}] Finished Candle Count Loop.")

        current_target_plot_results = [res for res in asset_results if res['target'] == current_target_raw and res['asset'] == asset_name_part]
        results_df_target_plot = pd.DataFrame(current_target_plot_results)

        if not results_df_target_plot.empty:
            summary_filename = f'{date_today}_{asset_name_part}_{frequency}_{current_target_raw}_{backtest_strategy}_healthspan_summary.json'
            summary_save_path = os.path.join(results_dir_timedated, summary_filename)
            try:
                serializable_current_target_results = []
                for record in current_target_plot_results:
                    new_record = record.copy(); temp_scores_list = []
                    if 'external_eval_wql_scores' in new_record and new_record['external_eval_wql_scores']:
                        for score_item in new_record['external_eval_wql_scores']:
                            new_score_item = score_item.copy()
                            time_delta = new_score_item.get('time_since_train')
                            if isinstance(time_delta, pd.Timedelta): new_score_item['time_since_train'] = None if pd.isna(time_delta) else time_delta.total_seconds()
                            temp_scores_list.append(new_score_item)
                        new_record['external_eval_wql_scores'] = temp_scores_list
                    serializable_current_target_results.append(new_record)
                with open(summary_save_path, 'w') as f: json.dump(serializable_current_target_results, f, default=default_serializer, indent=4)
                print(f"[{asset_name_part}/{current_target_raw}] Detailed summary saved: {summary_save_path}")
            except Exception as e: print(f"[{asset_name_part}/{current_target_raw}] Error saving JSON summary: {e}")

            plot_data_list = []
            for idx_plot, row_plot in results_df_target_plot.iterrows():
                cc_plot = row_plot['candle_count']
                oos_scores_plot = row_plot.get('external_eval_wql_scores', [])
                if oos_scores_plot:
                    for score_item_plot in oos_scores_plot:
                        wql_plot_val = score_item_plot.get('WQL')
                        steps_plot_val = score_item_plot.get('horizon') # Use 'horizon'
                        if steps_plot_val is not None and pd.notna(wql_plot_val):
                            plot_data_list.append({'candle_count': cc_plot, 'steps_since_train': steps_plot_val, 'WQL': wql_plot_val, 'WQL_neg': -wql_plot_val if pd.notna(wql_plot_val) else np.nan})
            plot_df_target_external = pd.DataFrame(plot_data_list) if plot_data_list else pd.DataFrame()
            if not plot_df_target_external.empty: plot_df_target_external.dropna(subset=['steps_since_train', 'WQL_neg'], inplace=True)

            results_df_filtered_internal = results_df_target_plot.dropna(subset=['best_score_val_internal']).copy()
            if not results_df_filtered_internal.empty:
                plt.figure(figsize=(12, 6)); plt.plot(results_df_filtered_internal['candle_count'], results_df_filtered_internal['best_score_val_internal'], marker='o', linestyle='-')
                plt.title(f'Internal Score (-WQL) vs. Candle Count (Asset: {asset_name_part}, Target: {current_target_raw})'); plt.xlabel('Number of Candles (`candle_count`)'); plt.ylabel(f'Internal Fit Score ({error_metric}, Higher is Better)')
                plt.grid(True); plt.xticks(rotation=45); plt.tight_layout()
                plot_filename_internal = f'{date_today}_{asset_name_part}_{frequency}_{current_target_raw}_{backtest_strategy}_internal_score_plot.png'
                plot_save_path_internal = os.path.join(results_dir_timedated, plot_filename_internal)
                try: plt.savefig(plot_save_path_internal, dpi=150); print(f"[{asset_name_part}/{current_target_raw}] Internal score plot saved: {plot_save_path_internal}")
                except Exception as e: print(f"[{asset_name_part}/{current_target_raw}] Error saving internal plot: {e}")
                plt.close()

            if not plot_df_target_external.empty:
                print(f"[{asset_name_part}/{current_target_raw}] Generating Enhanced Healthspan Plot...")
                plt.style.use('seaborn-v0_8-whitegrid'); fig, ax = plt.subplots(figsize=(16, 9))
                candle_counts_plot = sorted(plot_df_target_external['candle_count'].unique())
                palette = sns.color_palette("viridis", n_colors=len(candle_counts_plot)); color_map = dict(zip(candle_counts_plot, palette))
                sns.lineplot(data=plot_df_target_external, x='steps_since_train', y='WQL_neg', hue='candle_count', hue_order=candle_counts_plot, palette=color_map, marker='.', linewidth=1.5, legend=False, ax=ax)
                
                is_legend_handles = [];
                for cc_plot_iter in candle_counts_plot:
                    is_score_row = results_df_target_plot[results_df_target_plot['candle_count'] == cc_plot_iter] # Use full results_df_target_plot
                    if not is_score_row.empty:
                        is_score = is_score_row['best_score_val_internal'].iloc[0]
                        if pd.notna(is_score): 
                            ax.axhline(y=is_score, color=color_map.get(cc_plot_iter, 'grey'), linestyle=':', linewidth=1.5, alpha=0.9)
                            is_legend_handles.append(mlines.Line2D([], [], color=color_map.get(cc_plot_iter, 'grey'), linestyle=':', linewidth=1.5, label=f'CC {cc_plot_iter} (IS Ref: {is_score:.4f})'))
                
                median_wql_neg = plot_df_target_external.groupby('steps_since_train')['WQL_neg'].median(); median_handle = []
                if not median_wql_neg.empty: 
                    ax.plot(median_wql_neg.index, median_wql_neg.values, color='black', linestyle='--', linewidth=2.5, label='Median OOS')
                    median_handle = [mlines.Line2D([], [], color='black', linestyle='--', linewidth=2.5, label='Median OOS')]
                
                threshold_handle = []; 
                # Get wql_threshold per candle_count, then average, or take the one from the best CC?
                # For now, let's use the mean of available thresholds if they vary by CC.
                avg_actual_wql_threshold = results_df_target_plot['wql_threshold'].mean() 
                if pd.notna(avg_actual_wql_threshold): 
                    avg_neg_threshold_plot = -avg_actual_wql_threshold # Since WQL_neg is plotted
                    ax.axhline(y=avg_neg_threshold_plot, color='red', linestyle='-.', linewidth=2.0, label=f'Avg Degrad. Threshold (Score < {avg_neg_threshold_plot:.4f})')
                    threshold_handle = [mlines.Line2D([], [], color='red', linestyle='-.', linewidth=2.0, label=f'Avg Degrad. Threshold (Score < {avg_neg_threshold_plot:.4f})')]
                
                oos_legend_handles = [mlines.Line2D([], [], color=color_map.get(cc_plot_iter_oos, 'grey'), marker='.', linewidth=1.5, linestyle='-', label=f'CC {cc_plot_iter_oos} (OOS)') for cc_plot_iter_oos in candle_counts_plot]
                all_handles = oos_legend_handles + is_legend_handles + median_handle + threshold_handle
                valid_handles = [h for h in all_handles if h.get_label() and not h.get_label().startswith('_')]; 
                ax.legend(handles=valid_handles, title=f'Performance (-{error_metric}, Higher is Better)', loc='best', fontsize='small')
                ax.set_title(f"Model Healthspan: OOS vs IS (Asset: {asset_name_part}, Target: {current_target_raw})", fontsize=16)
                ax.set_xlabel(f"Evaluation Horizon (Steps Since Training)", fontsize=12) # UPDATED LABEL
                ax.set_ylabel(f"Performance Score (-{error_metric}, Higher is Better)", fontsize=12)
                ax.grid(True, which='major', linestyle='--', linewidth='0.5', color='grey'); ax.grid(True, which='minor', linestyle=':', linewidth='0.5', color='lightgrey'); ax.minorticks_on()
                try:
                    plot_filename = f'{date_today}_{asset_name_part}_{frequency}_{current_target_raw}_{backtest_strategy}_healthspan_plot_v2.png'
                    plot_save_path = os.path.join(results_dir_timedated, plot_filename)
                    plt.savefig(plot_save_path, bbox_inches='tight', dpi=150)
                    print(f"[{asset_name_part}/{current_target_raw}] Enhanced Healthspan plot saved: {plot_save_path}")
                except Exception as e: print(f"[{asset_name_part}/{current_target_raw}] Error saving plot: {e}")
                plt.close(fig)

        # Cleanup (Target specific DFs)
        if 'df_experiment_base' in locals(): del df_experiment_base
        if 'results_df_target_plot' in locals(): del results_df_target_plot
        if 'results_df_filtered_internal' in locals(): del results_df_filtered_internal
        if 'plot_df_target_external' in locals(): del plot_df_target_external
        gc.collect()

    # --- END TARGET LOOP (for current asset) ---
    print(f"\n[{asset_name_part}] Finished Target Loop.")
    return asset_results

Ensured base temporary model directory exists: ag_temp_models_15m


In [None]:
# @title Main Execution: Discover Files and Run in Parallel
if __name__ == "__main__":
    # --- Find Asset Files ---
    asset_files_to_process = []
    try:
        all_files_in_dir = os.listdir(config_params["datasets_dir"])
        for filename_main in all_files_in_dir:
            if filename_main.endswith('.feather') and f'-{config_params["frequency"]}-futures' in filename_main:
                asset_files_to_process.append(os.path.join(config_params["datasets_dir"], filename_main))
    except FileNotFoundError: print(f"Error: Datasets directory not found: {config_params['datasets_dir']}")
    except Exception as e: print(f"Error listing files: {e}")

    if not asset_files_to_process:
        print(f"No '.feather' files found for frequency '{config_params['frequency']}'. Exiting.")
    else:
        print(f"\nFound {len(asset_files_to_process)} asset files to process:")
        # asset_files_to_process = asset_files_to_process[:1] # Limit for testing
        # print(f">>> Limiting to first {len(asset_files_to_process)} file(s) for testing <<<")
        for f_main in asset_files_to_process: print(f" - {os.path.basename(f_main)}")

        print(f"\n{'='*20} Starting Parallel Asset Processing ({num_concurrent_runs} jobs) {'='*20}")
        start_parallel_time = time.time()

        results_list_parallel = Parallel(n_jobs=num_concurrent_runs, backend="loky", verbose=10)(
             delayed(process_asset)(file_path, config_params) for file_path in asset_files_to_process
        )

        end_parallel_time = time.time()
        print(f"\n{'='*20} Parallel Asset Processing Finished {'='*20}")
        print(f"Total time for parallel execution: {(end_parallel_time - start_parallel_time)/60:.2f} minutes")

        print("\nAggregating results from parallel jobs...")
        all_assets_all_targets_results_flat = []
        job_success_count = 0; job_fail_count = 0
        if results_list_parallel:
            for single_asset_results_list in results_list_parallel:
                if isinstance(single_asset_results_list, list) and single_asset_results_list:
                    all_assets_all_targets_results_flat.extend(single_asset_results_list)
                    job_success_count +=1
                elif isinstance(single_asset_results_list, list) and not single_asset_results_list:
                    job_fail_count +=1
                else:
                    print(f"Warning: A parallel job did not return a list of results. Result: {type(single_asset_results_list)}")
                    job_fail_count +=1
        print(f"Aggregation complete. Total results records: {len(all_assets_all_targets_results_flat)}")
        print(f"Assets processed (at least one target run): {job_success_count}")
        print(f"Assets potentially failed or skipped entirely: {job_fail_count}")

        if all_assets_all_targets_results_flat:
             all_results_save_path = os.path.join(config_params["results_dir_timedated"], f'{config_params["date_today"]}_{config_params["frequency"]}_ALL_ASSETS_TARGETS_healthspan_summary.json')
             try:
                  serializable_all_results_flat = []
                  for record in all_assets_all_targets_results_flat:
                      new_record = record.copy(); temp_scores = []
                      if 'external_eval_wql_scores' in new_record and new_record['external_eval_wql_scores']:
                          for score_item in new_record['external_eval_wql_scores']:
                               new_score_item = score_item.copy()
                               time_delta = new_score_item.get('time_since_train')
                               if isinstance(time_delta, pd.Timedelta): new_score_item['time_since_train'] = None if pd.isna(time_delta) else time_delta.total_seconds()
                               temp_scores.append(new_score_item)
                          new_record['external_eval_wql_scores'] = temp_scores
                      serializable_all_results_flat.append(new_record)
                  with open(all_results_save_path, 'w') as f: json.dump(serializable_all_results_flat, f, default=default_serializer, indent=4)
                  print(f"\nCOMBINED detailed results summary saved to JSON: {all_results_save_path}")
             except Exception as e: print(f"\nError saving COMBINED results summary: {e}")
        else: print("\nNo results generated across all assets to save.")

    # if os.path.exists(local_temp_models_base):
    #     print(f"\nCleaning up base temporary model directory: {local_temp_models_base}")
    #     try: shutil.rmtree(local_temp_models_base); print(f"Successfully removed {local_temp_models_base}")
    #     except OSError as e: print(f"Error removing {local_temp_models_base}: {e}")
    # else: print(f"\nBase temporary model directory {local_temp_models_base} not found, no cleanup needed or already cleaned.")


Found 512 asset files to process:
 - 1INCH_USDT_USDT-15m-futures.feather
 - 1000APU_USDT_USDT-15m-futures.feather
 - 1000BONK_USDT_USDT-15m-futures.feather
 - 1000BTT_USDT_USDT-15m-futures.feather
 - 1000CAT_USDT_USDT-15m-futures.feather
 - 1000CATS_USDT_USDT-15m-futures.feather
 - 1000FLOKI_USDT_USDT-15m-futures.feather
 - 1000LUNC_USDT_USDT-15m-futures.feather
 - 1000MUMU_USDT_USDT-15m-futures.feather
 - 1000NEIROCTO_USDT_USDT-15m-futures.feather
 - 1000PEPE_USDT_USDT-15m-futures.feather
 - 1000RATS_USDT_USDT-15m-futures.feather
 - 1000TOSHI_USDT_USDT-15m-futures.feather
 - 1000TURBO_USDT_USDT-15m-futures.feather
 - 1000X_USDT_USDT-15m-futures.feather
 - 1000XEC_USDT_USDT-15m-futures.feather
 - 10000COQ_USDT_USDT-15m-futures.feather
 - 10000ELON_USDT_USDT-15m-futures.feather
 - 10000LADYS_USDT_USDT-15m-futures.feather
 - 10000QUBIC_USDT_USDT-15m-futures.feather
 - 10000SATS_USDT_USDT-15m-futures.feather
 - 10000WEN_USDT_USDT-15m-futures.feather
 - 10000WHY_USDT_USDT-15m-futures.feat

[Parallel(n_jobs=14)]: Using backend LokyBackend with 14 concurrent workers.
