# Sample Pipeline

This notebook shows an end to end radio interferometry pipeline from the simulation of the sky to the final image. The pipeline consists of three modules:

- Simulation
    - Sky module: OSKAR
    - Telescope module incl. calibration: OSKAR
- Processing
    - Calibration after observation: RASCIL
    - Deconvolution: RASCIL
- Analysis & comparison
    - Quantitative and qualitative analysis of algorithms

In [4]:
import sys
import oskar
import matplotlib
import matplotlib.pyplot as plt
from astropy.visualization import astropy_mpl_style
import numpy as np
from rascil.apps import rascil_imager
from rascil.processing_components.util.performance import (
    performance_store_dict,
    performance_environment,
)
from astropy.utils.data import get_pkg_data_filename
from astropy.io import fits

ModuleNotFoundError: No module named 'h5py'

In [2]:
plt.style.use(astropy_mpl_style)

NameError: name 'plt' is not defined

## Simulation

The sky and telescope simulation is currently provided completely by OSKAR.

### Sky Module

The sky module of OSKAR contains radiation sources, which are defined as array and can be passed to `oskar.Sky.from_array`.

In [None]:
# Set the numerical precision to use.
precision = "single"

# Create a sky model containing three sources from a numpy array.
sky_data = np.array([
        [20.0, -30.0, 1, 0, 0, 0, 100.0e6, -0.7, 0.0, 0,   0,   0],
        [20.0, -30.5, 3, 2, 2, 0, 100.0e6, -0.7, 0.0, 600, 50,  45],
        [20.5, -30.5, 3, 0, 0, 2, 100.0e6, -0.7, 0.0, 700, 10, -10]])
sky = oskar.Sky.from_array(sky_data, precision)  # Pass precision here.

### Telescope Module

Various observation parameters and meta information `params` must be passed to the telescope module `oskar.Interferometer` of OSKAR as `oskar.SettingsTree`.

In [None]:
# Basic settings. (Note that the sky model is set up later.)
params = {
    "simulator": {
        "use_gpus": False
    },
    "observation" : {
        "num_channels": 64,
        "start_frequency_hz": 100e6,
        "frequency_inc_hz": 20e6,
        "phase_centre_ra_deg": 20,
        "phase_centre_dec_deg": -30,
        "num_time_steps": 24,
        "start_time_utc": "01-01-2000 12:00:00.000",
        "length": "12:00:00.000"
    },
    "telescope": {
        "input_directory": "../data/telescope.tm"
    },
    "interferometer": {
        "ms_filename": "visibilities.ms",
        "channel_bandwidth_hz": 1e6,
        "time_average_sec": 10
    }
}
settings = oskar.SettingsTree("oskar_sim_interferometer")
settings.from_dict(params)

if precision == "single":
    settings["simulator/double_precision"] = False

# Set the sky model and run the simulation.
sim = oskar.Interferometer(settings=settings)

### Observation Simulation

Now the sky module must be passed to the interferometer and the simulation of the observation must be started to generate the measurement set.

In [None]:
sim.set_sky_model(sky)
sim.run()

## Processing

After the observation is made with the telescope, a calibration of the measured data must be performed, followed by the reconstruction of the image.

### Calibration after Observation

toDo

In [None]:
# Code here

### Imaging

Start an mmclean algorithm with the visibilites.ms as an input to deconvolve. 
To use dask cluster where you can see the progress, first create a dask cluster in the dask-extension on the left. 
Then copy the scheduler adress into the variable below. It might be correct already.

If you don't do this, remove the --dask_scheduler option from the options in the start_imager call.
Then RASCIL starts its own scheduler, you will however not be able to see the dashbaord, as the port is probably not forwarded by docker.

In [None]:
#def start_imager(rawargs):
#    parser = rascil_imager.cli_parser()
#    args = parser.parse_args(rawargs)
#    performance_environment(args.performance_file, mode="w")
#    performance_store_dict(args.performance_file, "cli_args", vars(args), mode="a")
#    image_name = rascil_imager.imager(args)
#
#start_imager(
#    [
#        '--ingest_msname','visibilities.ms',
#        '--ingest_dd', '0', 
#        '--ingest_vis_nchan', '64',
#        '--ingest_chan_per_blockvis', '4' ,
#        '--ingest_average_blockvis', 'True',
#        '--imaging_npixel', '2048', 
#        '--imaging_cellsize', '3.878509448876288e-05',
#        '--imaging_weighting', 'robust',
#        '--imaging_robustness', '-0.5',
#        '--clean_nmajor', '5' ,
#        '--clean_algorithm', 'mmclean',
#        '--clean_scales', '0', '6', '10', '30', '60',
#        '--clean_fractional_threshold', '0.3',
#        '--clean_threshold', '0.12e-3',
#        '--clean_nmoment' ,'5',
#        '--clean_psf_support', '640',
#        '--clean_restored_output', 'integrated'
#    ])

