In [None]:
import numpy as np
import math
import matplotlib.pyplot as plt
from math import log2
import pandas as pd
import itertools
from matplotlib.collections import LineCollection
import matplotlib.patches as mpatches
from itertools import product


s=1985

# PREPARE DATA TO BE DISCRETE

In [None]:
def divide_lists_into_bins(n_bins, *args):
    """Divides lists of continuous values into n bins.

    Args:
        n_bins: The number of bins to divide the values into.
        *args: Variable number of lists containing continuous values.

    Returns:
        - A list of tuples, where each tuple contains the bin indices
          for corresponding elements across the input lists.
        - A list of bin indices if a single list is provided.
    """

    all_values = np.concatenate(args)
    min_value = all_values.min()
    max_value = all_values.max()

    bin_edges = np.linspace(min_value, max_value, n_bins + 1)

    bin_indices = [np.digitize(list_values, bin_edges) - 1 for list_values in args]

    # Check if we have a single list
    if len(args) == 1:
        return bin_indices[0]  # Return list of bin indices directly
    else:
        return list(zip(*bin_indices))

In [None]:
def Lists2Tuple(*args):
  """
  Creates a list of tuples, where each tuple contains corresponding elements from the input lists.

  Args:
      *args: A variable number of lists, each containing discrete values.

  Returns:
      A list of tuples, where each tuple represents a combination of corresponding elements
      from the input lists.
  """

  return list(zip(*args))

In [None]:
def create_subsets(list_of_lists):
    subsets = []
    for i in range(1, len(list_of_lists) + 1):  # Start from 1
        for subset in itertools.combinations(list_of_lists, i):
            subsets.append(list(subset))
    return subsets

In [None]:
def list_of_lists_to_tuples(list_of_lists):
  """Converts a list of lists to a list of tuples.

  Args:
      list_of_lists: A list of lists of integers.

  Returns:
      A list of tuples, where each tuple contains elements from the corresponding
      sublists in the input list_of_lists. If the input list contains a single sublist,
      the output list will contain tuples with a single element.
  """

  if len(list_of_lists) == 1:
    return [item for item in list_of_lists[0]]
  else:
    return list(zip(*list_of_lists))



# CREATING METRICS

## Probabilities

In [None]:
def probability(X):
  counts={}
  total=len(X)
  for x in X:
    counts[x]=counts.get(x, 0) + 1

  probs={count:x /total for count,x in counts.items()}
  return probs



In [None]:
def joint_probability_List(*args):
  """
  Calculates the joint probability distribution of multiple variables.

  Args:
      *args: A variable number of lists, each representing a single variable's
             possible values.

  Returns:
      A dictionary where keys are tuples representing combinations of values (one
      value from each input list), and values are the corresponding joint probabilities.
  """

  counts = {}  # A dictionary to store counts of each combination
  total_combos = len(args[0])  # Total number of possible combinations

  for tup in list(zip(*args)):
      # Iterate over combinations of values from the input lists

      key_tuple = tuple(tup)  # Create a tuple to represent the combination
      counts[key_tuple] = counts.get(key_tuple, 0) + 1
      # Increment count for the observed combination

  joint_probs = {tup: count / total_combos for tup, count in counts.items()}
  # Calculate joint probabilities by dividing counts by total combinations

  return joint_probs

In [None]:
def joint_probability_local(*args, local_values):
    """Calculates the joint probability of a specific combination of values."""

    # Calculate the joint probability distribution directly
    joint_probs = joint_probability_List(*args)

    # Find the probability of the specific combination
    return joint_probs.get(local_values, 0.0)

In [None]:
def probability_of_x_given_y(X, Y):
    counts = {}
    # Create an empty dictionary to store counts of x for each unique y

    for x, y in zip(X, Y):
        if y not in counts:
            counts[y] = {}
            # If y is not already a key in counts, initialize it with an empty dictionary

        if x not in counts[y]:
            counts[y][x] = 0
            # If x is not already a key in counts[y], initialize it with a count of 0

        counts[y][x] += 1
        # Increment the count of x for the corresponding y

    probabilities = {}
    # Create a dictionary to store probabilities

    for y, x_counts in counts.items():
        total_count = sum(x_counts.values())
        # Calculate the total count of x values for a particular y

        probabilities[y] = {x: count / total_count for x, count in x_counts.items()}
        # Calculate the probability of each x value given a specific y value.
        # This is done by dividing the count of each x value by the total count of x values for that y.

    return probabilities



