# CMB-ML Framework: Stage Code

## Introduction

CMB-ML manages a complex pipeline that processes data across multiple stages. Each stage produces outputs that need to be tracked, reused, and processed in later stages. Without a clear framework, this can lead to disorganized code, redundant logic, and errors.

The CMB-ML library provides a set of tools to manage the pipeline in a modular and scalable way.

Each tool focuses on a specific task, such as handling data files, managing file paths, or defining pipeline stages. Together, they simplify building and maintaining complex workflows.

This notebook will focus on the elements that occur within a single stage of the pipeline.

I’ll start with the simplest components, like Assets, and build up to what's needed for a stage. Each section includes an explanation of the concept and minimal examples showing how to use it.

In my initial work, I found myself hitting some stumbling blocks repeatedly:

- Keeping track of inputs and outputs across stages.
- Avoiding repetition when handling file paths and formats.
- Scaling workflows to many simulations.
- Supporting flexibility for different datasets and stages.

By organizing these concepts into clear components, the library reduces complexity and improves reliability.

## Contents

View this notebook with [nbviewer](https://nbviewer.org/github/CMB-ML/cmb-ml/tree/main/demonstrations/E_CMB_ML_framework.ipynb#Introduction) (or in your IDE) to enable these links.

This notebook first describes the running [Example](#Example) that will be used.

Then I introduce these components of the CMB-ML library:

- [Pipeline Config](#Pipeline-Stage-Configuration): The YAML configuration
- [Assets](#Assets): Represent data files with methods for reading and writing.
- [AssetHandlers](#AssetHandlers): Handle specific file formats (e.g., .fits, .npy).
- [Namers](#Namers): Dynamically generate file paths based on the current pipeline state.
- [Executors](#Executors): Define the logic for a single stage of the pipeline.

A short [Conclusion](#Conclusion) wraps up and provides a segue to the next notebook.

## Example

I consider a very simple task: converting a power spectrum to a map. For simplicity, I work on just one "simulation."

The example will create a "Dataset" called DemoNotebook in your local_system's Dataset folder. That whole folder can be deleted when done.

The following cell gives access to the configuration directory. You may need to change line 8 for your local_system.

In [1]:
import os

# Set the location of the data directory
os.environ["CMB_ML_DATA"] = "/data/jim/CMB_Data"

I've put all the nitty-gritty code into an external module so that the notebook can be a bit cleaner. I recommend **against** looking to it for explanation. To demonstrate concepts, I've had to expose simpler classes.

In [2]:
from omegaconf import OmegaConf
from helpers.E_helper import cfg

# Pipeline Stage Configuration

I've illustrated how Hydra configs work [previously](./A_hydra_tutorial.ipynb). In that example, the configuration had only very simple parameters. I can consider a file path to be a parameter that points to more rich data somewhere on disk. The **pipeline** portion of the configuration has the information I need to work with that data.

Here's the pipeline for this simple example. I'll be "working on" the `ps2map` stage; and considering the `ps_setup` stage as already done.

In [3]:
print(OmegaConf.to_yaml(cfg.pipeline))

ps_setup:
  assets_out:
    cmb_ps:
      handler: TextPowerSpectrum
      path_template: '{root}/{dataset}/{stage}/cmb_dummy_ps.fits'
  dir_name: A_PS_Setup
ps2map:
  assets_out:
    cmb_map:
      handler: HealpyMap
      path_template: '{root}/{dataset}/{stage}/cmb_dummy_map.fits'
  assets_in:
    cmb_ps:
      stage: ps_setup
  dir_name: B_CMB_Map



The rest of the notebook will use this information, but there's a lot of fiddly detail. Instead, ignore the things and stuff to see this structure:

```yaml
stage1:
    assets_out:
        thing:
            stuff
    other: stuff
stage2:
    assets_out:
        thing:
            stuff
    assets_in:
        first_thing: stuff
        other_thing: stuff
    other: stuff
```

For each stage, I list (1) **what comes out**, (2) **what goes in**, and (3) **other stage details**. (The details include the output directory, the relevant splits, logging information, and (rarely) flags for particular stages. These will be described more later.)

# Assets 

## Overview

At its core, an `Asset` represents a piece of data used or created during a pipeline stage. Assets provide a consistent way to manage data, allowing you to:

- Read data from files
- Write data to files
- Reference a file’s location

Before diving too deep, let's see how `Asset`'s are used. Suppose we're working on an executor that makes CMB maps from power spectra. The following cell sets up the state we'd be in:

In [4]:
from helpers.E_helper import assets_in, assets_out, make_map_from_ps

Using automatically generated assets, I load the data, operate on it, and write the output succinctly:

In [5]:
ps = assets_in["cmb_ps"].read()
cmb = make_map_from_ps(ps, nside=256)
assets_out["cmb_map"].write(data=cmb)

Assets lets me focus on the process instead of worrying about details of file paths and formatting.

## Defining Assets

Assets are defined as parameters in the pipeline configuration YAMLs. For the assets used above, that looks like:

In [6]:
print(OmegaConf.to_yaml(cfg.pipeline))

ps_setup:
  assets_out:
    cmb_ps:
      handler: TextPowerSpectrum
      path_template: '{root}/{dataset}/{stage}/cmb_dummy_ps.fits'
  dir_name: A_PS_Setup
ps2map:
  assets_out:
    cmb_map:
      handler: HealpyMap
      path_template: '{root}/{dataset}/{stage}/cmb_dummy_map.fits'
  assets_in:
    cmb_ps:
      stage: ps_setup
  dir_name: B_CMB_Map



- *Output Assets* (`assets_out`) are fully defined with both a `handler` and a `path_template`. 
- *Input Assets* (`assets_in`) reference earlier stages instead for those values.

All assets have a `handler` and a `path_template`. The `handler` (an `AssetHandler`) reads or writes files and will be described in the next section. The `path_template` is a string with placeholder tags (like {root} and {stage}) used to dynamically generate the filenames.

Compare the YAML to the values in the Python objects:

In [7]:
def report_asset(asset_str, asset):
    msg = f"Asset: {asset_str}\n"\
          f"  Handler:       {asset.handler.__class__.__name__}\n"\
          f"  Path template: {asset.path_template}\n"\
          f"  Path:          {asset.path}"
    print(msg)

report_asset("cmb_map", assets_out["cmb_map"])
report_asset("cmb_ps", assets_in["cmb_ps"])

Asset: cmb_map
  Handler:       HealpyMap
  Path template: {root}/{dataset}/{stage}/cmb_dummy_map.fits
  Path:          /data/jim/CMB_Data/Datasets/DemoNotebook/B_CMB_Map/cmb_dummy_map.fits
Asset: cmb_ps
  Handler:       TextPowerSpectrum
  Path template: {root}/{dataset}/{stage}/cmb_dummy_ps.fits
  Path:          /data/jim/CMB_Data/Datasets/DemoNotebook/A_PS_Setup/cmb_dummy_ps.fits


I'm currently working on the stage `ps2map`, which is where the two assets used above are defined. When I need to check what's going on, here's how I look at the **YAML**:
- The output asset is straightforward. I start at `ps2map`, then look to `assets_out` and then `cmb_map`. I see that HealpyMap is defined as the handler, and the end of the path_template specifies "cmb_dummy_map.fits". This is confirmed in the Python output.
- The input asset requires a bit more looking. All inputs are considered as outputs of previous stages. I start at `ps2map`, then look to `assets_in` and then `cmb_ps`. When I see `cmb_ps: {stage: ps_setup}`, I know I need to look at the `ps_setup` stage and the `assets_out`. From there I see the values reflected in the Python output.

Note that the path_template is automatically filled in to form the path. This will be explained later.

## Safety with Assets

One reason I'm clear about input versus output assets is so that I know how my data was generated. If I have an asset that handles access to a source map, I can manipulate that data all I want in RAM. However, when I try to save it I cannot accidentally overwrite the source data on disk. For instance:

In [8]:
try:
    assets_in["cmb_ps"].write(data=ps)
except AttributeError:
    print("Do not write to an input asset!")

Do not write to an input asset!


Similarly, I cannot accidentally read an output asset. It is my hope that your code works perfectly from the very start. However, if it is anything like mine has been, you may appreciate that you cannot accidentally re-use leftovers from a previous run.

# AssetHandlers

## Overview

An Asset provides a uniform interface for interacting with data. Because there are far fewer file *types* than different files, I define `AssetHandlers` to do the reading or writing for a type of data. This way, the Asset can focus on where the data will be, while the AssetHandler can focus on I/O.

In some cases, especially with plain text, there are conventions within the file. In other cases, we will want the data as read to be in a particular configuration. We define different AssetHandlers for these cases as well.

We can use AssetHandlers implicitly (as above in [Assets](#assets)), and this is generally the preferred way to go about it. However, I sometimes use an AssetHandler on its own. In the following example, I show how this is done:

In [9]:
from cmbml.core.asset_handlers import TextPowerSpectrum


# Loading using the handler directly
ps_path = assets_in["cmb_ps"].path
ps_handler = TextPowerSpectrum()
ps_from_file = ps_handler.read(ps_path)
print(ps_from_file[:3])

# Loading using the asset
ps_from_asset = assets_in["cmb_ps"].read()
print(ps_from_asset[:3])

# Confirm that the data is the same:
assert (ps_from_file == ps_from_asset).all()

print("This is the same data!")

[1048.93659    1006.92305844  970.47279622]
[1048.93659    1006.92305844  970.47279622]
This is the same data!


## Defining AssetHandlers

AssetHandlers are simple objects. To show this, I can extend the system by creating custom AssetHandlers for new file types. For example, suppose I want to read the power spectrum files as text:

In [10]:
from cmbml.core import GenericHandler, make_directories

class TextHandler(GenericHandler):
    def read(self, path):
        with open(path, 'r') as f:
            res = f.read()
        return res

    def write(self, path, data):
        # Ensure the directory exists
        make_directories(path)
        with open(path, 'w') as f:
            f.write(data)

This will now work as a standalone handler.

In [11]:
txt_handler = TextHandler()

text_ps = txt_handler.read(ps_path)
print(text_ps[:40])  # The first 40 characters of the file

1.048936590000000024e+03
1.0069230584423


In order to use the new handler within an Executor, I need to also register the handler. In the module with the new asset handler, I have

In [12]:
from cmbml.core.asset_handlers.asset_handler_registration import register_handler

# After the class definition
register_handler("TextHandler", TextHandler)

Adding AssetHandlers is not a common occurence, but this helps show that they're fairly simple. It's a convenient way to keep file-handling code out of the way.

# Namers

## Overview

The `Namer` handles dynamic file path generation in the pipeline. Instead of hardcoding file paths, the Namer takes a path_template ({dataset}, {stage}, {sim}, or others) and fills them in based on the pipeline’s current state. This ensures that file paths remain consistent and adaptable, even across many simulations and stages.

Every Asset relies on the Namer to construct its path property, making it a central piece of the pipeline’s infrastructure.

You probably **do not need to create a Namer**, ever. These are handled by the framework.

Recall the definition of this pipeline stage from earlier:

In [13]:
print(OmegaConf.to_yaml(cfg.pipeline.ps2map))

assets_out:
  cmb_map:
    handler: HealpyMap
    path_template: '{root}/{dataset}/{stage}/cmb_dummy_map.fits'
assets_in:
  cmb_ps:
    stage: ps_setup
dir_name: B_CMB_Map



The Namer object will take the 

```yaml
        path_template: "{root}/{dataset}/{stage}/cmb_dummy_map.fits"
```

and convert it into an actual path:

In [14]:
print(assets_out["cmb_map"].path_template)
print(assets_out["cmb_map"].path)

{root}/{dataset}/{stage}/cmb_dummy_map.fits
/data/jim/CMB_Data/Datasets/DemoNotebook/B_CMB_Map/cmb_dummy_map.fits


## Purpose

The Namer may not seem very important right now, but it's a critical part of the CMB-ML framework. The example above was for a generic single file produced in a stage. What if we're producing noise? I'll want to produce a different noise map for each detector frequency and save each of them immediately. However, I don't want to keep track (pass around variables) of the `{root}`, `{dataset}`, and `{stage}`, even though they're needed for the filename.

The structure of the Namer allows us to have defined these values ahead of time. Then, when iterating through each channel, I can simply tell the Namer what channel is being used, and it can handle the details.

We're producing multiple simulations across multiple datasplits. I can tell the Namer to keep track of these individually, nesting the settings.

## Using Namer

The Namer is stateful, meaning it tracks the current context of the stage being run (e.g., for the current split or simulation number).

In [15]:
from helpers.E_helper import name_tracker

name_tracker.context

{'dataset': 'DemoNotebook',
 'working': '',
 'root': '/data/jim/CMB_Data/Datasets',
 'src_root': '/data/jim/CMB_Data/Assets/'}

Within the asset, when the path property is used, it uses the Namer's `path()` method to fill in the template.

In [16]:
path_template = assets_out["cmb_map"].path_template
try:
    name_tracker.path(path_template)
except KeyError as e:
    print('KeyError:', e)

KeyError: 'Key stage not found in the context. Ensure that the path_template {root}/{dataset}/{stage}/cmb_dummy_map.fits is correct in the pipeline yaml.'


Compare the template to the current context of the Namer.

In [17]:
print(path_template)
name_tracker.context
# Notice "stage" is in the path_template, but there's no matching key in the context

{root}/{dataset}/{stage}/cmb_dummy_map.fits


{'dataset': 'DemoNotebook',
 'working': '',
 'root': '/data/jim/CMB_Data/Datasets',
 'src_root': '/data/jim/CMB_Data/Assets/'}

Indeed, we don't have the stage set. Normally, the asset within an Executor will handle this. In this case, I set it temporarily. I try to always use the context manager to do this. Outside the `with` block, the Namer's context reverts. This ensures that I can't forget to change the context (e.g., a simulation number or current split).

In [18]:
with name_tracker.set_context("stage", "B_CMB_Map"):
    print(name_tracker.path(path_template))

/data/jim/CMB_Data/Datasets/DemoNotebook/B_CMB_Map/cmb_dummy_map.fits


At times, I'll want to set multiple things in the context simultaneously. This is especially common when setting up for parallelized runs. Instead of multiple nested `with` blocks, I use the `set_contexts()` (note set_context***s***(), not set_context()) function instead, with the following pattern:

In [19]:
# Show the initial context
print(name_tracker.context)  # no "freq" or "epoch"

# Show the context within the context manager
new_context = dict(freq=100, epoch=50)
with name_tracker.set_contexts(new_context):
    print(name_tracker.context)  # includes "freq" and "epoch"

# Show the context after the context manager
print(name_tracker.context)  # no "freq" or "epoch"

{'dataset': 'DemoNotebook', 'working': '', 'root': '/data/jim/CMB_Data/Datasets', 'src_root': '/data/jim/CMB_Data/Assets/'}
{'dataset': 'DemoNotebook', 'working': '', 'root': '/data/jim/CMB_Data/Datasets', 'src_root': '/data/jim/CMB_Data/Assets/', 'freq': 100, 'epoch': 50}
{'dataset': 'DemoNotebook', 'working': '', 'root': '/data/jim/CMB_Data/Datasets', 'src_root': '/data/jim/CMB_Data/Assets/'}


All assets have access to the same Namer.

In [20]:
# We know name_tracker is the same for all assets; it was created in the same context
assert name_tracker == assets_out["cmb_map"].name_tracker
assert name_tracker == assets_in["cmb_ps"].name_tracker
assert assets_out["cmb_map"].name_tracker == assets_in["cmb_ps"].name_tracker

# Testing equality of objects shows they're the *same* object, not just equivalent
print("No assertion errors: all objects are the same")

No assertion errors: all objects are the same


Knowing that the Namer will take care of the paths, and that the AssetHandler will take care of reading and writing data, I can now move on to defining the stage itself.

# Executors

## Overview

I've now got the essential components of an `Executor` and can describe it as a whole.

An Executor represents a single stage in the pipeline. Its primary role is to process input data (via Assets) and produce output data. Executors are modular, meaning each stage is implemented as its own class, making the pipeline easy to extend, debug, and maintain.

At a high level, an Executor:

- Initializes input and output Assets.
- Defines the logic for processing data.
- Runs the process for all relevant simulations and splits.

We'll again start with an example, set up elsewhere:

In [21]:
from helpers.E_helper import helper, InitPS2MapExecutor

Recall that, within the config for the pipeline, the `ps2map` stage has been set up as follows:

In [22]:
print(OmegaConf.to_yaml(cfg.pipeline.ps2map))

assets_out:
  cmb_map:
    handler: HealpyMap
    path_template: '{root}/{dataset}/{stage}/cmb_dummy_map.fits'
assets_in:
  cmb_ps:
    stage: ps_setup
dir_name: B_CMB_Map



This executor has the correct assets:

In [23]:
def display_asset_list(asset_list):
    for asset_key, asset_val in asset_list.items():
        print(f"  {asset_key}")
        print(f"    Handler:  {asset_val.handler.__class__.__name__}")
        print(f"    Template: {asset_val.path_template}")
        print(f"    Path:     {asset_val.path}")

print("Outputs:")
display_asset_list(helper.assets_out)
print("Inputs:")
display_asset_list(helper.assets_in)

Outputs:
  cmb_map
    Handler:  HealpyMap
    Template: {root}/{dataset}/{stage}/cmb_dummy_map.fits
    Path:     /data/jim/CMB_Data/Datasets/DemoNotebook/B_CMB_Map/cmb_dummy_map.fits
Inputs:
  cmb_ps
    Handler:  TextPowerSpectrum
    Template: {root}/{dataset}/{stage}/cmb_dummy_ps.fits
    Path:     /data/jim/CMB_Data/Datasets/DemoNotebook/A_PS_Setup/cmb_dummy_ps.fits


And it has the following `.execute()` method:

```python
    def execute(self):
        print("This is an example helper")
        return
```

In [24]:
helper.execute()

This is an example executor.


This trivial method runs, but it isn't a good example of using all the tools described so far.

## Defining Executors

I've got the pipeline defined in the YAML. I know what Assets I need, I have the AssetHandlers working, and I know how I can use the Namer to manage paths for me. Now I'll rebuild it. I usually have to review my pipeline configuration yaml at this point.

In [25]:
print(OmegaConf.to_yaml(cfg.pipeline))

ps_setup:
  assets_out:
    cmb_ps:
      handler: TextPowerSpectrum
      path_template: '{root}/{dataset}/{stage}/cmb_dummy_ps.fits'
  dir_name: A_PS_Setup
ps2map:
  assets_out:
    cmb_map:
      handler: HealpyMap
      path_template: '{root}/{dataset}/{stage}/cmb_dummy_map.fits'
  assets_in:
    cmb_ps:
      stage: ps_setup
  dir_name: B_CMB_Map



First, I write out the `__init__()` method. I specify the "stage_str", which is the name of the stage as it appears in the YAML file. Then I define instance variables for each of my assets. I could alsoo define other variables, based on parameters in the `cfg`. I take great pains to avoid defining `self.cfg = cfg`, should the inclination arise.

Last, I define the `execute()` method. It must have this name, and it must not take any parameters. In this case, I can use the logic from [Assets Overview](#Overview) (repeated here):

```python
ps = assets_in["cmb_ps"].read()
cmb = make_map_from_ps(ps, nside=256)
assets_out["cmb_map"].write(data=cmb)
```

Other things may be included as well, but this suffices for the current example:

In [26]:
import healpy as hp
from cmbml.core import BaseStageExecutor

class PS2MapExecutor(BaseStageExecutor):
    def __init__(self, cfg) -> None:
        # I call the base class's __init__ method
        # The stage_str matches the key in the pipeline config
        super().__init__(cfg, stage_str="ps2map")

        # The base class will have generated assets_in and assets_out
        # I give myself more descriptive names.
        # I use "in_" or "out_" prefixes and "_asset" suffix for clarity
        self.out_map_asset = self.assets_out["cmb_map"]
        self.in_ps_asset = self.assets_in["cmb_ps"]

    def execute(self) -> None:
        # Use the input asset to read the power spectrum
        ps = self.in_ps_asset.read()
        print(f"Power spectrum read from {self.out_map_asset.path}")
        
        # Perform the operation
        cmb = hp.synfast(ps, nside=256)

        # Save the resulting map using the output asset
        self.out_map_asset.write(data=cmb)
        print(f"Map written to {self.out_map_asset.path}")
        return

In [27]:
ps2map_executor = PS2MapExecutor(cfg)
ps2map_executor.execute()

Power spectrum read from /data/jim/CMB_Data/Datasets/DemoNotebook/B_CMB_Map/cmb_dummy_map.fits
Map written to /data/jim/CMB_Data/Datasets/DemoNotebook/B_CMB_Map/cmb_dummy_map.fits


Because of all the work in the other classes above, this is simple.

Of course, there's a lot more that can be done. This is a very simple example that doesn't go into processing across multiple splits or simulations. In those cases I have a few different structures I can use. Often, there are also several steps of processing to apply within a stage. Those will be addressed [elsewhere](./G_CMB_ML_executors.ipynb). First, though, I'll look at a pipeline as a whole.

# Conclusion

I've set the groundwork for the main design patterns used as portions of a pipeline stage. You've seen how these different classes interact. 

- The pipeline configuration YAML defines *how* and from *where* data is read
- `Asset`s are an easy interface to that data
- `AssetHandler`s do the actual reading or writing of the data
- The `Namer` simplifies working with the paths to data
- `Executor`s automatically set up those classes and then include the business logic for a stage

In the [next notebook](F_CMB_ML_pipeline.ipynb), we'll look at how stages are put together to form pipelines.