forked from astrodbtoolkit/AstrodbKit2
-
Notifications
You must be signed in to change notification settings - Fork 0
/
spectra.py
224 lines (184 loc) · 7.96 KB
/
spectra.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
"""Functions to handle loading of spectrum objects"""
import os
import astropy.units as u
import numpy as np
from astropy.io import fits
from astropy.nddata import StdDevUncertainty
from astropy.units import Unit
from astropy.wcs import WCS
from specutils import Spectrum1D
from specutils.io.parsing_utils import read_fileobj_or_hdulist
from specutils.io.registers import data_loader
# pylint: disable=no-member, unused-argument
def _identify_spex(filename):
"""
Check whether the given file is a SpeX data product.
"""
try:
with fits.open(filename, memmap=False) as hdulist:
return "spex" in hdulist[0].header["INSTRUME"].lower() and "irtf" in hdulist[0].header["TELESCOP"].lower()
except Exception: # pylint: disable=broad-except,
return False
def identify_spex_prism(origin, *args, **kwargs):
"""
Confirm this is a SpeX Prism FITS file.
See FITS keyword reference at http://irtfweb.ifa.hawaii.edu/~spex/observer/
Notes: GRAT has values of: ShortXD, Prism, LXD_long, LXD_short, SO_long, SO_short
"""
is_spex = _identify_spex(args[0])
if is_spex:
with fits.open(args[0], memmap=False) as hdulist:
return (
isinstance(args[0], str)
and os.path.splitext(args[0].lower())[1] == ".fits"
and is_spex
and ("lowres" in hdulist[0].header["GRAT"].lower() or "prism" in hdulist[0].header["GRAT"].lower())
)
else:
return is_spex
@data_loader("Spex Prism", identifier=identify_spex_prism, extensions=["fits"], dtype=Spectrum1D)
def spex_prism_loader(filename, **kwargs):
"""Open a SpeX Prism file and convert it to a Spectrum1D object"""
with fits.open(filename, **kwargs) as hdulist:
header = hdulist[0].header
tab = hdulist[0].data
# Handle missing/incorrect units
try:
flux_unit = header["YUNITS"].replace("ergs", "erg ").strip()
wave_unit = header["XUNITS"].replace("Microns", "um")
except (KeyError, ValueError):
# For now, assume some default units
flux_unit = "erg"
wave_unit = "um"
wave, data = tab[0] * Unit(wave_unit), tab[1] * Unit(flux_unit)
if tab.shape[0] == 3:
uncertainty = StdDevUncertainty(tab[2])
else:
uncertainty = None
meta = {"header": header}
return Spectrum1D(flux=data, spectral_axis=wave, uncertainty=uncertainty, meta=meta)
def identify_wcs1d_multispec(origin, *args, **kwargs):
"""
Identifier for WCS1D multispec
"""
hdu = kwargs.get("hdu", 0)
# Check if number of axes is one and dimension of WCS is greater than one
with read_fileobj_or_hdulist(*args, **kwargs) as hdulist:
return (
hdulist[hdu].header.get("WCSDIM", 1) > 1
and hdulist[hdu].header["NAXIS"] > 1
and "WAT0_001" in hdulist[hdu].header
and hdulist[hdu].header.get("WCSDIM", 1) == hdulist[hdu].header["NAXIS"]
and "LINEAR" in hdulist[hdu].header.get("CTYPE1", "")
)
@data_loader("wcs1d-multispec", identifier=identify_wcs1d_multispec, extensions=["fits"], dtype=Spectrum1D, priority=10)
def wcs1d_multispec_loader(file_obj, flux_unit=None, hdu=0, verbose=False, **kwargs):
"""
Loader for multiextension spectra as wcs1d. Adapted from wcs1d_fits_loader
Parameters
----------
file_obj : str, file-like or HDUList
FITS file name, object (provided from name by Astropy I/O Registry),
or HDUList (as resulting from astropy.io.fits.open()).
flux_unit : :class:`~astropy.units.Unit` or str, optional
Units of the flux for this spectrum. If not given (or None), the unit
will be inferred from the BUNIT keyword in the header. Note that this
unit will attempt to convert from BUNIT if BUNIT is present.
hdu : int
The index of the HDU to load into this spectrum.
verbose : bool
Print extra info.
**kwargs
Extra keywords for :func:`~specutils.io.parsing_utils.read_fileobj_or_hdulist`.
Returns
-------
:class:`~specutils.Spectrum1D`
"""
with read_fileobj_or_hdulist(file_obj, **kwargs) as hdulist:
header = hdulist[hdu].header
wcs = WCS(header)
# Load data, convert units if BUNIT and flux_unit is provided and not the same
if "BUNIT" in header:
data = u.Quantity(hdulist[hdu].data, unit=header["BUNIT"])
if u.A in data.unit.bases:
data = data * u.A / u.AA # convert ampere to Angroms
if flux_unit is not None:
data = data.to(flux_unit)
else:
data = u.Quantity(hdulist[hdu].data, unit=flux_unit)
if wcs.wcs.cunit[0] == "" and "WAT1_001" in header:
# Try to extract from IRAF-style card or use Angstrom as default.
wat_dict = dict((rec.split("=") for rec in header["WAT1_001"].split()))
unit = wat_dict.get("units", "Angstrom")
if hasattr(u, unit):
wcs.wcs.cunit[0] = unit
else: # try with unit name stripped of excess plural 's'...
wcs.wcs.cunit[0] = unit.rstrip("s")
if verbose:
print(f"Extracted spectral axis unit '{unit}' from 'WAT1_001'")
elif wcs.wcs.cunit[0] == "":
wcs.wcs.cunit[0] = "Angstrom"
# Compatibility attribute for lookup_table (gwcs) WCS
wcs.unit = tuple(wcs.wcs.cunit)
# Identify the correct parts of the data to store
if len(data.shape) > 1:
flux_data = data[0]
else:
flux_data = data
uncertainty = None
if "NAXIS3" in header:
for i in range(header["NAXIS3"]):
if "spectrum" in header.get(f"BANDID{i+1}", ""):
flux_data = data[i]
if "sigma" in header.get(f"BANDID{i+1}", ""):
uncertainty = data[i]
# Reshape arrays if needed
if len(flux_data) == 1 and len(flux_data.shape) > 1:
flux_data = np.reshape(flux_data, -1)
if uncertainty is not None:
uncertainty = np.reshape(uncertainty, -1)
# Convert uncertainty to StdDevUncertainty array
if uncertainty is not None:
uncertainty = StdDevUncertainty(uncertainty)
# Manually generate spectral axis
pixels = [[i] + [0] * (wcs.naxis - 1) for i in range(wcs.pixel_shape[0])]
spectral_axis = [i[0] for i in wcs.all_pix2world(pixels, 0)] * wcs.wcs.cunit[0]
# Store header as metadata information
meta = {"header": header}
return Spectrum1D(flux=flux_data, spectral_axis=spectral_axis, uncertainty=uncertainty, meta=meta)
def load_spectrum(filename: str, spectra_format: str = None, raise_error: bool = False):
"""Attempt to load the filename as a spectrum object
Parameters
----------
filename
Name of the file to read
spectra_format
Optional file format, passed to Spectrum1D.read.
In its absense Spectrum1D.read will attempt to determine the format.
raise_error
Boolean to control if a failure to read the spectrum should raise an error.
"""
# Convert filename if using environment variables
if filename.startswith("$"):
partial_path, _ = os.path.split(filename)
while partial_path != "":
partial_path, envvar_name = os.path.split(partial_path)
abs_path = os.getenv(envvar_name[1:])
if abs_path is not None:
filename = filename.replace(envvar_name, abs_path)
else:
print(f"Could not find environment variable {envvar_name}")
try:
if spectra_format is not None:
spec1d = Spectrum1D.read(filename, format=spectra_format)
else:
spec1d = Spectrum1D.read(filename)
except Exception as e: # pylint: disable=broad-except, invalid-name
msg = f"Error loading {filename}: {e}"
# Control whether an error should be explicitly raised if failing to read
if raise_error:
raise TypeError(msg) from e
else:
print(msg)
spec1d = filename
return spec1d