In [7]:
import numpy as np
from sklearn.metrics import pairwise_distances, pairwise_kernels
from hyppo.tools import compute_dist

In [6]:
def check_ndarray_xyz(x, y,z):
    """Check if x, y, or z is an ndarray of float"""
    if not isinstance(x, np.ndarray) or not isinstance(y, np.ndarray) or not isinstance(z, np.ndarray):
        raise TypeError("x, y, and z must be ndarrays")

def convert_xyz_float64(x, y, z):
    """Convert x or y, or z to np.float64 (if not already done)"""
    # convert x and y to floats
    x = np.asarray(x).astype(np.float64)
    y = np.asarray(y).astype(np.float64)
    z = np.asarray(z).astype(np.float64)

    return x, y, z

                        
class _CheckInputs:
    """Checks inputs for all independence tests"""

    def __init__(self, x, y,z, reps=None):
        self.x = x
        self.y = y
        self.z = z
        self.reps = reps

    def __call__(self):
        check_ndarray_xy(self.x, self.y, self.z)
        contains_nan(self.x)
        contains_nan(self.y)
        contains_nan(self.z)
        self.x, self.y, self.z = self.check_dim_xy()
        self.x, self.y = convert_xyz_float64(self.x, self.y, self.z)
        self._check_min_samples()
        self._check_variance()

        if self.reps:
            check_reps(self.reps)

        return self.x, self.y, self.z
    def _check_min_samples(self):
        """Check if the number of samples is at least 3"""
        nx = self.x.shape[0]
        ny = self.y.shape[0]
        nz = self.z.shape[0]

        if nx <= 3 or ny <= 3 or nz <= 3:
            raise ValueError("Number of samples is too low")
    
    def check_dim_xyz(self):
        """Convert x and y and z to proper dimensions"""
        if self.x.ndim == 1:
            self.x = self.x[:, np.newaxis]
        elif self.x.ndim != 2:
            raise ValueError(
                "Expected a 2-D array `x`, found shape " "{}".format(self.x.shape)
            )
        if self.y.ndim == 1:
            self.y = self.y[:, np.newaxis]
        elif self.y.ndim != 2:
            raise ValueError(
                "Expected a 2-D array `y`, found shape " "{}".format(self.y.shape)
            )
        if self.z.ndim == 1:
            self.z = self.z[:, np.newaxis]
        elif self.z.ndim != 2:
            raise ValueError(
                "Expected a 2-D array `z`, found shape " "{}".format(self.z.shape)
            )
        return self.x, self.y, self.z
                
    def _check_variance(self):
        if np.var(self.x) == 0 or np.var(self.y) == 0 or np.var(self.z) == 0:
            raise ValueError("Test cannot be run, one of the inputs has 0 variance")
                

def conditional_dcorr(x,y,z,kernel_type, seed, reps = 1000, num_bootstrap = 99):
    check_input = _CheckInputs(
            x,
            y,
            z,
            reps=reps
        )
    x, y,z = check_input()
    #unsure of how to check z as check_input takes two inputs
    distx, disty = compute_dist(
                x, y, metric="euclidean")
    kernel_density_estimation = pairwise_kernels(z, metric=kernel_type, n_jobs=1)
    stat = Statistic(distx, disty, kernel_density_estimation)
    pvalue = conduct_cdc_test(dist_x, dist_y, kernel_density_estimation, num_bootstrap, seed, stat)
    return stat, pvalue

def Statistic(distx, disty, kernel_density_estimation):
    return condition_distance_correlation_stats(distance_x, distance_y, kernel_density_estimation)
                
def condition_distance_correlation_stats(distance_x, distance_y, kernel_density_estimation):
    condition_distance_correlation = compute_condition_distance_correlation(distance_x, 
                                                                            distance_y,kernel_density_estimation)
    return np.mean(condition_distance_correlation)

