In [1]:
#---------------------------------------------------------------------------------
#                                 _             _      
#                                | |_  ___ _ __(_)__ _ 
#                                | ' \/ -_) '_ \ / _` |
#                                |_||_\___| .__/_\__,_|
#                                         |_|          
#
#---------------------------------------------------------------------------------
#
# Company: HEPIA // HES-SO
# Engineer: Hugo Varenne <hugo.varenne@master.hes-so.ch>
# 
# Project Name: Unleashing the Full Potential of 
#               High-Performance Cherenkov Telescopes
#               with Fully-Digital Solid-State Sensors Camera
#
# File: 5.0_Custom_Model_Usage.ipynb
# Description: Notebook for creating models using CTLearn
#
# Last update: 2025-10-02
#
#--------------------------------------------------------------------------------

### Imports

In [2]:
import sys
import os
import importlib
import glob
import shutil
import hdf5plugin, h5py
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
from ctapipe.io import EventSource
from sklearn import metrics
import importlib
import numpy as np
import time
import json
from ctlearn.tools.predict_model import MonoPredictCTLearnModel
from ctlearn.utils import validate_trait_dict
import keras
from tensorflow.keras import Input, Model
from tensorflow.python.framework.convert_to_constants import convert_variables_to_constants_v2_as_graph
from ctlearn.tools.train_model import TrainCTLearnModel
from ctlearn.core.model import SingleCNN
from ctapipe.core.traits import ComponentName
from traitlets.config import Config
import yaml


# Custom tools
tools_path = os.path.join("../tools")
if tools_path not in sys.path:
    sys.path.append(tools_path)

2025-12-19 11:33:55.474086: I tensorflow/core/util/port.cc:111] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-12-19 11:33:55.614081: E tensorflow/compiler/xla/stream_executor/cuda/cuda_dnn.cc:9342] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2025-12-19 11:33:55.614128: E tensorflow/compiler/xla/stream_executor/cuda/cuda_fft.cc:609] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2025-12-19 11:33:55.614197: E tensorflow/compiler/xla/stream_executor/cuda/cuda_blas.cc:1518] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-12-19 11:33:55.642441: I tensorflow/core/platform/cpu_feature_g

2025-12-19 11:34:01,120 | INFO | Logging initialized. All stdout/stderr will go to SLURM log.


In [3]:
# Set paths shortcuts (configurable in a yaml file)
import tools.CTLearnMgrConfig as CTLearnMgrConfig
importlib.reload(CTLearnMgrConfig)

ctlearn_mgr_config = CTLearnMgrConfig.CTLearnMgrConfig()
ctlearn_mgr_config.load_config('../config/ctlearnmgr_config.yml')

### Verify data format

In [4]:
import re
import os
string = "gamma_runs_102400-102437.dl1.h5"
split = [int(s) for s in re.findall(r'\d+', string.split("/")[-1])][-2]
path = os.path.basename(string).split(".", 1)[0]
print(path)

gamma_runs_102400-102437


### Variables

In [5]:
# Type of model you wanna create : ["type", "energy", "direction"]
RECO = "energy"

# Custom model loaded 
custom = False

# Name of the model (should match config name) ["ResNet", "SimpleCNN", "LoadedModel"] are the types of models
loading = "ResNet"

# Custom model config path 
config_filename = f"{RECO}_{loading}_loading.yaml"


In [6]:
# Load config file
config_path = os.path.join(ctlearn_mgr_config.workspace_path, "models", "configs", config_filename)
with open(config_path, "r") as f:
    yaml_config = yaml.safe_load(f)
c = Config(yaml_config)

### Training (testing loading)

In [9]:
"""
Tool to train a ``CTLearnModel`` on R1/DL1a data using the ``DLDataReader`` and ``DLDataLoader``.
"""

import atexit
import keras
import pandas as pd
import numpy as np
import shutil

import tensorflow as tf

import math
import tensorflow_model_optimization as tfmot
from tensorflow_model_optimization.sparsity.keras import prune_low_magnitude, UpdatePruningStep, PolynomialDecay


from ctapipe.core import Tool
from ctapipe.core.tool import ToolConfigurationError
from ctapipe.core.traits import (
    Bool,
    CaselessStrEnum,
    Path,
    Float,
    Int,
    List,
    Dict,
    classes_with_traits,
    ComponentName,
    Unicode,
)
from tools.reader import DLDataReader
from ctlearn.core.loader import DLDataLoader
from ctlearn.core.model import CTLearnModel
from ctlearn.utils import validate_trait_dict


