In [10]:
from examples.radio_interferometry.functions import (
    dp3, imaging,
)
from examples.radio_interferometry.utils import filter_io_params
from flexecutor.storage.storage import FlexInput, FlexOutput, StrategyEnum
from flexecutor.utils.utils import flexorchestrator
from flexecutor.workflow.stage import Stage

In [2]:
# Initialization of the FlexOrchestrator
# TODO: replace by more elegant solution
@flexorchestrator(bucket="test-bucket")
def init_flex():
    pass


init_flex()

In [3]:
cal_rebinning_parameters = {
    "msin": FlexInput(prefix="partitions-CAL"),
    "steps": "[aoflag, avg, count]",
    "aoflag.type": "aoflagger",
    "aoflag.strategy": FlexInput(
        prefix="parameters/rebinning",
        custom_input_id="lua",
        strategy=StrategyEnum.BROADCAST,
    ),
    "avg.type": "averager",
    "avg.freqstep": 5,
    "avg.timestep": 2,
    "msout": FlexOutput(
        prefix="CAL/rebinning_out/ms",
        suffix=".ms.zip",
    ),
    "numthreads": 4,
    "log_output": FlexOutput(
        prefix="CAL/rebinning_out/logs",
        suffix=".log",
    ),
}

cal_rebinning_stage = Stage(
    stage_id="CAL-rebinning",
    func=dp3,
    inputs=filter_io_params(cal_rebinning_parameters, FlexInput),
    outputs=filter_io_params(cal_rebinning_parameters, FlexOutput),
    params={"parameters": cal_rebinning_parameters, "dp3_types": "rebinning"},
)

print(cal_rebinning_stage.execute().get_timings())

INFO:lithops.config:Lithops v3.4.1 - Python3.10
INFO:lithops.storage.backends.minio.minio:MinIO client created - Endpoint: http://172.17.0.1:9000
INFO:lithops.localhost.v2.localhost:Localhost compute v2 client created
INFO:flexecutor.workflow.executor:Executing DAG single-stage-dag
INFO:flexecutor.workflow.executor:DAG single-stage-dag has 1 final stages
2024-07-16 17:45:00 [INFO] Submitting stage CAL-rebinning
INFO:lithops.storage.backends.minio.minio:MinIO client created - Endpoint: http://172.17.0.1:9000
INFO:lithops.storage.backends.minio.minio:MinIO client created - Endpoint: http://172.17.0.1:9000
INFO:lithops.invokers:ExecutorID 4f0cbf-0 | JobID M000 - Selected Runtime: manriurv/astronomics-k8s:310 
INFO:lithops.invokers:ExecutorID 4f0cbf-0 | JobID M000 - Starting function invocation: dp3() - Total: 1 activations
INFO:lithops.invokers:ExecutorID 4f0cbf-0 | JobID M000 - View execution logs at /tmp/lithops-manri-urv/logs/4f0cbf-0-M000.log
INFO:lithops.wait:ExecutorID 4f0cbf-0 - Wa

    0%|          | 0/1  

ERROR:lithops.localhost.v2.localhost:Task process 4f0cbf-0-M000-00000 failed with return code 137
INFO:lithops.executors:ExecutorID 4f0cbf-0 - Cleaning temporary data
INFO:lithops.executors:ExecutorID 4f0cbf-0 - Getting results from 1 function activations


[FunctionTimes(read=0.2468562126159668, compute=0.8746955394744873, write=0.07173609733581543, cold_start=0.9145104885101318, total=0.39776261647542316)]


