-
Notifications
You must be signed in to change notification settings - Fork 1
/
uvdata.py
4378 lines (3942 loc) · 239 KB
/
uvdata.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""Primary container for radio interferometer datasets."""
from astropy import constants as const
from astropy.time import Time
import os
import numpy as np
import warnings
import ephem
from uvbase import UVBase
import parameter as uvp
import telescopes as uvtel
import utils as uvutils
import copy
import collections
import re
try:
import HERA_MapMaking_VisibilitySimulation as mmvs
except:
print('No HERA_Mapmaking_VisibilitySimulation module detected.')
from collections import OrderedDict as odict
def read_miriad_fromUVData(filepath, correct_lat_lon=True, run_check=True,
check_extra=True,
run_check_acceptability=True, phase_type=None, Parallel_Files=False, Time_Average=1, Frequency_Average=1, Dred=True, inplace=True, tol=5.e-4, Select_freq=False, Select_time=False, Badants=[]):
uv2 = UVData()
uv2.read_miriad(filepath=filepath, correct_lat_lon=correct_lat_lon, run_check=run_check, check_extra=check_extra, run_check_acceptability=run_check_acceptability, phase_type=phase_type)
if Dred is True or Time_Average != 1. or Frequency_Average != 1. or Badants != []:
print('select_average starts.')
uv2.select_average(Time_Average=Time_Average, Frequency_Average=Frequency_Average, Dred=Dred, inplace=inplace, tol=tol, Select_freq=Select_freq, Select_time=Select_time, Badants=Badants, run_check=run_check)
return uv2
def read_uvh5_fromUVData(filename, antenna_nums=None, antenna_names=None,
ant_str=None, bls=None, frequencies=None, freq_chans=None,
times=None, polarizations=None, blt_inds=None, read_data=True,
run_check=True, check_extra=True, run_check_acceptability=True,
Parallel_Files=False, Time_Average=1, Frequency_Average=1, Dred=True, inplace=True, tol=5.e-4, Select_freq=False, Select_time=False, Badants=[]):
uv2 = UVData()
uv2.read_uvh5(filename, antenna_nums=antenna_nums,
antenna_names=antenna_names, ant_str=ant_str, bls=bls,
frequencies=frequencies, freq_chans=freq_chans, times=times,
polarizations=polarizations, blt_inds=blt_inds,
read_data=read_data, run_check=run_check,
check_extra=check_extra,
run_check_acceptability=run_check_acceptability, Parallel_Files=False, Time_Average=1, Frequency_Average=1, Dred=True, inplace=True, tol=5.e-4, Select_freq=False, Select_time=False, Badants=[])
if Dred is True or Time_Average != 1. or Frequency_Average != 1. or Badants != []:
print('select_average starts.')
uv2.select_average(Time_Average=Time_Average, Frequency_Average=Frequency_Average, Dred=Dred, inplace=inplace, tol=tol, Select_freq=Select_freq, Select_time=Select_time, Badants=Badants, run_check=run_check)
return uv2
class UVData(UVBase):
"""
A class for defining a radio interferometer dataset.
Currently supported file types: uvfits, miriad, fhd.
Provides phasing functions.
Attributes:
UVParameter objects: For full list see UVData Parameters
(http://pyuvdata.readthedocs.io/en/latest/uvdata_parameters.html).
Some are always required, some are required for certain phase_types
and others are always optional.
"""
def __init__(self):
# """Create a new UVData object."""
# # add the UVParameters to the class
#
# # standard angle tolerance: 10 mas in radians.
# # Should perhaps be decreased to 1 mas in the future
# radian_tol = 10 * 2 * np.pi * 1e-3 / (60.0 * 60.0 * 360.0)
#
# self._Ntimes = uvp.UVParameter('Ntimes', description='Number of times',
# expected_type=int)
# self._Nbls = uvp.UVParameter('Nbls', description='Number of baselines',
# expected_type=int)
# self._Nblts = uvp.UVParameter('Nblts', description='Number of baseline-times '
# '(i.e. number of spectra). Not necessarily '
# 'equal to Nbls * Ntimes', expected_type=int)
# self._Nfreqs = uvp.UVParameter('Nfreqs', description='Number of frequency channels',
# expected_type=int)
# self._Npols = uvp.UVParameter('Npols', description='Number of polarizations',
# expected_type=int)
#
# desc = ('Array of the visibility data, shape: (Nblts, Nspws, Nfreqs, '
# 'Npols), type = complex float, in units of self.vis_units')
# self._data_array = uvp.UVParameter('data_array', description=desc,
# form=('Nblts', 'Nspws',
# 'Nfreqs', 'Npols'),
# expected_type=np.complex)
#
# desc = 'Visibility units, options are: "uncalib", "Jy" or "K str"'
# self._vis_units = uvp.UVParameter('vis_units', description=desc,
# form='str', expected_type=str,
# acceptable_vals=["uncalib", "Jy", "K str"])
#
# desc = ('Number of data points averaged into each data element, '
# 'NOT required to be an integer, type = float, same shape as data_array.'
# 'The product of the integration_time and the nsample_array '
# 'value for a visibility reflects the total amount of time '
# 'that went into the visibility. Best practice is for the '
# 'nsample_array to be used to track flagging within an integration_time '
# '(leading to a decrease of the nsample array value below 1) and '
# 'LST averaging (leading to an increase in the nsample array '
# 'value). So datasets that have not been LST averaged should '
# 'have nsample array values less than or equal to 1.'
# 'Note that many files do not follow this convention, but it is '
# 'safe to assume that the product of the integration_time and '
# 'the nsample_array is the total amount of time included in a visibility.')
# self._nsample_array = uvp.UVParameter('nsample_array', description=desc,
# form=('Nblts', 'Nspws',
# 'Nfreqs', 'Npols'),
# expected_type=(np.float))
#
# desc = 'Boolean flag, True is flagged, same shape as data_array.'
# self._flag_array = uvp.UVParameter('flag_array', description=desc,
# form=('Nblts', 'Nspws',
# 'Nfreqs', 'Npols'),
# expected_type=np.bool)
#
# self._Nspws = uvp.UVParameter('Nspws', description='Number of spectral windows '
# '(ie non-contiguous spectral chunks). '
# 'More than one spectral window is not '
# 'currently supported.', expected_type=int)
#
# self._spw_array = uvp.UVParameter('spw_array',
# description='Array of spectral window '
# 'Numbers, shape (Nspws)', form=('Nspws',),
# expected_type=int)
#
# desc = ('Projected baseline vectors relative to phase center, '
# 'shape (Nblts, 3), units meters. Convention is: uvw = xyz(ant2) - xyz(ant1).'
# 'Note that this is the Miriad convention but it is different '
# 'from the AIPS/FITS convention (where uvw = xyz(ant1) - xyz(ant2)).')
# self._uvw_array = uvp.UVParameter('uvw_array', description=desc,
# form=('Nblts', 3),
# expected_type=np.float,
# acceptable_range=(0, 1e8), tols=1e-3)
#
# desc = ('Array of times, center of integration, shape (Nblts), '
# 'units Julian Date')
# self._time_array = uvp.UVParameter('time_array', description=desc,
# form=('Nblts',),
# expected_type=np.float,
# tols=1e-3 / (60.0 * 60.0 * 24.0)) # 1 ms in days
#
# desc = ('Array of lsts, center of integration, shape (Nblts), '
# 'units radians')
# self._lst_array = uvp.UVParameter('lst_array', description=desc,
# form=('Nblts',),
# expected_type=np.float,
# tols=radian_tol)
#
# desc = ('Array of first antenna indices, shape (Nblts), '
# 'type = int, 0 indexed')
# self._ant_1_array = uvp.UVParameter('ant_1_array', description=desc,
# expected_type=int, form=('Nblts',))
# desc = ('Array of second antenna indices, shape (Nblts), '
# 'type = int, 0 indexed')
# self._ant_2_array = uvp.UVParameter('ant_2_array', description=desc,
# expected_type=int, form=('Nblts',))
#
# desc = ('Array of baseline indices, shape (Nblts), '
# 'type = int; baseline = 2048 * (ant1+1) + (ant2+1) + 2^16')
# self._baseline_array = uvp.UVParameter('baseline_array',
# description=desc,
# expected_type=int, form=('Nblts',))
#
# # this dimensionality of freq_array does not allow for different spws
# # to have different dimensions
# desc = 'Array of frequencies, shape (Nspws, Nfreqs), units Hz'
# self._freq_array = uvp.UVParameter('freq_array', description=desc,
# form=('Nspws', 'Nfreqs'),
# expected_type=np.float,
# tols=1e-3) # mHz
#
# desc = ('Array of polarization integers, shape (Npols). '
# 'AIPS Memo 117 says: pseudo-stokes 1:4 (pI, pQ, pU, pV); '
# 'circular -1:-4 (RR, LL, RL, LR); linear -5:-8 (XX, YY, XY, YX). '
# 'NOTE: AIPS Memo 117 actually calls the pseudo-Stokes polarizations '
# '"Stokes", but this is inaccurate as visibilities cannot be in '
# 'true Stokes polarizations for physical antennas. We adopt the '
# 'term pseudo-Stokes to refer to linear combinations of instrumental '
# 'visibility polarizations (e.g. pI = xx + yy).')
# self._polarization_array = uvp.UVParameter('polarization_array',
# description=desc,
# expected_type=int,
# acceptable_vals=list(
# np.arange(-8, 0)) + list(np.arange(1, 5)),
# form=('Npols',))
#
# desc = ('Length of the integration in seconds, shape (NBlts). '
# 'The product of the integration_time and the nsample_array '
# 'value for a visibility reflects the total amount of time '
# 'that went into the visibility. Best practice is for the '
# 'integration_time to reflect the length of time a visibility '
# 'was integrated over (so it should vary in the case of '
# 'baseline-dependent averaging and be a way to do selections '
# 'for differently integrated baselines).'
# 'Note that many files do not follow this convention, but it is '
# 'safe to assume that the product of the integration_time and '
# 'the nsample_array is the total amount of time included in a visibility.')
# self._integration_time = uvp.UVParameter('integration_time',
# description=desc,
# form=('Nblts',),
# expected_type=np.float, tols=1e-3) # 1 ms
#
# # self._integration_time = uvp.UVParameter('integration_time',
# # description='Length of the integration (s)',
# # expected_type=np.float, tols=1e-3) # 1 ms
#
# self._channel_width = uvp.UVParameter('channel_width',
# description='Width of frequency channels (Hz)',
# expected_type=np.float,
# tols=1e-3) # 1 mHz
#
# # --- observation information ---
# self._object_name = uvp.UVParameter('object_name',
# description='Source or field '
# 'observed (string)', form='str',
# expected_type=str)
# self._telescope_name = uvp.UVParameter('telescope_name',
# description='Name of telescope '
# '(string)', form='str',
# expected_type=str)
# self._instrument = uvp.UVParameter('instrument', description='Receiver or backend. '
# 'Sometimes identical to telescope_name',
# form='str', expected_type=str)
#
# desc = ('Telescope location: xyz in ITRF (earth-centered frame). '
# 'Can also be accessed using telescope_location_lat_lon_alt or '
# 'telescope_location_lat_lon_alt_degrees properties')
# self._telescope_location = uvp.LocationParameter('telescope_location',
# description=desc,
# acceptable_range=(
# 6.35e6, 6.39e6),
# tols=1e-3)
#
# self._history = uvp.UVParameter('history', description='String of history, units English',
# form='str', expected_type=str)
#
# # --- phasing information ---
# desc = ('String indicating phasing type. Allowed values are "drift", '
# '"phased" and "unknown"')
# self._phase_type = uvp.UVParameter('phase_type', form='str', expected_type=str,
# description=desc, value='unknown',
# acceptable_vals=['drift', 'phased', 'unknown'])
#
# desc = ('Required if phase_type = "phased". Epoch year of the phase '
# 'applied to the data (eg 2000.)')
# self._phase_center_epoch = uvp.UVParameter('phase_center_epoch',
# required=False,
# description=desc,
# expected_type=np.float)
#
# desc = ('Required if phase_type = "phased". Right ascension of phase '
# 'center (see uvw_array), units radians. Can also be accessed using phase_center_ra_degrees.')
# self._phase_center_ra = uvp.AngleParameter('phase_center_ra',
# required=False,
# description=desc,
# expected_type=np.float,
# tols=radian_tol)
#
# desc = ('Required if phase_type = "phased". Declination of phase center '
# '(see uvw_array), units radians. Can also be accessed using phase_center_dec_degrees.')
# self._phase_center_dec = uvp.AngleParameter('phase_center_dec',
# required=False,
# description=desc,
# expected_type=np.float,
# tols=radian_tol)
#
# desc = ('Only relevant if phase_type = "phased". Specifies the frame the'
# ' data and uvw_array are phased to. Options are "gcrs" and "icrs",'
# ' default is "icrs"')
# self._phase_center_frame = uvp.UVParameter('phase_center_frame',
# required=False,
# description=desc,
# expected_type=str,
# acceptable_vals=['icrs', 'gcrs'])
#
# # --- antenna information ----
# desc = ('Number of antennas with data present (i.e. number of unique '
# 'entries in ant_1_array and ant_2_array). May be smaller '
# 'than the number of antennas in the array')
# self._Nants_data = uvp.UVParameter('Nants_data', description=desc,
# expected_type=int)
#
# desc = ('Number of antennas in the array. May be larger '
# 'than the number of antennas with data')
# self._Nants_telescope = uvp.UVParameter('Nants_telescope',
# description=desc, expected_type=int)
#
# desc = ('List of antenna names, shape (Nants_telescope), '
# 'with numbers given by antenna_numbers (which can be matched '
# 'to ant_1_array and ant_2_array). There must be one entry '
# 'here for each unique entry in ant_1_array and '
# 'ant_2_array, but there may be extras as well.')
# self._antenna_names = uvp.UVParameter('antenna_names', description=desc,
# form=('Nants_telescope',),
# expected_type=str)
#
# desc = ('List of integer antenna numbers corresponding to antenna_names, '
# 'shape (Nants_telescope). There must be one '
# 'entry here for each unique entry in ant_1_array and '
# 'ant_2_array, but there may be extras as well.')
# self._antenna_numbers = uvp.UVParameter('antenna_numbers', description=desc,
# form=('Nants_telescope',),
# expected_type=int)
#
# # -------- extra, non-required parameters ----------
# desc = ('Orientation of the physical dipole corresponding to what is '
# 'labelled as the x polarization. Examples include "east" '
# '(indicating east/west orientation) and "north" (indicating '
# 'north/south orientation)')
# self._x_orientation = uvp.UVParameter('x_orientation', description=desc,
# required=False, expected_type=str)
#
# desc = ('Any user supplied extra keywords, type=dict. Keys should be '
# '8 character or less strings if writing to uvfits or miriad files. '
# 'Use the special key "comment" for long multi-line string comments.')
# self._extra_keywords = uvp.UVParameter('extra_keywords', required=False,
# description=desc, value={},
# spoof_val={}, expected_type=dict)
#
# desc = ('Array giving coordinates of antennas relative to '
# 'telescope_location (ITRF frame), shape (Nants_telescope, 3), '
# 'units meters. See the tutorial page in the documentation '
# 'for an example of how to convert this to topocentric frame.'
# 'Will be a required parameter in a future version.')
# self._antenna_positions = uvp.AntPositionParameter('antenna_positions',
# required=False,
# description=desc,
# form=(
# 'Nants_telescope', 3),
# expected_type=np.float,
# tols=1e-3) # 1 mm
#
# desc = ('Array of antenna diameters in meters. Used by CASA to '
# 'construct a default beam if no beam is supplied.')
# self._antenna_diameters = uvp.UVParameter('antenna_diameters',
# required=False,
# description=desc,
# form=('Nants_telescope',),
# expected_type=np.float,
# tols=1e-3) # 1 mm
#
# # --- other stuff ---
# # the below are copied from AIPS memo 117, but could be revised to
# # merge with other sources of data.
# self._gst0 = uvp.UVParameter('gst0', required=False,
# description='Greenwich sidereal time at '
# 'midnight on reference date',
# spoof_val=0.0, expected_type=np.float)
# self._rdate = uvp.UVParameter('rdate', required=False,
# description='Date for which the GST0 or '
# 'whatever... applies',
# spoof_val='', form='str')
# self._earth_omega = uvp.UVParameter('earth_omega', required=False,
# description='Earth\'s rotation rate '
# 'in degrees per day',
# spoof_val=360.985, expected_type=np.float)
# self._dut1 = uvp.UVParameter('dut1', required=False,
# description='DUT1 (google it) AIPS 117 '
# 'calls it UT1UTC',
# spoof_val=0.0, expected_type=np.float)
# self._timesys = uvp.UVParameter('timesys', required=False,
# description='We only support UTC',
# spoof_val='UTC', form='str')
#
# desc = ('FHD thing we do not understand, something about the time '
# 'at which the phase center is normal to the chosen UV plane '
# 'for phasing')
# self._uvplane_reference_time = uvp.UVParameter('uvplane_reference_time',
# required=False,
# description=desc,
# spoof_val=0)
#
# # desc = ('Required if phase_type = "drift". Right ascension of zenith. '
# # 'units: radians, shape (Nblts). Can also be accessed using zenith_ra_degrees.')
# # self._zenith_ra = uvp.AngleParameter('zenith_ra', required=False,
# # description=desc,
# # expected_type=np.float,
# # form=('Nblts',),
# # tols=radian_tol)
# #
# # desc = ('Required if phase_type = "drift". Declination of zenith. '
# # 'units: radians, shape (Nblts). Can also be accessed using zenith_dec_degrees.')
# # # in practice, dec of zenith will never change; does not need to be shape Nblts
# # self._zenith_dec = uvp.AngleParameter('zenith_dec', required=False,
# # description=desc,
# # expected_type=np.float,
# # form=('Nblts',),
# # tols=radian_tol)
#
# super(UVData, self).__init__()
"""Create a new UVData object."""
# add the UVParameters to the class
# standard angle tolerance: 10 mas in radians.
# Should perhaps be decreased to 1 mas in the future
radian_tol = 10 * 2 * np.pi * 1e-3 / (60.0 * 60.0 * 360.0)
self._Ntimes = uvp.UVParameter('Ntimes', description='Number of times',
expected_type=int)
self._Nbls = uvp.UVParameter('Nbls', description='Number of baselines',
expected_type=int)
self._Nblts = uvp.UVParameter('Nblts', description='Number of baseline-times '
'(i.e. number of spectra). Not necessarily '
'equal to Nbls * Ntimes', expected_type=int)
self._Nfreqs = uvp.UVParameter('Nfreqs', description='Number of frequency channels',
expected_type=int)
self._Npols = uvp.UVParameter('Npols', description='Number of polarizations',
expected_type=int)
desc = ('Array of the visibility data, shape: (Nblts, Nspws, Nfreqs, '
'Npols), type = complex float, in units of self.vis_units')
self._data_array = uvp.UVParameter('data_array', description=desc,
form=('Nblts', 'Nspws',
'Nfreqs', 'Npols'),
expected_type=np.complex)
desc = 'Visibility units, options are: "uncalib", "Jy" or "K str"'
self._vis_units = uvp.UVParameter('vis_units', description=desc,
form='str', expected_type=str,
acceptable_vals=["uncalib", "Jy", "K str"])
desc = ('Number of data points averaged into each data element, '
'NOT required to be an integer. type = float, same shape as data_array')
self._nsample_array = uvp.UVParameter('nsample_array', description=desc,
form=('Nblts', 'Nspws',
'Nfreqs', 'Npols'),
expected_type=(np.float))
desc = 'Boolean flag, True is flagged, same shape as data_array.'
self._flag_array = uvp.UVParameter('flag_array', description=desc,
form=('Nblts', 'Nspws',
'Nfreqs', 'Npols'),
expected_type=np.bool)
self._Nspws = uvp.UVParameter('Nspws', description='Number of spectral windows '
'(ie non-contiguous spectral chunks). '
'More than one spectral window is not '
'currently supported.', expected_type=int)
self._spw_array = uvp.UVParameter('spw_array',
description='Array of spectral window '
'Numbers, shape (Nspws)', form=('Nspws',),
expected_type=int)
desc = ('Projected baseline vectors relative to phase center, ' +
'shape (Nblts, 3), units meters')
self._uvw_array = uvp.UVParameter('uvw_array', description=desc,
form=('Nblts', 3),
expected_type=np.float,
acceptable_range=(1e-3, 1e8), tols=.001)
desc = ('Array of times, center of integration, shape (Nblts), ' +
'units Julian Date')
self._time_array = uvp.UVParameter('time_array', description=desc,
form=('Nblts',),
expected_type=np.float,
tols=1e-3 / (60.0 * 60.0 * 24.0)) # 1 ms in days
desc = ('Array of lsts, center of integration, shape (Nblts), ' +
'units radians')
self._lst_array = uvp.UVParameter('lst_array', description=desc,
form=('Nblts',),
expected_type=np.float,
tols=radian_tol)
desc = ('Array of first antenna indices, shape (Nblts), '
'type = int, 0 indexed')
self._ant_1_array = uvp.UVParameter('ant_1_array', description=desc,
expected_type=int, form=('Nblts',))
desc = ('Array of second antenna indices, shape (Nblts), '
'type = int, 0 indexed')
self._ant_2_array = uvp.UVParameter('ant_2_array', description=desc,
expected_type=int, form=('Nblts',))
desc = ('Array of baseline indices, shape (Nblts), '
'type = int; baseline = 2048 * (ant2+1) + (ant1+1) + 2^16')
self._baseline_array = uvp.UVParameter('baseline_array',
description=desc,
expected_type=int, form=('Nblts',))
# this dimensionality of freq_array does not allow for different spws
# to have different dimensions
desc = 'Array of frequencies, shape (Nspws, Nfreqs), units Hz'
self._freq_array = uvp.UVParameter('freq_array', description=desc,
form=('Nspws', 'Nfreqs'),
expected_type=np.float,
tols=1e-3) # mHz
desc = ('Array of polarization integers, shape (Npols). '
'AIPS Memo 117 says: stokes 1:4 (I,Q,U,V); '
'circular -1:-4 (RR,LL,RL,LR); linear -5:-8 (XX,YY,XY,YX)')
self._polarization_array = uvp.UVParameter('polarization_array',
description=desc,
expected_type=int,
acceptable_vals=list(
np.arange(-8, 0)) + list(np.arange(1, 5)),
form=('Npols',))
# self._integration_time = uvp.UVParameter('integration_time',
# description='Length of the integration (s)',
# expected_type=np.float, tols=1e-3) # 1 ms
self._channel_width = uvp.UVParameter('channel_width',
description='Width of frequency channels (Hz)',
expected_type=np.float,
tols=1e-3) # 1 mHz
# --- observation information ---
self._object_name = uvp.UVParameter('object_name',
description='Source or field '
'observed (string)', form='str',
expected_type=str)
self._telescope_name = uvp.UVParameter('telescope_name',
description='Name of telescope '
'(string)', form='str',
expected_type=str)
self._instrument = uvp.UVParameter('instrument', description='Receiver or backend. '
'Sometimes identical to telescope_name',
form='str', expected_type=str)
desc = ('Telescope location: xyz in ITRF (earth-centered frame). '
'Can also be accessed using telescope_location_lat_lon_alt or '
'telescope_location_lat_lon_alt_degrees properties')
self._telescope_location = uvp.LocationParameter('telescope_location',
description=desc,
acceptable_range=(
6.35e6, 6.39e6),
tols=1e-3)
self._history = uvp.UVParameter('history', description='String of history, units English',
form='str', expected_type=str)
# --- phasing information ---
desc = ('String indicating phasing type. Allowed values are "drift", '
'"phased" and "unknown"')
self._phase_type = uvp.UVParameter('phase_type', form='str', expected_type=str,
description=desc, value='unknown',
acceptable_vals=['drift', 'phased', 'unknown'])
# desc = ('Required if phase_type = "drift". Right ascension of zenith. '
# 'units: radians, shape (Nblts). Can also be accessed using zenith_ra_degrees.')
# self._zenith_ra = uvp.AngleParameter('zenith_ra', required=False,
# description=desc,
# expected_type=np.float,
# form=('Nblts',),
# tols=radian_tol)
#
# desc = ('Required if phase_type = "drift". Declination of zenith. '
# 'units: radians, shape (Nblts). Can also be accessed using zenith_dec_degrees.')
# # in practice, dec of zenith will never change; does not need to be shape Nblts
# self._zenith_dec = uvp.AngleParameter('zenith_dec', required=False,
# description=desc,
# expected_type=np.float,
# form=('Nblts',),
# tols=radian_tol)
desc = ('Required if phase_type = "phased". Epoch year of the phase '
'applied to the data (eg 2000.)')
self._phase_center_epoch = uvp.UVParameter('phase_center_epoch',
required=False,
description=desc,
expected_type=np.float)
desc = ('Required if phase_type = "phased". Right ascension of phase '
'center (see uvw_array), units radians. Can also be accessed using phase_center_ra_degrees.')
self._phase_center_ra = uvp.AngleParameter('phase_center_ra',
required=False,
description=desc,
expected_type=np.float,
tols=radian_tol)
desc = ('Required if phase_type = "phased". Declination of phase center '
'(see uvw_array), units radians. Can also be accessed using phase_center_dec_degrees.')
self._phase_center_dec = uvp.AngleParameter('phase_center_dec',
required=False,
description=desc,
expected_type=np.float,
tols=radian_tol)
# --- antenna information ----
desc = ('Number of antennas with data present (i.e. number of unique '
'entries in ant_1_array and ant_2_array). May be smaller ' +
'than the number of antennas in the array')
self._Nants_data = uvp.UVParameter('Nants_data', description=desc,
expected_type=int)
desc = ('Number of antennas in the array. May be larger ' +
'than the number of antennas with data')
self._Nants_telescope = uvp.UVParameter('Nants_telescope',
description=desc, expected_type=int)
desc = ('List of antenna names, shape (Nants_telescope), '
'with numbers given by antenna_numbers (which can be matched '
'to ant_1_array and ant_2_array). There must be one entry '
'here for each unique entry in ant_1_array and '
'ant_2_array, but there may be extras as well.')
self._antenna_names = uvp.UVParameter('antenna_names', description=desc,
form=('Nants_telescope',),
expected_type=str)
desc = ('List of integer antenna numbers corresponding to antenna_names, '
'shape (Nants_telescope). There must be one '
'entry here for each unique entry in ant_1_array and '
'ant_2_array, but there may be extras as well.')
self._antenna_numbers = uvp.UVParameter('antenna_numbers', description=desc,
form=('Nants_telescope',),
expected_type=int)
# -------- extra, non-required parameters ----------
desc = ('Orientation of the physical dipole corresponding to what is '
'labelled as the x polarization. Examples include "east" '
'(indicating east/west orientation) and "north" (indicating '
'north/south orientation)')
self._x_orientation = uvp.UVParameter('x_orientation', description=desc,
required=False, expected_type=str)
desc = ('Any user supplied extra keywords, type=dict. Keys should be '
'8 character or less strings if writing to uvfits or miriad files. '
'Use the special key "comment" for long multi-line string comments.')
self._extra_keywords = uvp.UVParameter('extra_keywords', required=False,
description=desc, value={},
spoof_val={}, expected_type=dict)
desc = ('Array giving coordinates of antennas relative to '
'telescope_location (ITRF frame), shape (Nants_telescope, 3), '
'units meters. See the tutorial page in the documentation '
'for an example of how to convert this to topocentric frame.')
self._antenna_positions = uvp.AntPositionParameter('antenna_positions',
required=False,
description=desc,
form=(
'Nants_telescope', 3),
expected_type=np.float,
tols=1e-3) # 1 mm
desc = ('Array of antenna diameters in meters. Used by CASA to '
'construct a default beam if no beam is supplied.')
self._antenna_diameters = uvp.UVParameter('antenna_diameters',
required=False,
description=desc,
form=('Nants_telescope',),
expected_type=np.float,
tols=1e-3) # 1 mm
# --- other stuff ---
# the below are copied from AIPS memo 117, but could be revised to
# merge with other sources of data.
self._gst0 = uvp.UVParameter('gst0', required=False,
description='Greenwich sidereal time at '
'midnight on reference date',
spoof_val=0.0, expected_type=np.float)
self._rdate = uvp.UVParameter('rdate', required=False,
description='Date for which the GST0 or '
'whatever... applies',
spoof_val='', form='str')
self._earth_omega = uvp.UVParameter('earth_omega', required=False,
description='Earth\'s rotation rate '
'in degrees per day',
spoof_val=360.985, expected_type=np.float)
self._dut1 = uvp.UVParameter('dut1', required=False,
description='DUT1 (google it) AIPS 117 '
'calls it UT1UTC',
spoof_val=0.0, expected_type=np.float)
self._timesys = uvp.UVParameter('timesys', required=False,
description='We only support UTC',
spoof_val='UTC', form='str')
desc = ('FHD thing we do not understand, something about the time '
'at which the phase center is normal to the chosen UV plane '
'for phasing')
self._uvplane_reference_time = uvp.UVParameter('uvplane_reference_time',
required=False,
description=desc,
spoof_val=0)
super(UVData, self).__init__()
def check(self, check_extra=True, run_check_acceptability=True):
"""
Add some extra checks on top of checks on UVBase class.
Check that required parameters exist. Check that parameters have
appropriate shapes and optionally that the values are acceptable.
Args:
check_extra: If true, check all parameters, otherwise only check
required parameters.
run_check_acceptability: Option to check if values in parameters
are acceptable. Default is True.
"""
# first run the basic check from UVBase
# set the phase type based on object's value
if self.phase_type == 'phased':
self.set_phased()
elif self.phase_type == 'drift':
self.set_drift()
else:
self.set_unknown_phase_type()
super(UVData, self).check(check_extra=check_extra,
run_check_acceptability=run_check_acceptability)
# Check internal consistency of numbers which don't explicitly correspond
# to the shape of another array.
nants_data_calc = int(len(np.unique(self.ant_1_array.tolist()
+ self.ant_2_array.tolist())))
if self.Nants_data != nants_data_calc:
raise ValueError('Nants_data must be equal to the number of unique '
'values in ant_1_array and ant_2_array')
if self.Nbls != len(np.unique(self.baseline_array)):
raise ValueError('Nbls must be equal to the number of unique '
'baselines in the data_array')
if self.Ntimes != len(np.unique(self.time_array)):
raise ValueError('Ntimes must be equal to the number of unique '
'times in the time_array')
# issue warning if extra_keywords keys are longer than 8 characters
for key in self.extra_keywords.keys():
if len(key) > 8:
warnings.warn('key {key} in extra_keywords is longer than 8 '
'characters. It will be truncated to 8 if written '
'to uvfits or miriad file formats.'.format(key=key))
# issue warning if extra_keywords values are lists, arrays or dicts
for key, value in self.extra_keywords.items():
if isinstance(value, (list, dict, np.ndarray)):
warnings.warn('{key} in extra_keywords is a list, array or dict, '
'which will raise an error when writing uvfits or '
'miriad file types'.format(key=key))
# issue deprecation warning if antenna positions are not set
if self.antenna_positions is None:
warnings.warn('antenna_positions are not defined. '
'antenna_positions will be a required parameter in '
'future versions.', PendingDeprecationWarning)
# check auto and cross-corrs have sensible uvws
autos = np.isclose(self.ant_1_array - self.ant_2_array, 0.0)
if not np.all(np.isclose(self.uvw_array[autos], 0.0,
rtol=self._uvw_array.tols[0],
atol=self._uvw_array.tols[1])):
raise ValueError("Some auto-correlations have non-zero "
"uvw_array coordinates.")
if np.any(np.isclose([np.linalg.norm(uvw) for uvw in self.uvw_array[~autos]], 0.0,
rtol=self._uvw_array.tols[0],
atol=self._uvw_array.tols[1])):
raise ValueError("Some cross-correlations have near-zero "
"uvw_array magnitudes.")
return True
def set_drift(self):
"""Set phase_type to 'drift' and adjust required parameters."""
self.phase_type = 'drift'
self._phase_center_epoch.required = False
self._phase_center_ra.required = False
self._phase_center_dec.required = False
def set_phased(self):
"""Set phase_type to 'phased' and adjust required parameters."""
self.phase_type = 'phased'
self._phase_center_epoch.required = True
self._phase_center_ra.required = True
self._phase_center_dec.required = True
def set_unknown_phase_type(self):
"""Set phase_type to 'unknown' and adjust required parameters."""
self.phase_type = 'unknown'
self._phase_center_epoch.required = False
self._phase_center_ra.required = False
self._phase_center_dec.required = False
def known_telescopes(self):
"""
Retun a list of telescopes known to pyuvdata.
This is just a shortcut to uvdata.telescopes.known_telescopes()
"""
return uvtel.known_telescopes()
def set_telescope_params(self, overwrite=False):
"""
Set telescope related parameters.
If the telescope_name is in the known_telescopes, set any missing
telescope-associated parameters (e.g. telescope location) to the value
for the known telescope.
Args:
overwrite: Option to overwrite existing telescope-associated
parameters with the values from the known telescope.
Default is False.
"""
telescope_obj = uvtel.get_telescope(self.telescope_name)
if telescope_obj is not False:
params_set = []
for p in telescope_obj:
telescope_param = getattr(telescope_obj, p)
self_param = getattr(self, p)
if telescope_param.value is not None and (overwrite is True
or self_param.value is None):
telescope_shape = telescope_param.expected_shape(telescope_obj)
self_shape = self_param.expected_shape(self)
if telescope_shape == self_shape:
params_set.append(self_param.name)
prop_name = self_param.name
setattr(self, prop_name, getattr(telescope_obj, prop_name))
else:
# expected shapes aren't equal. This can happen e.g. with diameters,
# which is a single value on the telescope object but is
# an array of length Nants_telescope on the UVData object
if telescope_shape == () and self_shape != 'str':
array_val = np.zeros(self_shape,
dtype=telescope_param.expected_type) + telescope_param.value
params_set.append(self_param.name)
prop_name = self_param.name
setattr(self, prop_name, array_val)
else:
raise ValueError('parameter {p} on the telescope '
'object does not have a compatible '
'expected shape.')
if len(params_set) > 0:
params_set_str = ', '.join(params_set)
warnings.warn('{params} is not set. Using known values '
'for {telescope_name}.'.format(params=params_set_str,
telescope_name=telescope_obj.telescope_name))
else:
raise ValueError('Telescope {telescope_name} is not in '
'known_telescopes.'.format(telescope_name=self.telescope_name))
def baseline_to_antnums(self, baseline):
"""
Get the antenna numbers corresponding to a given baseline number.
Args:
baseline: integer baseline number
Returns:
tuple with the two antenna numbers corresponding to the baseline.
"""
return uvutils.baseline_to_antnums(baseline, self.Nants_telescope)
def antnums_to_baseline(self, ant1, ant2, attempt256=False):
"""
Get the baseline number corresponding to two given antenna numbers.
Args:
ant1: first antenna number (integer)
ant2: second antenna number (integer)
attempt256: Option to try to use the older 256 standard used in
many uvfits files (will use 2048 standard if there are more
than 256 antennas). Default is False.
Returns:
integer baseline number corresponding to the two antenna numbers.
"""
return uvutils.antnums_to_baseline(ant1, ant2, self.Nants_telescope, attempt256=attempt256)
def order_pols(self, order='AIPS'):
'''
Arranges polarizations in orders corresponding to either AIPS or CASA convention.
CASA stokes types are ordered with cross-pols in between (e.g. XX,XY,YX,YY) while
AIPS orders pols with auto-pols followed by cross-pols (e.g. XX,YY,XY,YX)
Args:
order: string, either 'CASA' or 'AIPS'. Default='AIPS'
'''
if(order == 'AIPS'):
pol_inds = np.argsort(self.polarization_array)
pol_inds = pol_inds[::-1]
elif(order == 'CASA'): # sandwich
casa_order = np.array([1, 2, 3, 4, -1, -3, -4, -2, -5, -7, -8, -6])
pol_inds = []
for pol in self.polarization_array:
pol_inds.append(np.where(casa_order == pol)[0][0])
pol_inds = np.argsort(pol_inds)
else:
warnings.warn('Invalid order supplied. No sorting performed.')
pol_inds = list(range(self.Npols))
# Generate a map from original indices to new indices
if not np.array_equal(pol_inds, self.Npols):
self.reorder_pols(order=pol_inds)
def set_lsts_from_time_array(self):
"""Set the lst_array based from the time_array."""
latitude, longitude, altitude = self.telescope_location_lat_lon_alt_degrees
self.lst_array = uvutils.get_lst_for_time(self.time_array, latitude, longitude, altitude)
def unphase_to_drift(self, phase_frame=None, use_ant_pos=False):
"""
Convert from a phased dataset to a drift dataset.
See the phasing memo under docs/references for more documentation.
Args:
phase_frame: the astropy frame to phase from. Either 'icrs' or 'gcrs'.
'gcrs' accounts for precession & nutation, 'icrs' also includes abberation.
Defaults to using the 'phase_center_frame' attribute or 'icrs'
if that attribute is None
use_ant_pos: If True, calculate the uvws directly from the
antenna positions rather than from the existing uvws.
"""
if self.phase_type == 'phased':
pass
elif self.phase_type == 'drift':
raise ValueError('The data is already drift scanning; can only '
'unphase phased data.')
else:
raise ValueError('The phasing type of the data is unknown. '
'Set the phase_type to drift or phased to '
'reflect the phasing status of the data')
if phase_frame is None:
if self.phase_center_frame is not None:
phase_frame = self.phase_center_frame
else:
phase_frame = 'icrs'
icrs_coord = SkyCoord(ra=self.phase_center_ra, dec=self.phase_center_dec,
unit='radian', frame='icrs')
if phase_frame == 'icrs':
frame_phase_center = icrs_coord
else:
# use center of observation for obstime for gcrs
center_time = np.mean([np.max(self.time_array), np.min(self.time_array)])
icrs_coord.obstime = Time(center_time, format='jd')
frame_phase_center = icrs_coord.transform_to('gcrs')
# This promotion is REQUIRED to get the right answer when we
# add in the telescope location for ICRS
# In some cases, the uvws are already float64, but sometimes they're not
self.uvw_array = np.float64(self.uvw_array)
# apply -w phasor
w_lambda = (self.uvw_array[:, 2].reshape(self.Nblts, 1)
/ const.c.to('m/s').value * self.freq_array.reshape(1, self.Nfreqs))
phs = np.exp(-1j * 2 * np.pi * (-1) * w_lambda[:, None, :, None])
self.data_array *= phs
unique_times, unique_inds = np.unique(self.time_array, return_index=True)
for ind, jd in enumerate(unique_times):
inds = np.where(self.time_array == jd)[0]
obs_time = Time(jd, format='jd')
itrs_telescope_location = SkyCoord(x=self.telescope_location[0] * units.m,
y=self.telescope_location[1] * units.m,
z=self.telescope_location[2] * units.m,
representation='cartesian',
frame='itrs', obstime=obs_time)
frame_telescope_location = itrs_telescope_location.transform_to(phase_frame)
itrs_lat_lon_alt = self.telescope_location_lat_lon_alt
if use_ant_pos:
ant_uvw = uvutils.phase_uvw(self.telescope_location_lat_lon_alt[1],
self.telescope_location_lat_lon_alt[0],
self.antenna_positions)
for bl_ind in inds:
ant1_index = np.where(self.antenna_numbers == self.ant_1_array[bl_ind])[0][0]
ant2_index = np.where(self.antenna_numbers == self.ant_2_array[bl_ind])[0][0]
self.uvw_array[bl_ind, :] = ant_uvw[ant2_index, :] - ant_uvw[ant1_index, :]
else:
uvws_use = self.uvw_array[inds, :]
uvw_rel_positions = uvutils.unphase_uvw(frame_phase_center.ra.rad,
frame_phase_center.dec.rad,
uvws_use)
frame_telescope_location.representation = 'cartesian'
frame_uvw_coord = SkyCoord(x=uvw_rel_positions[:, 0] * units.m + frame_telescope_location.x,
y=uvw_rel_positions[:, 1] * units.m + frame_telescope_location.y,
z=uvw_rel_positions[:, 2] * units.m + frame_telescope_location.z,
representation='cartesian',
frame=phase_frame, obstime=obs_time)
itrs_uvw_coord = frame_uvw_coord.transform_to('itrs')
# now convert them to ENU, which is the space uvws are in
self.uvw_array[inds, :] = uvutils.ENU_from_ECEF(itrs_uvw_coord.cartesian.get_xyz().value.T,
*itrs_lat_lon_alt)
# remove phase center
self.phase_center_frame = None
self.phase_center_ra = None
self.phase_center_dec = None
self.phase_center_epoch = None
self.set_drift()
def phase(self, ra, dec, epoch='J2000', phase_frame='icrs', use_ant_pos=False):
""""
Phase a drift scan dataset to a single ra/dec at a particular epoch.
See the phasing memo under docs/references for more documentation.