In [1]:
%load_ext autoreload
%autoreload 2
from astropy.table import Table
import numpy as np
import tqdm

# Verify that old and new normalizations are equivalent


Old ones:

In [2]:
def normalize_crossspectrum_oldstingray(unnorm_power, tseg, nbins, nphots1, nphots2, norm="none", power_type="real"):
    """
    Normalize the real part of the cross spectrum to Leahy, absolute rms^2,
    fractional rms^2 normalization, or not at all.

    Parameters
    ----------
    unnorm_power: numpy.ndarray
        The unnormalized cross spectrum.

    tseg: int
        The length of the Fourier segment, in seconds.

    nbins : int
        Number of bins in the light curve

    nphots1 : int
        Number of photons in the light curve no. 1

    nphots2 : int
        Number of photons in the light curve no. 2

    Other parameters
    ----------------
    norm : str
        One of `'leahy'` (Leahy+83), `'frac'` (fractional rms), `'abs'`
        (absolute rms)

    power_type : str
        One of `'real'` (real part), `'all'` (all complex powers), `'abs'`
        (absolute value)

    Returns
    -------
    power: numpy.nd.array
        The normalized co-spectrum (real part of the cross spectrum). For
        'none' normalization, imaginary part is returned as well.
    """

    # The "effective" counts/bin is the geometrical mean of the counts/bin
    # of the two light curves. Same goes for counts/second in meanrate.

    log_nphots1 = np.log(nphots1)
    log_nphots2 = np.log(nphots2)

    actual_nphots = np.float64(np.sqrt(np.exp(log_nphots1 + log_nphots2)))

    if power_type == "all":
        c_num = unnorm_power
    elif power_type == "real":
        c_num = unnorm_power.real
    elif power_type == "absolute":
        c_num = np.absolute(unnorm_power)
    else:
        raise ValueError("`power_type` not recognized!")

    if norm.lower() == 'leahy':
        power = c_num * 2. / actual_nphots

    elif norm.lower() == 'frac':
        meancounts1 = nphots1 / nbins
        meancounts2 = nphots2 / nbins

        actual_mean = np.sqrt(meancounts1 * meancounts2)

        assert actual_mean > 0.0, \
            "Mean count rate is <= 0. Something went wrong."

        c = c_num / float(nbins ** 2.)
        power = c * 2. * tseg / (actual_mean ** 2.0)

    elif norm.lower() == 'abs':
        meanrate = np.sqrt(nphots1 * nphots2) / tseg

        power = c_num * 2. * meanrate / actual_nphots

    elif norm.lower() == 'none':
        power = unnorm_power

    else:
        raise ValueError("Value for `norm` not recognized.")

    return power


