-
Notifications
You must be signed in to change notification settings - Fork 3
/
nh3_testcubes.py
240 lines (185 loc) · 8.22 KB
/
nh3_testcubes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
import pyspeckit.spectrum.models.ammonia as ammonia
import pyspeckit.spectrum.models.ammonia_constants as nh3con
from pyspeckit.spectrum.units import SpectroscopicAxis as spaxis
from string import ascii_lowercase
import os
import sys
import numpy as np
from astropy.io import fits
from astropy.utils.console import ProgressBar
from astropy import log
from astropy import units as u
from astropy import constants
log.setLevel('ERROR')
def generate_cubes(nCubes=100, nBorder=1, noise_rms=0.1, output_dir='random_cubes', random_seed=42,
linenames=['oneone', 'twotwo']):
xarrList = []
for linename in linenames:
# generate spectral axis for each ammonia lines
xarr = generate_xarr(linename)
xarrList.append(xarr)
# generate random parameters for nCubes
nComps, Temp, Width, Voff, logN = generate_parameters(nCubes, random_seed)
gradX, gradY = generate_gradients(nCubes, random_seed)
for xarr, linename in zip(xarrList, linenames):
# generate cubes for each line specified
print('----------- generating {0} lines ------------'.format(linename))
for i in ProgressBar(range(nCubes)):
make_and_write(nCubes, nComps[i], i, nBorder, xarr, Temp[i], Width[i], Voff[i], logN[i], gradX[i], gradY[i]
, noise_rms, linename, output_dir)
def make_and_write(nCubes, nComp, i, nBorder, xarr, T, W, V, N, grdX, grdY, noise_rms, linename, output_dir):
# wrapper for make_cube() and write_fits_cube()
results = make_cube(nComp, nBorder, xarr, T, W, V, N, grdX, grdY, noise_rms)
write_fits_cube(results['cube'], nCubes, nComp, i, N, V, W, T, noise_rms,
results['Tmax'], results['Tmax_a'], results['Tmax_b'], linename,
output_dir)
def generate_gradients(nCubes, random_seed=None):
# generate random gradient for temp, sigma, voff, and logN in the X & Y directions
if random_seed:
np.random.seed(random_seed)
# scaling for the temp, sigma, voff, and logN parameters
scale = np.array([[0.2, 0.1, 0.1, 0.01]])
# 0.5 km/s/pix is about 39.7 km/s/pc at 260pc away at 10"/pix
gradX1 = np.random.randn(nCubes, 4) * scale
gradY1 = np.random.randn(nCubes, 4) * scale
gradX2 = np.random.randn(nCubes, 4) * scale
gradY2 = np.random.randn(nCubes, 4) * scale
gradX = np.array([gradX1, gradX2])
gradY = np.array([gradY1, gradY2])
return gradX.swapaxes(0, 1), gradY.swapaxes(0, 1)
def generate_parameters(nCubes, random_seed=None, fix_vlsr=True):
# generate random parameters within a pre-defined distributions, for a two velocity slab model
if random_seed:
np.random.seed(random_seed)
nComps = np.random.choice([1, 2], nCubes)
Temp1 = 8 + np.random.rand(nCubes) * 17
Temp2 = 8 + np.random.rand(nCubes) * 17
if fix_vlsr:
Voff1 = np.zeros(nCubes)
else:
Voff1 = np.random.rand(nCubes) * 5 - 2.5
Voff2 = Voff1 + np.random.rand(nCubes) * 5 - 2.5
logN1 = 13 + 1.5 * np.random.rand(nCubes)
logN2 = 13 + 1.5 * np.random.rand(nCubes)
Width1NT = 0.1 * np.exp(1.5 * np.random.randn(nCubes))
Width2NT = 0.1 * np.exp(1.5 * np.random.randn(nCubes))
Width1 = np.sqrt(Width1NT + 0.08 ** 2)
Width2 = np.sqrt(Width2NT + 0.08 ** 2)
Temp = np.array([Temp1, Temp2]).swapaxes(0, 1)
Width = np.array([Width1, Width2]).swapaxes(0, 1)
Voff = np.array([Voff1, Voff2]).swapaxes(0, 1)
logN = np.array([logN1, logN2]).swapaxes(0, 1)
return nComps, Temp, Width, Voff, logN
def generate_xarr(linename):
# generate SpectroscopicAxis objects
# the -1.0 term in the end is to insure the channel is in the increasing order in frequency space, consistent
# with GAS data
channelwidth = (5.72 * u.kHz / (nh3con.freq_dict[linename] * u.Hz)) * constants.c * -1.0
xarr = spaxis(np.arange(-500, 500) * channelwidth,
unit='GHz',
refX=nh3con.freq_dict[linename] / 1e9,
velocity_convention='radio', refX_unit='GHz')
return xarr
def make_cube(nComps, nBorder, xarr, Temp, Width, Voff, logN, gradX, gradY, noise_rms):
# the length of Temp, Width, Voff, logN, gradX, and gradY should match the number of components
xmat, ymat = np.indices((2 * nBorder + 1, 2 * nBorder + 1))
cube = np.zeros((xarr.shape[0], 2 * nBorder + 1, 2 * nBorder + 1))
results = {}
results['Tmax_a'], results['Tmax_b'] = (0,) * 2
for xx, yy in zip(xmat.flatten(), ymat.flatten()):
spec = np.zeros(cube.shape[0])
for j in range(nComps):
# define parameters
T = Temp[j] * (1 + gradX[j][0] * (xx - 1) + gradY[j][0] * (yy - 1)) + 5
if T < 2.74:
T = 2.74
W = np.abs(Width[j] * (1 + gradX[j][1] * (xx - 1) + gradY[j][1] * (yy - 1)))
V = Voff[j] + (gradX[j][2] * (xx - 1) + gradY[j][2] * (yy - 1))
N = logN[j] * (1 + gradX[j][3] * (xx - 1) + gradY[j][3] * (yy - 1))
# generate spectrum
spec_j = ammonia.cold_ammonia(xarr, T, ntot=N, width=W, xoff_v=V)
if (xx == nBorder) and (yy == nBorder):
Tmaxj = np.max(spec_j)
results['Tmax_{}'.format(ascii_lowercase[j])] = Tmaxj
# add each component to the total spectrum
spec = spec + spec_j
cube[:, yy, xx] = spec
if (xx == nBorder) and (yy == nBorder):
Tmax = np.max(spec)
results['Tmax'] = Tmax
cube += np.random.randn(*cube.shape) * noise_rms
results['cube'] = cube
return results
def write_fits_cube(cube, nCubes, nComps, i, logN, Voff, Width, Temp, noise_rms,
Tmax, Tmax_a, Tmax_b, linename, output_dir='random_cubes'):
"""
Function to write a test cube as a fits file
Note: only currently compatible with nComp <= 2
"""
if not os.path.isdir(output_dir):
# This places nCubes random cubes into the specified output directory
os.mkdir(output_dir)
logN1, logN2 = logN[0], logN[1]
Voff1, Voff2 = Voff[0], Voff[1]
Width1, Width2 = Width[0], Width[1]
Temp1, Temp2 = Temp[0], Temp[1]
nDigits = int(np.ceil(np.log10(nCubes)))
hdrkwds = {'BUNIT': 'K',
'INSTRUME': 'KFPA ',
'BMAJ': 0.008554169991270138,
'BMIN': 0.008554169991270138,
'TELESCOP': 'GBT',
'WCSAXES': 3,
'CRPIX1': 2,
'CRPIX2': 2,
'CRPIX3': 501,
'CDELT1': -0.008554169991270138,
'CDELT2': 0.008554169991270138,
'CDELT3': 5720.0,
'CUNIT1': 'deg',
'CUNIT2': 'deg',
'CUNIT3': 'Hz',
'CTYPE1': 'RA---TAN',
'CTYPE2': 'DEC--TAN',
'CTYPE3': 'FREQ',
'CRVAL1': 0.0,
'CRVAL2': 0.0,
'LONPOLE': 180.0,
'LATPOLE': 0.0,
'EQUINOX': 2000.0,
'SPECSYS': 'LSRK',
'RADESYS': 'FK5',
'SSYSOBS': 'TOPOCENT'}
truekwds = ['NCOMP', 'LOGN1', 'LOGN2', 'VLSR1', 'VLSR2',
'SIG1', 'SIG2', 'TKIN1', 'TKIN2']
hdu = fits.PrimaryHDU(cube)
for kk in hdrkwds:
hdu.header[kk] = hdrkwds[kk]
for kk, vv in zip(truekwds, [nComps, logN1, logN2,
Voff1, Voff2, Width1, Width2,
Temp1, Temp2]):
hdu.header[kk] = vv
hdu.header['TMAX'] = Tmax
hdu.header['TMAX-1'] = Tmax_a
hdu.header['TMAX-2'] = Tmax_b
hdu.header['RMS'] = noise_rms
hdu.header['CRVAL3'] = nh3con.freq_dict[linename]
hdu.header['RESTFRQ'] = nh3con.freq_dict[linename]
# specify the ID fore each line to appear in saved fits files
if linename == 'oneone':
lineID = '11'
elif linename == 'twotwo':
lineID = '22'
else:
# use line names at it is for lines above (3,3)
lineID = linename
hdu.writeto(output_dir + '/random_cube_NH3_{0}_'.format(lineID)
+ '{0}'.format(i).zfill(nDigits)
+ '.fits',
overwrite=True)
if __name__ == '__main__':
print(sys.argv)
if len(sys.argv) > 1:
generate_cubes(nCubes=int(sys.argv[1]))
else:
generate_cubes()