Skip to content

Commit

Permalink
Issue/136/build pipeline (#137)
Browse files Browse the repository at this point in the history
* added pipeline stuff

* WIP, playing with pipeline building cli

* added scripts.build_pipeline_yaml to testing

* Added build_pipe and run_stage to rail cli

* Added name_utils and catalog_utils to src/rail/utils

* fix up RailPipeline.build_and_write

* fix up TrainZPipeline

* Added build_and_read_pipelines to testing_utils

* Added test for train_z

* added build_pipeline to scripts

* tweaks to name_utils

* fixes for unit tests

* WIP sid's changes for RailProject

* WIP more of Sid's changes

* Working version of name_utils, train_z_pipeline and testing of same

* remove old version of test_pipeline

* WIP on name_utils

* simplify naming

* WIP, improvements to making rail pipelines

* WIP, cleaning up interfaces

* WIP, cleaning up interfaces

* Tweaking RailPipeline

* minor changes for pylint

* keep track of active catalog tag

* Fix up build_pipe command

* add hdf5_groupname to catalog utils

* Fix imports in stage

* Fix up coverage
  • Loading branch information
eacharles committed Jul 29, 2024
1 parent bb064dd commit e7bb3d4
Show file tree
Hide file tree
Showing 12 changed files with 398 additions and 7 deletions.
39 changes: 39 additions & 0 deletions src/rail/cli/commands.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import click

from rail.core import __version__
Expand Down Expand Up @@ -100,3 +101,41 @@ def estimate(stage_name, stage_class, stage_module, model_file, dry_run, input_f
data_path=input_file,
)


@options.pipeline_class()
@options.output_yaml()
@options.catalog_tag()
@options.stages_config()
@options.outdir()
@options.inputs()
def build_pipe(pipeline_class, output_yaml, catalog_tag, stages_config, outdir, inputs):
"""Build a pipeline yaml file"""
input_dict = {}
for input_ in inputs:
tokens = input_.split('=')
assert len(tokens) == 2
input_dict[tokens[0]] = tokens[1]
scripts.build_pipeline(pipeline_class, output_yaml, catalog_tag, input_dict, stages_config, outdir)
return 0


@cli.command()
@options.pipeline_yaml()
@options.stage_name()
@options.dry_run()
@options.inputs()
def run_stage(pipeline_yaml, stage_name, dry_run, inputs):
"""Run a pipeline stage"""
pipe = ceci.Pipeline.read(pipeline_yaml)
input_dict = {}
for input_ in inputs:
tokens = input_.split('=')
assert len(tokens) == 2
input_dict[tokens[0]] = tokens[1]
com = pipe.generate_stage_command(stage_name, **input_dict)
if dry_run:
print(com)
else:
os.system(com)
return 0

21 changes: 19 additions & 2 deletions src/rail/cli/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@
__all__ = [
"clear_output",
"bpz_demo_data",
"catalog_tag",
"dry_run",
"outdir",
"output_yaml",
"from_source",
"model_file",
"git_mode",
"pipeline_class",
"pipeline_yaml",
"print_all",
"print_packages",
"print_namespaces",
Expand Down Expand Up @@ -90,6 +94,14 @@ def __call__(self, *args: Any, **kwargs: Any) -> Any: # pragma: no cover
is_flag=True,
)


catalog_tag = PartialOption(
"--catalog_tag",
default=None,
help="Type of input catalog, used to determine column names",
)


from_source = PartialOption(
"--from-source",
help="Install from source",
Expand Down Expand Up @@ -177,6 +189,13 @@ def __call__(self, *args: Any, **kwargs: Any) -> Any: # pragma: no cover
is_flag=True,
)

project_yaml = PartialOption(
"--project_yaml",
type=click.Path(),
default=None,
help="File with project description",
)

package_file = PartialOption(
"--package-file",
type=click.Path(),
Expand Down Expand Up @@ -216,8 +235,6 @@ def __call__(self, *args: Any, **kwargs: Any) -> Any: # pragma: no cover
default=None,
)



inputs = PartialArgument("inputs", nargs=-1)

verbose_download = PartialOption(
Expand Down
26 changes: 26 additions & 0 deletions src/rail/cli/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@

import rail.stages
from rail.core import RailEnv
from rail.core.stage import RailPipeline
from rail.cli.options import GitMode
from rail.utils.path_utils import RAILDIR
from rail.utils import catalog_utils


def render_nb(outdir, clear_output, dry_run, inputs, skip, **_kwargs):
Expand Down Expand Up @@ -179,3 +181,27 @@ def get_data(verbose, **kwargs): # pragma: no cover
os.system(
f'curl -o {local_abs_path} {data_file["remote_path"]} --create-dirs'
)


def build_pipeline(
pipeline_class,
output_yaml,
catalog_tag=None,
input_dict=None,
stages_config=None,
output_dir='.',
log_dir=None,
**kwargs
):
tokens = pipeline_class.split('.')
module = '.'.join(tokens[:-1])
class_name = tokens[-1]

if catalog_tag:
catalog_utils.apply_defaults(catalog_tag)

if log_dir is None:
log_dir = os.path.join(output_dir, 'logs', class_name)

__import__(module)
RailPipeline.build_and_write(class_name, output_yaml, input_dict, stages_config, output_dir, log_dir, **kwargs)
67 changes: 64 additions & 3 deletions src/rail/core/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

import os
import sys
import yaml
from math import ceil

from ceci import PipelineStage, MiniPipeline
from ceci.stage import PipelineStage
from ceci.pipeline import MiniPipeline
from ceci.config import StageParameter as Param
from rail.core.data import DATA_STORE, DataHandle

Expand Down Expand Up @@ -48,6 +50,13 @@ class RailStageBuild:
def __init__(self, stage_class, **kwargs):
self.stage_class = stage_class
self._kwargs = kwargs
self._stage = None

@property
def io(self): # pragma: no cover
if self._stage:
return self._stage.io
return None

def build(self, name):
"""Actually build the stage, this is called by the pipeline the stage
Expand All @@ -63,8 +72,8 @@ def build(self, name):
stage : `RailStage`
The newly built stage
"""
stage = self.stage_class.make_and_connect(name=name, **self._kwargs)
return stage
self._stage = self.stage_class.make_and_connect(name=name, **self._kwargs)
return self._stage


class RailPipeline(MiniPipeline):
Expand All @@ -77,6 +86,58 @@ class RailPipeline(MiniPipeline):
And end up with a fully specified pipeline.
"""
pipeline_classes = {}

def __init_subclass__(cls):
cls.pipeline_classes[cls.__name__] = cls

@classmethod
def print_classes(cls):
for key, val in cls.pipeline_classes.items():
print(f"{key} {val}")

@classmethod
def get_pipeline_class(cls, name):
try:
return cls.pipeline_classes[name]
except KeyError as msg:
raise KeyError(f"Could not find pipeline class {name} in {list(cls.pipeline_classes.keys())}") from msg

@staticmethod
def load_pipeline_class(class_name):
tokens = class_name.split('.')
module = '.'.join(tokens[:-1])
class_name = tokens[-1]
__import__(module)
pipe_class = RailPipeline.get_pipeline_class(class_name)
return pipe_class

@staticmethod
def build_and_write(
class_name,
output_yaml,
input_dict=None,
stages_config=None,
output_dir='.',
log_dir='.',
**kwargs,
):
pipe_class = RailPipeline.get_pipeline_class(class_name)
pipe = pipe_class(**kwargs)

full_input_dict = pipe_class.default_input_dict.copy()
if input_dict is not None:
full_input_dict.update(**input_dict)
pipe.initialize(
full_input_dict,
dict(
output_dir=output_dir,
log_dir=log_dir,
resume=False,
),
stages_config,
)
pipe.save(output_yaml)

def __init__(self):
MiniPipeline.__init__(self, [], dict(name="mini"))
Expand Down
54 changes: 54 additions & 0 deletions src/rail/pipelines/estimation/train_z_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#!/usr/bin/env python
# coding: utf-8

# Prerquisites, os, and numpy
import os
import numpy as np

# Various rail modules
from rail.estimation.algos.train_z import TrainZInformer, TrainZEstimator
from rail.evaluation.single_evaluator import SingleEvaluator

from rail.core.stage import RailStage, RailPipeline

import ceci


class TrainZPipeline(RailPipeline):

default_input_dict = dict(
input_train='dummy.in',
input_test='dummy.in',
)

def __init__(self):
RailPipeline.__init__(self)

DS = RailStage.data_store
DS.__class__.allow_overwrite = True

self.inform_trainz = TrainZInformer.build(
aliases=dict(input='input_train'),
hdf5_groupname='',
)

self.estimate_trainz = TrainZEstimator.build(
aliases=dict(input='input_test'),
connections=dict(
model=self.inform_trainz.io.model,
),
hdf5_groupname='',
)

self.evalute_trainz = SingleEvaluator.build(
aliases=dict(truth='input_test'),
connections=dict(
input=self.estimate_trainz.io.output,
),
point_estimates=['mode'],
truth_point_estimates=["redshift"],
metrics=["all"],
metric_config=dict(brier=dict(limits=[0., 3.5])),
exclude_metrics=['rmse', 'ks', 'kld', 'cvm', 'ad', 'rbpe', 'outlier'],
hdf5_groupname='',
)
Loading

0 comments on commit e7bb3d4

Please sign in to comment.