def normalize_crossspectrum_gauss(
        unnorm_power, mean_flux, var, dt, N, norm="none", power_type="real"):
    """
    Normalize the real part of the cross spectrum to Leahy, absolute rms^2,
    fractional rms^2 normalization, or not at all.

    Parameters
    ----------
    unnorm_power: numpy.ndarray
        The unnormalized cross spectrum.

    mean_flux: float
        The mean flux of the light curve (if a cross spectrum, the geometrical
        mean of the flux in the two channels)

    var: float
        The variance of the light curve (if a cross spectrum, the geometrical
        mean of the variance in the two channels)

    dt: float
        The sampling time of the light curve

    N: int
        The number of bins in the light curve

    Other parameters
    ----------------
    norm : str
        One of `'leahy'` (Leahy+83), `'frac'` (fractional rms), `'abs'`
        (absolute rms)

    power_type : str
        One of `'real'` (real part), `'all'` (all complex powers), `'abs'`
        (absolute value)

    Returns
    -------
    power: numpy.nd.array
        The normalized co-spectrum (real part of the cross spectrum). For
        'none' normalization, imaginary part is returned as well.

    Examples
    --------
    >>> lc_c = np.random.poisson(10000, 10000)
    >>> lc_c_var = 10000
    >>> lc = lc_c / 17.3453
    >>> lc_var = (100 / 17.3453)**2
    >>> pds_c = np.absolute(np.fft.fft(lc_c))**2
    >>> pds = np.absolute(np.fft.fft(lc))**2
    >>> norm_c = normalize_crossspectrum_gauss(pds_c, np.mean(lc_c), lc_c_var, 0.1, len(lc_c), norm='leahy')
    >>> norm = normalize_crossspectrum_gauss(pds, np.mean(lc), lc_var, 0.1, len(lc), norm='leahy')
    >>> np.allclose(norm, norm_c)
    True
    >>> np.isclose(np.mean(norm[1:]), 2, atol=0.1)
    True
    >>> norm_c = normalize_crossspectrum_gauss(pds_c, np.mean(lc_c), np.mean(lc_c), 0.1, len(lc_c), norm='frac')
    >>> norm = normalize_crossspectrum_gauss(pds, np.mean(lc), lc_var, 0.1, len(lc), norm='frac')
    >>> np.allclose(norm, norm_c)
    True
    >>> norm_c = normalize_crossspectrum_gauss(pds_c, np.mean(lc_c), np.mean(lc_c), 0.1, len(lc_c), norm='abs')
    >>> norm = normalize_crossspectrum_gauss(pds, np.mean(lc), lc_var, 0.1, len(lc), norm='abs')
    >>> np.allclose(norm / np.mean(lc)**2, norm_c / np.mean(lc_c)**2)
    True
    >>> np.isclose(np.mean(norm_c[2:]), 2 * np.mean(lc_c * 0.1), rtol=0.1)
    True
    """

    # The "effective" counts/bin is the geometrical mean of the counts/bin
    # of the two light curves. Same goes for counts/second in meanrate.
    if power_type == "all":
        c_num = unnorm_power
    elif power_type == "real":
        c_num = unnorm_power.real
    elif power_type == "absolute":
        c_num = np.absolute(unnorm_power)
    else:
        raise ValueError("`power_type` not recognized!")

    common_factor = 2 * dt / N
    rate_mean = mean_flux * dt
    if norm.lower() == 'leahy':
        norm = 2 / var / N

    elif norm.lower() == 'frac':
        norm = common_factor / rate_mean**2

    elif norm.lower() == 'abs':
        norm = common_factor

    elif norm.lower() == 'none':
        norm = 1

    else:
        raise ValueError("Value for `norm` not recognized.")

    return norm * c_num




New ones:

In [3]:
def normalize_frac(power, dt, N, mean):
    """Fractional rms normalization, from the variance of the lc.

    Examples
    --------
    >>> mean = var = 1000000
    >>> N = 1000000
    >>> dt = 0.2
    >>> meanrate = mean / dt
    >>> lc = np.random.poisson(mean, N)
    >>> pds = np.abs(fft(lc))**2
    >>> pdsnorm = normalize_frac(pds, dt, lc.size, mean)
    >>> np.isclose(pdsnorm[1:N//2].mean(), poisson_level(norm="frac", meanrate=meanrate), rtol=0.01)
    True
    """
    #     (mean * N) / (mean /dt) = N * dt
    #     It's Leahy / meanrate;
    #     Nph = mean * N
    #     meanrate = mean / dt
    #     norm = 2 / (Nph * meanrate) = 2 * dt / (mean**2 * N)

    return power * 2. * dt / (mean ** 2 * N)


def normalize_abs(power, dt, N):
    """Absolute rms normalization, from the variance of the lc.

    Examples
    --------
    >>> mean = var = 100000
    >>> N = 1000000
    >>> dt = 0.2
    >>> meanrate = mean / dt
    >>> lc = np.random.poisson(mean, N)
    >>> pds = np.abs(fft(lc))**2
    >>> pdsnorm = normalize_abs(pds, dt, lc.size)
    >>> np.isclose(pdsnorm[1:N//2].mean(), poisson_level(norm="abs", meanrate=meanrate), rtol=0.01)
    True
    """
    #     It's frac * meanrate**2; Leahy / meanrate * meanrate**2
    #     Nph = mean * N
    #     meanrate = mean / dt
    #     norm = 2 / (Nph * meanrate) * meanrate**2 = 2 * dt / (mean**2 * N) * mean**2 / dt**2

    return power * 2. / N / dt


def normalize_leahy_from_variance(power, variance, N):
    """Leahy+83 normalization, from the variance of the lc.

    Examples
    --------
    >>> mean = var = 100000.
    >>> N = 1000000
    >>> lc = np.random.poisson(mean, N).astype(float)
    >>> pds = np.abs(fft(lc))**2
    >>> pdsnorm = normalize_leahy_from_variance(pds, var, lc.size)
    >>> np.isclose(pdsnorm[0], 2 * np.sum(lc), rtol=0.01)
    True
    >>> np.isclose(pdsnorm[1:N//2].mean(), poisson_level(norm="leahy"), rtol=0.01)
    True
    """
    return power * 2. / (variance * N)


