In [1]:
import numpy as np

In [2]:
# Naive uncorrealted chi2
def naive(data):
    return np.sum(data**2, axis=-1)

In [3]:
# Naive uncorrealted chi2 with inflated errors
def inflated(data, alpha=1.0):
    return np.sum(data**2, axis=-1) / alpha

In [4]:
# Calculate "chi2" w/ fitted correlation
def fitted(data, k=None):
    if k is None:
        return np.max(data**2, axis=-1)
    else:
        y = []
        for i, ni in enumerate(k):
            bi = int(np.sum(k[:i]))
            d2 = data**2
            y.append(np.sum(d2[..., bi : bi + ni], axis=-1))
        return np.max(y, axis=0)
        

In [5]:
# Define expected distributions for fitted "chi2"
from scipy.special import erf, erfinv
from scipy.stats import chi, chi2, norm, rv_continuous


class Bee(rv_continuous):
    def __init__(self, df=1):
        super().__init__(a=0)
        self.df = df
        self.chi = chi(df=1)

    def _cdf(self, x):
        p = self.chi.cdf(x) ** self.df
        return p


class Bee2(rv_continuous):
    def __init__(self, df=1):
        super().__init__(a=0)
        self.df = df
        self.chi2 = chi2(df=1)

    def _cdf(self, x):
        p = self.chi2.cdf(x) ** self.df
        return p


class Cee(rv_continuous):
    def __init__(self, k=[1]):
        super().__init__(a=0)
        self.k = np.atleast_1d(k)
        self.chi = [chi(df=_df) for _df in self.k]

    def _cdf(self, x):
        p = [c.cdf(x) for c in self.chi]
        return np.prod(p, axis=0)


class Cee2(rv_continuous):
    def __init__(self, k=[1]):
        super().__init__(a=0)
        self.k = np.atleast_1d(k)
        self.chi2 = [chi2(df=_df) for _df in self.k]

    def _cdf(self, x):
        p = [c.cdf(x) for c in self.chi2]
        return np.prod(p, axis=0)


print(Bee(df=5).cdf((2, 5)))
print(Bee(df=25).cdf((2, 5)))
print(Bee2(df=5).cdf((4, 25)))
print(Bee2(df=25).cdf((4, 25)))
print(Cee(k=5).cdf((2, 5)))
print(Cee(k=(5, 20)).cdf((2, 5)))
print(Cee2(k=5).cdf((4, 25)))
print(Cee2(k=[5, 20]).cdf((4, 25)))

[0.79228068 0.99999713]
[0.31217298 0.99998567]
[0.79228068 0.99999713]
[0.31217298 0.99998567]
[0.45058405 0.99986067]
[2.09512909e-05 7.98457627e-01]
[0.45058405 0.99986067]
[2.09512909e-05 7.98457627e-01]


In [6]:
# Transorm uncorrelated fitted "chi2" to chi2(ndof=1)
def scaled(data, N_dim):
    return 2 * erfinv(erf(np.sqrt(fitted(data)) / np.sqrt(2)) ** N_dim) ** 2
    # return (2/np.pi)**(N_dim-1) * x**N_dim

In [7]:
# Approximately invariant measures
from functools import lru_cache

# from numpy.linalg import norm as nrm
from scipy.optimize import root_scalar
from scipy.special import logsumexp


def p_mean(x, p):
    N = x.shape[-1]
    return np.exp(1 / p * (logsumexp(p * np.log(x), axis=-1) - np.log(N)))


def p_norm(x, p):
    return np.exp(1 / p * (logsumexp(p * np.log(x), axis=-1)))


def smooth_min(x, p):
    return p_mean(x, -p)


def smooth_max(x, p):
    return p_mean(x, p)


def invariant1(x):
    """Triangular wings"""
    cdf = chi2.cdf(x**2, df=1)
    # a = smooth_min(cdf, p[0])
    a = np.min(cdf, axis=-1)
    # b = smooth_max(cdf, p[1])
    b = np.max(cdf, axis=-1)

    x = a / (1 - b + a)
    return chi2.ppf(x, df=1)

In [8]:
@np.vectorize
@lru_cache(500000)
def _minfrommax(b, df=2):
    """Position of sides of missing corner from coordinates of hypercube sides."""
    # b**df - (b-x)**df = x
    def f(x):
        return b**df - (b - x) ** df - x

    if df > 1:

        def fp(x):
            return df * (b - x) ** (df - 1) * (-1)

    else:

        def fp(x):
            return -df * np.ones_like(x)

    if df > 2:

        def fpp(x):
            return df * (df - 1) * (b - x) ** (df - 2)

    else:

        def fpp(x):
            return df * (df - 1) * np.ones_like(x)

    # return root_scalar(f, fprime=fp, fprime2=fpp, x0=0.001, x1=0.9).root
    # return root_scalar(f, fprime=fp, x0=b).root
    if b == 0:
        return 0
    else:
        return root_scalar(f, x0=b, x1=b / 2).root
    # return root_scalar(f, bracket=(0.0,b)).root