In [None]:
def conditional_probability(X,Y,Z, x_value, y_value, z_value):
  """
  Calculates the conditional probability of X given Y and Z.

  Args:
      data (pd.DataFrame): A DataFrame containing three columns: 'X', 'Y', and 'Z'.
      x_value: The value of X for which to calculate the probability.
      y_value: The value of Y to condition on.
      z_value: The value of Z to condition on.

  Returns:
      float: The conditional probability of X given Y and Z.
  """
  data = pd.DataFrame({'X':X, 'Y': Y, 'Z': Z})

  # Filter data for specific Y and Z values
  filtered_data = data[(data['Y'] == y_value) & (data['Z'] == z_value)]

  # Check if there are any matching Y and Z values
  if filtered_data.empty:
    return 0

  # Calculate the probability of X given Y and Z
  probability = (filtered_data['X'] == x_value).mean()
  return probability

In [None]:
def prob_Xx_given_Yy(X, Y, x, y):
    """
    Calculates the conditional probability of event x occurring given that event y has occurred,
    where X and Y are lists of events (or event combinations).

    Args:
      X: A list of event combinations (represented as tuples).
      Y: A list of corresponding events (represented as tuples).
      x: The specific event combination we're interested in.
      y: The conditioning event.

    Returns:
      The conditional probability P(x | y)
    """

    # Calculate counts directly using list comprehensions
    xy_count = sum(a == x and b == y for a, b in zip(X, Y))
    y_count = sum(b == y for b in Y)
    return xy_count / y_count if y_count else 0 # Handle zero probability case

## Entropy

In [None]:
def entropy(X):

  # Count the occurrences of each unique value in the sample
  unique, counts = np.unique(X, return_counts=True)
  # Calculate the probabilities of each unique value
  p = counts / len(X)
  # Handle zero probabilities (add small value for numerical stability)
  p[p == 0] = np.finfo(float).eps
  # Calculate the entropy
  return -np.sum(p * np.log2(p))  # Base 2 logarithm for binary entropy



In [None]:

def conditional_entropy(X, Y):
    joint_probs = joint_probability_List(X, Y)
    prob_y = probability(Y)
    entropy = 0.0
    for key, value in joint_probs.items():
      py=prob_y[key[1]]
      entropy-=value*math.log(value/py,2)
    return entropy


In [None]:
def conditional_entropy_List(X, *args):
    joint_probs = joint_probability_List(X, *args)
    prob_arg = joint_probability_List(*args)
    entropy = 0.0
    for key, value in joint_probs.items():
      parg=prob_arg[key[1:]]
      entropy-=value*math.log(value/parg,2)
    return entropy

In [None]:
def joint_entropy(X, Y):
    joint_probs = joint_probability_List(X, Y)
    entropy = 0.0
    for key, value in joint_probs.items():
      entropy-=value*math.log(value,2)
    return entropy


In [None]:
def joint_entropy_List(*args):
  jp=joint_probability_List(*args)
  entropy = 0.0
  for prob in jp.values():
    entropy -= prob * math.log2(prob)
  return entropy

## Mutual Information

In [None]:
def mutual_information(X,Y):
  return entropy(X)-conditional_entropy_List(X,Y)


In [None]:
def co_information(X,Y,Z):
  return entropy(X)+entropy(Y)+entropy(Z)-joint_entropy_List(X,Y)-joint_entropy_List(X,Z)-joint_entropy_List(Y,Z)+joint_entropy_List(X,Y,Z)

In [None]:
def local_co_information(X,Y,Z,x,y,z):
  pX=probability(X)
  px=pX.get(x, 0)
  pY=probability(Y)
  py=pY.get(y, 0)
  pZ=probability(Z)
  pz=pZ.get(z, 0)
  pxy=joint_probability_local(X,Y, local_values=(x,y))
  pxz=joint_probability_local(X,Z, local_values=(x,z))
  pyz=joint_probability_local(Y,Z, local_values=(y,z))
  pxyz=pxy=joint_probability_local(X,Y,Z, local_values=(x,y,z))
  result=-log2(px)-log2(py)-log2(pz)+log2(pxy)+log2(pxz)+log2(pyz)-log2(pxyz)
  return result

