# Kolmogorov-Smirnov test for normality of high-delay signals

Kolmogorov-Smirnov test (KS test) is performed in this notebook to determine whether high-delay signals follow a Gaussian distribution or not.

In [None]:
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt
import hera_pspec as hp
from pyuvdata import UVData
from scipy import stats
from scipy.stats import norm
from scipy.optimize import curve_fit
import pyuvdata.utils as uvutils

In [None]:
# Load beam model
beamfile = 'HERA_NF_dipole_power.beamfits'
cosmo = hp.conversions.Cosmo_Conversions()
uvb = hp.pspecbeam.PSpecBeamUV(beamfile, cosmo=cosmo)

In [None]:
# Load data into UVData objects
dfile = 'zen.2458101.clean-002.uvh5'
uvd = UVData()
uvd.read(dfile)

In [None]:
# find conversion factor from Jy to mK
Jy_to_mK = uvb.Jy_to_mK(np.unique(uvd.freq_array), pol='xx')
uvd.data_array *= Jy_to_mK[None, None, :, None]

### Systematic subtraction

File 'zen.2458101.xtmdl.uvh5' is a systematic model. By subtraction between the 'clean' data above and this model, we can get a data with certain systematics removed. 

Run the following cell if you want to use the data after systematic subtraction. Otherwise, **skip the cell below**.

In [None]:
mdfile = 'zen.2458101.xtmdl.uvh5' 
#zen.2458116.34176.xx.HH.uvOCRS #zen.2458101.clean-002.uvh5 #zen.2458101.xtmdl.uvh5
muvd = UVData()
muvd.read(mdfile)

bl1 = (65, 67, 'xx')
bl2 = (37, 38, 'xx')

Jy_to_mK = uvb.Jy_to_mK(np.unique(muvd.freq_array), pol='xx')
muvd.data_array *= Jy_to_mK[None, None, :, None]

blt_inds = cuvd.antpair2ind(bl1)
uvd.data_array[blt_inds, 0, :, 0] -= muvd.get_data(bl1)

blt_inds = cuvd.antpair2ind(bl2)
uvd.data_array[blt_inds, 0, :, 0] -= muvd.get_data(bl2)

In [None]:
# slide the time axis by one integration
uvd1 = uvd.select(times=np.unique(uvd.time_array)[16:44:2], inplace=False)
uvd2 = uvd.select(times=np.unique(uvd.time_array)[17:45:2], inplace=False)

In [None]:
# Create a new PSpecData object
ds = hp.PSpecData(dsets=[uvd1, uvd2], wgts=[None, None], beam=uvb)
ds.rephase_to_dset(0) # Phase to the zeroth dataset

# Specify which baselines to include
baselines = [(138, 139), (37, 38)]

# Define uvp for 'clean' data
uvp = ds.pspec(baselines, baselines, (0, 1), [('xx', 'xx')], spw_ranges=[(520, 690)], 
                input_data_weight='identity',
                norm='I', taper='blackman-harris', verbose=False)

In [None]:
# with cable reflection #1
spw = 0
blp = ((138, 139), (138, 139))
key1 = (spw, blp, 'xx')
dlys = uvp.get_dlys(spw) * 1e9
power1 = np.real(uvp.get_data(key1))

# plot power spectrum of spectral window 1
fig, ax = plt.subplots(figsize=(8,4))
p1 = ax.plot(dlys, np.abs(power1).T)
ax.set_yscale('log')
ax.grid()
ax.set_xlabel("delay [ns]", fontsize=14)
ax.set_ylabel(r"$P(k)\ \rm [mK^2\ h^{-3}\ Mpc^3]$", fontsize=14)
ax.set_title("spw : {}, blpair : {}, pol : {}".format(*key1), fontsize=14)

In [None]:
# no cable reflection #1
spw = 0
blp = ((37, 38), (37, 38))
key2 = (spw, blp, 'xx')
dlys = uvp.get_dlys(spw) * 1e9
power2 = np.real(uvp.get_data(key2))

