In [None]:
from examples.radio_interferometry.functions import (
    imaging,
    dp3,
)
from examples.radio_interferometry.utils import filter_io_params
from examples.radio_interferometry.utils import FlexInput, FlexOutput
from flexecutor.storage.storage import StrategyEnum
from flexecutor.utils.utils import flexorchestrator
from flexecutor.workflow.stage import Stage

In [None]:
# Initialization of the FlexOrchestrator
# TODO: replace by more elegant solution
@flexorchestrator(bucket="test-bucket")
def init_flex():
    pass


init_flex()

In [None]:
rebinning_parameters = {
    "msin": FlexInput(prefix="partitions", custom_data_id="partitions"),
    "steps": "[aoflag, avg, count]",
    "aoflag.type": "aoflagger",
    "aoflag.strategy": FlexInput(
        prefix="parameters/rebinning",
        custom_data_id="lua",
        read_strategy=StrategyEnum.BROADCAST,
    ),
    "avg.type": "averager",
    "avg.freqstep": 4,
    "avg.timestep": 8,
    "numthreads": 4,
    "msout": FlexOutput(
        prefix="rebinning_out/ms",
        custom_data_id="rebinning_ms",
        suffix=".ms.zip",
    ),
    "log_output": FlexOutput(
        prefix="rebinning_out/logs",
        suffix=".log",
    ),
}

rebinning_stage = Stage(
    stage_id="rebinning",
    func=dp3,
    inputs=filter_io_params(rebinning_parameters, FlexInput),
    outputs=filter_io_params(rebinning_parameters, FlexOutput),
    params={"parameters": rebinning_parameters, "dp3_types": "rebinning"},
)

print(rebinning_stage.execute().get_timings())

In [None]:
calibration_parameters = {
    "msin": FlexInput(
        prefix="rebinning_out/ms", custom_data_id="rebinning_ms"
    ),
    "msin.datacolumn": "DATA",
    "steps": "[cal]",
    "cal.type": "ddecal",
    "cal.mode": "diagonal",
    "cal.solint": 4,
    "cal.nchan": 4,
    "cal.maxiter": 50,
    "cal.uvlambdamin": 5,
    "cal.smoothnessconstraint": 2e6,
    "numthreads": 4,
    "cal.sourcedb": FlexInput(
        prefix="parameters/calibration/step2a",
        custom_data_id="step2a",
        read_strategy=StrategyEnum.BROADCAST,
    ),
    "log_output": FlexOutput(
        prefix="applycal_out/cal/logs",
        suffix=".log",
    ),
}

subtraction_parameters = {
    # "msin" is the output of the calibration stage
    "msin.datacolumn": "DATA",
    "msout.datacolumn": "SUBTRACTED_DATA",
    "steps": "[sub]",
    "sub.type": "h5parmpredict",
    "sub.directions": "[[CygA],[CasA]]",
    "sub.operation": "subtract",
    "sub.applycal.steps": "[sub_apply_amp,sub_apply_phase]",
    "sub.applycal.correction": "fulljones",
    "sub.applycal.sub_apply_amp.correction": "amplitude000",
    "sub.applycal.sub_apply_phase.correction": "phase000",
    "msout": ".",
    "sub.sourcedb": FlexInput(
        prefix="parameters/calibration/step2a",
        custom_data_id="step2a",
        read_strategy=StrategyEnum.BROADCAST,
    ),
    "log_output": FlexOutput(
        prefix="applycal_out/sub/logs",
        suffix=".log",
    ),
}

apply_calibration_parameters = {
    # "msin" is the output of the subtraction stage
    "msin.datacolumn": "SUBTRACTED_DATA",
    "msout.datacolumn": "CORRECTED_DATA",
    "steps": "[apply]",
    "apply.type": "applycal",
    "apply.steps": "[apply_amp,apply_phase]",
    "apply.apply_amp.correction": "amplitude000",
    "apply.apply_phase.correction": "phase000",
    "apply.direction": "[Main]",
    "msout": FlexOutput(
        prefix="applycal_out/apply/ms",
        suffix=".ms.zip",
    ),
    "log_output": FlexOutput(
        prefix="applycal_out/apply/logs",
        suffix=".log",
    ),
}

full_calibration_parameters = [
    calibration_parameters,
    subtraction_parameters,
    apply_calibration_parameters,
]

full_calibration_stage = Stage(
    stage_id="full_calibration",
    func=dp3,
    inputs=filter_io_params(full_calibration_parameters, FlexInput),
    outputs=filter_io_params(full_calibration_parameters, FlexOutput),
    params={
        "parameters": full_calibration_parameters,
        "dp3_types": ["calibration", "subtraction", "apply_calibration"],
    },
)

print(full_calibration_stage.execute().get_timings())

In [None]:
imaging_parameters = [
    "-size",
    "1024",
    "1024",
    "-pol",
    "I",
    "-scale",
    "2arcmin",
    "-niter",
    "100000",
    "-gain",
    "0.1",
    "-mgain",
    "0.6",
    "-auto-mask",
    "5",
    "-local-rms",
    "-multiscale",
    "-no-update-model-required",
    "-make-psf",
    "-auto-threshold",
    "3",
    "-parallel-deconvolution",
    "4096",
    "-weight",
    "briggs",
    "0",
    "-data-column",
    "CORRECTED_DATA",
    "-nmiter",
    "0",
    "-j",
    str(5),
    "-name",
]

imaging_stage = Stage(
    stage_id="imaging",
    func=imaging,
    max_concurrency=1,
    inputs=[
        FlexInput(
            prefix="applycal_out/apply/ms", custom_data_id="imaging_input"
        )
    ],
    outputs=[
        FlexOutput(prefix="image_out", suffix="-image.fits"),
        FlexOutput(prefix="image_out/logs", suffix=".log"),
    ],
    params={
        "parameters": imaging_parameters,
    },
)

print(imaging_stage.execute().get_timings())