-
Notifications
You must be signed in to change notification settings - Fork 2
/
xrfi.py
3331 lines (3021 loc) · 157 KB
/
xrfi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Copyright (c) 2019 the HERA Project
# Licensed under the MIT License
"""Module for performing RFI identification and excision."""
import numpy as np
import os
from collections.abc import Iterable
from pyuvdata import UVData
from pyuvdata import UVCal
from pyuvdata import UVFlag
from . import utils as qm_utils
from pyuvdata import utils as uvutils
from . import __version__
from .metrics_io import process_ex_ants
from . import metrics_io
import warnings
import glob
import re
import copy
from hera_filters import dspec
from scipy.ndimage import convolve
from copy import deepcopy
uvcal_read_method = "read" if hasattr(UVCal(), "read") else "read_calfits"
#############################################################################
# Utility functions
#############################################################################
def flag_xants(uv, xants, inplace=True, run_check=True,
check_extra=True, run_check_acceptability=True):
"""Flag visibilities containing specified antennas.
Parameters
----------
uv : UVData or UVCal or UVFlag
Object containing data to be flagged. Should be a UVData, UVCal, or
UVFlag object.
xants : list of ints
List of antenna numbers to completely flag.
inplace : bool, optional
If True, apply flags to the uv object. If False, return a UVFlag object
with only xants flaged. Default is True.
run_check : bool
Option to check for the existence and proper shapes of parameters
on UVFlag Object.
check_extra : bool
Option to check optional parameters as well as required ones.
run_check_acceptability : bool
Option to check acceptable range of the values of parameters
on UVFlag Object.
Returns
-------
uvo : UVData or UVCal or UVFlag
If inplace is True, uvo is a reference to the input uv object, but with
the flags specified in xants flagged. If inplace is False, uvo is a new
UVFlag object with only xants flaged.
Raises
------
ValueError:
If uv is not a UVData, UVCal, UVFlag, or subclassed object, a ValueError
is raised. If a UVFlag of a "waterfall" type is passed in, a ValueError
is also raised. If a UVFlag object that is not in "flag" mode is passed
in, a ValueError is raised.
"""
# check that we got an appropriate object
if not issubclass(uv.__class__, (UVData, UVCal, UVFlag)):
raise ValueError('First argument to flag_xants must be a UVData, UVCal, '
' or UVFlag object.')
if isinstance(uv, UVFlag) and uv.type == 'waterfall':
raise ValueError('Cannot flag antennas on UVFlag obejct of type "waterfall".')
if not inplace:
if isinstance(uv, UVFlag):
uvo = uv.copy()
uvo.to_flag(run_check=run_check, check_extra=check_extra,
run_check_acceptability=run_check_acceptability)
else:
with warnings.catch_warnings():
warnings.filterwarnings("ignore", "Future array shapes are now always used")
uvo = UVFlag(uv, mode='flag', use_future_array_shapes=True)
else:
uvo = uv
if isinstance(uvo, UVFlag) and uvo.mode != 'flag':
raise ValueError('Cannot flag antennas on UVFlag obejct in mode ' + uvo.mode)
if not isinstance(xants, Iterable):
xants = [xants]
if issubclass(uvo.__class__, UVData) or (isinstance(uvo, UVFlag) and uvo.type == 'baseline'):
all_ants = np.unique(np.append(uvo.ant_1_array, uvo.ant_2_array))
for ant in all_ants:
for xant in xants:
blts = uvo.antpair2ind(ant, xant)
uvo.flag_array[blts] = True
blts = uvo.antpair2ind(xant, ant)
uvo.flag_array[blts] = True
elif issubclass(uvo.__class__, UVCal) or (isinstance(uvo, UVFlag) and uvo.type == 'antenna'):
for xant in xants:
ai = np.where(uvo.ant_array == xant)[0]
uvo.flag_array[ai] = True
if not inplace:
return uvo
def resolve_xrfi_path(xrfi_path, fname, jd_subdir=False):
"""Determine xrfi_path based on given directory or default to dirname of given file.
Parameters
----------
xrfi_path : str
Directory to which to write xrfi outputs.
fname : str
Filename to determine backup directory if xrfi_path == ''.
jd_subdir : bool, optional
Whether to append the filename directory with a subdirectory with
{JD}_xrfi (when xrfi_path is ''). Default is False.
This option assumes the standard HERA naming scheme: zen.{JD}.{JD_decimal}.HH.uvh5
Returns
-------
dirname : str
If xrfi_path is not '', dirname is xrfi_path. Otherwise it returns the
directory of the file.
"""
if (xrfi_path != '') and (os.path.exists(xrfi_path)):
dirname = xrfi_path
else:
dirname = os.path.dirname(os.path.abspath(fname))
if jd_subdir:
# Get JD string
xrfi_subfolder = '.'.join(os.path.basename(fname).split('.')[0:3]) + '.xrfi'
dirname = os.path.join(dirname, xrfi_subfolder)
if not os.path.exists(dirname):
os.makedirs(dirname)
return dirname
def _check_convolve_dims(data, K1=None, K2=None):
"""Check the kernel sizes to be used in various convolution-like operations.
If the kernel sizes are too big, replace them with the largest allowable size
and issue a warning to the user.
Parameters
----------
data : array
1- or 2-D array that will undergo convolution-like operations.
K1 : int, optional
Integer representing box dimension in first dimension to apply statistic.
Defaults to None (see Returns)
K2 : int, optional
Integer representing box dimension in second dimension to apply statistic.
Only used if data is two dimensional
Returns
-------
K1 : int
Input K1 or data.shape[0] if K1 is larger than first dim of arr.
If K1 is not provided, will return data.shape[0].
K2 : int (only if data is two dimensional)
Input K2 or data.shape[1] if K2 is larger than second dim of arr.
If data is 2D but K2 is not provided, will return data.shape[1].
Raises
------
ValueError:
If the number of dimensions of the arr array is not 1 or 2, a ValueError is raised;
If K1 < 1, or if data is 2D and K2 < 1.
"""
if data.ndim not in (1, 2):
raise ValueError('Input to filter must be 1- or 2-D array.')
if K1 is None:
warnings.warn("No K1 input provided. Using the size of the data for the "
"kernel size.")
K1 = data.shape[0]
elif data.shape[0] < K1:
warnings.warn("K1 value {:d} is larger than the data of dimension {:d}; "
"using the size of the data for the kernel size".format(K1, data.shape[0]))
K1 = data.shape[0]
elif K1 < 1:
raise ValueError('K1 must be greater than or equal to 1.')
if (data.ndim == 2) and (K2 is None):
warnings.warn("No K2 input provided. Using the size of the data for the "
"kernel size.")
K2 = data.shape[1]
elif (data.ndim == 2) and (data.shape[1] < K2):
warnings.warn("K2 value {:d} is larger than the data of dimension {:d}; "
"using the size of the data for the kernel size".format(K2, data.shape[1]))
K2 = data.shape[1]
elif (data.ndim == 2) and (K2 < 1):
raise ValueError('K2 must be greater than or equal to 1.')
if data.ndim == 1:
return K1
else:
return K1, K2
def robust_divide(num, den):
"""Prevent division by zero.
This function will compute division between two array-like objects by setting
values to infinity when the denominator is small for the given data type. This
avoids floating point exception warnings that may hide genuine problems
in the data.
Parameters
----------
num : array
The numerator.
den : array
The denominator.
Returns
-------
out : array
The result of dividing num / den. Elements where b is small (or zero) are set
to infinity.
"""
thresh = np.finfo(den.dtype).eps
out = np.true_divide(num, den, where=(np.abs(den) > thresh))
out = np.where(np.abs(den) > thresh, out, np.inf)
return out
# ############################################################################
# Functions for flagging auto-correlations
# ############################################################################
def dpss_flagger(data, noise, freqs, filter_centers, filter_half_widths, flags=None,
nsig=6, mode="dpss_solve", eigenval_cutoff=[1e-9],
suppression_factors=[1e-9], cache=None):
"""
Identify RFI in visibilities by filtering data with discrete prolate spheroidal sequences. Returns a boolean array of flags
with values of True indicating channels flagged for RFI
Parameters:
----------
data: np.ndarray
2D data array of the shape (time, frequency)
noise: np.ndarray, default=None
2D array for containing an estimate of the noise standard deviation of the data.
Must be the same shape as the data.
freqs: np.ndarray
1D array of frequencies present in the data in units of Hz
filter_centers: array-like
list of floats of centers of delay filter windows in nanosec
filter_half_widths: array-like
list of floats of half-widths of delay filter windows in nanosec
flags: np.ndarray
2D array of boolean flags to be interpretted as mask for data.
Must be the same shape as data.
nsig: float, default=6
The number of sigma in the metric above which to flag pixels.
mode: str, default='dpss_solve'
Method used to solve for DPSS model components. Options are 'dpss_matrix', 'dpss_solve', and 'dpss_leastsq'.
eigenval_cutoff: array-like, default=[1e-9]
List of sinc_matrix eigenvalue cutoffs to use for included DPSS modes.
suppression_factors: array-like, default=[1e-9]
Specifies the fractional residuals of model to leave in the data. For example, 1e-6 means that the filter
will leave in 1e-6 of data fitted by the model.
cache: dictionary, default=None
Dictionary for caching fitting matrices. By default this value is None to prevent the size of the cached
matrices from getting too large. By passing in a cache dictionary, this function could be much faster, but
the memory requirement will also increase.
Returns:
-------
flags: np.ndarray
Array of boolean flags that has the same shape as the data, where values of True
indicate flagged channels
"""
if len(suppression_factors) == 1 and len(filter_centers) > 1:
suppression_factors = len(filter_centers) * suppression_factors
if len(eigenval_cutoff) == 1 and len(filter_centers) > 1:
eigenval_cutoff = len(filter_centers) * eigenval_cutoff
if flags is None:
wgts = np.ones_like(data)
elif flags is not None and flags.dtype != bool:
raise TypeError('Input flag array must be type bool')
else:
wgts = np.array(np.logical_not(flags), dtype=np.float64)
# Compute model and residuals
model, _, _ = dspec.fourier_filter(
freqs, data, wgts, filter_centers, filter_half_widths, mode=mode,
suppression_factors=suppression_factors, eigenval_cutoff=eigenval_cutoff, cache=cache,
)
res = data - model
# Use smooth model to noise standard deviation without RFI
sigma = np.abs(model) * (noise / data)
# Determine weights
return np.where(res > sigma * nsig, True, False)
def channel_diff_flagger(data, noise, nsig=6, kernel_widths=[3, 4, 5], flags=None):
"""
Identify RFI in data using channel differencing kernels. Returns a boolean array of flags
with values of True indicating channels flagged for RFI
Parameters:
----------
data: np.ndarray
2D data array of the shape (time, frequency)
noise: np.ndarray
2D array for containing an estimate of the noise standard deviation. Must be the same shape as the data
nsig: float, default=6
The number of sigma in the metric above which to flag pixels.
kernel_widths: list, default=[3, 4, 5]
Half-width of the convolution kernels used to produce model. True kernel width is (2 * kernel_width + 1)
flags: np.ndarray, default=None
2D array of boolean flags to be interpretted as mask for data.
Must be the same shape as data.
Returns:
-------
flags: np.ndarray
Array of boolean flags that has the same shape as the data, where values of True
indicate flagged channels
"""
if flags is None:
wgts = np.ones_like(data)
elif flags is not None and flags.dtype != bool:
raise TypeError('Input flag array must be type bool')
else:
wgts = np.array(np.logical_not(flags), dtype=np.float64)
# Iterate through kernel widths
for kw in kernel_widths:
# Build convolution kernel
width = 2 * kw + 1
kernel = np.ones((1, width))
kernel[0, width // 2] = 0
# Convolve kernel with data and weights
_data = convolve(data * wgts, kernel)
_wgts = convolve(wgts, kernel)
# Calculate smooth model
model = robust_divide(_data, _wgts)
# Estimate noise level in absence of RFI
sigma = np.abs(model) * (noise / data)
res = data - model
# Identify outlier channels
wgts = np.where(res > sigma * nsig, 0., 1.)
return np.isclose(wgts, 0)
auto_flaggers = {'dpss_flagger': dpss_flagger, 'channel_diff_flagger': channel_diff_flagger}
def flag_autos(data, flag_method="channel_diff_flagger", nsig=6, int_count=None, antenna_class=None, flags={},
flag_broadcast_thresh=0.1, **flag_kwargs):
"""
Driver function for identifying RFI in auto-correlations. Flags auto-correlations on a per antenna basis using
a given function and consolidates the weights into a single array-wide weights array. Returns a dictionary of
boolean flag arrays and an array averaged flag array where channels flagged in "flag_threshold" fraction of antennas
will be flagged in the array averaged flag array.
Parameters:
----------
data: DataContainer
DataContainer containing numpy arrays of auto-correlation data with shape (ntimes, nfreqs)
flag_method: str, default='channel_diff_flagger'
Method used for flagging auto-correlations. Current options are 'dpss_flagger' and 'channel_diff_flagger'
nsig: float, default=6
The number of sigma in the metric above which to flag pixels.
int_count: float, default=None
Number of samples per integration in correlator (dt * dnu)
antenna_class: AntennaClassification, default=None
Optional AntennaClassification object. If provided, the flagging method chosen will skip antennas marked "bad"
flags: dictionary or DataContainer: default={}
Dictionary or DataContainer containing boolean flag arrays. If flags=None, then no data will be masked
flag_broadcast_thresh: float, default=0.1
The fraction of flags required to trigger a broadcast across all auto-correlations for
a given (time, frequency) pixel in the combined flag array.
flag_kwargs: dict
Additional keyword arguments for the function chosen
Returns:
-------
antenna_flags: dictionary
Dictionary containing boolean flag arrays for each antenna with values of either False or True with
True identifying channels found to have RFI
array_flags: np.ndarray
Contains boolean array of flags for the entire array averaged over all antennas. If antenna_class is provided,
only antennas labeled 'good' or 'suspect' are included in the average
"""
from hera_cal.utils import split_bl
# Check to see if flagging method is valid
if flag_method not in auto_flaggers:
raise ValueError("Flagging method must be a valid flagging function.")
alg_func = auto_flaggers[flag_method]
# Copy flags
flags = deepcopy(flags)
antenna_flags = {}
# Infer int_count from data's metadata if not provided
if int_count is None:
int_time = 24 * 3600 * np.median(np.diff(data.times))
chan_res = np.median(np.diff(data.freqs))
int_count = int(int_time * chan_res)
array_flags = []
# Iterate through data
for k, v in data.items():
ant1, ant2 = split_bl(k)
# Only flag auto-correlations
if ant1 == ant2:
# If antenna is labeled "bad", skip and set weights to zeros
if antenna_class is not None and antenna_class.is_bad(ant1):
antenna_flags[ant1] = np.ones_like(v, dtype=bool)
continue
# Estimate the noise standard deviation from data
noise = np.abs(v) / np.sqrt(int_count / 2)
# Run flagging algorithm
antenna_flags[ant1] = alg_func(
v, noise=noise, nsig=nsig, flags=flags.get(ant1, np.zeros_like(v, dtype=bool)), **flag_kwargs
)
array_flags.append(antenna_flags[ant1])
# Combine into array-wide weights
array_flags = np.mean(array_flags, axis=0)
array_flags = np.where(array_flags < flag_broadcast_thresh, False, True)
return antenna_flags, array_flags
#############################################################################
# Functions for preprocessing data prior to RFI flagging
#############################################################################
def medmin(data, flags=None):
"""Calculate the median minus minimum statistic of array.
Note
----
The statistic first computes the minimum value of the array along the
first axis (the time axis, if the array is passed in as (time, frequency,
so that a single spectrum is returned). The median of these values is
computed, multiplied by 2, and then the minimum value is subtracted off.
The goal is to get a proxy for the "noise" in the 2d array.
Parameters
----------
data : array
2D data array of the shape (time,frequency).
flags : array, optional
2D flag array to be interpretted as mask for d. NOT USED in this function,
but kept for symmetry with other preprocessing functions.
Returns
-------
medmin : array
The result of the medmin statistic.
"""
_ = _check_convolve_dims(data, 1, 1) # Just check data dims
mn = np.min(data, axis=0)
return 2 * np.median(mn) - np.min(mn)
def medminfilt(data, flags=None, Kt=8, Kf=8):
"""Filter an array on scales of Kt,Kf indexes with medmin.
Parameters
----------
data : array
2D data array of the shape (time, frequency).
flags : array, optional
2D flag array to be interpretted as mask for d. NOT USED in this function,
but kept for symmetry with other preprocessing functions.
Kt : int, optional
An integer representing box dimension in time to apply statistic. Default
is 8 pixels.
Kf : int, optional
An integer representing box dimension in frequency to apply statistic.
Default is 8 pixels.
Returns
-------
d_sm : array
The filtered array with the same shape as input array.
"""
Kt, Kf = _check_convolve_dims(data, Kt, Kf)
d_sm = np.empty_like(data)
for ind1 in range(data.shape[0]):
for ind2 in range(data.shape[1]):
i0, j0 = max(0, ind1 - Kt), max(0, ind2 - Kf)
i1, j1 = min(data.shape[0], ind1 + Kt), min(data.shape[1], ind2 + Kf)
d_sm[ind1, ind2] = medmin(data[i0:i1, j0:j1])
return d_sm
def detrend_deriv(data, flags=None, dt=True, df=True):
"""Detrend array by taking the derivative in either time, frequency, or both.
Note
----
When taking the derivative of both, the derivative in frequency is performed
first, then in time.
Parameters
----------
data : array
2D data array of the shape (time,frequency).
flags : array, optional
2D flag array to be interpretted as mask for d. NOT USED in this function,
but kept for symmetry with other preprocessing functions.
dt : bool, optional
The derivative across time bins. Default is True.
df : bool, optional
The derivative across frequency bins. Default is True.
Returns
-------
out : array
A detrended array with same shape as input array.
"""
_ = _check_convolve_dims(data, 1, 1) # Just check data dims
if not (dt or df):
raise ValueError("dt and df cannot both be False when calling detrend_deriv")
if df:
# take gradient along frequency
d_df = np.gradient(data, axis=1)
else:
d_df = data
if dt:
# take gradient along time
d_dtdf = np.gradient(d_df, axis=0)
else:
d_dtdf = d_df
d2 = np.abs(d_dtdf)**2
# model sig as separable function of 2 axes
sig_f = np.median(d2, axis=0)
sig_f.shape = (1, -1)
sig_t = np.median(d2, axis=1)
sig_t.shape = (-1, 1)
sig = np.sqrt(sig_f * sig_t / np.median(sig_t))
# don't divide by zero, instead turn those entries into +inf
out = robust_divide(d_dtdf, sig)
return out
def detrend_medminfilt(data, flags=None, Kt=8, Kf=8):
"""Detrend array using medminfilt statistic. See medminfilt.
Parameters
----------
data : array
2D data array of the shape (time, frequency) to detrend.
flags : array, optional
2D flag array to be interpretted as mask for d. NOT USED in this function,
but kept for symmetry with other preprocessing functions.
Kt : int, optional
An integer representing box dimension in time to apply statistic. Default
is 8 pixels.
Kf : int, optional
An integer representing box dimension in frequency to apply statistic.
Default is 8 pixels.
Returns
-------
out : array
An array of outlier significance metric.
"""
_ = _check_convolve_dims(data, 1, 1) # Just check data dimensions
d_sm = medminfilt(np.abs(data), Kt=(2 * Kt + 1), Kf=(2 * Kf + 1))
d_rs = data - d_sm
d_sq = np.abs(d_rs)**2
# puts minmed on same scale as average
sig = np.sqrt(medminfilt(d_sq, Kt=(2 * Kt + 1), Kf=(2 * Kf + 1))) * (np.sqrt(Kt**2 + Kf**2) / .64)
# don't divide by zero, instead turn those entries into +inf
out = robust_divide(d_rs, sig)
return out
def detrend_medfilt(data, flags=None, Kt=8, Kf=8):
"""Detrend array using a median filter of surrounding pixels (but not the center one).
Parameters
----------
data : array
2D data array to detrend.
flags : array, optional
2D flag array to be interpretted as mask for d. NOT USED in this function,
but kept for symmetry with other preprocessing functions.
Kt : int, optional
The box size in time (first) dimension to apply medfilt over. Default is
8 pixels.
Kf : int, optional
The box size in frequency (second) dimension to apply medfilt over. Default
is 8 pixels.
Returns
-------
out : array
An array containing the outlier significance metric. Same type and size as d.
"""
# Delay import so scipy is not required for any use of hera_qm
from scipy.ndimage import median_filter
Kt, Kf = _check_convolve_dims(data, Kt, Kf)
footprint = np.ones((2 * Kt + 1, 2 * Kf + 1))
footprint[Kt, Kf] = 0
if np.iscomplexobj(data):
d_sm_r = median_filter(data.real, footprint=footprint, mode='reflect')
d_sm_i = median_filter(data.imag, footprint=footprint, mode='reflect')
d_sm = d_sm_r + 1j * d_sm_i
else:
d_sm = median_filter(data, footprint=footprint, mode='reflect')
d_rs = data - d_sm
d_sq = np.abs(d_rs)**2
# Factor of .456 is to put mod-z scores on same scale as standard deviation.
sig = np.sqrt(median_filter(d_sq, footprint=footprint, mode='reflect') / .456)
# don't divide by zero, instead turn those entries into +inf
out = robust_divide(d_rs, sig)
return out
def detrend_meanfilt(data, flags=None, Kt=8, Kf=8):
"""Detrend array using a mean filter.
Parameters
----------
data : array
2D data array to detrend.
flags : array, optional
2D flag array to be interpretted as mask for d.
Kt : int, optional
The box size in time (first) dimension to apply medfilt over. Default is
8 pixels.
Kf : int, optional
The box size in frequency (second) dimension to apply medfilt over.
Default is 8 pixels.
Returns
-------
out : array
An array containing the outlier significance metric. Same type and size as d.
"""
# Delay import so astropy is not required for any use of hera_qm
# Using astropy instead of scipy for treatement of Nan: http://docs.astropy.org/en/stable/convolution/
from astropy.convolution import convolve
Kt, Kf = _check_convolve_dims(data, Kt, Kf)
kernel = np.ones((2 * Kt + 1, 2 * Kf + 1))
# do a mirror extend, like in scipy's convolve, which astropy doesn't support
data = np.concatenate([data[Kt - 1::-1], data, data[:-Kt - 1:-1]], axis=0)
data = np.concatenate([data[:, Kf - 1::-1], data, data[:, :-Kf - 1:-1]], axis=1)
if flags is not None:
flags = np.concatenate([flags[Kt - 1::-1], flags, flags[:-Kt - 1:-1]], axis=0)
flags = np.concatenate([flags[:, Kf - 1::-1], flags, flags[:, :-Kf - 1:-1]], axis=1)
d_sm = convolve(data, kernel, mask=flags, boundary='extend')
d_rs = data - d_sm
d_sq = np.abs(d_rs)**2
sig = np.sqrt(convolve(d_sq, kernel, mask=flags))
# don't divide by zero, instead turn those entries into +inf
out = robust_divide(d_rs, sig)
return out[Kt:-Kt, Kf:-Kf]
def zscore_full_array(data, flags=None, modified=False):
"""Calculate the z-score for full array, rather than a defined kernel size.
This is a special case of
detrend_medfilt/detrend_meanfilt, but is a separate function so it only
takes the median/mean once for efficiency. It also doesn't introduce edge
effects that would be very drastic if one were to call detrend_medfilt with
large kernel size.
Parameters
----------
data : array
2D data array to process.
flags : array, optional
2D flag array to be interpretted as mask for d. ONLY used for the regular
zscore (not modified).
modified : bool, optional
Whether to calculate the modified z-scores. Default is False.
Returns
-------
out : array
An array containing the outlier significance metric. Same type and size as d.
"""
data = np.array(data) # makes a copy of the data
if flags is not None:
data[flags] = np.nan
if modified:
if np.any(np.iscomplex(data)):
med_r = np.nanmedian(data).real
med_i = np.nanmedian(data).imag
mad_r = np.nanmedian(np.abs(data.real - med_r))
mad_i = np.nanmedian(np.abs(data.imag - med_i))
mad = np.sqrt(mad_r**2 + mad_i**2)
d_rs = data - med_r - 1j * med_i
else:
med = np.nanmedian(data)
mad = np.nanmedian(np.abs(data - med))
d_rs = data - med
# don't divide by zero, instead turn those entries into +inf
out = robust_divide(d_rs, np.array([1.486 * mad]))
else:
d_rs = data - np.nanmean(data)
out = robust_divide(d_rs, np.array([np.nanstd(data)]))
out[np.isnan(out)] = np.inf # turn all nans into infs
return out
def modzscore_1d(data, flags=None, kern=8, detrend=True):
"""Calculate modified zscores in 1d.
Parameters
----------
data : array
1D data array to detrend.
flags : array, optional
1D flag array to be interpretted as mask for d. NOT USED in this function,
but kept for symmetry with other preprocessing functions.
kern : int, optional
The box size to apply medfilt over. Default is 8 pixels. Center pixel excluded.
detrend : bool, optional
Whether to detrend the data before calculating zscores. Default is True.
Setting to False is equivalent to an infinite kernel, but the function
does it more efficiently.
Returns
-------
zscore : array
An array containing the outlier significance metric. Same type and size as data.
"""
if detrend:
# Delay import so scipy is not required for use of hera_qm
from scipy.ndimage import median_filter
footprint = np.ones(2 * kern + 1)
footprint[kern] = 0
data = np.concatenate([data[kern - 1::-1], data, data[:-kern - 1:-1]])
# detrend in 1D. Do real/imag regardless of whether data are complex because it's cheap.
d_sm = median_filter(data.real, footprint=footprint)
if np.iscomplexobj(data):
d_sm_i = median_filter(data.imag, footprint=footprint)
d_sm = d_sm + 1j * d_sm_i
d_rs = data - d_sm
d_sq = np.abs(d_rs)**2
# Factor of .456 is to put mod-z scores on same scale as standard deviation.
sig = np.sqrt(median_filter(d_sq, footprint=footprint) / .456)
zscore = robust_divide(d_rs, sig)[kern:-kern]
else:
if np.iscomplexobj(data):
d_rs = data - np.nanmedian(data.real) - 1j * np.nanmedian(data.imag)
else:
d_rs = data - np.nanmedian(data)
d_sq = np.abs(d_rs)**2
# Factor of .456 is to put mod-z scores on same scale as standard deviation.
sig = np.sqrt(np.nanmedian(d_sq) / .456)
zscore = robust_divide(d_rs, np.array([sig]))
return zscore.astype(data.dtype)
# Update algorithm_dict whenever new metric algorithm is created.
algorithm_dict = {'medmin': medmin, 'medminfilt': medminfilt, 'detrend_deriv': detrend_deriv,
'detrend_medminfilt': detrend_medminfilt, 'detrend_medfilt': detrend_medfilt,
'detrend_meanfilt': detrend_meanfilt, 'zscore_full_array': zscore_full_array,
'modzscore_1d': modzscore_1d}
#############################################################################
# RFI flagging algorithms
#############################################################################
def watershed_flag(uvf_m, uvf_f, nsig_p=2., nsig_f=None, nsig_t=None, avg_method='quadmean',
inplace=True, run_check=True, check_extra=True,
run_check_acceptability=True):
"""Expand a set of flags using a watershed algorithm.
This function uses a UVFlag object in 'metric' mode (i.e. how many sigma the data
point is from the center) and a set of flags to grow the flags using defined
thresholds.
Parameters
----------
uvf_m : UVFlag object
A UVFlag object in 'metric' mode.
uvf_f : UVFlag object
A UVFlag object in 'flag' mode.
nsig_p : float, optional
The Number of sigma above which to flag pixels which are near previously
flagged pixels. Default is 2.0.
nsig_f : float, optional
The Number of sigma above which to flag channels which are near fully
flagged frequency channels. Bypassed if None (Default).
nsig_t : float, optional
Number of sigma above which to flag integrations which are near fully
flagged integrations. Bypassed if None (Default)
avg_method : {"mean", "absmean", "quadmean"}, optional
Method to average metric data for frequency and time watershedding.
Default is "quadmean".
inplace : bool, optional
If True, update uvf_f. If False, create a new flag object. Default is True.
run_check : bool
Option to check for the existence and proper shapes of parameters
on UVFlag Object.
check_extra : bool
Option to check optional parameters as well as required ones.
run_check_acceptability : bool
Option to check acceptable range of the values of parameters
on UVFlag Object.
Returns
-------
uvf : UVFlag object
A UVFlag object in 'flag' mode with flags after watershed.
Raises
------
ValueError:
If uvf_m is not in "metric" mode, if uvf_f is not in "flag" mode, if
uvf_m and uvf_f do not have the same shape, or if uvf_m has an unknown
type, then a ValueError is raised.
"""
# Check inputs
if (not isinstance(uvf_m, UVFlag)) or (uvf_m.mode != 'metric'):
raise ValueError('uvf_m must be UVFlag instance with mode == "metric."')
if (not isinstance(uvf_f, UVFlag)) or (uvf_f.mode != 'flag'):
raise ValueError('uvf_f must be UVFlag instance with mode == "flag."')
if uvf_m.metric_array.shape != uvf_f.flag_array.shape:
raise ValueError('uvf_m and uvf_f must have data of same shape. Shapes '
'are: ' + str(uvf_m.metric_array.shape) + ' and '
+ str(uvf_f.flag_array.shape))
# Handle in place
uvf = uvf_f if inplace else uvf_f.copy()
# Convenience
farr = uvf.flag_array
marr = uvf_m.metric_array
warr = uvf_m.weights_array
if uvf_m.type == 'baseline':
# Pixel watershed
# TODO: bypass pixel-based if none
for bl in np.unique(uvf.baseline_array):
ind = np.where(uvf.baseline_array == bl)[0]
for pi in range(uvf.polarization_array.size):
farr[ind, :, pi] += _ws_flag_waterfall(marr[ind, :, pi],
farr[ind, :, pi], nsig_p)
if nsig_f is not None:
# Channel watershed
tempd = uvutils.collapse(marr, avg_method, axis=(0, 2), weights=warr)
tempf = np.all(farr, axis=(0, 2))
farr[:, :, :] += _ws_flag_waterfall(tempd, tempf, nsig_f).reshape(1, -1, 1)
if nsig_t is not None:
# Time watershed
ts = np.unique(uvf.time_array)
tempd = np.zeros(ts.size)
tempf = np.zeros(ts.size, dtype=np.bool_)
for ti, time in enumerate(ts):
tempd[ti] = uvutils.collapse(marr[uvf.time_array == time, :, :], avg_method,
weights=warr[uvf.time_array == time, :, :])
tempf[ti] = np.all(farr[uvf.time_array == time, :, :])
tempf = _ws_flag_waterfall(tempd, tempf, nsig_t)
for ti, time in enumerate(ts):
farr[uvf.time_array == time, :, :] += tempf[ti]
elif uvf_m.type == 'antenna':
# Pixel watershed
for ai in range(uvf.ant_array.size):
for pi in range(uvf.polarization_array.size):
farr[ai, :, :, pi] += _ws_flag_waterfall(marr[ai, :, :, pi].T,
farr[ai, :, :, pi].T, nsig_p).T
if nsig_f is not None:
# Channel watershed
tempd = uvutils.collapse(marr, avg_method, axis=(0, 2, 3), weights=warr)
tempf = np.all(farr, axis=(0, 2, 3))
farr[:, :, :, :] += _ws_flag_waterfall(tempd, tempf, nsig_f).reshape(1, -1, 1, 1)
if nsig_t is not None:
# Time watershed
tempd = uvutils.collapse(marr, avg_method, axis=(0, 1, 3), weights=warr)
tempf = np.all(farr, axis=(0, 1, 3))
farr[:, :, :, :] += _ws_flag_waterfall(tempd, tempf, nsig_t).reshape(1, 1, -1, 1)
elif uvf_m.type == 'waterfall':
# Pixel watershed
for pi in range(uvf.polarization_array.size):
farr[:, :, pi] += _ws_flag_waterfall(marr[:, :, pi], farr[:, :, pi], nsig_p)
if nsig_f is not None:
# Channel watershed
tempd = uvutils.collapse(marr, avg_method, axis=(0, 2), weights=warr)
tempf = np.all(farr, axis=(0, 2))
farr[:, :, :] += _ws_flag_waterfall(tempd, tempf, nsig_f).reshape(1, -1, 1)
if nsig_t is not None:
# Time watershed
tempd = uvutils.collapse(marr, avg_method, axis=(1, 2), weights=warr)
tempf = np.all(farr, axis=(1, 2))
farr[:, :, :] += _ws_flag_waterfall(tempd, tempf, nsig_t).reshape(-1, 1, 1)
else:
raise ValueError('Unknown UVFlag type: ' + uvf_m.type)
if run_check:
uvf.check(check_extra=check_extra,
run_check_acceptability=run_check_acceptability)
return uvf
def _ws_flag_waterfall(metric, fin, nsig=2.):
"""Perform the watershed algorithm on 1D or 2D arrays of metric and input flags.
This is a helper function for watershed_flag, but not usually called
by end users.
Parameters
----------
metric : array
A 2D or 1D array. Should be in units of standard deviations.
fin : array
The input (boolean) flags used as the seed of the watershed. Same size as metric.
nsig : float, optional
The number of sigma to flag above for points near flagged points. Default is 2.
Returns
-------
fout : array
A boolean array matching size of metric and fin, with watershedded flags.
Raises
------
ValueError:
If the shapes of metric and fin do not match, or if the number of dimensions is not
equal to 1 or 2, a ValueError is raised.
"""
# Delay import so scipy is not required for use of hera_qm
from scipy.signal import convolve
if metric.shape != fin.shape:
raise ValueError('metric and fin must match in shape. Shapes are: ' + str(metric.shape)
+ ' and ' + str(fin.shape))
fout = fin.copy()
while True:
nflags = np.sum(fout)
try:
kernel = {1: [1, 0, 1], 2: [[0, 1, 0], [1, 0, 1], [0, 1, 0]]}[metric.ndim]
except KeyError:
raise ValueError('Data must be 1D or 2D.')
is_neighbor_flagged = convolve(fout, kernel, mode='same').astype(bool)
fout |= (is_neighbor_flagged & (metric >= nsig))
if np.sum(fout) == nflags:
break
return fout
def xrfi_waterfall(data, flags=None, Kt=8, Kf=8, nsig_init=6., nsig_adj=2.,
algorithm='detrend_medfilt'):
"""Compute metrics, flag, and then watershed on a single waterfall.
Parameters
----------
data : array
2D data array (Ntimes, Nfreqs) to use in flagging.
flags : array, optional
2D flag array to be interpretted as mask for data. Ignored for many algorithms
for calculating metrics of "outlierness" (e.g. detrend_medfilt) but always
ORed with the intial flags from the metrics before the watershed is applied.
Kt : int, optional
The box size in time (first) dimension to apply medfilt over. Default is
8 pixels.
Kf : int, optional
The box size in frequency (second) dimension to apply medfilt over. Default
is 8 pixels.
nsig_init : float, optional
The number of sigma in the metric above which to flag pixels. Default is 6.
nsig_adj : float, optional
The number of sigma to flag above for points near flagged points. Default is 2.
algorithm : str
The metric algorithm name. Must be defined in algorithm_dict.