# plot power spectrum of spectral window 1
plt.figure(figsize=(8,4))
plt.semilogy(dlys, np.abs(power2).T)
plt.grid()
plt.xlabel("delay [ns]", fontsize=14)
plt.ylabel(r"$P(k)\ \rm [mK^2\ h^{-3}\ Mpc^3]$", fontsize=14)
plt.title("spw : {}, blpair : {}, pol : {}".format(*key2), fontsize=14)
plt.show()

In [None]:
# define KS test
def ks_test(data, pdf, guess_args):
    """
    H0: the data follow a given distribution
    d < critical value (cv)  -->> accept H0
    
    Parameters
    ----------
    data: 1d array
        empirical data 
    pdf: callable
        probability density function
    guess_args: float, 1d array
        pdf parameters guess
        
    Returns
    -------
    d: float
        KS-test stats
    cv: float
        critical value (Significance level:  α = 0.05)
    m: Boolean
        KS-test result: Accept H0 (True); reject H0 (False)
    """
    
    # get CDF of empirical data
    counts, bin_edges = np.histogram (data, bins=len(data), density=True)
    ecdf = np.cumsum(counts)/(np.cumsum(counts)[-1])
    
    # fit data to the model 
    y, x = np.histogram(data, bins='auto', density=True)
    x = (x + np.roll(x, -1))[:-1] / 2.0
    popt, pcov = curve_fit(pdf, x, y, p0=guess_args)
    
    # compute CDF from the fitted PDF
    cdf_scaled = np.cumsum(pdf(bin_edges[1:], *popt))
    cdf = cdf_scaled/cdf_scaled[-1]
    
    # perform KS-test
    gaps = np.column_stack([cdf - ecdf, ecdf - cdf])
    d = np.max(gaps)
    cv = 1.36/np.sqrt(len(data))
    
    if d < cv:
        m = True
    if d > cv:
        m = False
    
    return [d, cv, m]


# KS test for Gaussian only
def ks_norm_fit(data):
    """
    Perform a KS test for fitting a Gaussian distribution to the input data.
    
    Parameter
    ---------
    data : array_like
        Input data.
        
    Return
    ------
    d : float
        The KS test stats.
    cv : float
        Critical value at a 5% level of significance.
    """
    
    # fit the data with a Gaussian distribution
    mu, std = norm.fit(data)
    
    # get fitted Gaussian CDF
    fit_cdf = norm.cdf(np.sort(data), mu, std)
    
    # get the data CDF
    data_cdf = np.array(range(len(data)))/float(len(data))
    
    # compute the max. abs. dist. between data CDF and the fitted model CDF
    gaps = np.column_stack([fit_cdf - data_cdf, data_cdf - fit_cdf])
    d = np.max(gaps)
    
    # compute the critical value
    cv = 1.36/np.sqrt(len(data))

    return [d, cv]

In [None]:
# define the PDF of the CNN distribution
def real_pdf(z, s):
    """
    Compute the PDF of the CNN distribution
    
    Parameters
    ----------
    z : float, array_like
        Input data.
    s : float
        Standard deviation of the input data.
    
    Return
    -------
    p : float, array_like
        Return the PDF.
    """
    a = 1/(s)
    b = (-np.abs(2*z))/(s)
    p = a*np.exp(b)
    return p

In [None]:
# select delay region
dly_modes = [2000, 4000]  # between ~2000 ns and ~4000 ns

# find the closest value in dlys
dly_idx = [(np.abs(dlys - dly_modes[0])).argmin(), (np.abs(dlys - dly_modes[1])).argmin()]

In [None]:
# get data
data1 = power1[:,dly_idx[0]:dly_idx[1]].flatten()  # data from pspec with clear systematic features
data2 = power2[:,dly_idx[0]:dly_idx[1]].flatten()  # data from pspec without clear systematic features