In [None]:
def mutual_informationX1X2Y(X1, X2, Y):
    res = 0
    PJ = joint_probability_List(X1, X2, Y)
    PX1X2 = joint_probability_List(X1, X2)
    PY = probability(Y)

    for x1, x2, y in set(product(X1, X2, Y)):
        pj = PJ.get((x1, x2, y), 0.0)
        py = PY.get(y, 0.0)
        px1x2 = PX1X2.get((x1, x2), 0.0)
        if pj != 0 and py != 0 and px1x2 != 0:
            res += pj*log2(pj/(py * px1x2))
    return res

In [None]:
def local_mutual_informationX1X2Y(X1,X2,Y, x1,x2,y):
  res=0
  PJ=joint_probability_List(X1,X2,Y)
  PX1X2=joint_probability_List(X1,X2)
  PY=probability(Y)
  pj=PJ.get((x1,x2,y), 0.0)
  py=PY.get(y, 0.0)
  px1x2=PX1X2.get((x1,x2), 0.0)
  if pj != 0 and py != 0 and px1x2 != 0: return log2(pj/(py*px1x2))
  return 0


In [None]:
def joint_mutual_information(X,Y,Z):
  return joint_entropy_List(X,Y)+entropy(Z)-joint_entropy_List(X,Y,Z)


In [None]:
def conditional_mutual_information(X,Y,Z):
  res = joint_entropy_List(X,Z)+joint_entropy_List(Y,Z)-joint_entropy_List(X,Y,Z)-entropy(Z)
  return res

In [None]:
def local_conditional_mutual_information(X, Y, Z, x, y, z):
  """
  Calculates the local conditional mutual information (LCMI) between random
  variables X and Y, conditioned on variable Z. This measures the amount
  of information shared by X and Y, specifically when Z takes on the value z.

  Args:
      X, Y, Z:  Names of the random variables.
      x, y, z:  Specific values taken by variables X, Y, and Z respectively.

  Returns:
      The Local Conditional Mutual Information value (float).
  """

  # Calculate the joint probabilities with the specified local conditions
  p_xy = joint_probability_local(X, Y, local_values=[x, y])
  p_yz = joint_probability_local(Y, Z, local_values=[y, z])
  p_xyz = joint_probability_local(X, Y, Z, local_values=[x, y, z])
  p_z = joint_probability_local(Z, local_values=[z])

  # Calculate and return the LCMI
  return -np.log2(p_xy) - np.log2(p_yz) + np.log2(p_xyz) + np.log2(p_z)

In [None]:
def local_mutual_information(X,Y,x,y):
  pX=probability(X)
  px=pX.get(x, 0)
  px_given_y=prob_Xx_given_Yy(X, Y, x, y)
  return log2(px_given_y/px)


## Total Correlation and Dual Total Correlation

In [None]:
def TC_List(*args):
  sum=0
  for X in args:
    sum+=entropy(X)
  return sum-joint_entropy_List(*args)

In [None]:
def DTC_List(*args):
  sum=0
  for i, arg in enumerate(args):
        other_args = args[:i] + args[i+1:]
        sum+=conditional_entropy_List(arg,*other_args)
  return joint_entropy_List(*args)-sum

In [None]:
def O_information(*args):
  return TC_List(*args)-DTC_List(*args)

In [None]:
def local_TC(X,Y,Z):
  pX=probability(X)
  pY=probability(Y)
  pZ=probability(Z)
  pXYZ=joint_probability_List(X,Y,Z)
  result=[]
  for x,y,z in zip(X,Y,Z):
    i=-(np.log2(pX.get(x))+np.log2(pY.get(y))+np.log2(pZ.get(z)))+np.log2(pXYZ.get((x,y,z)))
    result.append(i)
  return result


