In [43]:
import numpy as np
from sklearn.neighbors import KernelDensity
import torch
import sys
import os
import pickle
from get_data import get_dataloader # noqa
sys.path.append(os.path.join(os.getcwd(), '..'))
from unimodals.common_models import MLP # noqa
from training_structures.unimodal import train, test # noqa


In [2]:
def estimate_density(data, bandwidth=1.0, kernel='gaussian'):
    kde = KernelDensity(kernel=kernel, bandwidth=bandwidth)
    kde.fit(data)
    return kde

def get_log_density(kde_model, input_data):
    """
    Get the probability density of an arbitrary 64-dimensional input.

    Parameters:
    kde_model: Trained KernelDensity model.
    input_data (np.ndarray): A single 64-dimensional input, or multiple inputs (n_samples, 64).

    Returns:
    density (np.ndarray): The estimated probability density for the input.
    """
    
    # Ensure input data has the correct number of dimensions
    if input_data.ndim == 1:
        input_data = input_data.reshape(1, -1)

    # Estimate log density for the input
    log_density = kde_model.score_samples(input_data)

    # # Convert log density to regular density
    # density = np.exp(log_density)

    return log_density

In [71]:
kdes = {}
label_pdf = {}
setting = 'redundancy'
with open(f'experiments/DATA_{setting}.pickle', 'rb') as f:
    data = pickle.load(f)['train']
    for modality in {'0', '1'}:
        data_mod = data[modality]
        kde = estimate_density(data_mod)
        kdes[modality] = kde
    

    label_count = np.bincount(data['label'].flatten())
    # convert label_count to log distribution
    label_probs = np.log(label_count / np.sum(label_count))
    label_pdf = {i: label_probs[i] for i in range(len(label_probs))}

print(label_pdf)


{0: -3.4645936982209498, 1: -3.4645936982209498, 2: -3.4645936982209498, 3: -3.4645936982209498, 4: -3.4668794135018053, 5: -3.4645936982209498, 6: -3.4668794135018053, 7: -3.4668794135018053, 8: -3.4668794135018053, 9: -3.4668794135018053, 10: -3.4668794135018053, 11: -3.4645936982209498, 12: -3.4668794135018053, 13: -3.4668794135018053, 14: -3.4645936982209498, 15: -3.4645936982209498, 16: -3.4645936982209498, 17: -3.4668794135018053, 18: -3.4645936982209498, 19: -3.4668794135018053, 20: -3.4668794135018053, 21: -3.4668794135018053, 22: -3.4668794135018053, 23: -3.4645936982209498, 24: -3.4645936982209498, 25: -3.4645936982209498, 26: -3.4645936982209498, 27: -3.4668794135018053, 28: -3.4668794135018053, 29: -3.4645936982209498, 30: -3.4645936982209498, 31: -3.4668794135018053}


In [73]:
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
encoders = {}
heads = {}

saved_model = f'experiments/{setting}/{setting}_unimodal'

for modality in {'0', '1'}:
    saved_encoder = saved_model + '{}_encoder.pt'.format(modality)
    saved_head = saved_model + '{}_head.pt'.format(modality)
    encoders[modality] = torch.load(saved_encoder).to(device)
    heads[modality] = torch.load(saved_head).to(device)

print(encoders, heads)



{'0': MLP(
  (fc): Linear(in_features=64, out_features=512, bias=True)
  (fc2): Linear(in_features=512, out_features=600, bias=True)
  (dropout_layer): Dropout(p=0.1, inplace=False)
  (lklu): LeakyReLU(negative_slope=0.2)
), '1': MLP(
  (fc): Linear(in_features=64, out_features=512, bias=True)
  (fc2): Linear(in_features=512, out_features=600, bias=True)
  (dropout_layer): Dropout(p=0.1, inplace=False)
  (lklu): LeakyReLU(negative_slope=0.2)
)} {'0': MLP(
  (fc): Linear(in_features=600, out_features=512, bias=True)
  (fc2): Linear(in_features=512, out_features=32, bias=True)
  (dropout_layer): Dropout(p=0.1, inplace=False)
  (lklu): LeakyReLU(negative_slope=0.2)
), '1': MLP(
  (fc): Linear(in_features=600, out_features=512, bias=True)
  (fc2): Linear(in_features=512, out_features=32, bias=True)
  (dropout_layer): Dropout(p=0.1, inplace=False)
  (lklu): LeakyReLU(negative_slope=0.2)
)}


In [75]:
from torch import nn
softmax = nn.Softmax(dim=-1)