def normalize_leahy_poisson(power, Nph):
    """Leahy+83 normalization, from the variance of the lc.

    Examples
    --------
    >>> mean = var = 100000.
    >>> N = 1000000
    >>> lc = np.random.poisson(mean, N).astype(float)
    >>> pds = np.abs(fft(lc))**2
    >>> pdsnorm = normalize_leahy_poisson(pds, np.sum(lc))
    >>> np.isclose(pdsnorm[0], 2 * np.sum(lc), rtol=0.01)
    True
    >>> np.isclose(pdsnorm[1:N//2].mean(), poisson_level(norm="leahy"), rtol=0.01)
    True
    """
    return power * 2. / Nph


def normalize_crossspectrum(unnorm_power, dt, N, mean, variance=None, norm="abs", power_type="all"):
    """Wrapper around all the normalize_NORM methods."""

    if norm == "leahy" and variance is not None:
        pds = normalize_leahy_from_variance(unnorm_power, variance, N)
    elif norm == "leahy":
        pds = normalize_leahy_poisson(unnorm_power, N * mean)
    elif norm == "frac":
        pds = normalize_frac(unnorm_power, dt, N, mean)
    elif norm == "abs":
        pds = normalize_abs(unnorm_power, dt, N)
    elif norm == "none":
        pds = unnorm_power
    else:
        raise ValueError("Unknown value for the norm")

    if power_type == "real":
        pds = pds.real
    elif power_type in ["abs", "absolute"]:
        pds = np.abs(pds)

    return pds


In [5]:
import tqdm
ntrial = 100

rows = []
for i in tqdm.tqdm(range(ntrial)):
    M = np.random.randint(1, 1000)
    N = np.random.randint(1, 10000)
    
    dt = np.random.uniform(0.0001, 1)
    nphots1 = np.random.randint(0, 100_000_000)
    nphots2 = nphots1 # np.random.randint(0, 100_000_000)
    nphots = np.sqrt(nphots1 * nphots2)

    mean1 = nphots1 / N
    mean2 = nphots2 / N
    mean = np.sqrt(mean1 * mean2)
    
    unnorm_power = 1

    variance = np.random.choice([None, np.random.uniform(0, 1000)])
    norm = np.random.choice(["abs", "frac", "leahy", "none"])
    power_type = np.random.choice(["all", "real", "absolute"])
    
    tseg = dt * N
    
    if variance is not None:
        old_power = normalize_crossspectrum_gauss(
            unnorm_power, mean / dt, variance, dt, N, norm=norm, power_type=power_type)
    else:
        old_power = normalize_crossspectrum_oldstingray(
            unnorm_power, tseg, N, nphots1, nphots2, norm=norm, power_type=power_type)

    new_power = normalize_crossspectrum(unnorm_power, dt, N, mean, variance=variance, norm=norm, power_type=power_type)
    
    msg =(f"power={unnorm_power:g}, dt={dt:g}, M={M}, N={N}, mean1={mean1:g}, mean2={mean2:g}, " 
          f"variance={variance}, norm={norm}, power_type={power_type}\n"
          f"old_power={old_power}, new_power={new_power}")
    is_close = np.isclose(new_power, old_power, rtol=0.00001)
    if is_close:
        msg = f"GOOD: {msg}"
    else:
        msg = f"BAD: {msg}"
        print(msg)
    variance = 0 if variance is None else variance
    rows.append([old_power, new_power, dt, M, N, mean1, mean2, mean, variance, norm, power_type, is_close])
table = Table(rows=rows, names=["old", "new", "dt", "M", "N", "mean1", "mean2", "mean", "variance", "norm", "power_type", "good?"])

  0%|          | 0/100 [00:00<?, ?it/s]

100%|██████████| 100/100 [00:00<00:00, 2131.76it/s]

BAD: power=1, dt=0.110371, M=185, N=436, mean1=108.842, mean2=108.842, variance=650.1415023475095, norm=abs, power_type=real
old_power=0.0005062877958892786, new_power=0.04156134120259484
BAD: power=1, dt=0.074888, M=24, N=7550, mean1=12803, mean2=12803, variance=921.8946936076566, norm=abs, power_type=absolute
old_power=1.983787598908775e-05, new_power=0.0035372920417470876
BAD: power=1, dt=0.365039, M=235, N=7864, mean1=4309.95, mean2=4309.95, variance=111.9180098679673, norm=abs, power_type=real
old_power=9.283788201097274e-05, new_power=0.0006967031236869825
BAD: power=1, dt=0.666194, M=336, N=1204, mean1=54093, mean2=54093, variance=958.030612366853, norm=abs, power_type=all
old_power=0.001106635054927272, new_power=0.0024934610825412614
BAD: power=1, dt=0.0729246, M=889, N=8213, mean1=8052.65, mean2=8052.65, variance=978.2295246630366, norm=abs, power_type=all
old_power=1.7758345595830987e-05, new_power=0.003339287733316632
BAD: power=1, dt=0.843437, M=602, N=9708, mean1=9473, me




