# Comparing the Divervgence Between Data Distributions
The goal is to show that the data distribution shifts much more rapidly for change in depth compared to change in saturation. If that is the case, it becomes really difficult for any transformation to exist that can reliably distinguish changes in saturation agnostic of depth. For the distribution metric, I would want to use KLD for now. But something else might also work. KLD Formula used: Assuming P and Q are the two distributions(normalized)
$$
KLD(P || Q) = \sum_{P \neq 0, Q\neq 0} (P \times log(P / Q))
$$
For now, we are disregarding the 0 values.

## Defining the KLD function

In [32]:
# Find distribution ranges per dimension
from typing import List, Tuple, Callable
import numpy as np
from scipy.ndimage import gaussian_filter


def find_ranges(dist1: np.ndarray, dist2: np.ndarray) -> List[Tuple[float, float]]:
    assert dist1.shape[1] == dist2.shape[1], "The two distributions must have the same number of dimensions"
    ranges = []
    for i in range(dist1.shape[1]):
        min_val = min(np.min(dist1[:, i]), np.min(dist2[:, i]))
        max_val = max(np.max(dist1[:, i]), np.max(dist2[:, i]))
        ranges.append((min_val, max_val))
    return ranges


DistanceFunctionOutputType = Callable[[np.ndarray, np.ndarray, np.ndarray], float]


distance_functions: List[DistanceFunctionOutputType] = []


def register_distance_function(func: DistanceFunctionOutputType):
    distance_functions.append(func)
    return func


@register_distance_function
def custom_kld(hist1: np.ndarray, hist2: np.ndarray, non_zero_mask: np.ndarray) -> float:
    """
    A customized version of the Kullback-Leibler Divergence that ignores zero bins in either distribution
    :param hist1: The first distribution
    :param hist2: The second distribution
    :param non_zero_mask: A mask that is True where both distributions are non-zero
    """
    # Assert that the dimensions match
    assert hist1.shape == hist2.shape == non_zero_mask.shape, "All inputs must have the same shape"
    result = np.sum(hist1[non_zero_mask] * np.log(hist1[non_zero_mask] / hist2[non_zero_mask]))
    return float(result)


@register_distance_function
def true_distance(hist1: np.ndarray, hist2: np.ndarray, non_zero_mask: np.ndarray) -> float:
    """
    Calculates the true distance between two distributions. This distance is defined as the sum of the absolute
    differences between each of the bins of the two distributions
    :param hist1: The first distribution
    :param hist2: The second distribution
    :param non_zero_mask: A mask that is True where both distributions are non-zero
    """
    return np.sum(np.abs(hist1 - hist2), axis=None)


@register_distance_function
def total_variation_distance(hist1: np.ndarray, hist2: np.ndarray, non_zero_mask: np.ndarray) -> float:
    """
    Calculates the total variation distance between two distributions. This distance is defined as the maximum
    absolute difference between each of the bins of the two distributions
    :param hist1: The first distribution
    :param hist2: The second distribution
    :param non_zero_mask: A mask that is True where both distributions are non-zero
    """
    return np.max(np.abs(hist1 - hist2), axis=None)


@register_distance_function
def kld_smoothened(hist1: np.ndarray, hist2: np.ndarray, non_zero_mask: np.ndarray) -> float:
    """
    Calculates the Kullback-Leibler Divergence between two distributions. The distributions are smoothened before
    calculation to avoid collapse in KLD formula
    :param hist1: The first distribution
    :param hist2: The second distribution
    :param non_zero_mask: A mask that is True where both distributions are non-zero
    """
    # Smoothen
    hist1 = gaussian_filter(hist1, sigma=1)
    hist2 = gaussian_filter(hist2, sigma=1)
    # Renormalize the histograms
    hist1 = hist1 / np.sum(hist1)
    hist2 = hist2 / np.sum(hist2)
    return float(np.sum(hist1 * np.log(hist1 / hist2)))


@register_distance_function
def kld_biased(hist1: np.ndarray, hist2: np.ndarray, non_zero_mask: np.ndarray) -> float:
    """
    Calculates the Kullback-Leibler Divergence between two distributions. This version of the KLD adds a tiny epsilon to
    zero terms to prevent collapse
    :param hist1: The first distribution
    :param hist2: The second distribution
    :param non_zero_mask: A mask that is True where both distributions are non-zero
    """
    epsilon = 1e-30
    bias_matrix1 = np.ones_like(hist1) * epsilon
    bias_matrix1 = bias_matrix1 * (hist1 == 0)
    bias_matrix2 = np.ones_like(hist2) * epsilon
    bias_matrix2 = bias_matrix2 * (hist2 == 0)
    hist1 = hist1 + bias_matrix1
    hist2 = hist2 + bias_matrix2
    # Renormalize the histograms
    hist1 = hist1 / np.sum(hist1)
    hist2 = hist2 / np.sum(hist2)
    return float(np.sum(hist1 * np.log(hist1 / hist2)))

In [33]:
# Normalization Functions
def minmax_normalization(data: np.ndarray) -> np.ndarray:
    """
    Normalizes the data to the range [-1 1]
    :param data: The data to normalize
    """
    data = np.abs(data)
    return (data - np.min(data, axis=0)) / (np.max(data, axis=0) - np.min(data, axis=0)) * 2 - 1