In [4]:
cal_calibration_params = {
    "msin": FlexInput(prefix="CAL/rebinning_out/ms"),
    "msin.datacolumn": "DATA",
    "msout": ".",
    "steps": "[cal]",
    "cal.type": "gaincal",
    "cal.caltype": "diagonal",
    "cal.sourcedb": FlexInput(
        prefix="parameters/calibration/step2a",
        custom_input_id="step2a",
        strategy=StrategyEnum.BROADCAST,
    ),
    "cal.parmdb": FlexOutput(
        prefix="CAL/calibration_out/h5",
        suffix=".h5",
    ),
    "cal.solint": 0,  # means 1 solution for all time steps
    "cal.nchan": 1,  # means 1 solution per channel
    "cal.maxiter": 50,
    "cal.uvlambdamin": 5,
    "cal.smoothnessconstraint": 2e6,
    "numthreads": 4,
    "log_output": FlexOutput(
        prefix="CAL/calibration_out/logs",
        suffix=".log",
    ),
}

cal_calibration_stage = Stage(
    stage_id="CAL-calibration",
    func=dp3,
    inputs=filter_io_params(cal_calibration_params, FlexInput),
    outputs=filter_io_params(cal_calibration_params, FlexOutput),
    params={
        "parameters": cal_calibration_params,
        "dp3_types": "calibration",
    },
)

print(cal_calibration_stage.execute().get_timings())

INFO:lithops.config:Lithops v3.4.1 - Python3.10
INFO:lithops.storage.backends.minio.minio:MinIO client created - Endpoint: http://172.17.0.1:9000
INFO:lithops.localhost.v2.localhost:Localhost compute v2 client created
INFO:flexecutor.workflow.executor:Executing DAG single-stage-dag
INFO:flexecutor.workflow.executor:DAG single-stage-dag has 1 final stages
2024-07-16 17:45:06 [INFO] Submitting stage CAL-calibration
INFO:lithops.storage.backends.minio.minio:MinIO client created - Endpoint: http://172.17.0.1:9000
INFO:lithops.storage.backends.minio.minio:MinIO client created - Endpoint: http://172.17.0.1:9000
INFO:lithops.invokers:ExecutorID 4f0cbf-1 | JobID M000 - Selected Runtime: manriurv/astronomics-k8s:310 
INFO:lithops.invokers:ExecutorID 4f0cbf-1 | JobID M000 - Starting function invocation: dp3() - Total: 1 activations
INFO:lithops.invokers:ExecutorID 4f0cbf-1 | JobID M000 - View execution logs at /tmp/lithops-manri-urv/logs/4f0cbf-1-M000.log
INFO:lithops.wait:ExecutorID 4f0cbf-1 - 

    0%|          | 0/1  

ERROR:lithops.localhost.v2.localhost:Task process 4f0cbf-1-M000-00000 failed with return code 137
INFO:lithops.executors:ExecutorID 4f0cbf-1 - Cleaning temporary data
INFO:lithops.executors:ExecutorID 4f0cbf-1 - Getting results from 1 function activations


[FunctionTimes(read=0.09463763236999512, compute=0.4650692939758301, write=0.013876914978027344, cold_start=0.7987880706787109, total=0.19119461377461752)]


In [5]:
target_rebinning_params = {
    "msin": FlexInput(prefix="partitions-TAR"),
    "steps": "[aoflag, avg, count]",
    "aoflag.type": "aoflagger",
    "aoflag.strategy": FlexInput(
        prefix="parameters/rebinning",
        custom_input_id="lua",
        strategy=StrategyEnum.BROADCAST,
    ),
    "avg.type": "averager",
    "avg.freqstep": 5,  # averaging 5 channels
    "avg.timestep": 2,  # averaging 2 times samples
    "msout": FlexOutput(
        prefix="TAR/rebinning_out/ms",
        suffix=".ms.zip",
    ),
    "numthreads": 4,
    "log_output": FlexOutput(
        prefix="TAR/rebinning_out/logs",
        suffix=".log",
    ),
}

target_rebinning_stage = Stage(
    stage_id="TARGET-rebinning",
    func=dp3,
    inputs=filter_io_params(target_rebinning_params, FlexInput),
    outputs=filter_io_params(target_rebinning_params, FlexOutput),
    params={
        "parameters": target_rebinning_params,
        "dp3_types": "rebinning",
    },
)

