Skip to content

Commit

Permalink
Merge pull request #217 from HERA-Team/refactoring_exact_norm
Browse files Browse the repository at this point in the history
Refactoring exact norm
  • Loading branch information
philbull authored Aug 9, 2019
2 parents 7708f92 + 69c0d78 commit 2c248d1
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 45 deletions.
117 changes: 78 additions & 39 deletions hera_pspec/pspecdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -1100,10 +1100,12 @@ def q_hat(self, key1, key2, allow_fft=False, exact_norm = False, pol=False):
q = []
del_tau = np.median(np.diff(self.delays()))*1e-9 #Get del_eta in Eq.11(a) (HERA memo #44) (seconds)
Q_matrix_all_delays = np.zeros((self.spw_Ndlys,self.spw_Nfreqs,self.spw_Nfreqs), dtype='complex128')
integral_beam = self.get_integral_beam(pol) # This result does not depend on delay modes. We can remove it from the for loop to avoid its repeated computation

for i in range(self.spw_Ndlys):
# Ideally, del_tau should be part of get_Q. We use it here to
# avoid its repeated computation
Q = del_tau * self.get_Q(i, pol)
Q = del_tau * self.get_Q(i, pol) * integral_beam
Q_matrix_all_delays[i] = Q
QRx2 = np.dot(Q, Rx2)

Expand Down Expand Up @@ -1604,52 +1606,80 @@ def get_Q_alt(self, mode, allow_fft=True):

Q_alt = np.einsum('i,j', m.conj(), m) # dot it with its conjugate
return Q_alt

def get_integral_beam(self, pol=False):
"""
Computes the integral containing the spectral beam and tapering
function in Q_alpha(i,j).
Parameters
----------
pol : str/int/bool, optional
Which beam polarization to use. If the specified polarization
doesn't exist, a uniform isotropic beam (with integral 4pi for all
frequencies) is assumed. Default: False (uniform beam).
Return
-------
integral_beam : array_like
integral containing the spectral beam and tapering.
"""
nu = self.freqs[self.spw_range[0]:self.spw_range[1]] # in Hz

try:
# Get beam response in (frequency, pixel), beam area(freq) and
# Nside, used in computing dtheta
beam_res, beam_omega, N = \
self.primary_beam.beam_normalized_response(pol, nu)
prod = 1. / beam_omega
beam_prod = beam_res * prod[:, np.newaxis]

# beam_prod has omega subsumed, but taper is still part of R matrix
# The nside term is dtheta^2, where dtheta is the resolution in
# healpix map
integral_beam = np.pi/(3.*N*N) * np.dot(beam_prod, beam_prod.T)

except(AttributeError):
warnings.warn("The beam response could not be calculated. "
"PS will not be normalized!")
integral_beam = np.ones((len(nu), len(nu)))

return integral_beam


def get_Q(self, mode, pol=False):
'''
Computes Q_alpha(i,j), which is the response of the data covariance to the bandpower (dC/dP_alpha).
This includes contributions from primary beam.
"""
Computes Q_alpha(i,j), which is the response of the data covariance to
the bandpower (dC/dP_alpha). This includes contributions from primary
beam.
Parameters
----------
mode : int
Central wavenumber (index) of the bandpower, p_alpha.
pol : str/int/bool, optional
Which beam polarization to use. If the specified polarization doesn't exist,
a uniform isotropic beam (with integral 4pi for all frequencies) is assumed.
Default: False (uniform beam).
Which beam polarization to use. If the specified polarization
doesn't exist, a uniform isotropic beam (with integral 4pi for all
frequencies) is assumed. Default: False (uniform beam).
Return
-------
Q : array_like
Response matrix for bandpower p_alpha.
'''