In [None]:
def local_DTC(X,Y,Z):
  pXYZ=joint_probability_List(X,Y,Z)
  result=[]
  for x,y,z in zip(X,Y,Z):
    i=-np.log2(pXYZ.get((x,y,z)))+np.log2(conditional_probability(X,Y,Z,x,y,z))+np.log2(conditional_probability(Y,Z,X,y,z,x))+np.log2(conditional_probability(Z,X,Y,z,x,y))
    result.append(i)
  return result

# Measuring Redundancy

In [None]:
def MinMI(X1,X2,Y):
  m1=mutual_information(X1,Y)
  m2=mutual_information(X2,Y)

  return min(m1,m2)

In [None]:
def BROJA(X1,X2,Y):
  m1=conditional_mutual_information(X1,Y,X2)
  m2=conditional_mutual_information(X2,Y,X1)

  return (m1+m2)/2

In [None]:
def local_BROJA(X1,X2,Y,x1,x2,y):
  m1=local_conditional_mutual_information(X1,Y,X2,x1,y,x2)
  m2=local_conditional_mutual_information(X2,Y,X1,x2,y,x1)

  return (m1+m2)/2

In [None]:
def local_Specific_information(S,s,A,a):
    pS=probability(S)
    ps=pS.get(s, 0)
    p_sa=prob_Xx_given_Yy(S, A, s, a)
    return (log2(1/ps)-log2(1/p_sa)) if p_sa else 0



In [None]:
def Specific_information(S,s,A):
  result=0
  for a in set(A):
    p_as=prob_Xx_given_Yy(A, S, a, s)
    result=result+p_as*local_Specific_information(S,s,A,a)
  return result

In [None]:
def Specific_information(S,s,A):
  pS=probability(S)
  ps=pS.get(s, 0)
  result=0
  for a in set(A):
    p_as=prob_Xx_given_Yy(A, S, a, s)
    p_sa=prob_Xx_given_Yy(S, A, s, a)
    if p_sa !=0:
      result=result+p_as*(log2(1/ps)-log2(1/p_sa))
  return result

In [None]:
def local_Imin(S,s,A,a):
    subsets_A=create_subsets(A)
    subsets_a=create_subsets(a)
    result=0
    min_SI = float('inf')
    for A_s, a_s in zip(subsets_A,subsets_a):
      A_tup= list_of_lists_to_tuples(A_s)
      if len(a_s) == 1:
        a_s=a_s[0]
      else:
        a_s=tuple(a_s)
      SI=local_Specific_information(S,s,A_tup,a_s)
      if SI<min_SI:
        min_SI=SI
    return min_SI


In [None]:
def Imin(S,*args):
  subsets_A=create_subsets(args)
  result=0
  for s in set(S):
    pS=probability(S)
    ps=pS.get(s, 0)
    min_SI = float('inf')
    for As in subsets_A:
      A_tup= list_of_lists_to_tuples(As)
      SI=Specific_information(S,s,A_tup)
      if SI<min_SI:
        min_SI=SI
        minA=A_tup
    result=result+ps*min_SI
  #print(minA)
  return result

In [None]:
Y = [2, 5, 5, 3, 2]
X1 = [1, 2, 2, 2, 4]
X2 = [0, 1, 1, 0, 1]
X3 = [4, 9, 1, 9, 4]

In [None]:
r=10
n=1000

Y = np.random.randint(0, r + 1, size=n)
X1 = np.random.randint(0, r + 1, size=n)
X2 = np.random.randint(0, r + 1, size=n)
X3 = np.random.randint(0, r + 1, size=n)

In [None]:
Specific_information(Y,3,list_of_lists_to_tuples([X3]))

0.0468790128967521

In [None]:
Imin(Y,X1,X2)

0.06454250979214894

In [None]:
local_Imin(Y,1,[X1,X2],(1,0))

-0.1381591452294706



# GENERATE TEST DATA-SET