print(target_rebinning_stage.execute().get_timings())

INFO:lithops.config:Lithops v3.4.1 - Python3.10
INFO:lithops.storage.backends.minio.minio:MinIO client created - Endpoint: http://172.17.0.1:9000
INFO:lithops.localhost.v2.localhost:Localhost compute v2 client created
INFO:flexecutor.workflow.executor:Executing DAG single-stage-dag
INFO:flexecutor.workflow.executor:DAG single-stage-dag has 1 final stages
2024-07-16 17:45:13 [INFO] Submitting stage TARGET-rebinning
INFO:lithops.storage.backends.minio.minio:MinIO client created - Endpoint: http://172.17.0.1:9000
INFO:lithops.storage.backends.minio.minio:MinIO client created - Endpoint: http://172.17.0.1:9000
INFO:lithops.invokers:ExecutorID 4f0cbf-2 | JobID M000 - Selected Runtime: manriurv/astronomics-k8s:310 
INFO:lithops.invokers:ExecutorID 4f0cbf-2 | JobID M000 - Starting function invocation: dp3() - Total: 1 activations
INFO:lithops.invokers:ExecutorID 4f0cbf-2 | JobID M000 - View execution logs at /tmp/lithops-manri-urv/logs/4f0cbf-2-M000.log
INFO:lithops.wait:ExecutorID 4f0cbf-2 -

    0%|          | 0/1  

ERROR:lithops.localhost.v2.localhost:Task process 4f0cbf-2-M000-00000 failed with return code 137
INFO:lithops.executors:ExecutorID 4f0cbf-2 - Cleaning temporary data
INFO:lithops.executors:ExecutorID 4f0cbf-2 - Getting results from 1 function activations


[FunctionTimes(read=0.1855311393737793, compute=0.9087080955505371, write=0.07149243354797363, cold_start=0.756340742111206, total=0.3885772228240967)]


In [7]:
target_apply_calibration_params = {
    "msin": FlexInput(prefix="TAR/rebinning_out/ms"),
    "msin.datacolumn": "DATA",
    "msout": FlexOutput(
        prefix="TAR/applycal_out/ms",
        suffix=".ms.zip",
    ),
    "msout.datacolumn": "CORRECTED_DATA",
    "steps": "[apply]",
    "apply.type": "applycal",
    "apply.steps": "[apply_amp,apply_phase]",
    "apply.apply_amp.correction": "amplitude000",
    "apply.apply_phase.correction": "phase000",
    "apply.direction": "[Main]",
    "apply.parmdb": FlexInput(prefix="CAL/calibration_out/h5"),
    "log_output": FlexOutput(
        prefix="TAR/applycal_out/logs",
        suffix=".log",
    ),
}


target_apply_calibration_stage = Stage(
    stage_id="TARGET-apply-calibration",
    func=dp3,
    inputs=filter_io_params(target_apply_calibration_params, FlexInput),
    outputs=filter_io_params(target_apply_calibration_params, FlexOutput),
    params={
        "parameters": target_apply_calibration_params,
        "dp3_types": "apply_calibration",
    },
)

print(target_apply_calibration_stage.execute().get_timings())

INFO:lithops.config:Lithops v3.4.1 - Python3.10
INFO:lithops.storage.backends.minio.minio:MinIO client created - Endpoint: http://172.17.0.1:9000
INFO:lithops.localhost.v2.localhost:Localhost compute v2 client created
INFO:flexecutor.workflow.executor:Executing DAG single-stage-dag
INFO:flexecutor.workflow.executor:DAG single-stage-dag has 1 final stages
2024-07-16 17:45:25 [INFO] Submitting stage TARGET-apply-calibration
INFO:lithops.storage.backends.minio.minio:MinIO client created - Endpoint: http://172.17.0.1:9000
INFO:lithops.storage.backends.minio.minio:MinIO client created - Endpoint: http://172.17.0.1:9000
INFO:lithops.invokers:ExecutorID 4f0cbf-3 | JobID M000 - Selected Runtime: manriurv/astronomics-k8s:310 
INFO:lithops.invokers:ExecutorID 4f0cbf-3 | JobID M000 - Starting function invocation: dp3() - Total: 1 activations
INFO:lithops.invokers:ExecutorID 4f0cbf-3 | JobID M000 - View execution logs at /tmp/lithops-manri-urv/logs/4f0cbf-3-M000.log
INFO:lithops.wait:ExecutorID 4f

    0%|          | 0/1  