with open(f'experiments/DATA_{setting}.pickle', 'rb') as f:
    r_tot = 0
    data = pickle.load(f)['test']
    #compute mutual information between data['0'] and data['label']
    
    for i in range(len(data['0'])):
        #print(get_log_density(kdes['0'], data['0'][i]), get_log_density(kdes['1'], data['1'][i]))
        r_p = -max(get_log_density(kdes['0'], data['0'][i]), get_log_density(kdes['1'], data['1'][i]))

        model = nn.Sequential(encoders['0'], heads['0'])
        model = model.to(device)
        #print(data['label'][i].shape)
        out = model(torch.from_numpy(data['0'][i]).float().to(device))
        #print(torch.log(softmax(out)[data['label'][i][0]]).item(),label_pdf[data['label'][i][0]],get_log_density(kdes['0'], data['0'][i]))
        r_m_0 = -torch.log(softmax(out)[data['label'][i][0]]).item() + label_pdf[data['label'][i][0]] - get_log_density(kdes['0'], data['0'][i])
        
        model = nn.Sequential(encoders['1'], heads['1'])
        model = model.to(device)
        out = model(torch.from_numpy(data['1'][i]).float().to(device))
        r_m_1 = -torch.log(softmax(out)[data['label'][i][0]]).item() + label_pdf[data['label'][i][0]] - get_log_density(kdes['1'], data['1'][i])

        r_m = min(r_m_0, r_m_1)

        #print(r_p[0], r_m[0], r_p[0] - r_m[0])
        r_tot += r_p[0] - r_m[0]
        if i % 100 == 0:
            print(i,r_tot/(i+1))
    print(r_tot/len(data['0']))

0 1.7683794991547899
100 0.8228630903860259
200 0.9016114178574897
300 0.8366690431806286
400 0.8116699778711653
500 0.8160218091528525
600 0.8376742147470094
700 0.8417209950635236
800 0.849901790348339
900 0.8477636928517439
1000 0.8577006669988861
1100 0.8394662120899146
1200 0.8385568759383508
1300 0.817681023963302
1400 0.8204898410844053
1500 0.8036135105683314
1600 0.8071484738401669
1700 0.7992397272156517
1800 0.7999230824637392
1900 0.8085904178745592
2000 0.8100914645130783
2100 0.8054932015250031
2200 0.8050300144242835
2300 0.8022557010103539
2400 0.8167577640181217
2500 0.8225909768265989
2600 0.8200791220069193
2700 0.825026299731818
2800 0.828528653761138
2900 0.8281742129631645
0.8299158114354036


In [62]:
from sklearn.feature_selection import mutual_info_classif

print(mutual_info_classif(data['0'], data['label']))

  y = column_or_1d(y, warn=True)


[0.00000000e+00 0.00000000e+00 5.28368569e-03 2.79933609e-03
 0.00000000e+00 0.00000000e+00 2.02423294e-03 0.00000000e+00
 7.18903370e-03 2.34222098e-03 7.20770169e-04 3.49873650e-03
 0.00000000e+00 9.00163278e-03 0.00000000e+00 1.75211585e-03
 0.00000000e+00 2.70416117e-03 0.00000000e+00 0.00000000e+00
 0.00000000e+00 0.00000000e+00 3.79431900e-03 3.65050921e-03
 0.00000000e+00 7.38596608e-03 1.76865524e-03 3.33311401e-04
 0.00000000e+00 8.51585707e-03 8.30891574e-03 0.00000000e+00
 7.30608258e-01 3.92350783e-01 5.26249432e-01 3.63501193e-01
 5.30752023e-01 6.62302108e-01 3.72255246e-01 5.46606729e-01
 4.02091675e-01 4.88761401e-01 4.59198093e-01 3.55607873e-01
 5.15717008e-01 2.50008520e-01 4.68811705e-01 4.42193052e-01
 5.01751442e-01 6.96141277e-01 4.73771180e-01 4.30744400e-01
 6.15738100e-01 4.31457076e-01 5.53663045e-01 2.67632752e-01
 5.40123196e-01 6.84772192e-01 7.16779286e-01 5.74439618e-01
 2.68479110e-01 5.92062233e-01 6.35010892e-01 5.76291530e-01]


