Skip to content

Commit

Permalink
added iraf non-linear loaders
Browse files Browse the repository at this point in the history
  • Loading branch information
Simon Torres committed Aug 1, 2019
1 parent 5d4c3fb commit 1d53c13
Showing 1 changed file with 270 additions and 0 deletions.
270 changes: 270 additions & 0 deletions specutils/io/default_loaders/wcs_fits.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,273 @@ def wcs1d_fits_writer(spectrum, file_name, **kwargs):
tab = Table([disp, flux], names=("dispersion", "flux"), meta=meta)

tab.write(file_name, format="fits")


def identify_iraf_wcs(*args):
print('args ', args)
return True


@data_loader('iraf', identifier=identify_iraf_wcs, dtype=Spectrum1D)
def non_linear_wcs1d_fits(file_name, spectral_axis_unit=None, **kwargs):
"""docstrings
"""
logging.info('Loading 1D non-linear fits solution')

with fits.open(file_name, **kwargs) as hdulist:
header = hdulist[0].header
for wcsdim in range(1, header['WCSDIM'] + 1):
ctypen = header['CTYPE{:d}'.format(wcsdim)]
if ctypen == 'LINEAR':
logging.info("linear Solution: Try using "
"`format='wcs1d-fits'` instead")
wcs = WCS(header)
print(wcs)
spectral_axis = _read_linear_iraf_wcs(wcs=wcs,
dc_flag=header['DC-FLAG'])
elif ctypen == 'MULTISPE':
logging.info("Multi spectral or non-linear solution")
spectral_axis = _read_non_linear_iraf_wcs(header=header,
wcsdim=wcsdim)
# print("Spectral Axis ", spectral_axis)
else:
raise NotImplementedError

if 'BUNIT' in header:
data = u.Quantity(hdulist[0].data, unit=header['BUNIT'])
else:
data = hdulist[0].data

if spectral_axis_unit is not None:
spectral_axis *= spectral_axis_unit

meta = {'header': header}

return Spectrum1D(flux=data, spectral_axis=spectral_axis, meta=meta)


def _read_linear_iraf_wcs(wcs, dc_flag):
"""Linear solution reader
This method read the appropriate keywords and defines a linear
wavelength solution
Returns:
Callable wavelength solution model. Instance of
astropy.modeling.Model
"""
wcs_dict = {'crval': wcs['CRVAL'],
'crpix': wcs['CRPIX'],
'cdelt': wcs['CDELT'],
'dtype': dc_flag,
'pnum': wcs['NAXIS']}

print(wcs_dict['pnum'])

math_model = _set_math_model(wcs_dict=wcs_dict)

spectral_axis = math_model(wcs_dict['pnum'])

return spectral_axis


def _read_non_linear_iraf_wcs(header, wcsdim):
wat_wcs_dict = {}
ctypen = header['CTYPE{:d}'.format(wcsdim)]
logging.info('Attempting to read CTYPE{:d}: {:s}'.format(wcsdim, ctypen))
if ctypen == 'MULTISPE':
# TODO (simon): What is the * (asterisc) doing here?.
wat_head = header['WAT{:d}*'.format(wcsdim)]
if len(wat_head) == 1:
logging.debug('Get units')
wat_array = wat_head[0].split(' ')
for pair in wat_array:
split_pair = pair.split('=')
wat_wcs_dict[split_pair[0]] = split_pair[1]
# print(wat_head[0].split(' '))
elif len(wat_head) > 1:
wat_string = ''
for key in wat_head:
wat_string += header[key]
wat_array = shlex.split(wat_string.replace('=', ' '))
if len(wat_array) % 2 == 0:
for i in range(0, len(wat_array), 2):
# if wat_array[i] not in wcs_dict.keys():
wat_wcs_dict[wat_array[i]] = wat_array[i + 1]
# print(wat_array[i], wat_array[i + 1])

for key in wat_wcs_dict.keys():
logging.info("{:d} -{:s}- {:s}".format(wcsdim,
key,
wat_wcs_dict[key]))