class LoadCTLearnModel(Tool):
    input_dir_signal = Path(
        help="Input directory for signal events",
        allow_none=False,
        exists=True,
        directory_ok=True,
        file_ok=False,
    ).tag(config=True)

    file_pattern_signal = List(
        trait=Unicode(),
        default_value=["*.h5"],
        help="List of specific file pattern for matching files in ``input_dir_signal``",
    ).tag(config=True)

    input_dir_background = Path(
        default_value=None,
        help="Input directory for background events",
        allow_none=True,
        exists=True,
        directory_ok=True,
        file_ok=False,
    ).tag(config=True)

    file_pattern_background = List(
        trait=Unicode(),
        default_value=["*.h5"],
        help="List of specific file pattern for matching files in ``input_dir_background``",
    ).tag(config=True)

    dl1dh_reader_type = ComponentName(
        DLDataReader, default_value="DLImageReader"
    ).tag(config=True)

    model_type = ComponentName(
        CTLearnModel, default_value="ResNet"
    ).tag(config=True)

    output_dir = Path(
        exits=False,
        default_value=None,
        allow_none=False,
        directory_ok=True,
        file_ok=False,
        help="Output directory for the trained reconstructor.",
    ).tag(config=True)

    reco_tasks = List(
        trait=CaselessStrEnum(["type", "energy", "cameradirection", "skydirection"]),
        allow_none=False, 
        help=(
            "List of reconstruction tasks to perform. "
            "'type': classification of the primary particle type "
            "'energy': regression of the primary particle energy "
            "'cameradirection': regression of the primary particle arrival direction in camera coordinates "
            "'skydirection': regression of the primary particle arrival direction in sky coordinates"
        )
    ).tag(config=True)

    batch_size = Int(
        default_value=64,
        allow_none=False,
        help="Size of the batch to train the neural network.",
    ).tag(config=True)
    
    stack_telescope_images = Bool(
        default_value=False,
        allow_none=False,
        help=(
            "Set whether to stack the telescope images in the data loader. "
            "Requires DLDataReader mode to be ``stereo``."
        ),
    ).tag(config=True)

    sort_by_intensity = Bool(
        default_value=False,
        allow_none=True,
        help=(
            "Set whether to sort the telescope images by intensity in the data loader. "
            "Requires DLDataReader mode to be ``stereo``."
        ),
    ).tag(config=True)

    random_seed = Int(
        default_value=0,
        help=(
            "Random seed for shuffling the data "
            "before the training/validation split "
            "and after the end of an epoch."
        )
    ).tag(config=True)

    validation_split = Float(
        default_value=0.1,
        help="Fraction of the data to use for validation",
        min=0.01,
        max=0.99,
    ).tag(config=True)


    classes = classes_with_traits(CTLearnModel) + classes_with_traits(DLDataReader)

    def setup(self):
        # Check if the output directory exists
        # Create a MirroredStrategy.
        self.strategy = tf.distribute.MirroredStrategy()
        atexit.register(self.strategy._extended._collective_ops._lock.locked)  # type: ignore
        self.log.info("Number of devices: %s", self.strategy.num_replicas_in_sync)
        # Get signal input files
        self.input_url_signal = []
        for signal_pattern in self.file_pattern_signal:
            self.input_url_signal.extend(self.input_dir_signal.glob(signal_pattern))
        # Get bkg input files
        self.input_url_background = []
        if self.input_dir_background is not None:
            for background_pattern in self.file_pattern_background:
                self.input_url_background.extend(self.input_dir_background.glob(background_pattern))

        # Set up the data reader
        self.log.info("Loading data:")
        self.log.info("For a large dataset, this may take a while...")
        if self.dl1dh_reader_type == "DLFeatureVectorReader":
            raise NotImplementedError(
                "'DLFeatureVectorReader' is not supported in CTLearn yet. "
                "Missing stereo CTLearnModel implementation."
            )
        self.dl1dh_reader = DLDataReader.from_name(
            self.dl1dh_reader_type,
            input_url_signal=sorted(self.input_url_signal),
            input_url_background=sorted(self.input_url_background),
            parent=self,
        )
        self.log.info("Number of events loaded: %s", self.dl1dh_reader._get_n_events())
        if "type" in self.reco_tasks:
            self.log.info("Number of signal events: %d", self.dl1dh_reader.n_signal_events)
            self.log.info("Number of background events: %d", self.dl1dh_reader.n_bkg_events)
        # Check if the number of events is enough to form a batch
        if self.dl1dh_reader._get_n_events() < self.batch_size:
            raise ValueError(
                f"{self.dl1dh_reader._get_n_events()} events are not enough "
                f"to form a batch of size {self.batch_size}. Reduce the batch size."
            )
        # Check if there are at least two classes in the reader for the particle classification
        if self.dl1dh_reader.class_weight is None and "type" in self.reco_tasks:
            raise ValueError(
                "Classification task selected but less than two classes are present in the data."
            )
        # Check if stereo mode is selected for stacking telescope images
        if self.stack_telescope_images and self.dl1dh_reader.mode == "mono":
            raise ToolConfigurationError(
                f"Cannot stack telescope images in mono mode. Use stereo mode for stacking."
            )
        # Ckeck if only one telescope type is selected for stacking telescope images
        if self.stack_telescope_images and len(list(self.dl1dh_reader.selected_telescopes)) > 1:
            raise ToolConfigurationError(
                f"Cannot stack telescope images from multiple telescope types. Use only one telescope type."
            )
        # Check if sorting by intensity is disabled for stacking telescope images
        if self.stack_telescope_images and self.sort_by_intensity:
            raise ToolConfigurationError(
                f"Cannot stack telescope images when sorting by intensity. Disable sorting by intensity."
            )

        # Set up the data loaders for training and validation
        indices = list(range(self.dl1dh_reader._get_n_events()))
        # Shuffle the indices before the training/validation split
        np.random.seed(self.random_seed)
        np.random.shuffle(indices)
        n_validation_examples = int(self.validation_split * self.dl1dh_reader._get_n_events())
        training_indices = indices[n_validation_examples:]
        validation_indices = indices[:n_validation_examples]
        self.training_loader = DLDataLoader(
            self.dl1dh_reader,
            training_indices,
            tasks=self.reco_tasks,
            batch_size=self.batch_size*self.strategy.num_replicas_in_sync,
            random_seed=self.random_seed,
            sort_by_intensity=self.sort_by_intensity,
            stack_telescope_images=self.stack_telescope_images,
        )
        self.validation_loader = DLDataLoader(
            self.dl1dh_reader,
            validation_indices,
            tasks=self.reco_tasks,
            batch_size=self.batch_size*self.strategy.num_replicas_in_sync,
            random_seed=self.random_seed,
            sort_by_intensity=self.sort_by_intensity,
            stack_telescope_images=self.stack_telescope_images,
        )