In [None]:
def create_correlated_datasets(n, r, s, c):
    """
    Creates two datasets with a given correlation.

    Args:
        n (int): Number of elements in each dataset.
        r (int): Range of the numbers in the dataset (0 to r).
        s (int): Seed for the random number generator.
        c (float): The desired correlation coefficient (-1 to 1).

    Returns:
        tuple: A tuple containing the two correlated datasets as NumPy arrays.
    """

    np.random.seed(s)

    # Ensure correlation coefficient is within valid range
    if not -1 <= c <= 1:
        raise ValueError("Correlation coefficient 'c' must be between -1 and 1.")

    # Generate a base dataset
    x = np.random.randint(0, r + 1, size=n)

    # Create a correlated dataset based on the desired correlation
    if c == 0:
        # No correlation, just generate another independent dataset
        y = np.random.randint(0, r + 1, size=n)
    elif c == 1:
        y = x
    else:
        # Calculate the required standard deviation for the correlated data
        sigma_y = np.std(x) * np.sqrt(1 - c**2)

        # Generate correlated noise
        noise = np.random.normal(scale=sigma_y, size=n)

        # Create the correlated dataset
        y = c * x + noise
        y = np.round(y).astype(int)
        y = np.clip(y, 0, r)

    return x, y

In [None]:
def create_correlated_binary_datasets(n, s, c):
    np.random.seed(s)

    # Generate random binary dataset x
    x = np.random.binomial(1, 0.5, n)

    # Generate random noise for y
    noise = np.random.binomial(1, 0.5, n)

    # Calculate y based on correlation c with x
    y = np.where(noise, x, 1 - x)

    # Introduce correlation between x and y
    if c != 0:
        cov_matrix = np.array([[1, c], [c, 1]])
        x_y = np.random.multivariate_normal([0, 0], cov_matrix, size=n).T
        x, y = np.where(x_y > 0, 1, 0)

    return x, y

In [None]:
def generate_correlation_table_TEST(c, n, s,corr):

    df = pd.DataFrame(index=range(n))

    # Generate random variables X1, X2, ..., X_(c-1) and Y
    X1, X2 = create_correlated_datasets(n, 1, s, corr[0])
    Y=np.logical_and(X1,X2)
    Y=np.logical_xor(X1,X2)
    #Y=X1
    Y = np.clip(Y, 0, 1)

    # Assign data to DataFrame
    df['X1'] = X1
    df['X2'] = X2
    df['Y'] = Y
    return df

# PARTIAL INFORMATION DECOMPOSITION

In [None]:
def generate_correlation_table_TEST(c, n, s,corr):

    df = pd.DataFrame(index=range(n))

    # Generate random variables X1, X2, ..., X_(c-1) and Y
    X1, X2 = create_correlated_datasets(n, 1, s, corr[0])
    Y=np.logical_and(X1,X2)
    #Y=np.logical_xor(X1,X2)
    #Y=X1
    Y = np.clip(Y, 0, 1)

    # Assign data to DataFrame
    df['X1'] = X1
    df['X2'] = X2
    df['Y'] = Y
    return df

In [None]:
c = 3
n = 10
corr = [0,0]
test_data=generate_correlation_table_TEST(c, n, s,corr)
X1 = test_data.iloc[:, 0]
X2 = test_data.iloc[:, 1]
Y=  test_data.iloc[:, 2]
test_data

Unnamed: 0,X1,X2,Y
0,1,0,0
1,1,0,0
2,0,0,0
3,0,1,0
4,1,1,1
5,0,1,0
6,0,0,0
7,0,1,0
8,0,0,0
9,1,1,1


In [None]:
joint_probability_local(X1,X2,Y, local_values=(1,0,0))

0.2

In [None]:
def PID(X1,X2,Y):
  R=BROJA(X1,X2,Y)
  #print(R)
  R=MinMI(X1,X2,Y)
  #print(R)
  R=Imin(Y,X1,X2)
  #print(R)
  U1=mutual_information(X1,Y)-R
  U2=mutual_information(X2,Y)-R
  S=mutual_informationX1X2Y(X1,X2,Y)-R-U1-U2
  print("Redundant information R:", f"{R:.2f}")
  print("Unique information U1:", f"{U1:.2f}")
  print("Unique information U2:", f"{U2:.2f}")
  print("Synergetic information S:", f"{S:.2f}")
  #print("X1:", X1)
  #print("X2:", X2)
  #print("Y:", Y)
  return [R,U1,U2,S]