def compute_condition_distance_correlation(distance_x, distance_y,kernel_density_estimation):

    num = distance_x.size()
    anova_x = np.zeros((num,num))
    anova_y = np.zeros((num,num))
    condition_distance_covariance_xy = np.zeros(num)
    condition_distance_covariance_xx = np.zeros(num)
    condition_distance_covariance_yy = np.zeros(num)

    for i in range(num):
        anova_x = weight_distance_anova(distance_x, kernel_density_estimation[i])
        anova_y = weight_distance_anova(distance_y, kernel_density_estimation[i])

        for k in range(num):
            for j in range(num): 
                condition_distance_covariance_xy[i] += anova_x[k][j] * anova_y[k][j] * kernel_density_estimation[i][k] * kernel_density_estimation[i][j]
                condition_distance_covariance_xx[i] += anova_x[k][j] * anova_x[k][j] * kernel_density_estimation[i][k] * kernel_density_estimation[i][j]
                condition_distance_covariance_yy[i] += anova_y[k][j] * anova_y[k][j] * kernel_density_estimation[i][k] * kernel_density_estimation[i][j]
    for i in range(num):
        dcor_denominator = condition_distance_covariance_xx[i] * condition_distance_covariance_yy[i]
        if (dcor_denominator > 0.0):
            condition_distance_covariance_xy[i] /= np.sqrt(dcor_denominator)
        else:
            condition_distance_covariance_xy[i] = 0.0

    return condition_distance_covariance_xy 

def weight_distance_anova(distance_matrix, weight):
    weight_sum = np.sum(weight)
    num = distance_matrix.size()

    marginal_weight_distance = np.zeros(num)
    for i in range(num):
        marginal_weight_distance[i] = vector_weight_sum(distance_matrix[i], weight)
    
    weight_distance_sum = vector_weight_sum(marginal_weight_distance, weight) 
    weight_distance_sum /= weight_sum * weight_sum

    for i in range(num):
        marginal_weight_distance[i] /= weight_sum

    weight_distance_anova_table = np.zeros((num,num))
    for k in range(num):
        for j in range(num):
            weight_distance_anova_table[k][j] = distance_matrix[k][j] - marginal_weight_distance[k] - marginal_weight_distance[j] + weight_distance_sum
            weight_distance_anova_table[j][k] = weight_distance_anova_table[k][j]

    return weight_distance_anova_table

def vector_weight_sum(vector1, weight): 
    sum_value = 0.0
    for i in range(vector1.size()):
        sum_value += vector1[i] * weight[i]
    return sum_value

def conduct_cdc_test(distance_x, distance_y, kernel, num_bootstrap, seed, stat):

    if (num_bootstrap != 0):
        if (seed == 0):
            rng = np.random.default_rng()
        else:
            rng = np.random.default_rng(seed)

        random_sample_index = generate_random_sample_index(num_bootstrap, kernel,rng)

        bootstrap_distance_x = []
        perm_stat = np.zeros(num_bootstrap)
        for i in range(num_bootstrap):
            bootstrap_distance_x = rearrange_matrix(distance_x, random_sample_index[i])
            perm_stat[i] = Statistic(bootstrap_distance_x, distance_y, kernel)
        
        p_value = compute_p_value(perm_stat, stat);
        return p_value

def compute_p_value( permuted_statistic, statistic):
        larger_num = 0.0
        for value in  permuted_statistic:
            larger_num += value >= statistic
        return (1.0 + larger_num) / (1.0 + permuted_statistic.size().astype(float))
    
def rearrange_matrix(dist_matrix,rearrange_index):
    new_matrix1 = np.zeros((rearrange_index.size(),rearrange_index.size()))
    new_matrix2 = np.zeros((rearrange_index.size(),rearrange_index.size()))
    k = 0
    for index in rearrange_index:
        new_matrix1[k] = dist_matrix[index]
        new_matrix2[k] = dist_matrix[index]
        k = k+ 1
    
    k = 0
    for index in rearrance_index:
        for i in range(dist_matrix.size()):
            new_matrix1[i][k] = new_matrix2[i][index];
        k = k+1
    
    return new_matrix1

def generate_random_sample_index(replication_number, probability_matrix,random_number_generator):
    random_sample_index = np.zeros((replication_number,probability_matrix.size()))
    for i in range(probability_matrix.size()):
        for j in range(replication_number):
            random_sample_index[j][i] = sample_multinomial_distribution(probability_matrix[i], random_number_generator)
    return random_sample_index;

def sample_mulitnomial_distribution(prob_array, random_number_generator):
    arr = random_number_generator.multinomial(1,prob_array)
    index = np.where(arr == 1)
    return index[0]