In [10]:
# Call Train Class and start training
model = LoadCTLearnModel(config=c)
start = time.time()
try:
    model.run()
except SystemExit as e:
    print(f"Caught SystemExit ({e.code}, continuing...)")
end = time.time()

--- Logging error ---
Traceback (most recent call last):
  File "/home/hugo/miniforge3/envs/ctlearn/lib/python3.10/logging/__init__.py", line 440, in format
    return self._format(record)
  File "/home/hugo/miniforge3/envs/ctlearn/lib/python3.10/logging/__init__.py", line 436, in _format
    return self._fmt % values
KeyError: 'highlevel'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/hugo/miniforge3/envs/ctlearn/lib/python3.10/logging/__init__.py", line 1100, in emit
    msg = self.format(record)
  File "/home/hugo/miniforge3/envs/ctlearn/lib/python3.10/logging/__init__.py", line 943, in format
    return fmt.format(record)
  File "/home/hugo/miniforge3/envs/ctlearn/lib/python3.10/site-packages/ctapipe/core/logging.py", line 52, in format
    s = super().format(record)
  File "/home/hugo/miniforge3/envs/ctlearn/lib/python3.10/logging/__init__.py", line 681, in format
    s = self.formatMessage(record)
  File "/ho

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)
2025-12-19 11:36:38,360 | INFO | Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:CPU:0',)


2025-12-19 11:36:38,362 [1;32mINFO[0m [__main__.application] (1063718094.setup): Number of devices: 1
2025-12-19 11:36:38,364 [1;32mINFO[0m [__main__.application] (1063718094.setup): Loading data:
2025-12-19 11:36:38,364 [1;32mINFO[0m [__main__.application] (1063718094.setup): For a large dataset, this may take a while...
2025-12-19 11:36:38,365 [1;32mINFO[0m [__main__.application.DLImageReader] (reader.__init__): Loading 12 files with multiprocessing...
2025-12-19 11:36:38,366 [1;32mINFO[0m [__main__.application.DLImageReader] (reader.__init__): CPU available : 16
2025-12-19 11:36:38,383 [1;32mINFO[0m [__main__.application.DLImageReader] (reader.__init__): All files loaded.
2025-12-19 11:36:38,404 [1;32mINFO[0m [__main__.application.DLImageReader] (reader.__init__): Get metadata
2025-12-19 11:36:38,614 [1;32mINFO[0m [__main__.application.DLImageReader] (reader.__init__): Finish metadata
2025-12-19 11:36:38,872 [1;32mINFO[0m [__main__.application.DLImageReader] (reade







2025-12-19 11:36:40,474 [1;32mINFO[0m [__main__.application.DLImageReader] (reader.__init__): End mono example
2025-12-19 11:36:40,477 [1;32mINFO[0m [__main__.application] (1063718094.setup): Number of events loaded: 3236
2025-12-19 11:36:40,479 [1;34mDEBUG[0m [__main__.application] (tool.run): CONFIG: {'LoadCTLearnModel': {'batch_size': 32, 'config_files': [], 'dl1dh_reader_type': 'DLImageReader', 'file_pattern_background': ['*.h5'], 'file_pattern_signal': ['gamma_*.h5'], 'input_dir_background': None, 'input_dir_signal': PosixPath('/home/hugo/TM/data/samples/gamma/train'), 'log_config': {}, 'log_datefmt': '%Y-%m-%d %H:%M:%S', 'log_file': None, 'log_file_level': 'INFO', 'log_format': '[%(name)s]%(highlevel)s %(message)s', 'log_level': 10, 'logging_config': {}, 'model_type': 'ResNet', 'output_dir': PosixPath('/home/hugo/TM/ml/models/energy/loader'), 'overwrite': True, 'provenance_log': PosixPath('/home/hugo/TM/ml/application.provenance.log'), 'quiet': False, 'random_seed': 0, 'rec

Caught SystemExit (0, continuing...)
