-
Notifications
You must be signed in to change notification settings - Fork 10
/
dataset.py
234 lines (185 loc) · 8.23 KB
/
dataset.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
from pathlib import Path
from textwrap import dedent
from importlib import resources
from jsonschema.exceptions import ValidationError
import asdf
import astropy.table
import gwcs
from astropy.wcs.wcsapi.wrappers import SlicedLowLevelWCS
from ndcube.ndcube import NDCube
from dkist.io import DaskFITSArrayContainer
from dkist.utils.globus import (DKIST_DATA_CENTRE_DATASET_PATH, DKIST_DATA_CENTRE_ENDPOINT_ID,
start_transfer_from_file_list, watch_transfer_progress)
from dkist.utils.globus.endpoints import get_local_endpoint_id, get_transfer_client
from .utils import dataset_info_str
__all__ = ['Dataset']
class Dataset(NDCube):
"""
The base class for DKIST datasets.
This class is backed by `dask.array.Array` and `gwcs.wcs.WCS` objects.
Parameters
----------
data: `numpy.ndarray`
The array holding the actual data in this object.
wcs: `ndcube.wcs.wcs.WCS`
The WCS object containing the axes' information
uncertainty : any type, optional
Uncertainty in the dataset. Should have an attribute uncertainty_type
that defines what kind of uncertainty is stored, for example "std"
for standard deviation or "var" for variance. A metaclass defining
such an interface is NDUncertainty - but isn’t mandatory. If the uncertainty
has no such attribute the uncertainty is stored as UnknownUncertainty.
Defaults to None.
mask : any type, optional
Mask for the dataset. Masks should follow the numpy convention
that valid data points are marked by False and invalid ones with True.
Defaults to None.
meta : dict-like object, optional
Additional meta information about the dataset. If no meta is provided
an empty collections.OrderedDict is created. Default is None.
unit : Unit-like or str, optional
Unit for the dataset. Strings that can be converted to a Unit are allowed.
Default is None.
extra_coords : iterable of `tuple`, each with three entries
(`str`, `int`, `astropy.units.quantity` or array-like)
Gives the name, axis of data, and values of coordinates of a data axis not
included in the WCS object.
headers : `astropy.table.Table`
A Table of all FITS headers for all files comprising this dataset.
"""
def __init__(self, data, wcs=None, uncertainty=None, mask=None, meta=None,
unit=None, copy=False, headers=None):
# Do some validation
if (not isinstance(wcs, gwcs.WCS) and
(isinstance(wcs, SlicedLowLevelWCS) and not isinstance(wcs._wcs, gwcs.WCS))):
raise ValueError("DKIST Dataset objects expect gWCS objects as the wcs argument.")
if isinstance(wcs, gwcs.WCS):
# Set the array shape to be that of the data.
if wcs.array_shape is None:
wcs.array_shape = data.shape
if wcs.pixel_shape is None:
wcs.pixel_shape = data.shape[::-1]
if (wcs.pixel_shape != data.shape[::-1] or wcs.array_shape != data.shape):
raise ValueError("The pixel and array shape on the WCS object "
"do not match the given array.")
if headers is not None and not isinstance(headers, astropy.table.Table):
raise ValueError("The headers keyword argument must be an Astropy Table instance.")
super().__init__(data, wcs, uncertainty=uncertainty, mask=mask, meta=meta,
unit=unit, copy=copy)
self._header_table = headers
self._array_container = None
"""
Properties.
"""
@property
def headers(self):
return self._header_table
@property
def array_container(self):
"""
A reference to the files containing the data.
"""
return self._array_container
@property
def filenames(self):
"""
The filenames referenced by this dataset.
.. note::
This is not their full file paths.
"""
if self._array_container is None:
return []
else:
return self._array_container.filenames
"""
Dataset loading and saving routines.
"""
@classmethod
def from_directory(cls, directory):
"""
Construct a `~dkist.dataset.Dataset` from a directory containing one
asdf file and a collection of FITS files.
"""
base_path = Path(directory)
if not base_path.is_dir():
raise ValueError("directory argument must be a directory")
asdf_files = tuple(base_path.glob("*.asdf"))
if not asdf_files:
raise ValueError("No asdf file found in directory.")
elif len(asdf_files) > 1:
raise NotImplementedError("Multiple asdf files found in this "
"directory. Use from_asdf to specify which "
"one to use.") # pragma: no cover
asdf_file = asdf_files[0]
return cls.from_asdf(asdf_file)
@classmethod
def from_asdf(cls, filepath):
"""
Construct a dataset object from a filepath of a suitable asdf file.
"""
filepath = Path(filepath)
base_path = filepath.parent
try:
with resources.path("dkist.io", "level_1_dataset_schema.yaml") as schema_path:
with asdf.open(filepath, custom_schema=schema_path.as_posix(),
lazy_load=False, copy_arrays=True) as ff:
return ff.tree['dataset']
except ValidationError as e:
err = f"This file is not a valid DKIST Level 1 asdf file, it fails validation with: {e.message}."
raise TypeError(err) from e
"""
Private methods.
"""
def __repr__(self):
"""
Overload the NDData repr because it does not play nice with the dask delayed io.
"""
prefix = object.__repr__(self)
output = dedent(f"{prefix}\n{self.__str__()}")
return output
def __str__(self):
return dataset_info_str(self)
"""
DKIST specific methods.
"""
def download(self, path="/~/", destination_endpoint=None, progress=True):
"""
Start a Globus file transfer for all files in this Dataset.
Parameters
----------
path : `pathlib.Path` or `str`, optional
The path to save the data in, must be accessible by the Globus
endpoint.
destination_endpoint : `str`, optional
A unique specifier for a Globus endpoint. If `None` a local
endpoint will be used if it can be found, otherwise an error will
be raised. See `~dkist.utils.globus.get_endpoint_id` for valid
endpoint specifiers.
progress : `bool`, optional
If `True` status information and a progress bar will be displayed
while waiting for the transfer to complete.
"""
base_path = Path(DKIST_DATA_CENTRE_DATASET_PATH.format(**self.meta))
# TODO: Default path to the config file
destination_path = Path(path) / self.meta['primaryProposalId'] / self.meta['datasetId']
file_list = [base_path / fn for fn in self.filenames]
file_list.append(base_path / self.meta['asdfObjectKey'])
if not destination_endpoint:
destination_endpoint = get_local_endpoint_id()
task_id = start_transfer_from_file_list(DKIST_DATA_CENTRE_ENDPOINT_ID,
destination_endpoint, destination_path,
file_list)
tc = get_transfer_client()
if progress:
watch_transfer_progress(task_id, tc, initial_n=len(file_list))
else:
tc.task_wait(task_id, timeout=1e6)
# TODO: This is a hack to change the base dir of the dataset.
# The real solution to this is to use the database.
local_destination = destination_path.relative_to("/").expanduser()
old_ac = self._array_container
self._array_container = DaskFITSArrayContainer.from_external_array_references(
old_ac.external_array_references,
loader=old_ac._loader,
basepath=local_destination)
self._data = self._array_container.array