In [1]:
import time
import logging
import lithops
from radiointerferometry.utils import setup_logging, get_executor_id_lithops
from radiointerferometry.steps.imaging import ImagingStep
from radiointerferometry.steps.pipelinestep import DP3Step
from radiointerferometry.datasource import InputS3, OutputS3
from radiointerferometry.partitioning import StaticPartitioner


# Logger setup
LOG_LEVEL = logging.INFO
logger = setup_logging(LOG_LEVEL)
partitioner = StaticPartitioner(log_level=LOG_LEVEL)

BUCKET = "os-10gb"
RACK_BUCKET = "os-10gb"

def prepend_hash_to_key(key: str) -> str:
    #print(f"Executor ID: {get_executor_id_lithops()}")
    return f"440531/{key}"


fexec = lithops.FunctionExecutor(
    log_level=LOG_LEVEL, runtime_memory=2048, runtime_cpu=4
)

# Input ms's are stored here
inputs = InputS3(bucket=BUCKET, key="CYGLOOP2024/20240312_081800_20240312_084100_CYGLOOP_CYGA/")

print(prepend_hash_to_key("dummy_key"))


2024-06-21 11:10:04 [INFO] Started StaticPartitioner
2024-06-21 11:10:04,428 [INFO] config.py:139 -- Lithops v3.4.1 - Python3.10
2024-06-21 11:10:06,414 [INFO] minio.py:62 -- MinIO client created - Endpoint: http://192.168.5.24:9000
2024-06-21 11:10:06,449 [INFO] k8s.py:111 -- Kubernetes client created - Namespace: default


440531/dummy_key


In [4]:
# Workflow is described like this:
# CALIBRATOR:  [FLAG&REBIN] -> [CALIBRATION] -> (caltables HDF5 files)
# TARGET: [FLAG&REBIN] -> [CALIBRATION (APPLYCAL ONLY)] -> [IMAGING]

# Rebinning parameters with hash included in the key as a root directory, notice how we use the result from the partitioning step

# CALIBRATOR REBINNING PARAMS
CAL_rebinning_params = {
    "msin": inputs,
    "steps": "[aoflag, avg, count]",
    "aoflag.type": "aoflagger",
    "aoflag.strategy": InputS3(
        bucket=BUCKET,
        key="parameters/rebinning/STEP1-NenuFAR64C1S.lua",
    ),
    "avg.type": "averager",
    "avg.freqstep": 5,
    "avg.timestep": 2,
    "msout": OutputS3(
        bucket=BUCKET,
        key=prepend_hash_to_key("CAL/rebinning_out/ms"),
        file_ext="ms",
    ),
    "numthreads": 4,
    "log_output": OutputS3(
        bucket=BUCKET,
        key=prepend_hash_to_key("CAL/rebinning_out/logs"),
        file_ext="log",
    ),
}


 # CALIBRATOR REBINNING
start_time = time.time()
finished_job = DP3Step(
    parameters=CAL_rebinning_params,
    log_level=LOG_LEVEL
).run(func_limit=1)

end_time = time.time()
logger.info(f"CAL Rebinning completed in {end_time - start_time} seconds.")


2024-06-21 11:14:57,234 [INFO] config.py:139 -- Lithops v3.4.1 - Python3.10