In [None]:
def cdfs(data):

    # CDF for data (norm plot)
    x1 = np.sort(data)
    y1 = np.array(range(len(data)))/float(len(data))

    # CDF for data (CNN plot)
    y, x = np.histogram(data, bins='auto', density=True)
    x = (x + np.roll(x, -1))[:-1] / 2.0
    cdf_x1 = x
    cdf_y1 = np.cumsum(y)/np.cumsum(y)[-1]

    # CDF of data + norm
    norm_popt1 = norm.fit(data)
    cdf_norm_x1 = np.sort(data)
    cdf_norm_y1 = norm.cdf(np.sort(data), *norm_popt1)

    # CDF of data + CNN
    cnn_popt1 = curve_fit(real_pdf, x, y, p0=np.std(data))[0]
    cdf_cnn_x1 = x
    cdf_cnn_y1 = np.cumsum(real_pdf(x, *cnn_popt1))/np.cumsum(real_pdf(x, *cnn_popt1))[-1]
    
    return [[x1, y1], [cdf_norm_x1, cdf_norm_y1], [cdf_x1, cdf_y1], [cdf_cnn_x1, cdf_cnn_y1]]

In [None]:
# get CDFs for plotting
cdfs1 = cdfs(data1)
cdfs2 = cdfs(data2)

In [None]:
# plot fittings

# plot the delay range selected
box = [3e6, 3e12]

plt.figure(figsize=(12, 10))
plt.subplots_adjust(hspace=.5, wspace=.3)

# plot the pspec with clear reflection systematics (data1)
plt.subplot(321)
plt.plot(dlys, np.abs(power1).T)
plt.yscale('log')
plt.vlines(dly_modes[0], box[0], box[1])
plt.vlines(dly_modes[1], box[0], box[1])
plt.hlines(box[0], dly_modes[0], dly_modes[1])
plt.hlines(box[1], dly_modes[0], dly_modes[1])
plt.grid()
plt.xlabel("$τ$ [ns]", fontsize=14)
plt.ylabel(r"$P(k)\ \rm [mK^2\ h^{-3}\ Mpc^3]$", fontsize=14)
plt.title("spw : {}, blpair : {}, pol : {}".format(*key1), fontsize=14)

# plot the pspec without clear reflection systematics (data2)
plt.subplot(322)
plt.plot(dlys, np.abs(power2).T)
plt.yscale('log')
plt.vlines(dly_modes[0], box[0], box[1])
plt.vlines(dly_modes[1], box[0], box[1])
plt.hlines(box[0], dly_modes[0], dly_modes[1])
plt.hlines(box[1], dly_modes[0], dly_modes[1])
plt.grid()
plt.xlabel("$τ$ [ns]", fontsize=14)
plt.ylabel(r"$P(k)\ \rm [mK^2\ h^{-3}\ Mpc^3]$", fontsize=14)
plt.title("spw : {}, blpair : {}, pol : {}".format(*key2), fontsize=14)

# plot the CDF of data1 and its Gaussian fit
plt.subplot(323)
plt.plot(cdfs1[0][0], cdfs1[0][1], label='data')
plt.plot(cdfs1[1][0], cdfs1[1][1], '--', label='$\mathcal{N}$ fit')
plt.legend(loc=2, fontsize=12)
plt.grid()
plt.xlabel(r"Re{$P(k)\}\ \rm [mK^2\ h^{-3}\ Mpc^3]$", fontsize=14)
plt.ylabel("$P_N$ (Re{$P(k)$})", fontsize=14)
plt.title("blpair : {}, $τ$ : {}-{}ns".format(key1[1], *dly_modes), fontsize=14)

# plot the CDF of data2 and its Gaussian fit
plt.subplot(324)
plt.plot(cdfs2[0][0], cdfs2[0][1], label='data')
plt.plot(cdfs2[1][0], cdfs2[1][1], '--', label='$\mathcal{N}$ fit')
plt.legend(loc=2, fontsize=12)
plt.grid()
plt.xlabel(r"Re{$P(k)\}\ \rm [mK^2\ h^{-3}\ Mpc^3]$", fontsize=14)
plt.ylabel("$P_N$ (Re{$P(k)$})", fontsize=14)
plt.title("blpair : {}, $τ$ : {}-{}ns".format(key2[1], *dly_modes), fontsize=14)