In [6]:
table

old,new,dt,M,N,mean1,mean2,mean,variance,norm,power_type,good?
float64,float64,float64,int64,int64,float64,float64,float64,float64,str5,str8,bool
4.644672929118666e-09,4.644672929118666e-09,0.49214109927748084,933,6273,183.7996174079388,183.7996174079388,183.7996174079388,411.39082574664434,frac,all,True
5.05571308117597e-06,5.05571308117597e-06,0.23268052655247934,735,482,27458.273858921162,27458.273858921162,27458.273858921162,820.7304344090352,leahy,absolute,True
1.0,1.0,0.8951171166113507,753,2493,11399.404332129963,11399.404332129963,11399.404332129963,0.0,none,all,True
2.736622772149608e-07,2.736622772149609e-07,0.166826932019413,129,6790,1076.3296023564064,1076.3296023564064,1076.3296023564064,0.0,leahy,all,True
0.0005062877958892786,0.04156134120259484,0.11037073950386274,185,436,108.84174311926606,108.84174311926606,108.84174311926606,650.1415023475095,abs,real,False
1.4810890477389813e-07,1.4810890477389805e-07,0.5037104982414005,82,5160,2616.9722868217054,2616.9722868217054,2616.9722868217054,0.0,leahy,all,True
1.6262782087928895e-09,1.6262782087928897e-09,0.46198789654039774,90,2028,529.2963510848126,529.2963510848126,529.2963510848126,645.9302570271362,frac,absolute,True
4.1067299392908086e-07,4.106729939290809e-07,0.8964585204007969,690,7430,1720.363526244953,1720.363526244953,1720.363526244953,655.4582551492772,leahy,absolute,True
1.0,1.0,0.579645265209173,985,8088,871.6397131552918,871.6397131552918,871.6397131552918,0.0,none,absolute,True
...,...,...,...,...,...,...,...,...,...,...,...


In [7]:
table_bad = table[~table["good?"]]
table_bad

old,new,dt,M,N,mean1,mean2,mean,variance,norm,power_type,good?
float64,float64,float64,int64,int64,float64,float64,float64,float64,str5,str8,bool
0.0005062877958892,0.0415613412025948,0.1103707395038627,185,436,108.84174311926606,108.84174311926606,108.84174311926606,650.1415023475095,abs,real,False
1.983787598908775e-05,0.003537292041747,0.0748879818588062,24,7550,12803.02238410596,12803.02238410596,12803.02238410596,921.8946936076566,abs,absolute,False
9.283788201097274e-05,0.0006967031236869,0.3650385520671448,235,7864,4309.950534079349,4309.950534079349,4309.950534079349,111.9180098679673,abs,real,False
0.0011066350549272,0.0024934610825412,0.6661943030662177,336,1204,54093.02740863787,54093.02740863787,54093.02740863787,958.030612366853,abs,all,False
1.7758345595830987e-05,0.0033392877333166,0.0729246461892799,889,8213,8052.645683672227,8052.645683672227,8052.645683672227,978.2295246630366,abs,all,False
0.0001737612893049,0.0002442572288521,0.8434372982863915,602,9708,9472.9956736712,9472.9956736712,9472.9956736712,561.5256149222473,abs,all,False
0.0001769213164839,0.0010424790911128,0.4119612854329713,75,4657,11360.81232553146,11360.81232553146,11360.81232553146,974.5772510739674,abs,real,False
0.0001750856454658,0.0002656288650312,0.8118721380249846,577,9274,5555.559305585508,5555.559305585508,5555.559305585508,567.0222379871162,abs,all,False
8.623624822811473e-05,0.0006072216842896,0.3768524047568614,545,8740,3675.1972540045767,3675.1972540045767,3675.1972540045767,720.3757129088073,abs,real,False


Looking at the actual normalizations, I realized that the _old_ stingray formula was wrong, in the Gaussian case. 
The normalization factor should have been

$$\frac{2}{dt\,N}$$

instead of 

$$\frac{2 dt}{N}$$
