-
Notifications
You must be signed in to change notification settings - Fork 8
/
continuum.py
180 lines (135 loc) · 6.44 KB
/
continuum.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Continuum-normalization.
"""
from __future__ import (division, print_function, absolute_import,
unicode_literals)
__all__ = ["sines_and_cosines"]
import logging
import numpy as np
def _continuum_design_matrix(dispersion, L, order):
"""
Build a design matrix for the continuum determination, using sines and
cosines.
:param dispersion:
An array of dispersion points.
:param L:
The length-scale for the sine and cosine functions.
:param order:
The number of sines and cosines to use in the fit.
"""
L, dispersion = float(L), np.array(dispersion)
scale = 2 * (np.pi / L)
return np.vstack([
np.ones_like(dispersion).reshape((1, -1)),
np.array([
[np.cos(o * scale * dispersion), np.sin(o * scale * dispersion)] \
for o in range(1, order + 1)]).reshape((2 * order, dispersion.size))
])
def sines_and_cosines(dispersion, flux, ivar, continuum_pixels, L=1400, order=3,
regions=None, fill_value=1.0, **kwargs):
"""
Fit the flux values of pre-defined continuum pixels using a sum of sine and
cosine functions.
:param dispersion:
The dispersion values.
:param flux:
The flux values for all pixels, as they correspond to the `dispersion`
array.
:param ivar:
The inverse variances for all pixels, as they correspond to the
`dispersion` array.
:param continuum_pixels:
A mask that selects pixels that should be considered as 'continuum'.
:param L: [optional]
The length scale for the sines and cosines.
:param order: [optional]
The number of sine/cosine functions to use in the fit.
:param regions: [optional]
Specify sections of the spectra that should be fitted separately in each
star. This may be due to gaps between CCDs, or some other physically-
motivated reason. These values should be specified in the same units as
the `dispersion`, and should be given as a list of `[(start, end), ...]`
values. For example, APOGEE spectra have gaps near the following
wavelengths which could be used as `regions`:
>> regions = ([15090, 15822], [15823, 16451], [16452, 16971])
:param fill_value: [optional]
The continuum value to use for when no continuum was calculated for that
particular pixel (e.g., the pixel is outside of the `regions`).
:param full_output: [optional]
If set as True, then a metadata dictionary will also be returned.
:returns:
The continuum values for all pixels, and a dictionary that contains
metadata about the fit.
"""
scalar = kwargs.pop("__magic_scalar", 1e-6) # MAGIC
flux, ivar = np.atleast_2d(flux), np.atleast_2d(ivar)
if regions is None:
regions = [(dispersion[0], dispersion[-1])]
region_masks = []
region_matrices = []
continuum_masks = []
continuum_matrices = []
pixel_included_in_regions = np.zeros_like(flux).astype(int)
for start, end in regions:
# Build the masks for this region.
si, ei = np.searchsorted(dispersion, (start, end))
region_mask = (end >= dispersion) * (dispersion >= start)
region_masks.append(region_mask)
pixel_included_in_regions[:, region_mask] += 1
continuum_masks.append(continuum_pixels[
(ei >= continuum_pixels) * (continuum_pixels >= si)])
# Build the design matrices for this region.
region_matrices.append(
_continuum_design_matrix(dispersion[region_masks[-1]], L, order))
continuum_matrices.append(
_continuum_design_matrix(dispersion[continuum_masks[-1]], L, order))
# TODO: ISSUE: Check for overlapping regions and raise an warning.
# Check for non-zero pixels (e.g. ivar > 0) that are not included in a
# region. We should warn about this very loudly!
warn_on_pixels = (pixel_included_in_regions == 0) * (ivar > 0)
metadata = []
continuum = np.ones_like(flux) * fill_value
for i in range(flux.shape[0]):
warn_indices = np.where(warn_on_pixels[i])[0]
if any(warn_indices):
# Split by deltas so that we give useful warning messages.
segment_indices = np.where(np.diff(warn_indices) > 1)[0]
segment_indices = np.sort(np.hstack(
[0, segment_indices, segment_indices + 1, len(warn_indices)]))
segment_indices = segment_indices.reshape(-1, 2)
segments = ", ".join(["{:.1f} to {:.1f} ({:d} pixels)".format(
dispersion[s], dispersion[e], e-s) for s, e in segment_indices])
logging.warn("Some pixels in spectrum index {0} have measured flux "
"values (e.g., ivar > 0) but are not included in any "
"specified continuum region. These pixels won't be "
"continuum-normalised: {1}".format(i, segments))
# Get the flux and inverse variance for this object.
object_metadata = []
object_flux, object_ivar = (flux[i], ivar[i])
# Normalize each region.
for region_mask, region_matrix, continuum_mask, continuum_matrix in \
zip(region_masks, region_matrices, continuum_masks, continuum_matrices):
if continuum_mask.size == 0:
# Skipping..
object_metadata.append([order, L, fill_value, scalar, [], None])
continue
# We will fit to continuum pixels only.
continuum_disp = dispersion[continuum_mask]
continuum_flux, continuum_ivar \
= (object_flux[continuum_mask], object_ivar[continuum_mask])
# Solve for the amplitudes.
M = continuum_matrix
MTM = np.dot(M, continuum_ivar[:, None] * M.T)
MTy = np.dot(M, (continuum_ivar * continuum_flux).T)
eigenvalues = np.linalg.eigvalsh(MTM)
MTM[np.diag_indices(len(MTM))] += scalar * np.max(eigenvalues)
eigenvalues = np.linalg.eigvalsh(MTM)
condition_number = max(eigenvalues)/min(eigenvalues)
amplitudes = np.linalg.solve(MTM, MTy)
continuum[i, region_mask] = np.dot(region_matrix.T, amplitudes)
object_metadata.append(
(order, L, fill_value, scalar, amplitudes, condition_number))
metadata.append(object_metadata)
return (continuum, metadata)