if 'spec1' in wat_wcs_dict.keys():
# print(wat_wcs_dict['spec1'])
spec = wat_wcs_dict['spec1'].split()
aperture = int(spec[0])
beam = int(spec[1])
disp_type = int(spec[2])
disp_start = float(spec[3])
disp_del_av = float(spec[4])
pix_num = int(spec[5])
dopp_fact = float(spec[6])
aper_low = int(float(spec[7]))
aper_high = int(float(spec[8]))
weight = float(spec[9])
zeropoint = float(spec[10])
function_type = int(spec[11])
order = int(float(spec[12]))
min_pix_val = int(float(spec[13]))
max_pix_val = int(float(spec[14]))
# resto = list(spec[9:])
params = [float(i) for i in spec[15:]]
# TODO (simon): Document wcs_dict (what is what).
wcs_dict = {'aperture': aperture,
'beam': beam,
'dtype': disp_type,
'dstart': disp_start,
'avdelt': disp_del_av,
'pnum': pix_num,
'z': dopp_fact,
'alow': aper_low,
'ahigh': aper_high,
'weight': weight,
'zeropoint': zeropoint,
'ftype': function_type,
'order': order,
'pmin': min_pix_val,
'pmax': max_pix_val,
'fpar': params}

# This section of the code only shows a plot to see if the code
# above actually worked which means this methods has not been fully
# developed neither tested
logging.info('Retrieving model')
math_model = _set_math_model(wcs_dict=wcs_dict)

spectral_axis = math_model(range(wcs_dict['pnum']))
return spectral_axis


def _set_math_model(wcs_dict):
if wcs_dict['dtype'] == -1:
return _none()
elif wcs_dict['dtype'] == 0:
return _linear_solution(wcs_dict=wcs_dict)
elif wcs_dict['dtype'] == 1:
return _log_linear(wcs_dict=wcs_dict)
elif wcs_dict['dtype'] == 2:
if wcs_dict['ftype'] == 1:
return _chebyshev(wcs_dict=wcs_dict)
elif wcs_dict['ftype'] == 2:
return _non_linear_legendre(wcs_dict=wcs_dict)
elif wcs_dict['ftype'] == 3:
return _non_linear_cspline(wcs_dict=wcs_dict)
elif wcs_dict['ftype'] == 4:
return _non_linear_lspline(wcs_dict=wcs_dict)
elif wcs_dict['ftype'] == 5:
# pixel coordinates
raise NotImplementedError
elif wcs_dict['ftype'] == 6:
# sampled coordinate array
raise NotImplementedError
else:
raise SyntaxError('ftype {:d} is not defined in the '
'standard'.format(wcs_dict['ftype']))
else:
raise SyntaxError('dtype {:d} is not defined in the '
'standard'.format(wcs_dict['dtype']))


def _none():
"""Required to handle No-wavelength solution
No wavelength solution is considered in the FITS standard (dtype = -1)
This method is placed here for completness even though is not
implemented.
Raises:
NotImplementedError
"""
raise NotImplementedError


def _linear_solution(wcs_dict):
"""Constructs a linear 1D model based on the WCS information obtained
from the header.
"""
intercept = wcs_dict['crval'] - \
(wcs_dict['crpix'] - 1) * \
wcs_dict['cdelt']
model = models.Linear1D(slope=wcs_dict['cdelt'],
intercept=intercept)

return model


def _log_linear(wcs_dict):
"""Not implemented
Raises:
NotImplementedError
"""
# intercept = np.power(10, wcs_dict['crval'] +
# wcs_dict['cdelt'] *
# (wcs_dict['crpix'] - 1))
#
# slope = np.power(10, wcs_dict['cdelt'])
#
# model = models.Linear1D(slope=slope,
# intercept=intercept)
# print(model)
raise NotImplementedError


def _chebyshev(wcs_dict):
"""Returns a chebyshev model"""
model = models.Chebyshev1D(degree=wcs_dict['order'] - 1,
domain=[wcs_dict['pmin'],
wcs_dict['pmax']], )
# model.parameters[0] = wcs_dict['pmin']
for param_index in range(wcs_dict['order']):
model.parameters[param_index] = wcs_dict['fpar'][
param_index]

return model


def _non_linear_legendre(wcs_dict):
"""Set model to Legendre1D
"""
"""Returns a legendre model"""
model = models.Legendre1D(degree=wcs_dict['order'] - 1,
domain=[wcs_dict['pmin'],
wcs_dict['pmax']], )
# model.parameters[0] = wcs_dict['pmin']
for param_index in range(wcs_dict['order']):
model.parameters[param_index] = wcs_dict['fpar'][
param_index]

return model


def _non_linear_lspline(wcs_dict):
"""Not implemented
Raises:
NotImplementedError
"""
raise NotImplementedError('Linear spline is not implemented')


def _non_linear_cspline(wcs_dict):
"""Not implemented
Raises:
NotImplementedError
"""
raise NotImplementedError('Cubic spline is not implemented')

# def get_model(wcs_dict):
# """Returns the wavelength solution model if exists."""
# if model is not None:
# return model
# else:
# log.error("The solution hasn't been found")
#

0 comments on commit 1d53c13

Please sign in to comment.