Q_alt : array_like
Exponential part of Q (HERA memo #44, Eq. 11).
"""
if self.spw_Ndlys == None:
self.set_Ndlys()
if mode >= self.spw_Ndlys:
raise IndexError("Cannot compute Q matrix for a mode outside"
"of allowed range of delay modes.")
tau = self.delays()[int(mode)] * 1.0e-9 # delay in seconds
nu = self.freqs[self.spw_range[0]:self.spw_range[1]] # in Hz

try:
beam_res, beam_omega, N = self.primary_beam.beam_normalized_response(pol, nu)
#Get beam response in (frequency, pixel), beam area(freq) and Nside, used in computing dtheta.
prod = (1.0/beam_omega)
beam_prod = beam_res * prod[:, np.newaxis]
integral_beam = (np.pi/(3.0*(N)**2))* \
np.dot(beam_prod, beam_prod.T) #beam_prod has omega subsumed, but taper is still part of R matrix
# the nside terms is dtheta^2, where dtheta is the resolution in healpix map
except(AttributeError):
warnings.warn('The beam response could not be calculated. PS will not be normalized!')
integral_beam = np.ones((len(nu), len(nu)))

eta_int = np.exp(-2j * np.pi * tau * nu) #exponential part of the expression
Q_alt = np.einsum('i,j', eta_int.conj(), eta_int) # dot it with its conjugate
Q = Q_alt * integral_beam
return Q

eta_int = np.exp(-2j * np.pi * tau * nu) # exponential part
Q_alt = np.einsum('i,j', eta_int.conj(), eta_int) # dot with conjugate
return Q_alt

def p_hat(self, M, q):
"""
Expand All @@ -1673,7 +1703,8 @@ def p_hat(self, M, q):
def cov_p_hat(self, M, q_cov):
"""
Covariance estimate between two different band powers p_alpha and p_beta
given by M_{alpha i} M^*_{beta,j} C_q^{ij} where C_q^{ij} is the q-covariance
given by M_{alpha i} M^*_{beta,j} C_q^{ij} where C_q^{ij} is the
q-covariance.
Parameters
----------
Expand All @@ -1688,7 +1719,8 @@ def cov_p_hat(self, M, q_cov):
p_cov[tnum] = np.einsum('ab,cd,bd->ac', M, M, q_cov[tnum])
return p_cov

def broadcast_dset_flags(self, spw_ranges=None, time_thresh=0.2, unflag=False):
def broadcast_dset_flags(self, spw_ranges=None, time_thresh=0.2,
unflag=False):
"""
For each dataset in self.dset, update the flag_array such that
the flagging patterns are time-independent for each baseline given
Expand All @@ -1701,8 +1733,9 @@ def broadcast_dset_flags(self, spw_ranges=None, time_thresh=0.2, unflag=False):
Additionally, one can also unflag the flag_array entirely if desired.
Note: although technically allowed, this function may give unexpected results
if multiple spectral windows in spw_ranges have frequency overlap.
Note: although technically allowed, this function may give unexpected
results if multiple spectral windows in spw_ranges have frequency
overlap.
Note: it is generally not recommended to set time_thresh > 0.5, which
could lead to substantial amounts of data being flagged.
Expand All @@ -1712,11 +1745,12 @@ def broadcast_dset_flags(self, spw_ranges=None, time_thresh=0.2, unflag=False):
spw_ranges : list of tuples
list of len-2 spectral window tuples, specifying the start (inclusive)
and stop (exclusive) index of the frequency array for each spw.
Default is to use the whole band
Default is to use the whole band.
time_thresh : float
Fractional threshold of flagged pixels across time needed to flag all times
per freq channel. It is not recommend to set this greater than 0.5
Fractional threshold of flagged pixels across time needed to flag
all times per freq channel. It is not recommend to set this greater
than 0.5.
unflag : bool
If True, unflag all data in the spectral window.
Expand All @@ -1730,7 +1764,8 @@ def broadcast_dset_flags(self, spw_ranges=None, time_thresh=0.2, unflag=False):
# spw type check
if spw_ranges is None:
spw_ranges = [(0, self.Nfreqs)]
assert isinstance(spw_ranges, list), "spw_ranges must be fed as a list of tuples"
assert isinstance(spw_ranges, list), \
"spw_ranges must be fed as a list of tuples"

# iterate over datasets
for dset in self.dsets:
Expand All @@ -1740,7 +1775,7 @@ def broadcast_dset_flags(self, spw_ranges=None, time_thresh=0.2, unflag=False):
# unflag
if unflag:
# unflag for all times
dset.flag_array[:, :, self.spw_range[0]:self.spw_range[1], :] = False
dset.flag_array[:,:,self.spw_range[0]:self.spw_range[1],:] = False
continue
# enact time threshold on flag waterfalls
# iterate over polarizations
Expand Down Expand Up @@ -2123,8 +2158,12 @@ def pspec(self, bls1, bls2, dsets, pols, n_dlys=None,
exact_norm : bool, optional
If True, estimates power spectrum using Q instead of Q_alt
(HERA memo #44). The default options is False. Beware that
turning this True would take ~ 7 sec for computing
power spectrum for 100 channels per time sample per baseline.
turning this True would take ~ 0.2 sec for computing
power spectrum for 100 channels per time sample per baseline.
If False, computing a power spectrum for 100 channels would
take ~ 0.04 sec per time sample per baseline. This means
that computing a power spectrum when exact_norm is set to
False runs five times faster than setting it to True.
history : str, optional
history string to attach to UVPSpec object
Expand Down
35 changes: 29 additions & 6 deletions hera_pspec/tests/test_pspecdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,8 @@ def test_get_Q_alt(self):

# Check for error handling
nt.assert_raises(ValueError, self.ds.set_Ndlys, vect_length+100)



def test_get_Q(self):
"""
Test the Q = dC_ij/dp function.
Expand All @@ -361,9 +362,6 @@ def test_get_Q(self):
key2 = (1, 24, 38)
uvd = copy.deepcopy(self.uvd)
ds_t = pspecdata.PSpecData(dsets=[uvd, uvd])
with warnings.catch_warnings(record=True) as w:
ds_t.get_Q(0, pol)
assert len(w) > 0

for i in range(vect_length):
try:
Expand All @@ -373,7 +371,7 @@ def test_get_Q(self):
self.assertEqual(self.ds.spw_Ndlys, self.ds.spw_Nfreqs)
except IndexError:
Q_matrix = np.ones((vect_length, vect_length))

xQy = np.dot(np.conjugate(x_vect), np.dot(Q_matrix, y_vect))
yQx = np.dot(np.conjugate(y_vect), np.dot(Q_matrix, x_vect))
xQx = np.dot(np.conjugate(x_vect), np.dot(Q_matrix, x_vect))
Expand Down Expand Up @@ -428,6 +426,31 @@ def test_get_Q(self):
# of the range of delay bins
nt.assert_raises(IndexError, self.ds.get_Q, vect_length-1, pol)

def test_get_integral_beam(self):
"""
Test the integral of the beam and tapering function in Q.
"""
pol = 'xx'
#Test if there is a warning if user does not pass the beam
uvd = copy.deepcopy(self.uvd)
ds_t = pspecdata.PSpecData(dsets=[uvd, uvd])
ds = pspecdata.PSpecData(dsets=[uvd, uvd], beam=self.bm)

with warnings.catch_warnings(record=True) as w:
ds_t.get_integral_beam(pol)
assert len(w) > 0

try:
integral_matrix = ds.get_integral_beam(pol)
# Test that if the number of delay bins hasn't been set
# the code defaults to putting that equal to Nfreqs
self.assertEqual(ds.spw_Ndlys, ds.spw_Nfreqs)
except IndexError:
integral_matrix = np.ones((ds.spw_Ndlys, ds.spw_Ndlys))

# Test that integral matrix has the right shape
self.assertEqual(integral_matrix.shape, (ds.spw_Nfreqs, ds.spw_Nfreqs))

def test_get_unnormed_E(self):
"""
Test the E function
Expand Down Expand Up @@ -1180,7 +1203,7 @@ def test_pspec(self):
bls_Q = [(24, 25)]
uvp = ds_Q.pspec(bls_Q, bls_Q, (0, 1), [('xx', 'xx')], input_data_weight='identity',
norm='I', taper='none', verbose=True, exact_norm=False)
Q_sample = ds_Q.get_Q((ds_Q.spw_range[1] - ds_Q.spw_range[0])/2, 'xx') #Get Q matrix for 0th delay mode
Q_sample = ds_Q.get_integral_beam('xx') #Get integral beam for pol 'xx'

nt.assert_equal(np.shape(Q_sample), (ds_Q.spw_range[1] - ds_Q.spw_range[0],\
ds_Q.spw_range[1] - ds_Q.spw_range[0])) #Check for the right shape
Expand Down

0 comments on commit 2c248d1

Please sign in to comment.