In [89]:
def compute_pw_redundancy(setting):
    kdes = {}
    label_pdf = {}
    with open(f'experiments/DATA_{setting}.pickle', 'rb') as f:
        data = pickle.load(f)['train']
        for modality in {'0', '1'}:
            data_mod = data[modality]
            kde = estimate_density(data_mod)
            kdes[modality] = kde
        

        label_count = np.bincount(data['label'].flatten())
        # convert label_count to log distribution
        label_probs = np.log(label_count / np.sum(label_count))
        label_pdf = {i: label_probs[i] for i in range(len(label_probs))}

    device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
    encoders = {}
    heads = {}

    saved_model = f'experiments/{setting}/{setting}_unimodal'

    for modality in {'0', '1'}:
        saved_encoder = saved_model + '{}_encoder.pt'.format(modality)
        saved_head = saved_model + '{}_head.pt'.format(modality)
        encoders[modality] = torch.load(saved_encoder).to(device)
        heads[modality] = torch.load(saved_head).to(device)

    softmax = nn.Softmax(dim=-1)

    with open(f'experiments/DATA_{setting}.pickle', 'rb') as f:
        r_tot = 0
        data = pickle.load(f)['test']
        #compute mutual information between data['0'] and data['label']
        
        for i in range(len(data['0'])):
            #print(get_log_density(kdes['0'], data['0'][i]), get_log_density(kdes['1'], data['1'][i]))
            r_p = -max(get_log_density(kdes['0'], data['0'][i]), get_log_density(kdes['1'], data['1'][i]))

            model = nn.Sequential(encoders['0'], heads['0'])
            model = model.to(device)
            #print(data['label'][i].shape)
            out = model(torch.from_numpy(data['0'][i]).float().to(device))
            #print(torch.log(softmax(out)[data['label'][i][0]]).item(),label_pdf[data['label'][i][0]],get_log_density(kdes['0'], data['0'][i]))
            r_m_0 = -torch.log(softmax(out)[data['label'][i][0]]).item() + label_pdf[data['label'][i][0]] - get_log_density(kdes['0'], data['0'][i])
            
            model = nn.Sequential(encoders['1'], heads['1'])
            model = model.to(device)
            out = model(torch.from_numpy(data['1'][i]).float().to(device))
            r_m_1 = -torch.log(softmax(out)[data['label'][i][0]]).item() + label_pdf[data['label'][i][0]] - get_log_density(kdes['1'], data['1'][i])

            r_m = min(r_m_0, r_m_1)

            #print(r_p[0], r_m[0], r_p[0] - r_m[0])
            r_tot += r_p[0] - r_m[0]
            # if i % 100 == 0:
            #     print(i,r_tot/(i+1))
        return r_tot/len(data['0'])/np.log(2) * 0.2
        # print(r_tot/len(data['0']))

In [90]:
compute_pw_redundancy('redundancy')

0.23946308510263936

In [91]:
compute_pw_redundancy('uniqueness0')

0.14801657174130795

In [92]:
compute_pw_redundancy('uniqueness1')

0.1708824815495391

In [93]:
compute_pw_redundancy('synergy')


0.05331318649827324

In [99]:
from sklearn.feature_selection import mutual_info_classif
from sklearn.preprocessing import KBinsDiscretizer

with open(f'experiments/DATA_redundancy.pickle', 'rb') as f:
    data = pickle.load(f)['train']
    X = data['0']
    y = data['label'].flatten()

    # Discretize the continuous feature data into bins
    discretizer = KBinsDiscretizer(n_bins=5, encode='ordinal', strategy='uniform')
    X_binned = discretizer.fit_transform(X)

    # Convert multidimensional features to tuples (joint representation)
    X_tuples = np.apply_along_axis(lambda row: tuple(row), 1, X_binned)

    # Compute total mutual information between joint X (as tuples) and y
    total_mi_joint = mutual_info_classif(X, y)

    print(f"Total Mutual Information (joint) between X and y: {total_mi_joint}")

Total Mutual Information (joint) between X and y: [0.00000000e+00 0.00000000e+00 5.28368569e-03 2.79933609e-03
 0.00000000e+00 0.00000000e+00 2.02423294e-03 0.00000000e+00
 7.18903370e-03 2.34222098e-03 7.20770169e-04 3.49873650e-03
 0.00000000e+00 9.00163278e-03 0.00000000e+00 1.75211585e-03
 0.00000000e+00 2.70416117e-03 0.00000000e+00 0.00000000e+00
 0.00000000e+00 0.00000000e+00 3.79431900e-03 3.65050921e-03
 0.00000000e+00 7.38596608e-03 1.76865524e-03 3.33311401e-04
 0.00000000e+00 8.51585707e-03 8.30891574e-03 0.00000000e+00
 7.30608258e-01 3.92350783e-01 5.26249432e-01 3.63501193e-01
 5.30752023e-01 6.62302108e-01 3.72255246e-01 5.46606729e-01
 4.02091675e-01 4.88761401e-01 4.59198093e-01 3.55607873e-01
 5.15717008e-01 2.50008520e-01 4.68811705e-01 4.42193052e-01
 5.01751442e-01 6.96141277e-01 4.73771180e-01 4.30744400e-01
 6.15738100e-01 4.31457076e-01 5.53663045e-01 2.67632752e-01
 5.40123196e-01 6.84772192e-01 7.16779286e-01 5.74439618e-01
 2.68479110e-01 5.92062233e-01 6.35