In [None]:
import os 
import sys
import json
sys.path.append(os.path.abspath(".."))

from velopix.hyperParameterFramework._optimizers import BaseOptimizer, pMap
from velopix.hyperParameterFramework import TrackFollowingPipeline, GraphDFSPipeline, SearchByTripletTriePipeline

In [2]:
# pyright: strict
from collections.abc import Generator
from itertools import product
from typing import Any, TypeAlias, cast, Literal
from math import inf

import numpy as np

_Types: TypeAlias = bool | int | float
HSpace: TypeAlias = Generator[pMap, None, None] # NOTE: Stands for "hypothesis space"

class GridSearch(BaseOptimizer):
    _resolution: int
    _stopped: bool = False
    _spgen: HSpace # NOTE: Stands for "space generator"
    _options: dict[str, Any]
    _total_hypotheses: int
    _last_config: pMap
    _best_config: pMap
    _best_score: float

    def __init__(self, resolution: int = 10, objective: Literal["min", "max"] = "min", options: dict[str, Any] | None = None):
        super().__init__(objective=objective)
        self._resolution = resolution
        self._options = options if options is not None else {"w": [1., 1., 1.], "nested": False}

    def init(self) -> pMap:
        """
        Initializes the optimization process by setting an initial parameter map.
        """
        self._stopped = False

        num_hypotheses = 1
        axis: dict[str, tuple[_Types, ...]] = {}

        for param, (dtype, _) in self._algorithm.get_config().items():
            if dtype == bool:
                axis[param] = (False, True)
            elif dtype in (float, int):
                low, high = cast(tuple[_Types, _Types], self._algorithm.get_bounds().get(param))
                axis[param] = tuple(np.linspace(low, high, num=self._resolution, endpoint=True))
                if dtype == int:
                    axis[param] = tuple(frozenset(map(int, axis[param])))
            else:
                raise NotImplementedError(f"Unsupported type: {dtype}")
            num_hypotheses *= len(axis[param])
            
        def spgen() -> HSpace:
            idx2axe = {i: a for i, a in enumerate(axis.keys())}
            space = product(*[axis[idx2axe[i]] for i in range(len(axis))])
            space = tuple(space)
            self._total_hypotheses = len(space)
            for point in space:
                config = {idx2axe[i]: point[i] for i in range(len(point))}
                self._last_config = config
                yield config
        
        self._spgen = spgen()

        self.best_score = inf if self.objective == "min" else -inf
        try:
            self.best_config = next(self._spgen)
            return self.best_config
        except StopIteration:
            raise RuntimeError("No hypotheses generated.")

    def next(self) -> pMap:
        """
        Generates the next parameter map by slightly modifying existing values.
        """
        last_score = self.objective_func([1., 1., 1., -10.])

        if self.objective == "min":
            if last_score < self.best_score:
                self.best_score = last_score
                self.best_config = self._last_config
        elif self.objective == "max":
            if self.best_score < last_score:
                self.best_score = last_score
                self.best_config = self._last_config
        try:
            return next(self._spgen)
        except StopIteration:
            self._stopped = True
            return self.best_config

    def is_finished(self) -> bool:
        """
        Determines if the optimization process is finished.
        In this case, it stops after `max_iterations` iterations.
        """
        return self._stopped

In [3]:
events = []
n_files = 7

for i in range(0, n_files):
    if i == 51:
        """
        There's an issue with event 51 -> module_prefix_sum contains value 79 twice resulting in and indexing error when loading the event
        """
        print(f"Skipping problematic file: velo_event_{i}.json")
    else:    
        print(f"Loading file: velo_event_{i}.json")
        event_file = open(os.path.join("../DB/raw", f"velo_event_{i}.json"))
        json_data = json.loads(event_file.read())
        events.append(json_data) # type: ignore
        event_file.close()

Loading file: velo_event_0.json
Loading file: velo_event_1.json
Loading file: velo_event_2.json
Loading file: velo_event_3.json
Loading file: velo_event_4.json
Loading file: velo_event_5.json
Loading file: velo_event_6.json


In [4]:
pipeline = TrackFollowingPipeline(events=events, intra_node=True) # type: ignore 

In [None]:
Optimiser = GridSearch(resolution=3)
optimal_parameters = pipeline.optimise_parameters(Optimiser, max_runs=243) # DO NOT remove max_runs, chances are that this will run forever

Optimising:   1%|          | 2/243 [00:00<00:09, 24.83it/s]

=== Objective Function Debug Info ===
run_data: {'total_tracks': 0, 'total_ghosts': 0, 'overall_ghost_rate': 0.0, 'event_avg_ghost_rate': 0.0, 'categories': [{'label': 'long_strange>5GeV', 'n_reco': 0, 'n_particles': 9, 'recoeffT': 0.0, 'avg_recoeff': 0.0, 'n_clones': 0, 'clone_percentage': 0.0, 'purityT': 0.0, 'avg_purity': 0.0, 'avg_hiteff': 0.0, 'hit_eff_percentage': 0.0}, {'label': 'velo', 'n_reco': 0, 'n_particles': 1765, 'recoeffT': 0.0, 'avg_recoeff': 0.0, 'n_clones': 0, 'clone_percentage': 0.0, 'purityT': 0.0, 'avg_purity': 0.0, 'avg_hiteff': 0.0, 'hit_eff_percentage': 0.0}, {'label': 'long_fromb', 'n_reco': 0, 'n_particles': 26, 'recoeffT': 0.0, 'avg_recoeff': 0.0, 'n_clones': 0, 'clone_percentage': 0.0, 'purityT': 0.0, 'avg_purity': 0.0, 'avg_hiteff': 0.0, 'hit_eff_percentage': 0.0}, {'label': 'long_strange', 'n_reco': 0, 'n_particles': 21, 'recoeffT': 0.0, 'avg_recoeff': 0.0, 'n_clones': 0, 'clone_percentage': 0.0, 'purityT': 0.0, 'avg_purity': 0.0, 'avg_hiteff': 0.0, 'hit_e




In [6]:
print(optimal_parameters) # Note these are just here for example...

{'x_slope': np.float64(0.0), 'y_slope': np.float64(0.0), 'x_tol': np.float64(0.4), 'y_tol': np.float64(0.4), 'scatter': np.float64(0.0)}