def minfrommax(b, df=2):
    """Position of sides of missing corner from coordinates of hypercube sides."""
    step = 0.0001
    b_ = np.floor(b / step, dtype=float) * step
    b__ = b_ + step
    delta = (b - b_) / step
    x_ = _minfrommax(b_, df)
    x__ = _minfrommax(b__, df)
    return x_ + (x__ - x_) * delta


def invariant2(x):
    """Hypercube minus corner"""
    cdf = chi2.cdf(x**2, df=1)
    # a = smooth_min(cdf, p[0])
    a = np.min(cdf, axis=-1)
    # b = smooth_max(cdf, p[1])
    b = np.max(cdf, axis=-1)

    df = x.shape[-1]
    mfm = minfrommax(b, df=df)
    x = np.maximum(a, mfm)
    # Cap value in case of rounding errors or stray root finding
    x = np.minimum(x, 1.0 - 1e-9)
    return chi2.ppf(x, df=1)

In [9]:
@np.vectorize
@lru_cache(10000)
def _yfrommax(b, df=2, alpha=0.5):
    """(1 - diagonal coordinate) from (1 - max) of accepted region"""
    # A = (1-b)**df - ((y-b)**df)/((1-alpha+alpha*y)**(df-1)) = 1 - y
    beta = 1 - alpha
    q = (1.0 - b) ** df - 1
    dfm = df - 1

    def f(y):
        return q - ((y - b) ** df) / ((beta + alpha * y) ** (dfm)) + y

    if b <= 0:
        return 0.0
    if b >= 1:
        return 1.0
    else:
        return root_scalar(f, x0=b, x1=b * 1.001).root


def yfrommax(b, df=2, alpha=0.5):
    """Buffer and interpolate values to speed things up."""
    step = 0.0001
    b_ = np.floor(b / step, dtype=float) * step
    b__ = b_ + step
    delta = (b - b_) / step
    x_ = _yfrommax(b_, df=df, alpha=alpha)
    x__ = _yfrommax(b__, df=df, alpha=alpha)
    return x_ + (x__ - x_) * delta


def invariant3(x, alpha=0.5, fast=False):
    """Return test statistic given vector of normalised values."""
    if fast:
        sf = 1 - chi2.cdf(x**2, df=1)  # Faster, but less accurate
    else:
        sf = chi2.sf(x**2, df=1)

    # Get possible diagonal coordinate from maximum CDF value (= minimum SF)
    a = np.min(sf, axis=-1)
    b = np.max(sf, axis=-1)
    yfm = yfrommax(a, df=x.shape[-1], alpha=alpha)

    # Get possible diagonal coordinate from centre surface
    yfc = (alpha * (a - b) + b) / (1.0 + alpha * (a - b))

    y = np.minimum(yfc, yfm)
    y = np.maximum(y, 0)  # Cap in case of rounding or root finding errors
    return chi2.isf(y, df=1)

In [10]:
from numba import njit

@njit()
def _fix(cov):
    changed = True
    while changed:
        changed = False
        for k in range(0, len(cov)):
            for l in range(k + 1, len(cov)):
                # pivot point k, l
                pivot = np.sign(cov[k, l])
                if not np.isfinite(pivot):
                    continue
                if np.all(np.isfinite(cov[k, :])) and np.all(np.isfinite(cov[:, l])):
                    continue
                for m in range(len(cov)):
                    if np.isfinite(cov[k, m]) and not np.isfinite(cov[m, l]):
                        if cov[k, m] != 0 or pivot != 0:
                            cov[l, m] = cov[m, l] = np.sign(cov[k, m] * pivot)
                            changed = True
                    elif not np.isfinite(cov[k, m]) and np.isfinite(cov[m, l]):
                        if cov[m, l] != 0 or pivot != 0:
                            cov[m, k] = cov[k, m] = np.sign(cov[m, l] * pivot)
                            changed = True
    return cov


def fill_max_correlation(cor, target):
    """Fill the correlation matrix with elements to achieve maximum correlation.

    Try to match the signs in "target".

    Only replaces elements in "cor" that are NaN
    """

    priority = np.unravel_index(
        np.argsort(np.abs(target), axis=None)[::-1], target.shape
    )

    for i, j in zip(*priority):
        if np.isfinite(cor[i, j]):
            continue

        # Set the new element
        if target[i, j] == 0:
            t = 1
        else:
            t = np.sign(target[i, j])

        cor[i, j] = cor[j, i] = t

        # Check and fix connections to other elements
        cor = _fix(cor)

    return cor