ERROR:lithops.localhost.v2.localhost:Task process 4f0cbf-3-M000-00000 failed with return code 137
INFO:lithops.executors:ExecutorID 4f0cbf-3 - Cleaning temporary data
INFO:lithops.executors:ExecutorID 4f0cbf-3 - Getting results from 1 function activations


[FunctionTimes(read=0.0978548526763916, compute=0.2704446315765381, write=0.0932319164276123, cold_start=0.7850160598754883, total=0.15384380022684732)]


In [11]:
imaging_parameters = [
    "-size",
    "1024",
    "1024",
    "-pol",
    "I",
    "-scale",
    "5arcmin",
    "-niter",
    "100000",
    "-gain",
    "0.1",
    "-mgain",
    "0.6",
    "-auto-mask",
    "5",
    "-local-rms",
    "-multiscale",
    "-no-update-model-required",
    "-make-psf",
    "-auto-threshold",
    "3",
    "-weight",
    "briggs",
    "0",
    "-data-column",
    "CORRECTED_DATA",
    "-nmiter",
    "0",
    "-j",
    str(5),
    "-name",
]


imaging_stage = Stage(
    stage_id="imaging",
    func=imaging,
    max_concurrency=1,
    inputs=[
        FlexInput(prefix="TAR/applycal_out/ms", custom_input_id="imaging_input")
    ],
    outputs=[
        FlexOutput(prefix="image_out", suffix="-image.fits"),
        FlexOutput(prefix="image_out/logs", suffix=".log"),
    ],
    params={
        "parameters": imaging_parameters,
    },
)

print(imaging_stage.execute().get_timings())

INFO:lithops.config:Lithops v3.4.1 - Python3.10
INFO:lithops.storage.backends.minio.minio:MinIO client created - Endpoint: http://172.17.0.1:9000
INFO:lithops.localhost.v2.localhost:Localhost compute v2 client created
INFO:flexecutor.workflow.executor:Executing DAG single-stage-dag
INFO:flexecutor.workflow.executor:DAG single-stage-dag has 1 final stages
2024-07-16 17:45:48 [INFO] Submitting stage imaging
INFO:lithops.storage.backends.minio.minio:MinIO client created - Endpoint: http://172.17.0.1:9000
INFO:lithops.invokers:ExecutorID 4f0cbf-4 | JobID M000 - Selected Runtime: manriurv/astronomics-k8s:310 
INFO:lithops.invokers:ExecutorID 4f0cbf-4 | JobID M000 - Starting function invocation: imaging() - Total: 1 activations
INFO:lithops.invokers:ExecutorID 4f0cbf-4 | JobID M000 - View execution logs at /tmp/lithops-manri-urv/logs/4f0cbf-4-M000.log
INFO:lithops.wait:ExecutorID 4f0cbf-4 - Waiting for 1 function activations to complete


    0%|          | 0/1  

ERROR:lithops.localhost.v2.localhost:Task process 4f0cbf-4-M000-00000 failed with return code 137
INFO:lithops.executors:ExecutorID 4f0cbf-4 - Cleaning temporary data
INFO:lithops.executors:ExecutorID 4f0cbf-4 - Getting results from 1 function activations


[FunctionTimes(read=0.13083195686340332, compute=10.972547769546509, write=0.036598920822143555, cold_start=0.7566609382629395, total=3.7133262157440186)]