# plot the CDF of data1 and its CNN fit
plt.subplot(325)
plt.plot(cdfs1[2][0], cdfs1[2][1], label='data') # label='fit ($s$=%.2e)' % tuple(popt1)
plt.plot(cdfs1[3][0], cdfs1[3][1], '--', label='$\mathcal{CNN}$ fit')
plt.legend(loc=2, fontsize=12)
plt.grid()
plt.xlabel(r"Re{$P(k)\}\ \rm [mK^2\ h^{-3}\ Mpc^3]$", fontsize=14)
plt.ylabel("$P_{CNN}$ (Re{$P(k)$})", fontsize=14)
plt.title("blpair : {}, $τ$ : {}-{}ns".format(key1[1], *dly_modes), fontsize=14)

# plot the CDF of data2 and its CNN fit
plt.subplot(326)
plt.plot(cdfs2[2][0], cdfs2[2][1], label='data')
plt.plot(cdfs2[3][0], cdfs2[3][1], '--', label='$\mathcal{CNN}$ fit')
plt.legend(loc=2, fontsize=12)
plt.grid()
plt.xlabel(r"Re{$P(k)\}\ \rm [mK^2\ h^{-3}\ Mpc^3]$", fontsize=14)
plt.ylabel("$P_{CNN}$ (Re{$P(k)$})", fontsize=14)
plt.title("blpair : {}, $τ$ : {}-{}ns".format(key2[1], *dly_modes), fontsize=14)

plt.show()

In [None]:
# perform KS test for data fitted to a Gaussian distribution
norm_d1, norm_cv1 = ks_norm_fit(data1)
norm_d2, norm_cv2 = ks_norm_fit(data2)

# perform KS test for data fitted to a CNN distribution
cnn_d1, cnn_cv1 = ks_test(data1, real_pdf, np.std(data1))[:2]
cnn_d2, cnn_cv2 = ks_test(data2, real_pdf, np.std(data2))[:2]

In [None]:
plt.figure(figsize=(12, 2))
plt.subplots_adjust(hspace=.5, wspace=.3)

plt.subplot(121)
plt.hlines(norm_cv1, 0, 4, 'r', label='critical value ($α$=5%)')
plt.plot(2, norm_d1, '.', label='blpair :\n{}'.format(key1[1]))
plt.plot(2, norm_d2, '.', label='blpair :\n{}'.format(key2[1]))
plt.plot(11, norm_cv1*0.4, alpha=0)
plt.tick_params(
    axis='x',          # changes apply to the x-axis
    which='both',      # both major and minor ticks are affected
    bottom=False,      # ticks along the bottom edge are off
    top=False,         # ticks along the top edge are off
    labelbottom=False)
plt.legend(loc='center right', fontsize=12)
plt.ylabel('KS test statistic', fontsize=14)
plt.title('KS test for $\mathcal{N}$ fit', fontsize=14)

plt.subplot(122)
plt.hlines(cnn_cv1, 0, 4, 'r', label='critical value ($α$=5%)')
plt.plot(2, cnn_d1, '.', label='blpair :\n{}'.format(key1[1]))
plt.plot(2, cnn_d2, '.', label='blpair :\n{}'.format(key2[1]))
plt.plot(11, cnn_cv1*1.1, alpha=0)
plt.tick_params(
    axis='x',          # changes apply to the x-axis
    which='both',      # both major and minor ticks are affected
    bottom=False,      # ticks along the bottom edge are off
    top=False,         # ticks along the top edge are off
    labelbottom=False)
plt.legend(loc='center right', fontsize=12)
plt.ylabel('KS test statistic', fontsize=14)
plt.title('KS test for $\mathcal{CNN}$ fit', fontsize=14)

plt.show()