In [None]:
def PID_local(X1,X2,Y, x1,x2,y):
  R=local_Imin(Y,y,[X1,X2],(x1,x2))
  print(R)
  U1=local_mutual_information(X1,Y,x1,y)-R
  U2=local_mutual_information(X2,Y,x2,y)-R
  S=local_mutual_informationX1X2Y(X1,X2,Y, x1,x2,y)-R-U1-U2
  print("Redundant information R:", R)
  print("Unique information U1:", U1)
  print("Unique information U2:", U2)
  print("Synergetic information S:", S)
  #print("X1:", X1)
  #print("X2:", X2)
  #print("Y:", Y)
  return [R,U1,U2,S]

In [None]:
def PID_local_all(X1,X2,Y):
  result=[]
  for x1,x2,y in zip(X1,X2,Y):
     R=local_Imin(Y,y,[X1,X2],(x1,x2))
     U1=local_mutual_information(X1,Y,x1,y)-R
     U2=local_mutual_information(X2,Y,x2,y)-R
     S=local_mutual_informationX1X2Y(X1,X2,Y, x1,x2,y)-R-U1-U2
     result.append({'X1_X2_Y': (x1,x2,y),'Red':R,'UX1':U1,'UX2':U2,'SYN':S})
  return pd.DataFrame(result)


In [None]:
def Redundancy_local_spread(X1,X2,Y):
  triplets = zip(X1, X2, Y)
  # Create a set to automatically remove duplicates
  unique_pairs = set(triplets)
  # Iterate through unique pairs
  mean=0
  for x1, x2, y in unique_pairs:
    R=local_Imin(Y,y,[X1,X2],(x1,x2))
    p=joint_probability_local(X1,X2,Y, local_values=(x1,x2,y))
    mean=mean+p*R
    print(p)
    print(x1, x2, y)
    print("R=",R)
  print(mean)
  return

# EXPERIMENT

In [None]:
def generate_correlation_table_function(n, t_c, r,s, f):
    """
    Generates a table of Dual Total Correlations for different correlation coefficients.

    Args:
        n (int): Number of samples in X1 and X2.
        t_c (int): Number of correlation values
        r (int): Range for the values in X1 and X2 (0 to r).
        s=seed

    Returns:
        pandas.DataFrame: A table containing correlation coefficients, X1-X2
                          correlations, and Dual Total Correlations.
    """

    print(f)
    corrs = np.linspace(0, 1, t_c)  # Uniformly spaced correlation coefficients
    results = []
    for corr in corrs:
        print("NEXT EXPERIMENT")
        print(f"corr X1_X2={corr:.2f}")
        X1, X2 = create_correlated_binary_datasets(n, s, corr)
        true_corr = np.corrcoef(X1, X2)[0, 1]  # True correlation between X1 and X2
        print(f"true_corr X1_X2={true_corr:.2f}")
        X1, X2 = create_correlated_binary_datasets(n,s,corr)
        if f==1:
          Y=np.logical_and(X1,X2)
        if f==2:
          Y=np.logical_xor(X1,X2)
        if f==3:
          Y=np.random.randint(0, r + 1, size=n)
        if f==4:
          Y=X1
        Y=np.clip(Y, 0, 1).round()

        true_corr = np.corrcoef(X1, X2)[0, 1]  # True correlation between X1 and X2
        X1_Y_corr=np.corrcoef(X1, Y)[0, 1]
        X2_Y_corr=np.corrcoef(X2, Y)[0, 1]
        dtc = DTC_List(X1, X2, Y)
        tc= TC_List(X1,X2,Y)
        R=Imin(Y,X1,X2)
        U1=mutual_information(X1,Y)-R
        U2=mutual_information(X2,Y)-R
        S=mutual_informationX1X2Y(X1,X2,Y)-R-U1-U2
        tc_local=local_TC(X1,X2,Y)
        dtc_local=local_DTC(X1,X2,Y)
        o_local=[a - b for a, b in zip(tc_local, dtc_local)]
        PID_loc= PID_local_all(X1,X2,Y)

        results.append({'X1': X1, 'X2': X2, 'Y':Y, 'X1-X2 Correlation': true_corr, 'X1-Y Correlation': X1_Y_corr,'X2-Y Correlation': X2_Y_corr,'DTC': dtc, 'DTC_AVG': np.mean(dtc_local),'DTC_MED': np.median(dtc_local),
                        'DTC_Q1': np.percentile(dtc_local,25), 'DTC_Q3': np.percentile(dtc_local,75), 'TC': tc, 'TC_AVG': np.mean(tc_local),'TC_MED': np.median(tc_local),'TC_Q1': np.percentile(tc_local,25), 'TC_Q3': np.percentile(tc_local,75),
                        'OINF': tc-dtc, 'OINF_LOCAL': o_local, 'TC_STDV': np.std(tc_local), 'DTC_STDV': np.std(dtc_local), 'DTC_LOCAL': dtc_local, 'TC_LOCAL': tc_local, 'TC_SET': set(tc_local), 'DTC_SET':set(dtc_local), 'OINF_SET':set(o_local),'R':R, 'U1':U1, 'U2':U2,'S':S,
                        'R_LOCAL': PID_loc['Red'], 'UX1_LOCAL': PID_loc['UX1'],'UX2_LOCAL': PID_loc['UX2'], 'S_LOCAL': PID_loc['SYN'], 'R_SET': set(PID_loc['Red']), 'UX1_SET': set(PID_loc['UX1']),'UX2_SET': set(PID_loc['UX2']), 'S_SET': set(PID_loc['SYN']) })
        data = {
          'X1': X1,
          'X2': X2,
          'Y': Y,
          'R_LOCAL': PID_loc['Red'],
          'UX1_LOCAL': PID_loc['UX1'],
          'UX2_LOCAL': PID_loc['UX2'],
          'S_LOCAL': PID_loc['SYN']
        }
        df_overview=analyze_dataframe(pd.DataFrame(data))

        t=PID(X1,X2,Y)

        print("mutual information", f"{mutual_informationX1X2Y(X1,X2,Y):.2f}")
        print("\n\n")

    return pd.DataFrame(results)

