/
Telluric.py
153 lines (120 loc) · 5 KB
/
Telluric.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# Copyright (c) Geoffrey Lentner 2015. All Rights Reserved.
# See LICENSE (GPLv3)
# slipy/SLiPy/Tellucic.Py
"""
Telluric - Corrections for atmospheric absoption lines.
"""
import numpy as np
from astropy import units as u
from .. import SlipyError
from ..Framework.Options import Options, OptionsError
from .Correlate import Xcorr, CorrelateError
from .Spectrum import Spectrum, SpectrumError
class TelluricError(SlipyError):
"""
Exception specific to the Telluric module.
"""
pass
def Correct(spectrum, *calibration, **kwargs):
"""
Correct(spectrum, *calibration, **kwargs):
Perform a telluric correction on `spectrum` with one or more
`calibration` spectra. If more than one calibration spectrum is
provided, the one with the best fit after performing both a
horizontal cross correlation and a vertical amplitude fit is used.
The spectrum and all the calibration spectra must have the same
number of pixels (elements). If a horizontal shift in the calibration
spectra is appropriate, only the corresponding range of the spectrum
is divided out!
kwargs = {
lag : 25 , # pixel shift limit for XCorr()
range:(0.5, 2.0, 151), # numpy.linspace for amplitude trials
}
"""
try:
# default keyword arguments
options = Options( kwargs,
{
'lag' : 25 , # pixel shift limit for XCorr()
'range':(0.5, 2.0, 151) # numpy.linspace for amplitude trials
})
# check arguments
if not calibration:
raise TelluricError('At least one `calibration` spectrum '
'needs to be provided for Telluric.Correction().')
if type(spectrum) is not Spectrum:
raise TelluricError('Telluric.Correct() expects all arguments '
'of type Spectrum.')
for cal in calibration:
if type(cal) is not Spectrum:
raise TelluricError('Telluric.Correct() expects all '
'arguments to be of type Spectrum.')
if spectrum not in cal:
raise TelluricError('Telluric.Correct() expects the '
'`spectrum` domain to be equivalent to or at least '
'contained within each `calibration` spectrum.')
if len(options('range')) != 3:
raise OptionsError('`range` expects a tuple of length 3.')
# assign parameters
lag = options('lag')
amp = np.linspace( *options('range') )
trials = len(amp)
npix = len(spectrum)
except OptionsError as err:
print(' --> OptionsError:', err)
raise TelluricError('Inappropriate keyword arguments in '
'Telluric.Correct().')
# quit if too big of a task
if trials*npix > 1e8:
raise TelluricError('Telluric.Correct() is programmed to quit '
'if it detects a request to operate on matrices with more '
'than 10**8 elements.')
# resample all the calibration spectra
for cal in calibration:
cal.resample(spectrum)
# find best XCorr and amplitude adjustment
best = None
for cal in calibration:
# best horizontal pixel shift
shift = Xcorr( spectrum, cal, lag=lag)
# build matrices with identical rows (len=npix-shift)
if shift < 0:
calmatrix = np.tile(cal.data[:shift], (trials, 1))
objmatrix = np.tile(spectrum.data[-shift:], (trials, 1))
elif shift > 0:
calmatrix = np.tile(cal.data[shift:], (trials, 1))
objmatrix = np.tile(spectrum.data[:-shift], (trials, 1))
else:
calmatrix = np.tile(cal.data, (trials, 1))
objmatrix = np.tile(spectrum.data, (trials, 1))
# amplitude matrix has identical columns
size = np.shape(calmatrix)[1]
ampmatrix = np.tile(amp, (size,1)).T
# remove units for dimensionless operations
calmatrix = calmatrix.value
objmatrix = objmatrix.value
# flip arrays for amplification
diff = objmatrix - (1 - (1 - calmatrix) * ampmatrix)
# compute the RMS for each trial
rmsvector = np.sqrt(( diff**2 ).sum(axis=1) / size)
if not best:
# if first pass, assign to `best`
best = ( rmsvector.min(), rmsvector.argmin(), shift, cal )
elif rmsvector.min() < best[0]:
# this fit is better, save it to `best`
best = ( rmsvector.min(), rmsvector.argmin(), shift, cal )
# results of calibration fitting
index = best[1] # amplitude
shift = best[2] # XCorr
cal = best[3] # which calibration spectrum
# we can't update an attribute...
update = spectrum.data.value
# divide spectrum
if shift < 0:
update[-shift:] /= 1 - (1 - cal.data[:shift].value) * amp[index]
elif shift > 0:
update[:-shift] /= 1 - (1 - cal.data[shift:].value) * amp[index]
else:
update /= 1 - (1 - cal.data.value) * amp[index]
# re-incorporate units
spectrum.data = update * u.Unit(spectrum.yunits)