In [None]:
from rascil.workflows import \
    invert_list_rsexecute_workflow, \
    deconvolve_list_rsexecute_workflow, \
    create_blockvisibility_from_ms_rsexecute, rsexecute, \
    weight_list_rsexecute_workflow, \
    continuum_imaging_skymodel_list_rsexecute_workflow

from rascil.workflows.rsexecute.execution_support import rsexecute
from rascil.processing_components.visibility.operations import convert_blockvisibility_to_stokesI
from rascil.data_models import PolarisationFrame
import dask

In [None]:
# read visibilities from ms locally (to prevent dask workers from not finding the ms file)
rsexecute.set_client(use_dask=False)

In [1]:
dds = [0]
channels_per_dd = 64
nchan_per_blockvis = 4
nout = channels_per_dd // nchan_per_blockvis

# Create a list of blockvisibilities
bvis_list = create_blockvisibility_from_ms_rsexecute('visibilities.ms/', 
                                                     nchan_per_blockvis=nchan_per_blockvis, 
                                                     dds=dds, 
                                                     nout=nout,
                                                     average_channels=True)

NameError: name 'create_blockvisibility_from_ms_rsexecute' is not defined

### Info: Dask Cluster

Create a new local cluster on the left with the dask-labextension.
Then just drag the blue box of the cluster in the notebook and it will create a cell like the one below.
Or just change the scheduler address in the box below.

In [None]:
from dask.distributed import Client

client = Client("tcp://127.0.0.1:43619")
client

In [None]:
#set execute to existing dask cluster (Tip: if no cluster exists you can create one in the dask-labextension tab to the left)
rsexecute.set_client(client=client, use_dask=True)

In [None]:
# covnert visibility to stokes I
bvis_list = [rsexecute.execute(convert_blockvisibility_to_stokesI)(vis) for vis in bvis_list]

In [None]:
#create model images from all visibilites
modelimage_list = [rsexecute.execute(create_image_from_visibility)(vis, 
                                                                   npixel=2048, 
                                                                   nchan=1, 
                                                                   cellsize=3.878509448876288e-05, 
                                                                   polarisationFrame=PolarisationFrame('stokesI')) 
                   for vis in bvis_list]

In [None]:
# weight visibilities
bvis_list = weight_list_rsexecute_workflow(bvis_list, 
                                        modelimage_list, 
                                        weigthing= 'robust', 
                                        robustness=-0.5)

In [None]:
# compute deconvolution
result = continuum_imaging_skymodel_list_rsexecute_workflow(
    bvis_list,
    modelimage_list,
    context='ng',
    threads=4,
    wstacking=True,
    niter=1000,
    nmajor=5,
    algorithm='mmclean',
    gain=0.1,
    scales= [0, 6, 10, 30, 60],
    fractional_threshold=0.3,
    threshold=0.00012,
    nmoment=5,
    psf_support=640,
    restored_output='integrated',
    deconvolve_facets=1,
    deconvolve_overlap=32,
    deconvolve_taper='tukey',
    restore_facets=1,
    restore_overlap=32,
    restore_taper='tukey',
    dft_compute_kernel=None,
    component_threshold=None,
    component_method='fit',
    flat_sky=False,
    clean_beam=None,
)

# start computation on dask cluster
result = rsexecute.compute(result, sync=True)

### Info: Dask Dashboard

To see the execution of the dask-dashboard, you can now simply open the different panels in the dask-labextension to the left. For example the Graph or the Task Stream give you a good representation where the computation currently is.

Also if you stop the calculation with the stop button on top. You need to run the client setup cell again, else you will encounter an IOLoop closed error.

## Analysis and Comparison

toDo

In [None]:
# matplotlib.use("Agg") from rascil_imager.py causes the problem that matplotlib isn't able to plot in the notebook
image_file = get_pkg_data_filename('visibilities_nmoment5_cip_deconvolved.fits')
fits.info(image_file)

In [None]:
image_data = fits.getdata(image_file)
image_data = np.log(image_data.sum(axis=(0,1)))
_ = plt.figure(figsize=(8,6))
_ = plt.imshow(image_data, cmap='gray')
_ = plt.colorbar()