In [None]:
def plot_PID_spread_function(df, figsize=(25, 4)):
  """
  Creates plots with two subplots: 'X1-X2 Correlation' vs 'DTC' and
  'X1-X2 Correlation' vs 'TC' for each w_AND value. Ensures shared y-axis range.

  Args:
      df (pandas.DataFrame): The DataFrame containing results.
      figsize (tuple, optional): The size of each figure. Defaults to (10, 4).
      marker_color (str, optional): The color for all markers. Defaults to 'red'.
  """
  ylim_min = -5
  ylim_max = 5

  # Create figure and subplots
  fig, (ax1, ax2, ax3, ax4) = plt.subplots(1, 4, figsize=figsize)

  # Add title to each subplot
  ax1.set_title("Redundant")
  ax2.set_title("Unique_X1")
  ax3.set_title("Unique_X2")
  ax4.set_title("Synergetic")

  # Plot 'X1-X2 Correlation' vs 'R'
  for index, row in df.iterrows():
    x_value = row['X1-X2 Correlation']
    y_values = row['R_SET']
    for y in y_values:
      ax1.scatter(x_value, y, color='blue')  # Set marker color
    ax1.scatter(x_value, row['R'], color='red')

  # Plot 'X1-X2 Correlation' vs 'UX1'
  for index, row in df.iterrows():
    x_value = row['X1-X2 Correlation']
    y_values = row['UX1_SET']
    for y in y_values:
      ax2.scatter(x_value, y, color='blue')  # Set marker color
    ax2.scatter(x_value, row['U1'], color='red')

  # Plot 'X1-X2 Correlation' vs 'UX2'
  for index, row in df.iterrows():
    x_value = row['X1-X2 Correlation']
    y_values = row['UX2_SET']
    for y in y_values:
      ax3.scatter(x_value, y, color='blue')  # Set marker color
    ax3.scatter(x_value, row['U2'], color='red')

  # Plot 'X1-X2 Correlation' vs 'S'
  for index, row in df.iterrows():
    x_value = row['X1-X2 Correlation']
    y_values = row['S_SET']
    for y in y_values:
      ax4.scatter(x_value, y, color='blue')  # Set marker color
    ax4.scatter(x_value, row['S'], color='red')

    # Set the same y-axis range for both subplots
    #ax1.set_ylim(ylim_min, ylim_max)
    #ax2.set_ylim(ylim_min, ylim_max)

    # Add axis labels
    ax1.set_xlabel('X1-X2 Correlation')
    ax1.set_ylabel('R')
    ax2.set_xlabel('X1-X2 Correlation')
    ax2.set_ylabel('UX1')
    ax3.set_xlabel('X1-X2 Correlation')
    ax3.set_ylabel('UX2')
    ax4.set_xlabel('X1-X2 Correlation')
    ax4.set_ylabel('S')

  plt.show()

