-
Notifications
You must be signed in to change notification settings - Fork 5
/
load_processing_set.py
170 lines (142 loc) · 6.59 KB
/
load_processing_set.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
import xarray as xr
import zarr
import copy
import os
from ._processing_set import processing_set
from typing import Dict, Union
def load_processing_set(
ps_store: str,
sel_parms: dict,
data_variables: Union[list, None] = None,
load_sub_datasets: bool = True,
) -> processing_set:
"""Loads a processing set into memory.
Parameters
----------
ps_store : str
String of the path and name of the processing set. For example '/users/user_1/uid___A002_Xf07bba_Xbe5c_target.lsrk.vis.zarr' for a file stored on a local file system, or 's3://viper-test-data/Antennae_North.cal.lsrk.split.vis.zarr/' for a file in AWS object storage.
sel_parms : dict
A dictionary where the keys are the names of the ms_xds's and the values are slice_dicts.
slice_dicts: A dictionary where the keys are the dimension names and the values are slices.
For example::
{
'ms_v4_name_1': {'frequency': slice(0, 160, None),'time':slice(0,100)},
...
'ms_v4_name_n': {'frequency': slice(0, 160, None),'time':slice(0,100)},
}
data_variables : Union[list, None], optional
The list of data variables to load into memory for example ['VISIBILITY', 'WEIGHT, 'FLAGS']. By default None which will load all data variables into memory.
load_sub_datasets : bool, optional
If true sub-datasets (for example weather_xds, antenna_xds, pointing_xds, ...) will be loaded into memory, by default True.
Returns
-------
processing_set
In memory representation of processing set (data is represented by Dask.arrays).
"""
from xradio._utils.zarr.common import _open_dataset
import s3fs
from botocore.exceptions import NoCredentialsError
s3 = None
ps = processing_set()
for ms_dir_name, ms_xds_isel in sel_parms.items():
# before the _open_dataset call, check if dealing with an S3 bucket URL
if ps_store.startswith("s3"):
if not ps_store.endswith("/"):
# just for consistency, as there is no os.path equivalent in s3fs
ps_store = ps_store + "/"
try:
s3 = s3fs.S3FileSystem(anon=False, requester_pays=False)
main_xds = ps_store + ms_dir_name + "/MAIN"
xds = _open_dataset(
main_xds, ms_xds_isel, data_variables, load=True, s3=s3
)
if load_sub_datasets:
from xradio.vis.read_processing_set import _read_sub_xds
xds.attrs = {
**xds.attrs,
**_read_sub_xds(
os.path.join(ps_store, ms_dir_name), load=True, s3=s3
),
}
except (NoCredentialsError, PermissionError) as e:
# only public, read-only buckets will be accessible
s3 = s3fs.S3FileSystem(anon=True)
main_xds = ps_store + ms_dir_name + "/MAIN"
xds = _open_dataset(
main_xds, ms_xds_isel, data_variables, load=True, s3=s3
)
if load_sub_datasets:
from xradio.vis.read_processing_set import _read_sub_xds
xds.attrs = {
**xds.attrs,
**_read_sub_xds(
os.path.join(ps_store, ms_dir_name), load=True, s3=s3
),
}
else:
# fall back to the default case of assuming the files are on local disk
main_xds = os.path.join(ps_store, ms_dir_name, "MAIN")
xds = _open_dataset(main_xds, ms_xds_isel, data_variables, load=True)
if load_sub_datasets:
from xradio.vis.read_processing_set import _read_sub_xds
xds.attrs = {
**xds.attrs,
**_read_sub_xds(os.path.join(ps_store, ms_dir_name), load=True),
}
ps[ms_dir_name] = xds
return ps
class processing_set_iterator:
def __init__(
self,
sel_parms: dict,
input_data_store: str,
input_data: Union[Dict, processing_set, None] = None,
data_variables: list = None,
load_sub_datasets: bool = True,
):
"""An iterator that will go through a processing set one MS v4 at a time.
Parameters
----------
sel_parms : dict
A dictionary where the keys are the names of the ms_xds's and the values are slice_dicts.
slice_dicts: A dictionary where the keys are the dimension names and the values are slices.
For example::
{
'ms_v4_name_1': {'frequency': slice(0, 160, None),'time':slice(0,100)},
...
'ms_v4_name_n': {'frequency': slice(0, 160, None),'time':slice(0,100)},
}
input_data_store : str
String of the path and name of the processing set. For example '/users/user_1/uid___A002_Xf07bba_Xbe5c_target.lsrk.vis.zarr'.
input_data : Union[Dict, processing_set, None], optional
If the processing set is in memory already it can be supplied here. By default None which will make the iterator load data using the supplied input_data_store.
data_variables : list, optional
The list of data variables to load into memory for example ['VISIBILITY', 'WEIGHT, 'FLAGS']. By default None which will load all data variables into memory.
load_sub_datasets : bool, optional
If true sub-datasets (for example weather_xds, antenna_xds, pointing_xds, ...) will be loaded into memory, by default True.
"""
self.input_data = input_data
self.input_data_store = input_data_store
self.sel_parms = sel_parms
self.xds_name_iter = iter(sel_parms.keys())
self.data_variables = data_variables
self.load_sub_datasets = load_sub_datasets
def __iter__(self):
return self
def __next__(self):
try:
xds_name = next(self.xds_name_iter)
except Exception as e:
raise StopIteration
if self.input_data is None:
slice_description = self.sel_parms[xds_name]
ps = load_processing_set(
ps_store=self.input_data_store,
sel_parms={xds_name: slice_description},
data_variables=self.data_variables,
load_sub_datasets=self.load_sub_datasets,
)
xds = ps.get(0)
else:
xds = self.input_data[xds_name] # In memory
return xds