# Python implementation of https://arxiv.org/pdf/1909.10140.pdf

In [None]:
from itertools import product
from joblib import Parallel, delayed
import sys

import numpy as np
from scipy.stats import rankdata
import pandas as pd

In [None]:
def XI_coef(
    xvec: np.ndarray, yvec: np.ndarray, simple: bool = True, seed: int = 42
) -> float:
    np.random.seed(seed)

    n = len(xvec)

    # TODO: Fix this clearly inefficient way of computing the r code line:
    # PI <- rank(xvec, ties.method = "random"
    # This is a shameless stackoverflow copy
    pandas_series = pd.Series(xvec)
    PI = (
        pandas_series.sample(frac=1)
        .rank(method="first")
        .reindex_like(pandas_series)
        .values
    )

    fr = rankdata(yvec, method="max") / n
    gr = rankdata(-yvec, method="max") / n

    ord = np.argsort(PI)
    fr = fr[ord]
    A1 = np.sum(np.abs(fr[0 : n - 1] - fr[1:n])) / (2.0 * n)
    CU = np.mean(gr * (1.0 - gr))

    xi = 1.0 - A1 / (CU + sys.float_info.epsilon)
    if simple:
        return xi

    return [xi, fr, CU]


def XI_coef_matrix(np_array: np.ndarray) -> np.ndarray:
    assert len(np_array.shape) == 2
    _, w = np_array.shape

    return np.array(
        [
            XI_coef(np_array[:, i], np_array[:, j]) if i != j else 1.0
            for i, j in product(range(w), repeat=2)
        ]
    ).reshape((w, w))

In [None]:
def XI_coef_matrix_parallel(np_array: np.ndarray) -> np.ndarray:
    assert len(np_array.shape) == 2
    _, w = np_array.shape

    def process_index(i, j):
        if i == j:
            return 1.0
        return XI_coef(np_array[:, i], np_array[:, j])

    results = Parallel(n_jobs=-1)(
        delayed(process_index)(i, j) for i, j in product(range(w), repeat=2)
    )
    return np.array(results).reshape((w, w))

In [None]:
# A reasonably sized dataframe for Data Science cases
np_array = np.random.randint(0, 100, size=(int(1e4), 70))

In [None]:
%time _ = XI_coef_matrix(np_array)

In [None]:
%time _ = XI_coef_matrix_parallel(np_array)

In [None]:
# TEST: a straight line with noise
size = int(1e6)
list_1 = np.array(list(range(0, size)))
list_2 = np.array(list(range(size, 0, -1))) + np.random.random_sample(len(list_1)) * 100

print("x -> y", XI_coef(list_1, list_2))
print("y -> x", XI_coef(list_2, list_1))
print("std coeff matrix", np.corrcoef(list_1, list_2))

In [None]:
# Odd cases:
## All Zeros
np_array = np.zeros(shape=(10, 10))
print("XI coeff matrix", XI_coef_matrix(np_array))

## Purely random
np_array = np.random.randint(0, 100, (1, 10))
print("XI coeff matrix", XI_coef_matrix(np_array))

## Negatively correlated
np_array = np.random.randint(0, 100, (1000, 2))
np_array[:, 1] = -np_array[:, 0]
print("XI coeff matrix", XI_coef_matrix(np_array))

In [None]:
np.arange(0, 1e6)

In [None]:
# TEST: a straight line with itself
size = 1e6

print("x -> y", XI_coef(np.arange(0, size), np.arange(0, size)))
print("std coeff matrix", np.corrcoef(list_1, list_1))

In [None]:
# TEST: a sin(x) function
# The std correlation cooeff is close to 0, but this improved correlation function
# should show that there is a relationship from x -> y, but a bad one from y -> x
# That is to say, if you know x, you can easily determine y, but not the other way around.
list_1 = np.arange(0, 1000, np.pi / 8)
list_2 = np.sin(list_1) + np.random.random_sample(len(list_1)) / 10

print("x -> y", XI_coef(list_1, list_2))
print("y -> x", XI_coef(list_2, list_1))
print("std coeff matrix", np.corrcoef(list_1, list_2))