In [None]:
def analyze_dataframe(df_data):
  """
  Analyzes a DataFrame containing features X1, X2, Y, tc_local, and dtc_local,
  calculates descriptive statistics for each unique combination,
  and returns a new DataFrame with the results, formatting tc_local and dtc_local to 2 decimals.

  Args:
      df_data (pandas.DataFrame): The DataFrame containing the data.

  Returns:
      pandas.DataFrame: A new DataFrame with the following columns, formatted as specified:
          - X1/X2/Y: Unique combination of features.
          - Count (%): Percentage occurrence of the combination (normalized).
          - tc_local: Average value of tc_local for the combination, formatted to 2 decimals.
          - dtc_local: Average value of dtc_local for the combination, formatted to 2 decimals.
  """

  # Group by X1, X2, and Y for efficient calculations
  grouped_data = df_data.groupby(['X1', 'X2', 'Y'])

  # Calculate descriptive statistics with appropriate functions
  results = grouped_data.agg(
      Count=('X1', 'size'),  # Count occurrences (faster than nrows)
      R_local=('R_LOCAL', 'mean'),
      UX1_local=('UX1_LOCAL', 'mean'),
      UX2_local=('UX2_LOCAL', 'mean'),
      S_local=('S_LOCAL', 'mean'),

  ).reset_index()

  # Normalize Count to percentage
  results['Count (%)'] = (results['Count'] / len(df_data)) * 100

  # Custom formatting function for string representation (2 decimals)
  def format_two_decimals(value):
      return f"{value:.2f}"

  # Format tc_local and dtc_local columns using list comprehension
  results[['R_local', 'UX1_local', 'UX2_local', 'S_local']] =   results[['R_local', 'UX1_local', 'UX2_local', 'S_local']] .applymap(format_two_decimals)

  # Reorder columns as requested
  results = results[['X1', 'X2', 'Y', 'Count (%)', 'R_local', 'UX1_local', 'UX2_local', 'S_local']]

  # Print the DataFrame with clear column titles for readability
  print("Analysis Results:")
  print(results)

  return results

In [None]:
n= 400
t_c=20
r=1

In [None]:
df=generate_correlation_table_function(n, t_c, r,s,f=1)
plot_PID_spread_function(df)

1
NEXT EXPERIMENT
corr X1_X2=0.00
true_corr X1_X2=0.02
Analysis Results:
   X1  X2  Y  Count (%) R_local UX1_local UX2_local S_local
0   0   0  0      28.00    0.38     -0.00     -0.00    0.00
1   0   1  0      22.50   -0.64      1.02     -0.00    0.00
2   1   0  0      26.25   -0.53      0.00      0.92    0.00
3   1   1  1      23.25    1.01      0.00      0.11    0.98
Redundant information R: 0.29
Unique information U1: -0.00
Unique information U2: 0.04
Synergetic information S: 0.46
mutual information 0.78



NEXT EXPERIMENT
corr X1_X2=0.05
true_corr X1_X2=0.11
Analysis Results:
   X1  X2  Y  Count (%) R_local UX1_local UX2_local S_local
0   0   0  0       27.0    0.48     -0.00     -0.00    0.00
1   0   1  0       24.5   -0.63      1.11     -0.00    0.00
2   1   0  0       20.0   -0.79      0.00      1.28    0.00
3   1   1  1       28.5    0.92      0.13     -0.00    0.77
Redundant information R: 0.33
Unique information U1: 0.05
Unique information U2: -0.00
Synergetic information S

In [None]:
df=generate_correlation_table_function(n, t_c, r,s,f=2)
plot_PID_spread_function(df)

In [None]:
df=generate_correlation_table_function(n, t_c, r,s,f=3)
plot_PID_spread_function(df)

In [None]:
df=generate_correlation_table_function(n, t_c, r,s,f=4)
plot_PID_spread_function(df)