In [1]:
import sys, os
project_dir = os.path.split(os.getcwd())[0]
if project_dir not in sys.path:
    sys.path.append(project_dir)

import torch
from torch import Tensor, nn
from IPDL import MatrixEstimator, ClassificationInformationPlane, AutoEncoderInformationPlane
from IPDL.optim import AligmentOptimizer, SilvermanOptimizer

import torchvision
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision.transforms import Compose, ToTensor, Lambda
from torch.nn.functional import one_hot
from tqdm import tqdm

device = 'cuda' if torch.cuda.is_available() else 'cpu'

# RBF Kernel

In [2]:
from IPDL import TensorKernel

def RBF(x: Tensor, sigma: float) -> Tensor:
    '''
        Tensor Based Radial Basis Function (RBF) Kernel

        @param x: Tensor shape (n, features) or (batch, n, features)
        @param sigma
    '''
    assert x.ndim > 1 and x.ndim < 4, "The dimension of X must be 2 or 3"
    pairwise_difference = (torch.unsqueeze(x,x.ndim-1) - torch.unsqueeze(x,x.ndim-2))**2
    distance = torch.sum(pairwise_difference, dim=x.ndim)
    return torch.exp(-distance / (2*(sigma**2)) )

In [3]:
sigma = 5
a = torch.rand(4, 32, 128).cuda()

In [4]:
rbf_result = RBF(a, sigma)
A = rbf_result / rbf_result.size(-1)

In [5]:
print("Test RBF")
for i in range(len(a)):
    _result = TensorKernel.RBF(a[i], sigma)
    print(torch.all(torch.isclose(_result, rbf_result[i])))

print("Test A")
for i in range(len(a)):
    _result = TensorKernel.RBF(a[i], sigma) 
    _A = _result / _result.size(-1)
    print(torch.all(torch.isclose(_A, A[i])))

Test RBF
tensor(True, device='cuda:0')
tensor(True, device='cuda:0')
tensor(True, device='cuda:0')
tensor(True, device='cuda:0')
Test A
tensor(True, device='cuda:0')
tensor(True, device='cuda:0')
tensor(True, device='cuda:0')
tensor(True, device='cuda:0')


# Entropy

In [6]:
from IPDL import MatrixBasedRenyisEntropy as MRE

def entropy(A: Tensor) -> float:
    eigval, _ = torch.linalg.eigh(A)        
    epsilon = 1e-8
    eigval = eigval.abs() + epsilon 
    return -torch.sum(eigval*(torch.log2(eigval)), dim=eigval.ndim-1)

In [7]:
entropy_result = entropy(A)
_entropy_result = []
for i in range(len(a)):
    _entropy_result.append(MRE.entropy(A[i]))

print(torch.isclose(torch.hstack(_entropy_result), entropy_result))

tensor([True, True, True, True], device='cuda:0')


In [8]:
print(torch.hstack(_entropy_result))
print(entropy_result)

tensor([2.5319, 2.5847, 2.5233, 2.5393], device='cuda:0')
tensor([2.5319, 2.5847, 2.5233, 2.5393], device='cuda:0')


# Joint entropy

In [9]:
sigma = 5
a = torch.rand(12, 128).cuda()
b = torch.rand(4, 12, 128).cuda()

rbf_result = RBF(a, sigma)
A = rbf_result / rbf_result.size(-1)

rbf_result = RBF(b, sigma)
B =  rbf_result / rbf_result.size(-1)

In [10]:
def jointEntropy(Kx: Tensor, *args: Tensor) -> float:
    '''
        Parameters
        ----------
            Kx: Tensor

            args: More tensors!!!
    '''
    A = Kx.clone()
    for val in args:
        A = A * val
    
    A = A/A.trace() if A.ndim == 2 else A/(torch.sum(A.diagonal(offset=0, dim1=-1, dim2=-2), dim=1).reshape(-1,1,1))
    return entropy(A)

In [11]:
from IPDL import MatrixBasedRenyisEntropy as MRE

je_result = jointEntropy(A, B)
_je_result = []
for i in range(len(B)):
    _je_result.append(MRE.jointEntropy(A, B[i]))

print(torch.isclose(torch.hstack(_je_result), je_result))
print(torch.hstack(_je_result) == je_result)

tensor([True, True, True, True], device='cuda:0')
tensor([True, True, True, True], device='cuda:0')


In [12]:
print(torch.hstack(_je_result))
print(je_result)

tensor([2.8086, 2.7817, 2.7773, 2.7952], device='cuda:0')
tensor([2.8086, 2.7817, 2.7773, 2.7952], device='cuda:0')


# Mutual Information

In [13]:
from IPDL import MatrixBasedRenyisEntropy as MRE

def mutualInformation(Ax: Tensor, Ay: Tensor) -> float:
    entropy_Ax = entropy(Ax)
    entropy_Ay = entropy(Ay)
    joint_entropy = jointEntropy(Ax, Ay)
    return (entropy_Ax + entropy_Ay - joint_entropy)

In [14]:
sigma = 5
a = torch.rand(12, 128).cuda()
b = torch.rand(4, 12, 128).cuda()

rbf_result = RBF(a, sigma)
A = rbf_result / rbf_result.size(-1)

rbf_result = RBF(b, sigma)
B =  rbf_result / rbf_result.size(-1)

In [15]:
entropy_Ax = entropy(A)
entropy_Ay = entropy(B)
joint_entropy = jointEntropy(A, B)

print(entropy_Ax.shape)
print(entropy_Ay.shape)
print(joint_entropy.shape)

torch.Size([])
torch.Size([4])
torch.Size([4])


In [16]:
entropy_Ax + entropy_Ay - joint_entropy

tensor([1.1677, 1.1848, 1.1587, 1.1712], device='cuda:0')

In [17]:
mi_estimation = mutualInformation(A, B)

_mi_estimation = []
for i in range(len(B)):
    _mi_estimation.append(MRE.mutualInformation(A, B[i]))

print(torch.isclose(torch.stack(_mi_estimation), mi_estimation))

tensor([True, True, True, True], device='cuda:0')


In [18]:
print(torch.stack(_mi_estimation))
print(mi_estimation)

tensor([1.1677, 1.1848, 1.1587, 1.1712], device='cuda:0')
tensor([1.1677, 1.1848, 1.1587, 1.1712], device='cuda:0')