In [34]:
def distribution_distance(
    dist1: np.ndarray,
    dist2: np.ndarray,
    bin_count: int = 20,
    distance_function: DistanceFunctionOutputType = custom_kld,
    normalization_func: Callable[[np.ndarray], np.ndarray] = lambda x: x,
    verbose: bool = False,
) -> float:
    """
    Calculate the distance between two distributions using a custom distance function
    
    :param dist1: The first set of D dimensional data (N x D)
    :param dist2: The second set of D dimensional data (N x D)
    :param bin_count: The number of bins to use for the histogram
    :param distance_function: The distance function to use on the normalized histograms
    :param normalization_func: The normalization function to use on the data before calculating the histogram
    :verbose: Whether to print out results from the intermediate steps
    """
    # Normalize the Data
    dist1 = normalization_func(dist1)
    dist2 = normalization_func(dist2)
    
    # Find Values Ranges for both distributions combined
    value_ranges = find_ranges(dist1, dist2)
    if verbose:
        print(f"Value Ranges [(x_min, x_max), .. ]: {value_ranges}")
    
    # Create a histogram of the two distributions
    hist1, _ = np.histogramdd(dist1, bins=bin_count, range=value_ranges)
    hist2, _ = np.histogramdd(dist2, bins=bin_count, range=value_ranges)
    
    # Normalize the histograms to sum up to 1
    hist1 = hist1 / np.sum(hist1)
    hist2 = hist2 / np.sum(hist2)

    # Create the mask
    non_zero_mask = (hist1 > 0) & (hist2 > 0)
    if verbose:
        print("Non-Zero Mask Length (AND condition):", np.sum(non_zero_mask))
    # Compute the KLD
    distance = distance_function(hist1, hist2, non_zero_mask)
    return distance


# Test Case
# dist1 = np.exp(np.random.rand(1000, 3))
dist1 = np.random.rand(1000, 3) * 4
dist2 = np.random.rand(1000, 3)
print(distribution_distance(dist1, dist2, 10, distance_function=true_distance, verbose=True))

Value Ranges [(x_min, x_max), .. ]: [(0.0006827445785481112, 3.997011158927622), (0.0007516782886578532, 3.998653870547394), (0.0005716123686845265, 3.9976904488410834)]
Non-Zero Mask Length (AND condition): 20
1.9320000000000004


# Load Simulation Data

In [35]:
from pathlib import Path
import json
import pandas as pd

# Load the log intensity data
pulsation_ratio_path = Path().resolve().parent.parent / "data" / "processed_data" / "pulsation_ratio.pkl"
data = pd.read_pickle(pulsation_ratio_path)
pulsation_ratio_config_path = pulsation_ratio_path.with_suffix('.json')
with open(pulsation_ratio_config_path, 'r') as file:
    pulsation_ratio_config = json.load(file)
pr_columns = pulsation_ratio_config['features']

In [36]:
# Way too many pulsation ratio features. Keep only 5 per wavelength - 10 in total
chosen_sdds = ['10', '33', '50', '72', '94']
filtered_columns = [col for col in pr_columns if col.split('_')[-1] in chosen_sdds]
filtered_columns = filtered_columns[:5] # WV1
# filtered_columns += ["Fetal Saturation"]
print("Length of filtered columns:", len(filtered_columns))

Length of filtered columns: 5


In [37]:
# metric_to_use = kld_biased
metric_to_use = true_distance
normalization_func = minmax_normalization
# normalization_func = lambda x: x        # No normalization

# Calculate DIstribution Shift For Changing Depth

In [38]:
all_depths = data['Maternal Wall Thickness'].unique()
all_depths.sort()

depth0_data = (data[data['Maternal Wall Thickness'] == all_depths[0]])[filtered_columns].to_numpy()
depth1_data = (data[data['Maternal Wall Thickness'] == all_depths[1]])[filtered_columns].to_numpy()

distribution_distance(np.abs(depth0_data), np.abs(depth1_data), 20, metric_to_use, normalization_func, verbose=True)

Value Ranges [(x_min, x_max), .. ]: [(-1.0, 1.0), (-1.0, 1.0), (-1.0, 1.0), (-1.0, 1.0), (-1.0, 1.0)]
Non-Zero Mask Length (AND condition): 51


1.9865319865319875

# Calculate Distribution Shift For Changing Saturation

In [39]:
all_fetal_sat = data['Fetal Saturation'].unique()
all_fetal_sat.sort()

fetal_sat0_data = (data[data['Fetal Saturation'] == all_fetal_sat[0]])[filtered_columns].to_numpy()
fetal_sat1_data = (data[data['Fetal Saturation'] == all_fetal_sat[2]])[filtered_columns].to_numpy()

distribution_distance(np.abs(fetal_sat0_data), np.abs(fetal_sat1_data), 20, metric_to_use, normalization_func, verbose=True)

Value Ranges [(x_min, x_max), .. ]: [(-1.0, 1.0), (-1.0, 1.0), (-1.0, 1.0), (-1.0, 1.0), (-1.0, 1.0)]
Non-Zero Mask Length (AND condition): 744


0.14182144020853693

# Calculate Distribution Shift For Changing Saturation in a Fixed Depth

In [40]:
chosen_depth = all_depths[4]

fetal_sat0_data = (data[(data['Fetal Saturation'] == all_fetal_sat[0]) & (data["Maternal Wall Thickness"] == chosen_depth)])[filtered_columns].to_numpy()
fetal_sat1_data = (data[(data['Fetal Saturation'] == all_fetal_sat[1]) & (data["Maternal Wall Thickness"] == chosen_depth)])[filtered_columns].to_numpy()

# distribution_distance(np.abs(fetal_sat0_data), np.abs(fetal_sat1_data), 20, distance_function=metric_to_use, verbose=True)
distribution_distance(fetal_sat0_data, fetal_sat1_data, 20, metric_to_use, normalization_func, verbose=True)

Value Ranges [(x_min, x_max), .. ]: [(-1.0, 1.0), (-1.0, 1.0), (-1.0, 1.0), (-1.0, 1.0), (-1.0, 1.0)]
Non-Zero Mask Length (AND condition): 218


1.4040404040404084