In [1]:
import sys
sys.path.insert(0, "/app")

In [2]:
import os
from datetime import datetime, date, timezone
import pandas as pd
from sqlmodel import Session, select, create_engine
from condorgame_backend.infrastructure.db.db_tables import PredictionRow, ModelScoreSnapshotRow

In [3]:
DATABASE_URL = f"postgresql+psycopg2://{os.getenv('POSTGRES_USER')}:{os.getenv('POSTGRES_PASSWORD')}@" \
               f"{os.getenv('POSTGRES_HOST')}:{os.getenv('POSTGRES_PORT')}/{os.getenv('POSTGRES_DB')}"
engine = create_engine(DATABASE_URL)
DATABASE_URL

'postgresql+psycopg2://condorgame:condorgame@postgres:5432/condorgame'

In [4]:
#############
# Query database "predictions": get all predictions from a prediction round
#############
# prediction round params:
asset = "BTC"
horizon = 180
status = "SUCCESS"
resolvable_at = datetime.fromisoformat("2026-01-15 11:30:45.07684").replace(tzinfo=timezone.utc)

# Query the predictions with exact prediction round params
with Session(engine) as session:
    stmt = select(PredictionRow).where(
        PredictionRow.resolvable_at == resolvable_at,
        PredictionRow.asset == asset,
        PredictionRow.horizon == horizon,
        PredictionRow.status == status
    )
    results = session.exec(stmt).all()

if results:
    performed_at = results[0].performed_at
else:
    performed_at = None