2024-06-21 11:14:57,338 [INFO] minio.py:62 -- MinIO client created - Endpoint: http://192.168.5.24:9000
2024-06-21 11:14:57,369 [INFO] k8s.py:111 -- Kubernetes client created - Namespace: default
2024-06-21 11:14:57,383 [INFO] minio.py:62 -- MinIO client created - Endpoint: http://192.168.5.24:9000
2024-06-21 11:14:57 [INFO] keys : ['CYGLOOP2024/20240312_081800_20240312_084100_CYGLOOP_CYGA/SB220.MS.zip']
2024-06-21 11:14:57,411 [INFO] minio.py:62 -- MinIO client created - Endpoint: http://192.168.5.24:9000
2024-06-21 11:14:57 [INFO] New output path: 440531/CAL/rebinning_out/ms/SB220.ms
2024-06-21 11:14:57 [INFO] New output path: 440531/CAL/rebinning_out/logs/SB220.log
2024-06-21 11:14:57 [INFO] Function params: [[{'msin': /os-10gb/CYGLOOP2024/20240312_081800_20240312_084100_CYGLOOP_CYGA/SB220.MS.zip, 'steps': '[aoflag, avg, count]', 'aoflag.type': 'aoflagger', 'aoflag.strategy': /os-10gb/parameters/rebinning/STEP1-NenuFAR64C1S.lua, 'avg.type': 'averager', 'avg.freqstep': 5, 'avg.timest

In [3]:
CAL_calibration_params = {
    "msin": InputS3(
        bucket=BUCKET,
        key=prepend_hash_to_key("CAL/rebinning_out/ms"),
    ),
    "msin.datacolumn": "DATA",
    "msout": ".",
    "steps": "[cal]",
    "cal.type": "gaincal",
    "cal.caltype": "diagonal",
    "cal.sourcedb": InputS3(
        bucket=BUCKET,
        key="parameters/calibration/CAL.sourcedb",
    ),
    "cal.parmdb": OutputS3(
        bucket=BUCKET,
        key=prepend_hash_to_key("CAL/calibration_out/h5"),
        file_ext="h5",
    ),
    "cal.solint": 0, # means 1 solution for all time steps  
    "cal.nchan": 1, # means 1 solution per channel
    "cal.maxiter": 50,
    "cal.uvlambdamin": 5,
    "cal.smoothnessconstraint": 2e6,
    "numthreads": 4,
    "log_output": OutputS3(
        bucket=BUCKET,
        key=prepend_hash_to_key("CAL/calibration_out/logs"),
        file_ext="log",
    ),
}

# CALIBRATOR CALIBRATION
start_time = time.time()

finished_job = DP3Step(
    parameters=CAL_calibration_params,
    log_level=LOG_LEVEL
).run(func_limit=1)

end_time = time.time()
logger.info(f"CAL Calibration completed in {end_time - start_time} seconds.")

2024-06-21 11:13:17,482 [INFO] config.py:139 -- Lithops v3.4.1 - Python3.10
2024-06-21 11:13:17,499 [INFO] minio.py:62 -- MinIO client created - Endpoint: http://192.168.5.24:9000


2024-06-21 11:13:17,534 [INFO] k8s.py:111 -- Kubernetes client created - Namespace: default
2024-06-21 11:13:17,549 [INFO] minio.py:62 -- MinIO client created - Endpoint: http://192.168.5.24:9000
2024-06-21 11:13:17 [INFO] keys : ['440531/CAL/rebinning_out/ms/SB220.ms.zip']
2024-06-21 11:13:17,577 [INFO] minio.py:62 -- MinIO client created - Endpoint: http://192.168.5.24:9000
2024-06-21 11:13:17 [INFO] New output path: 440531/CAL/calibration_out/h5/SB220.h5
2024-06-21 11:13:17 [INFO] New output path: 440531/CAL/calibration_out/logs/SB220.log
2024-06-21 11:13:17 [INFO] Function params: [[{'msin': /os-10gb/440531/CAL/rebinning_out/ms/SB220.ms.zip, 'msin.datacolumn': 'DATA', 'msout': '.', 'steps': '[cal]', 'cal.type': 'gaincal', 'cal.caltype': 'diagonal', 'cal.sourcedb': /os-10gb/parameters/calibration/CAL.sourcedb, 'cal.parmdb': /os-10gb/440531/CAL/calibration_out/h5/SB220.h5, 'cal.solint': 0, 'cal.nchan': 1, 'cal.maxiter': 50, 'cal.uvlambdamin': 5, 'cal.smoothnessconstraint': 2000000.0,

In [4]:
inputs_tar = InputS3(bucket=BUCKET, key="CYGLOOP2024/20240312_084100_20240312_100000_CYGLOOP_TARGET/")



# TARGET REBINNING PARAMS
TARGET_rebinning_params = {
    "msin": inputs_tar,
    "steps": "[aoflag, avg, count]",
    "aoflag.type": "aoflagger",
    "aoflag.strategy": InputS3(
        bucket=BUCKET,
        key="parameters/rebinning/STEP1-NenuFAR64C1S.lua",
    ),
    "avg.type": "averager",
    "avg.freqstep": 5, # averaging 5 channels
    "avg.timestep": 2, # averaging 2 times samples
    "msout": OutputS3(
        bucket=BUCKET,
        key=prepend_hash_to_key("TAR/rebinning_out/ms"),
        file_ext="ms",
    ),
    "numthreads": 4,
    "log_output": OutputS3(
        bucket=BUCKET,
        key=prepend_hash_to_key("TAR/rebinning_out/logs"),
        file_ext="log",
    ),
}

# TARGET REBINNING
start_time = time.time()
finished_job = DP3Step(
    parameters=TARGET_rebinning_params,
    log_level=LOG_LEVEL
).run(func_limit=1)

end_time = time.time()
logger.info(f"TARGET Rebinning completed in {end_time - start_time} seconds.")

2024-06-20 11:40:39,050 [INFO] config.py:139 -- Lithops v3.4.1 - Python3.10
2024-06-20 11:40:39,062 [INFO] ceph.py:64 -- Ceph client created - Endpoint: https://s3.gra.perf.cloud.ovh.net/


Executor ID: 440531
Executor ID: 440531


2024-06-20 11:40:39,311 [INFO] k8s.py:111 -- Kubernetes client created - Namespace: taska
2024-06-20 11:40:39,321 [INFO] ceph.py:64 -- Ceph client created - Endpoint: https://s3.gra.perf.cloud.ovh.net/
2024-06-20 11:40:39 [INFO] keys : ['CYGLOOP2024/20240312_084100_20240312_100000_CYGLOOP_TARGET/SB220.MS.zip']
2024-06-20 11:40:39,579 [INFO] ceph.py:64 -- Ceph client created - Endpoint: https://s3.gra.perf.cloud.ovh.net/
2024-06-20 11:40:39 [INFO] New output path: 440531/TAR/rebinning_out/ms/SB220.ms
2024-06-20 11:40:39 [INFO] New output path: 440531/TAR/rebinning_out/logs/SB220.log
2024-06-20 11:40:39 [INFO] Function params: [[{'msin': /os-10gb/CYGLOOP2024/20240312_084100_20240312_100000_CYGLOOP_TARGET/SB220.MS.zip, 'steps': '[aoflag, avg, count]', 'aoflag.type': 'aoflagger', 'aoflag.strategy': /os-10gb/parameters/rebinning/STEP1-NenuFAR64C1S.lua, 'avg.type': 'averager', 'avg.freqstep': 5, 'avg.timestep': 2, 'msout': /os-10gb/440531/TAR/rebinning_out/ms/SB220.ms, 'numthreads': 4, 'log_

In [3]:
import logging
LOG_LEVEL = logging.DEBUG
TARGET_apply_calibration = {
    "msin": InputS3(bucket=BUCKET, key=prepend_hash_to_key("TAR/rebinning_out/ms")),
    "msin.datacolumn": "DATA",
    #Need to update MS instead of duplicating MS to 
    "msout": OutputS3(
       bucket=BUCKET,
       key=prepend_hash_to_key("TAR/rebinning_out/ms"),
       file_ext="ms",
       remote_key_ow=prepend_hash_to_key("TAR/applycal_out/ms"),
    ),
    "msout.datacolumn": "CORRECTED_DATA",
    "steps": "[apply]",
    "apply.type": "applycal",
    "apply.steps": "[apply_amp,apply_phase]",
    "apply.apply_amp.correction": "amplitude000",
    "apply.apply_phase.correction": "phase000",
    "apply.direction": "[Main]",
    "apply.parmdb": InputS3(
        bucket=BUCKET,
        key=prepend_hash_to_key("CAL/calibration_out/h5"),
        dynamic=True,
        file_ext="h5",
    ),
    "log_output": OutputS3(
        bucket=BUCKET,
        key=prepend_hash_to_key("TAR/applycal_out/logs"),
        file_ext="log",
    ),
}


# TARGET CALIBRATION (APPLY)
start_time = time.time()
finished_job = DP3Step(
    parameters=TARGET_apply_calibration,
    log_level=LOG_LEVEL
).run(func_limit=1)

end_time = time.time()
logger.info(f"target Calibration completed in {end_time - start_time} seconds.")

2024-06-20 13:11:12 [DEBUG] pipelinestep.py:42 -- DP3 Step initialized
2024-06-20 13:11:12,985 [INFO] config.py:139 -- Lithops v3.4.1 - Python3.10
2024-06-20 13:11:12,986 [DEBUG] config.py:101 -- Loading configuration from /home/ayman/.lithops/config
2024-06-20 13:11:12,988 [DEBUG] config.py:179 -- Loading Serverless backend module: k8s
2024-06-20 13:11:12,989 [DEBUG] config.py:220 -- Loading Storage backend module: ceph
2024-06-20 13:11:12,989 [DEBUG] ceph.py:38 -- Creating Ceph client
2024-06-20 13:11:12,989 [DEBUG] ceph.py:43 -- Setting Ceph endpoint to https://s3.gra.perf.cloud.ovh.net/
2024-06-20 13:11:12,993 [INFO] ceph.py:64 -- Ceph client created - Endpoint: https://s3.gra.perf.cloud.ovh.net/
2024-06-20 13:11:13,171 [DEBUG] k8s.py:51 -- Creating Kubernetes client
2024-06-20 13:11:13,173 [DEBUG] k8s.py:67 -- Loading kubeconfig file: /home/ayman/.kube/config


Executor ID: 0a6804
Executor ID: 0a6804
Executor ID: 0a6804
Executor ID: 0a6804
Executor ID: 0a6804


2024-06-20 13:11:13,190 [DEBUG] k8s.py:78 -- Using kubeconfig conetxt: kubernetes-admin@extract-test - cluster: extract-test
2024-06-20 13:11:13,191 [INFO] k8s.py:111 -- Kubernetes client created - Namespace: taska
2024-06-20 13:11:13,193 [DEBUG] invokers.py:93 -- ExecutorID 0a6804-1 - Invoker initialized. Max workers: 100
2024-06-20 13:11:13,194 [DEBUG] executors.py:148 -- Function executor for k8s created with ID: 0a6804-1
2024-06-20 13:11:13,195 [DEBUG] config.py:101 -- Loading configuration from /home/ayman/.lithops/config
2024-06-20 13:11:13,199 [DEBUG] config.py:220 -- Loading Storage backend module: ceph
2024-06-20 13:11:13,200 [DEBUG] ceph.py:38 -- Creating Ceph client
2024-06-20 13:11:13,200 [DEBUG] ceph.py:43 -- Setting Ceph endpoint to https://s3.gra.perf.cloud.ovh.net/
2024-06-20 13:11:13,205 [INFO] ceph.py:64 -- Ceph client created - Endpoint: https://s3.gra.perf.cloud.ovh.net/
2024-06-20 13:11:13 [INFO] pipelinestep.py:278 -- keys : ['fed0f0/TAR/rebinning_out/ms/SB220.ms.

In [11]:
TARGET_imaging_params = [
    "-size",
    "1024",
    "1024",
    "-pol",
    "I",
    "-scale",
    "5arcmin",
    "-niter",
    "100000",
    "-gain",
    "0.1",
    "-mgain",
    "0.6",
    "-auto-mask",
    "5",
    "-local-rms",
    "-multiscale",
    "-no-update-model-required",
    "-make-psf",
    "-auto-threshold",
    "3",
    "-weight",
    "briggs",
    "0",
    "-data-column",
    "CORRECTED_DATA",
    "-nmiter",
    "0",
    "-name",
    OutputS3(
        bucket=BUCKET,
        key=prepend_hash_to_key("TAR/imag_out/img"),
    ),
]

# TARGET IMAGING
start_time = time.time()
finished_job = ImagingStep(
    input_data_path=InputS3(
        bucket=BUCKET, key=prepend_hash_to_key("TAR/rebinning_out/ms")
    ),
    parameters=TARGET_imaging_params,
    log_level=LOG_LEVEL
).run()
end_time = time.time()
logger.info(f"TARGET Imaging completed in {end_time - start_time} seconds.")

2024-06-20 13:58:15,120 [INFO] config.py:139 -- Lithops v3.4.1 - Python3.10
2024-06-20 13:58:15,128 [INFO] ceph.py:64 -- Ceph client created - Endpoint: https://s3.gra.perf.cloud.ovh.net/


Executor ID: 0a6804
Executor ID: 0a6804


2024-06-20 13:58:15,334 [INFO] k8s.py:111 -- Kubernetes client created - Namespace: taska
2024-06-20 13:58:15,346 [INFO] ceph.py:64 -- Ceph client created - Endpoint: https://s3.gra.perf.cloud.ovh.net/
2024-06-20 13:58:15,571 [INFO] invokers.py:107 -- ExecutorID 0a6804-9 | JobID A000 - Selected Runtime: docker.io/ayman321/extractdysco3 - 8000MB
2024-06-20 13:58:16,949 [INFO] invokers.py:174 -- ExecutorID 0a6804-9 | JobID A000 - Starting function invocation: _execute_step() - Total: 1 activations
2024-06-20 13:58:18,002 [INFO] invokers.py:213 -- ExecutorID 0a6804-9 | JobID A000 - View execution logs at /tmp/lithops-ayman/logs/0a6804-9-A000.log
2024-06-20 13:58:18 [INFO] parameters: ['-size', '1024', '1024', '-pol', 'I', '-scale', '5arcmin', '-niter', '100000', '-gain', '0.1', '-mgain', '0.6', '-auto-mask', '5', '-local-rms', '-multiscale', '-no-update-model-required', '-make-psf', '-auto-threshold', '3', '-weight', 'briggs', '0', '-data-column', 'CORRECTED_DATA', '-nmiter', '0', '-name'

FileNotFoundError: [Errno 2] No such file or directory: '/tmp/os-10gb/440531/TAR/imag_out'