Skip to content
This repository has been archived by the owner on Feb 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #6 from tnakazato/import_autocorr_as_float_array
Browse files Browse the repository at this point in the history
Import autocorr as float array
  • Loading branch information
ryanraba committed May 18, 2021
2 parents 005cda2 + 85a6174 commit c27a1af
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 17 deletions.
18 changes: 14 additions & 4 deletions cngi/_utils/_table_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ def convert_simple_table(infile, outfile, subtable='', dimnames=None, timecols=[
# timecols is a list of column names to convert to datetimes
# dimnames is a dictionary mapping old to new dimension names for remaining dims (not in keys)
# ignore is a list of column names to ignore
def convert_expanded_table(infile, outfile, keys, subtable='', subsel=None, timecols=[], dimnames={}, ignore=[], compressor=None, chunks=(100, 20, 1), nofile=False):
def convert_expanded_table(infile, outfile, keys, subtable='', subsel=None, timecols=[], dimnames={}, ignore=[], compressor=None, chunks=(100, 20, 1), nofile=False, extraselstr=''):

if compressor is None:
compressor = Blosc(cname='zstd', clevel=2, shuffle=0)
Expand All @@ -230,11 +230,16 @@ def convert_expanded_table(infile, outfile, keys, subtable='', subsel=None, time
# 3. row_idxs = row numbers where each unique value occurs
# then compute 1 and 3 for each additional key/dimension and store in midxs dictionary
ordering = ','.join([np.atleast_1d(key)[ii] for key in keys.keys() for ii in range(len(np.atleast_1d(key)))])
if subsel is None:
if subsel is None or len(subsel) == 0:
selstr = str(extraselstr)
else:
selstr = '&&'.join([f'{k} = {v}' for k, v in subsel.items()])
if len(extraselstr) > 0:
selstr = f'({selstr}) && ({extraselstr})'
if len(selstr) == 0:
sorted_table = tb_tool.taql('select * from %s ORDERBY %s' % (os.path.join(infile,subtable), ordering))
else:
tsel = [list(subsel.keys())[0], list(subsel.values())[0]]
sorted_table = tb_tool.taql('select * from %s where %s = %s ORDERBY %s' % (os.path.join(infile,subtable), tsel[0], tsel[1], ordering))
sorted_table = tb_tool.taql('select * from %s where %s ORDERBY %s' % (os.path.join(infile,subtable), selstr, ordering))
if sorted_table.nrows() == 0:
sorted_table.close()
tb_tool.close()
Expand Down Expand Up @@ -291,6 +296,11 @@ def convert_expanded_table(infile, outfile, keys, subtable='', subsel=None, time
for kk in np.arange(start_idx + 1, start_idx + len(data) + 1)])
else:
data = sorted_table.getcol(col, idx_range[0], len(idx_range)).transpose()
if np.iscomplexobj(data) is True and np.all(data.imag == 0):
# generate C-contiguous float array from real part of complex data
tmp = np.empty_like(data.real)
tmp[:] = data.real
data = tmp
except Exception:
bad_cols += [col]
continue
Expand Down
43 changes: 30 additions & 13 deletions cngi/conversion/convert_ms.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def convert_ms(infile, outfile=None, ddis=None, ignore=['HISTORY'], compressor=N
xarray.core.dataset.Dataset
Master xarray dataset of datasets for this visibility set
"""
import itertools
import os
import xarray
import dask.array as da
Expand Down Expand Up @@ -97,43 +98,59 @@ def convert_ms(infile, outfile=None, ddis=None, ignore=['HISTORY'], compressor=N
else: ddis = np.atleast_1d(ddis)
xds_list = []

# extra data selection to split autocorr and crosscorr into separate xds
# extrasels[0] is for autocorrelation
# extrasels[1] is for others (corsscorrelations, correlations between feeds)
extrasels = [
'ANTENNA1 == ANTENNA2 && FEED1 == FEED2',
'ANTENNA1 != ANTENNA2 || FEED1 != FEED2'
]

####################################################################
# process each selected DDI from the input MS, assume a fixed shape within the ddi (should always be true)
# each DDI is written to its own subdirectory under the parent folder
for ddi in ddis:
for extrasel, ddi in itertools.product(extrasels, ddis):
if ddi == 'global': continue # handled afterwards

extra_sel_index = extrasels.index(extrasel)
if extra_sel_index == 0:
xds_prefix = 'xdsa'
else:
xds_prefix = 'xds'
xds_name = f'{xds_prefix}{ddi}'

ddi = int(ddi)
print('Processing ddi', ddi, end='\r')
print('Processing ddi', ddi, f'xds name is {xds_name}', end='\r')
start_ddi = time.time()

# these columns are different / absent in MSv3 or need to be handled as special cases
msv2 = ['WEIGHT', 'WEIGHT_SPECTRUM', 'SIGMA', 'SIGMA_SPECTRUM', 'ANTENNA1', 'ANTENNA2', 'UVW']

# convert columns that are common to MSv2 and MSv3
xds = tblconv.convert_expanded_table(infile, os.path.join(outfile,'xds'+str(ddi)), keys={'TIME': 'time', ('ANTENNA1', 'ANTENNA2'): 'baseline'},
xds = tblconv.convert_expanded_table(infile, os.path.join(outfile,xds_name), keys={'TIME': 'time', ('ANTENNA1', 'ANTENNA2'): 'baseline'},
subsel={'DATA_DESC_ID':ddi}, timecols=['time'], dimnames={'d2':'chan', 'd3':'pol'},
ignore=ignorecols + msv2, compressor=compressor, chunks=chunks, nofile=False)
ignore=ignorecols + msv2, compressor=compressor, chunks=chunks, nofile=False, extraselstr=extrasel)
if len(xds.dims) == 0: continue

# convert and append UVW separately so we can handle its special dimension
uvw_chunks = (chunks[0],chunks[1],3) #No chunking over uvw_index
uvw_xds = tblconv.convert_expanded_table(infile, os.path.join(outfile,'tmp'), keys={'TIME': 'time', ('ANTENNA1', 'ANTENNA2'): 'baseline'},
subsel={'DATA_DESC_ID': ddi}, timecols=['time'], dimnames={'d2': 'uvw_index'},
ignore=ignorecols + list(xds.data_vars) + msv2[:-1], compressor=compressor, chunks=uvw_chunks, nofile=False)
uvw_xds.to_zarr(os.path.join(outfile, 'xds'+str(ddi)), mode='a', compute=True, consolidated=True)
ignore=ignorecols + list(xds.data_vars) + msv2[:-1], compressor=compressor, chunks=uvw_chunks, nofile=False, extraselstr=extrasel)
uvw_xds.to_zarr(os.path.join(outfile, xds_name), mode='a', compute=True, consolidated=True)

# convert and append the ANTENNA1 and ANTENNA2 columns separately so we can squash the unnecessary time dimension
ant_xds = tblconv.convert_expanded_table(infile, os.path.join(outfile, 'tmp'), keys={'TIME': 'time', ('ANTENNA1', 'ANTENNA2'): 'baseline'},
subsel={'DATA_DESC_ID': ddi}, timecols=['time'], ignore=ignorecols+list(xds.data_vars)+msv2[:4]+['UVW'],
compressor=compressor, chunks=chunks[:2], nofile=False)
compressor=compressor, chunks=chunks[:2], nofile=False, extraselstr=extrasel)
ant_xds = ant_xds.assign({'ANTENNA1': ant_xds.ANTENNA1.max(axis=0), 'ANTENNA2': ant_xds.ANTENNA2.max(axis=0)}).drop_dims('time')
ant_xds.to_zarr(os.path.join(outfile, 'xds' + str(ddi)), mode='a', compute=True, consolidated=True)
ant_xds.to_zarr(os.path.join(outfile, xds_name), mode='a', compute=True, consolidated=True)

# now convert just the WEIGHT and WEIGHT_SPECTRUM (if preset)
# WEIGHT needs to be expanded to full dimensionality (time, baseline, chan, pol)
wt_xds = tblconv.convert_expanded_table(infile, os.path.join(outfile,'tmp'), keys={'TIME': 'time', ('ANTENNA1', 'ANTENNA2'): 'baseline'},
subsel={'DATA_DESC_ID':ddi}, timecols=['time'], dimnames={},
ignore=ignorecols + list(xds.data_vars) + msv2[-3:], compressor=compressor, chunks=chunks, nofile=False)
ignore=ignorecols + list(xds.data_vars) + msv2[-3:], compressor=compressor, chunks=chunks, nofile=False, extraselstr=extrasel)

# MSv3 changes to weight/sigma column handling
# 1. DATA_WEIGHT = 1/sqrt(SIGMA)
Expand All @@ -157,7 +174,7 @@ def convert_ms(infile, outfile=None, ddis=None, ignore=['HISTORY'], compressor=N
wt_xds = wt_xds.assign({'CORRECTED_DATA_WEIGHT': xarray.DataArray(wt_da, dims=dimorder)})

wt_xds = wt_xds.drop([cc for cc in msv2 if cc in wt_xds.data_vars])
wt_xds.to_zarr(os.path.join(outfile, 'xds' + str(ddi)), mode='a', compute=True, consolidated=True)
wt_xds.to_zarr(os.path.join(outfile, xds_name), mode='a', compute=True, consolidated=True)

# add in relevant data grouping, spw and polarization attributes
attrs = {'data_groups':[{}]}
Expand Down Expand Up @@ -186,10 +203,10 @@ def convert_ms(infile, outfile=None, ddis=None, ignore=['HISTORY'], compressor=N
'chan_width':chan_width, 'effective_bw':effective_bw, 'resolution':resolution}
aux_xds = xarray.Dataset(coords=coords, attrs=attrs)

aux_xds.to_zarr(os.path.join(outfile, 'xds'+str(ddi)), mode='a', compute=True, consolidated=True)
xds = xarray.open_zarr(os.path.join(outfile,'xds'+str(ddi)))
aux_xds.to_zarr(os.path.join(outfile, xds_name), mode='a', compute=True, consolidated=True)
xds = xarray.open_zarr(os.path.join(outfile,xds_name))

xds_list += [('xds'+str(ddi), xds)]
xds_list += [(xds_name, xds)]
print('Completed ddi %i process time {:0.2f} s'.format(time.time()-start_ddi) % ddi)

# clean up the tmp directory created by the weight conversion to MSv3
Expand Down

0 comments on commit c27a1af

Please sign in to comment.