In [None]:
# Ensure that no cached data is present if a previous run has been interrupted.
# TODO: properly implement option to overwrite an exsisting cache directory.
def clean_up():
    import os
    from shutil import rmtree

    try:
        os.mkdir("run")
    except OSError:
        pass

    for key in ("ref", "unk"):
        try:
            rmtree(f"run/test_{key}")
        except FileNotFoundError:
            pass

    for path in ("input", "output", "log", "pipeline"):
        try:
            os.mkdir(f"run/{path}")
        except OSError:
            pass

clean_up()

In [None]:
# step 1
from yaw import UniformRandoms
from rail.yaw_rail.utils import get_dc2_test_data

from rail.yaw_rail import (
    YawCacheCreate,     # step 2
    YawAutoCorrelate,   # step 3
    YawCrossCorrelate,  # step 4
    YawSummarize,       # step 5
    YawCacheDrop,       # step 6
)  # equivalent: from rail.yaw_rail import *
from rail.yaw_rail.cache import stage_helper  # utility for YawCacheCreate

In [None]:
from rail.core.stage import RailStage
DS = RailStage.data_store
DS.__class__.allow_overwrite = True

In [None]:
VERBOSE = "debug"  # verbosity level of built-in logger, disable with "error"

## Create the data and add it to the datastore

In [None]:
mock_data = get_dc2_test_data()  # downloads test data, cached for future calls
redshifts = mock_data["z"].to_numpy()
zmin = redshifts.min()
zmax = redshifts.max()
n_data = len(mock_data)
f"N={n_data}, {zmin:.1f}<z<{zmax:.1f}"

mock_data.to_parquet("run/input/data.pqt")

In [None]:
angular_rng = UniformRandoms(
    mock_data["ra"].min(),
    mock_data["ra"].max(),
    mock_data["dec"].min(),
    mock_data["dec"].max(),
    seed=12345,
)
mock_rand = angular_rng.generate(n_data * 10, draw_from=dict(z=redshifts))

mock_rand.to_parquet("run/input/rand.pqt")

## Instantiate the stages

In [None]:
stage_cache_ref = YawCacheCreate.make_stage(
    #name="ref",
    #aliases=stage_helper("ref"),
    path="run/test_ref",
    overwrite=True,
    ra_name="ra",
    dec_name="dec",
    redshift_name="z",
    n_patches=5,
    verbose=VERBOSE,
)
"""
stage_cache_unk = YawCacheCreate.make_stage(
    name="unk",
    aliases=stage_helper("unk"),
    path="run/test_unk",
    overwrite=True,
    ra_name="ra",
    dec_name="dec",
    patches="./test_ref",
    verbose=VERBOSE,
)
"""
stage_cache_drop = YawCacheDrop.make_stage()

In [None]:
corr_config = dict(
    rmin=100,
    rmax=1000,
    zmin=zmin,
    zmax=zmax,
    zbin_num=8,
    verbose=VERBOSE,
)

stage_w_ss = YawAutoCorrelate.make_stage(**corr_config)

stage_w_sp = YawCrossCorrelate.make_stage(**corr_config)

stage_estimate = YawSummarize.make_stage(verbose=VERBOSE)

## Build the pipeline

In [None]:
import ceci

pipe = ceci.Pipeline.interactive()
pipe.add_stage(stage_cache_ref)
#pipe.add_stage(stage_cache_unk)
#pipe.add_stage(stage_w_ss)
#pipe.add_stage(stage_w_sp)
#pipe.add_stage(stage_estimate)

In [None]:
pipe.initialize(
    dict(
        data="run/input/data.pqt",
        rand="run/input/rand.pqt",
        #data_unk="run/input/data.pqt",
        #rand_unk="run/input/rand.pqt",
    ),
    dict(
        output_dir="run/output",
        log_dir="run/log",
    ),
    None,
)
#stage_w_ss.connect_input(stage_cache_ref, inputTag="sample", outputTag="cache_ref")
#stage_w_ss.connect_input(stage_cache_ref)
#stage_w_sp.connect_input(dict(referenc=stage_cache_ref, unknown=stage_cache_unk))
#stage_estimate.connect_input([stage_w_sp, stage_w_ss, None])
pipe.save("run/pipeline/stages.yml")

In [None]:
%cat run/pipeline/stages.yml

In [None]:
%cat run/pipeline/stages_config.yml

In [None]:
ceci.Pipeline.read("run/pipeline/stages.yml").run()