# Convert to DataFrame
prediction_round = pd.DataFrame([r.model_dump() for r in results])
prediction_round

  PydanticSerializationUnexpectedValue(Expected `tuple[int, ...]` - serialized value may not be as expected [field_name='steps', input_value=[60, 120, 180], input_type=list])
  return self.__pydantic_serializer__.to_python(


Unnamed: 0,horizon,id,status,distributions,resolvable_at,score_final_value,score_failed_reason,model_id,asset,steps,exec_time,performed_at,score_raw_value,score_success,score_scored_at
0,180,PRE_3_20260115_112745.076,SUCCESS,"{'60': [{'step': 60, 'type': 'mixture', 'compo...",2026-01-15 11:30:45.076840,0.69997,,3,BTC,"[60, 120, 180]",51755.0,2026-01-15 11:27:45.076840,0.053223,True,2026-01-15 11:30:51.637365
1,180,PRE_4_20260115_112745.076,SUCCESS,"{'60': [{'step': 60, 'type': 'mixture', 'compo...",2026-01-15 11:30:45.076840,0.0,,4,BTC,"[60, 120, 180]",15752.0,2026-01-15 11:27:45.076840,0.060254,True,2026-01-15 11:30:51.634630
2,180,PRE_5_20260115_112745.076,SUCCESS,"{'60': [{'step': 60, 'type': 'mixture', 'compo...",2026-01-15 11:30:45.076840,0.55804,,5,BTC,"[60, 120, 180]",24990.0,2026-01-15 11:27:45.076840,0.054412,True,2026-01-15 11:30:51.636020
3,180,PRE_1_20260115_112745.076,SUCCESS,"{'60': [{'step': 60, 'type': 'mixture', 'compo...",2026-01-15 11:30:45.076840,1.0,,1,BTC,"[60, 120, 180]",10247.0,2026-01-15 11:27:45.076840,0.05071,True,2026-01-15 11:30:51.638605
4,180,PRE_2_20260115_112745.076,SUCCESS,"{'60': [{'step': 60, 'type': 'mixture', 'compo...",2026-01-15 11:30:45.076840,0.628877,,2,BTC,"[60, 120, 180]",29740.0,2026-01-15 11:27:45.076840,0.053818,True,2026-01-15 11:30:51.639862


In [5]:
# dict[str, dict[str, list[dict]]: {model_id: distributions}
dict_prediction_round = prediction_round[["model_id", "distributions"]].set_index("model_id").to_dict()["distributions"]
# TODO: steps keys are str from json serialization, should be int
dict_prediction_round

{'3': {'60': [{'step': 60,
    'type': 'mixture',
    'components': [{'density': {'type': 'builtin',
       'name': 'norm',
       'params': {'loc': 0.2607780659701388, 'scale': 46.12261199070487}},
      'weight': 1}]},
   {'step': 120,
    'type': 'mixture',
    'components': [{'density': {'type': 'builtin',
       'name': 'norm',
       'params': {'loc': 0.2607780659701388, 'scale': 46.12261199070487}},
      'weight': 1}]},
   {'step': 180,
    'type': 'mixture',
    'components': [{'density': {'type': 'builtin',
       'name': 'norm',
       'params': {'loc': 0.2607780659701388, 'scale': 46.12261199070487}},
      'weight': 1}]}],
  '120': [{'step': 120,
    'type': 'mixture',
    'components': [{'density': {'type': 'builtin',
       'name': 'norm',
       'params': {'loc': 0.5215561319402776, 'scale': 64.68782036423453}},
      'weight': 1}]}],
  '180': [{'step': 180,
    'type': 'mixture',
    'components': [{'density': {'type': 'builtin',
       'name': 'norm',
       'params':

In [6]:
performed_at

datetime.datetime(2026, 1, 15, 11, 27, 45, 76840)

In [7]:
#############
# Query database "model_score_snapshot": get all scores JUST BEFORE the prediction round
#############
with Session(engine) as session:
    # Step 1: find the closest timestamp ≤ performed_at
    stmt_max_ts = (
        select(ModelScoreSnapshotRow.performed_at)
        .where(ModelScoreSnapshotRow.performed_at <= performed_at)
        .order_by(ModelScoreSnapshotRow.performed_at.desc())
        .limit(1)
    )
    closest_ts = session.exec(stmt_max_ts).first()  # the nearest timestamp

    if closest_ts:
        # Step 2: select all rows with that timestamp
        stmt_rows = select(ModelScoreSnapshotRow).where(
            ModelScoreSnapshotRow.performed_at == closest_ts
        )
        snapshots = session.exec(stmt_rows).all()
    else:
        snapshots = []
        
scores_just_before_prediction_round = pd.DataFrame([r.model_dump() for r in snapshots])
scores_just_before_prediction_round

Unnamed: 0,overall_score_anchor,id,model_id,overall_score_steady,scores_by_param,overall_score_recent,performed_at
0,0.613821,SNP_M1_20260115_112644,1,0.525189,"[{'param': {'asset': 'BTC', 'horizon': 180, 's...",0.404408,2026-01-15 11:26:44.732651
1,0.657554,SNP_M2_20260115_112644,2,0.695723,"[{'param': {'asset': 'BTC', 'horizon': 180, 's...",0.645017,2026-01-15 11:26:44.732651
2,0.666982,SNP_M3_20260115_112644,3,0.686857,"[{'param': {'asset': 'BTC', 'horizon': 180, 's...",0.622942,2026-01-15 11:26:44.732651
3,0.871181,SNP_M4_20260115_112644,4,1.0,"[{'param': {'asset': 'BTC', 'horizon': 180, 's...",1.0,2026-01-15 11:26:44.732651
4,0.0,SNP_M5_20260115_112644,5,0.0,"[{'param': {'asset': 'BTC', 'horizon': 180, 's...",0.0,2026-01-15 11:26:44.732651


In [8]:
dict_scores_just_before_prediction_round = scores_just_before_prediction_round[["model_id", "overall_score_anchor"]].set_index("model_id").to_dict()["overall_score_anchor"]
dict_scores_just_before_prediction_round

{'1': 0.6138212332018346,
 '2': 0.6575544794547115,
 '3': 0.6669821227969381,
 '4': 0.8711805269986709,
 '5': 0.0}

In [9]:
def to_builtin(dist):
    """
    Convert a distribution dict of type:
    - scipy
    - statistics
    - builtin
    into a canonical builtin format:
        {"type": "builtin", "name": ..., "params": ...}
    Mixtures are returned unchanged.
    """

    dist_type = dist["type"]

    if dist_type == "mixture":
        return dist  # leave mixture untouched

    # --- builtin → already in the right format ---
    if dist_type == "builtin":
        return dist

    # --- scipy → builtin ---
    if dist_type == "scipy":
        return {
            "type": "builtin",
            "name": dist["name"],     # scipy distribution name
            "params": dist["params"]  # loc/scale/etc
        }

    # --- statistics.normal → builtin:norm ---
    if dist_type == "statistics":
        name = dist["name"]
        params = dist["params"]

        if name == "normal":
            mu = params.get("mu", params.get("loc", 0.0))
            sigma = params.get("sigma", params.get("scale", 1.0))

            return {
                "type": "builtin",
                "name": "norm",
                "params": {"loc": mu, "scale": sigma}
            }

        raise NotImplementedError(
            f"statistics distribution '{name}' not supported"
        )

    raise ValueError(f"Unknown distribution type: {dist_type}")

def ensure_mixture(dist):
    """
    Convert non-mixture distribution into mixture with 1 component.
    Assumes dist is already converted to builtin format.
    """
    if dist["type"] == "mixture":
        return dist

    # Wrap builtin (or converted scipy/statistics) into mixture
    return {
        "type": "mixture",
        "components": [
            {
                "density": dist,
                "weight": 1.0
            }
        ]
    }

In [10]:
from typing import List, Dict
import copy
from collections import defaultdict
import numpy as np

def ensemble_tracker_distributions(
    tracker_distributions: Dict[str, Dict[str, List[Dict]]],
    tracker_scores: Dict[str, float] = None,
    config: dict = {"strategy": "uniform"},
) -> Dict[str, List[Dict]]:
    """
    Combine multiple tracker distributions into a single ensemble distribution.

    Each tracker provides predicted distributions at multiple time resolutions and steps.
    This function merges them using a specified weighting strategy.

    Parameters
    ----------
    tracker_distributions : Dict[str, Dict[str, List[Dict]]]
        A dictionary mapping tracker names to their predicted distributions.
        Structure:
            {
                tracker_name: {
                    resolution_1: [dist_step_0, dist_step_1, ...],
                    resolution_2: [dist_step_0, dist_step_1, ...],
                    ...
                },
                ...
            }
        Each `dist_step_*` should be a dictionary representing a probability distribution

    tracker_scores : Dict[str, float], optional
        Dictionary mapping tracker names to their current performance scores.
        Required for weighted strategies (e.g., 'score_weighted', 'softmax', 'winner', 'rank_weighted').
        If not provided or empty, the 'uniform' strategy is used.

    config : dict, optional
        Configuration dictionary. Supported keys:
            - strategy : str
                Weighting strategy. Options:
                    - "uniform": all trackers equally weighted
                    - "score_weighted": weight proportional to tracker_scores
                    - "softmax": softmax of tracker_scores, controlled by 'temperature'
                    - "winner": only the highest-scoring tracker is used
                    - "rank_weighted": weight inversely proportional to tracker rank
            - temperature : float
                Temperature parameter for 'softmax' strategy (default: 1.0)
            - top_k : int
                Optional: consider only the top-K trackers by score before applying weights

    Returns
    -------
    Dict[str, List[Dict]]
        Ensemble distributions per resolution. Structure:
            {
                resolution_1: [step_0_ensemble, step_1_ensemble, ...],
                resolution_2: [step_0_ensemble, step_1_ensemble, ...],
                ...
            }

    Notes
    -----
    - If 'top_k' is specified, trackers not in the top-K by score are ignored.
    - Each tracker distribution is wrapped as a mixture component, and its weight
      is multiplied by the tracker weight.
    - Component weights are normalized to sum to 1 per step.
    - Errors during distribution conversion or weighting are caught and logged; 
      processing continues with remaining trackers/components.
    - Designed to work with predictions from multiple trackers at multiple resolutions.
    """

    # Extract tracker names and a sample distribution for structure reference
    tracker_names = list(tracker_distributions.keys())
    example_dists = next(iter(tracker_distributions.values()))

    # ---------------------------------------------------------
    # Optional TOP-K filtering BEFORE strategy weighting
    # ---------------------------------------------------------
    if config.get("top_k", None) is not None:
        K = config["top_k"]
        if tracker_scores is None:
            raise ValueError("top_k requires tracker_scores")

        # Sort trackers by score descending and pick top-K
        ranked = sorted(tracker_scores.items(), key=lambda x: x[1], reverse=True)
        top_k_names = [t for t, _ in ranked[:K]]

        # Keep only trackers in top-K
        tracker_names = [t for t in tracker_names if t in top_k_names]

        # Safety: ensure at least one tracker remains
        if len(tracker_names) == 0:
            raise ValueError("top_k filtered out all trackers!")

    # ---------------------------------------------------------
    # Compute tracker weights based on selected strategy
    # ---------------------------------------------------------
    if config["strategy"] == "uniform" or tracker_scores is None or len(tracker_scores) == 0:
        # All trackers equally weighted
        weights = {t: 1.0 / len(tracker_names) for t in tracker_names}

    elif config["strategy"] == "score_weighted":
        # Weight proportional to score
        total = sum(tracker_scores.values())
        weights = {t: tracker_scores[t] / total if total > 0 else 1.0 / len(tracker_names) for t in tracker_names}

    elif config["strategy"] == "softmax":
        # Softmax weighting
        temperature = config.get("temperature", 1.0)
        s = np.array([tracker_scores[t] for t in tracker_names])
        w = np.exp(s / temperature)
        w /= w.sum()
        weights = {t: float(w[i]) for i, t in enumerate(tracker_names)}

    elif config["strategy"] == "winner":
        # Only the top-scoring tracker gets weight 1
        best = max(tracker_scores, key=lambda t: tracker_scores[t])
        weights = {t: 1.0 if t == best else 0.0 for t in tracker_names}
    
    elif config["strategy"] == "rank_weighted":
        # Weight inversely proportional to rank (1st rank gets highest)
        ranked = sorted(tracker_scores.items(), key=lambda x: x[1], reverse=True)
        weights = {t: 1.0 / (rank + 1) for rank, (t, _) in enumerate(ranked)}
        # Normalize weights
        total_weight = sum(weights[t] for t in tracker_names)
        weights = {t: weights[t] / total_weight for t in tracker_names}

    else:
        raise ValueError(f"Unknown strategy: {config["strategy"]}")

    # ---------------------------------------------------------
    # Build ensemble distributions for each resolution and step
    # ---------------------------------------------------------
    ensemble_distributions = {}
    resolutions = example_dists.keys()

    for resolution in resolutions:
        ensemble_distributions_resolution = []
        n_steps = len(example_dists[resolution])
        
        for step_idx in range(n_steps):
            step_ensemble = {
                "step": example_dists[resolution][step_idx]["step"],
                "type": "mixture",
                "components": []
            }
    
            # Collect all components from all trackers for this step
            for t in tracker_names:
                try:
                    raw_dist = tracker_distributions[t][resolution][step_idx]
                except Exception as e:
                    print(f"[Ensemble] ERROR: Cannot read distribution for tracker '{t}' at resolution {resolution} and step {step_idx}: {e}")
                    continue
    
                # try:
                #     # 1) Convert to builtin
                #     builtin_dist = to_builtin(raw_dist)
                # except Exception as e:
                #     print(f"[Ensemble] ERROR: Failed to convert tracker '{t}' distribution to builtin at resolution {resolution} and step {step_idx}: {e}")
                #     # Optionally log raw_dist for debugging
                #     # print("Raw distribution:", raw_dist)
                #     continue
                builtin_dist = raw_dist
    
                try:
                    # 2) Wrap as single-component mixture
                    dist = ensure_mixture(builtin_dist)
                except Exception as e:
                    print(f"[Ensemble] ERROR: ensure_mixture failed for tracker '{t}' at resolution {resolution} and step {step_idx}: {e}")
                    continue
    
                try:
                    # 3) Add weighted components
                    for comp in dist.get("components", []):
                        try:
                            new_comp = copy.deepcopy(comp)
                            new_comp["weight"] *= weights[t]
    
                            if new_comp["weight"] > 0:
                                step_ensemble["components"].append(new_comp)
    
                        except Exception as e:
                            print(f"[Ensemble] ERROR: Failed applying weight to component of tracker '{t}' at resolution {resolution} and step {step_idx}: {e}")
                            continue
    
                except Exception as e:
                    print(f"[Ensemble] ERROR: Failed processing mixture for tracker '{t}' at resolution {resolution} and step {step_idx}: {e}")
                    continue
    
    
            # Normalize mixture weights to sum to 1
            total_weight = sum(c["weight"] for c in step_ensemble["components"])
            if total_weight > 0:
                for c in step_ensemble["components"]:
                    c["weight"] /= total_weight
    
            ensemble_distributions_resolution.append(step_ensemble)

        ensemble_distributions[resolution] = ensemble_distributions_resolution

    return ensemble_distributions, weights

In [11]:
config_ensemble = {
    "strategy": "uniform",   # "uniform", "score_weighted", "softmax", "winner", "rank_weighted"
    "temperature": 1.0,      # only used for "softmax" weighting
    "top_k": 2,              # Optional TOP-K filtering BEFORE strategy weighting
}

# Naming
ensemble_name = "Ensemble_" + config_ensemble["strategy"]
if config_ensemble["top_k"] is not None:
    ensemble_name += f"_top_{config_ensemble['top_k']}"
print("Name:", ensemble_name)

ensemble_distributions, weights = ensemble_tracker_distributions(
    tracker_distributions=dict_prediction_round,
    tracker_scores=dict_scores_just_before_prediction_round,
    config = config_ensemble,
) 

print("ensemble_distributions:", ensemble_distributions)
print("Model weights:", weights)

Name: Ensemble_uniform_top_2
ensemble_distributions: {'60': [{'step': 60, 'type': 'mixture', 'components': [{'density': {'type': 'builtin', 'name': 'norm', 'params': {'loc': 0.2607780659701388, 'scale': 46.12261199070487}}, 'weight': 0.5}, {'density': {'type': 'builtin', 'name': 'norm', 'params': {'loc': -0.28500000000000003, 'scale': 29.358968892780677}}, 'weight': 0.5}]}, {'step': 120, 'type': 'mixture', 'components': [{'density': {'type': 'builtin', 'name': 'norm', 'params': {'loc': 0.2607780659701388, 'scale': 46.12261199070487}}, 'weight': 0.5}, {'density': {'type': 'builtin', 'name': 'norm', 'params': {'loc': -0.28500000000000003, 'scale': 29.358968892780677}}, 'weight': 0.5}]}, {'step': 180, 'type': 'mixture', 'components': [{'density': {'type': 'builtin', 'name': 'norm', 'params': {'loc': 0.2607780659701388, 'scale': 46.12261199070487}}, 'weight': 0.5}, {'density': {'type': 'builtin', 'name': 'norm', 'params': {'loc': -0.28500000000000003, 'scale': 29.358968892780677}}, 'weight