# Installation
CNGI documentation is located here:
[https://cngi-prototype.readthedocs.io/en/latest/index.html](https://cngi-prototype.readthedocs.io/en/latest/index.html)

Google Colab requires specific older versions of some packages such as Pandas and Dask, so we will install CNGI without its normal dependencies and then manually install each dependency afterwards.

Normally, you would want to leave out the --no-dependencies option

For this demonstration we will use the data from the ALMA First Look at Imaging CASAguide

In [0]:
import os
import logging
logging.getLogger('distributed.utils_perf').setLevel(logging.ERROR) # dask is noisy

print("installing cngi (takes a few minutes)...")
os.system("apt-get install libgfortran3")
os.system("pip install --extra-index-url https://casa-pip.nrao.edu/repository/pypi-group/simple casatools")
os.system("pip install cngi-prototype==0.0.15 --no-dependencies")
os.system("pip install --upgrade dask")

print("downloading MeasurementSet from CASAguide First Look at Imaging...")
os.system("wget https://bulk.cv.nrao.edu/almadata/public/working/sis14_twhya_calibrated_flagged.ms.tar")
os.system("tar -xvf sis14_twhya_calibrated_flagged.ms.tar")

print('complete')

installing cngi (takes a few minutes)...
downloading MeasurementSet from CASAguide First Look at Imaging...
complete


# Initialize the Processing Environment
This is a bit limited with Colab and produces some warnings, but will still work

InitializeFramework instantiates a client object (does not need to be returned and saved by caller). Once this object exists, all Dask objects automatically know to use it for parallel execution.

Omitting this step will cause the data conversion routines to run in serial and the subsequent Dask dataframe operations to use the default built-in scheduler for parallel execution (which can actually be faster on local machines anyway)

In [0]:
from cngi.direct import InitializeFramework
client = InitializeFramework(2,'6GB',False)
client

Failed to start diagnostics server on port 8787. [Errno 99] Cannot assign requested address
Could not launch service 'bokeh' on port 8787. Got the following message:

[Errno 99] Cannot assign requested address
  self.scheduler.start(scheduler_address)


0,1
Client  Scheduler: inproc://172.28.0.2/126/1,Cluster  Workers: 2  Cores: 2  Memory: 12.00 GB


# Convert an MS to Apache Parquet
Convert the current custom MS format to a new off-the-shelf Apache Parquet format. This new format can be read natively by a variety of parallel processing frameworks, including Dask


In [0]:
from cngi.conversion import ms_to_pq

ms_to_pq('sis14_twhya_calibrated_flagged.ms')

processing ddi 0: chunks=13, size=5812
completed ddi 0
Complete.


# Open an Apache Parquet based MS

Retrieve a summary of the Parquet MS file, then create a new Dask Dataframe from it.

This Dataframe is the common data structure passed around to most other CNGI functions.

In [0]:
from cngi.ms import summarizeFile
from cngi.dio import read_ms

# returns summary as a pandas dataframe
mssummary = summarizeFile('sis14_twhya_calibrated_flagged.pq')
print(mssummary[['ddi','row_count_estimate','col_count','size_GB']])

# there is only one ddi in the MS, but pretend there are more and one is chosen
ddi = mssummary.ddi.values[0]

# here we create the dask dataframe for use in other CNGI functions
ddf = read_ms('sis14_twhya_calibrated_flagged.pq',ddi=ddi)

# examine the start of the dataframe 
ddf.head()


   ddi  row_count_estimate  col_count  size_GB
0    0            31245312         30     0.62


Unnamed: 0,SAMPLE,U,V,W,CHAN,FLAG0,FLAG1,WEIGHT0,WEIGHT1,SIGMA0,SIGMA1,ANTENNA1,ANTENNA2,ARRAY_ID,EXPOSURE,FEED1,FEED2,FIELD_ID,FLAG_ROW,INTERVAL,OBSERVATION_ID,PROCESSOR_ID,SCAN_NUMBER,STATE_ID,TIME,TIME_CENTROID,RDATA0,RDATA1,IDATA0,IDATA1
0,17436,72.554947,123.86137,80.785859,0,False,False,11.595811,21.028986,0.293663,0.218067,3,20,0,6.048,0,0,5,False,6.048,0,2,12,18,4860029000.0,4860029000.0,-4.244297,-5.035166,-4.653774,-0.859822
1,17437,113.20123,42.83082,124.777299,0,False,False,10.664979,17.576269,0.30621,0.238527,3,22,0,6.048,0,0,5,False,6.048,0,2,12,18,4860029000.0,4860029000.0,-5.746907,-7.581162,-0.424164,4.133389
2,17438,50.866897,8.626822,54.836695,0,False,False,10.246797,21.456364,0.312396,0.215885,3,24,0,6.048,0,0,5,False,6.048,0,2,12,18,4860029000.0,4860029000.0,6.160158,0.486019,6.502325,3.045249
3,17439,51.611428,112.809274,57.904984,0,False,False,12.562062,24.207361,0.282143,0.203248,4,5,0,6.048,0,0,5,False,6.048,0,2,12,18,4860029000.0,4860029000.0,15.389114,-5.577965,-2.725766,6.142738
4,17440,105.619718,34.460985,116.495976,0,False,False,10.014101,22.142685,0.316005,0.212513,4,6,0,6.048,0,0,5,False,6.048,0,2,12,18,4860029000.0,4860029000.0,-3.638794,3.095149,-14.711627,2.282497


# Now we have a Dask Dataframe (ddf) MeasurementSet object

We can pass around the ddf object to the various CNGI MS functions.  These functions all return new Dataframe objects and build up an execution graph without actually processing any data.  

We can also use any of the standard Dataframe operations from Dask, or slice it to Dask arrays and use any of those operations.  See [https://docs.dask.org/en/latest/dataframe-api.html](https://docs.dask.org/en/latest/dataframe-api.html)

Processing does not begin until explicitly called for (ie ddf.compute()) or an operation than needs to know the results of the previous operations first (ie ddf.shape()).

In [0]:
# todo: need new sorted/indexed parquet conversion first

# example: time average each antenna baseline
#ddf['DATA0'] = ddf.RDATA0 + ddf.IDATA0  # create complex vis from real and imaginary parts
#newdf = ddf[['ANTENNA1','ANTENNA2','DATA0']].set_index('ANTENNA1', sorted=True)  # drop stuff we don't need and index
#timeGroups = newdf.groupby(['ANTENNA1','ANTENNA2'])['DATA0'].mean()  # group rows by baselines
#time_averages = timeGroups.compute() # this triggers the execution of the first three lines
#print(time_averages)