In [2]:
import torch
from torch import Tensor, nn

project_dir = os.path.split(os.getcwd())[0]
if project_dir not in sys.path:
    sys.path.append(project_dir)

ipdl_dir = os.path.join(project_dir, "ipdl/")
if ipdl_dir not in sys.path:
    sys.path.append(ipdl_dir)

from IPAE import MatrixEstimator, SDAE

In [30]:
import math 
import torch
from functools import reduce
from torch import Tensor
from torch import nn

from IPDL import TensorKernel

class MatrixEstimator(nn.Module):
    def __init__(self, sigma: float):
        super(MatrixEstimator, self).__init__()
        
        self.sigma = sigma
        self.x = torch.rand((10, 1))

    def set_sigma(self, sigma: float) -> None:
        self.sigma = sigma

    def forward(self, x: Tensor) -> Tensor:
        if not self.training:
            self.x = x.detach().cpu()

        return x

    def get_matrix(self) -> Tensor:
        '''
            Return matrix A
        '''
        device = 'cuda' if torch.cuda.is_available() else 'cpu'

        n = self.x.size(0)
        return (TensorKernel.RBF(self.x.flatten(1).to(device), self.sigma) / n).cpu()


    def __repr__(self) -> str:
        return "MatrixEstimator(sigma={})".format(self.sigma)



matrix_estimator = MatrixEstimator(0.8)

model = nn.Sequential(
        # nn.Linear(10, 10),
        nn.Identity(),
        matrix_estimator,
        nn.Linear(10, 2),
        MatrixEstimator(0.8)
    )

In [31]:
print(model)

Sequential(
  (0): Identity()
  (1): MatrixEstimator(sigma=0.8)
  (2): Linear(in_features=10, out_features=2, bias=True)
  (3): MatrixEstimator(sigma=0.8)
)


In [32]:
x = torch.rand(1, 10)
print(x)
model.train()
model(x)

print(matrix_estimator.x)
model.eval()
model(x)
print(matrix_estimator.x)

tensor([[0.2078, 0.5078, 0.4116, 0.0355, 0.8846, 0.0701, 0.7639, 0.6777, 0.0431,
         0.7622]])
tensor([[0.8907],
        [0.4822],
        [0.2004],
        [0.2244],
        [0.8916],
        [0.9460],
        [0.8299],
        [0.5061],
        [0.6178],
        [0.1399]])
tensor([[0.2078, 0.5078, 0.4116, 0.0355, 0.8846, 0.0701, 0.7639, 0.6777, 0.0431,
         0.7622]])


In [None]:
class MatrixOptimizer():
    def __init__(self, model: nn.Module,  beta=0.5):
        if not(0 <= beta <= 1):
            raise Exception('beta must be in the range [0, 1]')

        self.matrix_estimators = []
        # First element corresponds to input A matrix and last element
        # is the output A matrix
        for module in model.modules():
            if isinstance(module, (MatrixEstimator)):
                self.matrix_estimators.append(module)

        self.beta = beta

        self.sigma_prev = [-1] * len(matrix_estimators)

    # Temporal, just for testing
    # def getSigmaValues(self):
    #     return self.sigma_tmp

    # def getSigma(self):
    #     return self.sigma

    '''
        @param The output of a specific layer
        @param label_kernel_matrix
        @param n_sigmas
    '''
    def step(self, layer_output: Tensor, Ky: Tensor, sigma_values: list) -> float:
        sigma_t = self.optimize(layer_output, Ky, sigma_values)
        self.sigma = ( (self.beta*sigma_t) + ((1-self.beta)*self.sigma) ) if not(self.sigma is None) else sigma_t
        return self.getSigma()

    '''
        This function is used in orter to obtain the optimal kernel width for
        an T DNN layer

        @param layer_output
        @param n_sigmas: number of possible sigma values

        [Descripción del procedimiento]
    '''
    def optimize(self, x: Tensor, Ky: Tensor, sigma_values: list) -> float:
        Kt = list( map(lambda sigma: TensorKernel.RBF(x, sigma).detach(), sigma_values) )
        loss = np.array( list( map(lambda k: self.kernelAligmentLoss(k, Ky), Kt) ) )

        self.sigma_tmp.append(sigma_values[ np.argwhere(loss == loss.max()).item(0) ])
        return self.sigma_tmp[-1]

    '''
        Kernel Aligment Loss Function.

        This function is used in order to obtain the optimal sigma parameter from
        RBF kernel.  
    '''
    def kernelAligmentLoss(self, x: Tensor, y: Tensor) -> float:
        return (torch.sum(x*y)/(torch.norm(x) * torch.norm(y))).item()

In [33]:
A = [0] * 10

A[0] = 1
A

[1, 0, 0, 0, 0, 0, 0, 0, 0, 0]

In [None]:
# def dist_mat(self, x, y=None):
#         try:
#             x = th.from_numpy(x)
#         except TypeError:
#             x = x

#         dist = th.norm(x[:, None] - x, dim=2, p=2)
#         return dist / dist.max()

In [65]:
a = torch.arange(10, dtype=torch.float32).reshape((5,2))
b = torch.arange(5, 15)

dist = torch.norm(a[:, None] - a, dim=2, p=2)
print(dist)

pairwise_difference = (torch.unsqueeze(a,1) - torch.unsqueeze(a,0))
distance = torch.sqrt(torch.sum(pairwise_difference**2, dim=2))
print(distance)
distance = torch.cdist(a,a, p=2)
print(distance)

tensor([[ 0.0000,  2.8284,  5.6569,  8.4853, 11.3137],
        [ 2.8284,  0.0000,  2.8284,  5.6569,  8.4853],
        [ 5.6569,  2.8284,  0.0000,  2.8284,  5.6569],
        [ 8.4853,  5.6569,  2.8284,  0.0000,  2.8284],
        [11.3137,  8.4853,  5.6569,  2.8284,  0.0000]])
tensor([[ 0.0000,  2.8284,  5.6569,  8.4853, 11.3137],
        [ 2.8284,  0.0000,  2.8284,  5.6569,  8.4853],
        [ 5.6569,  2.8284,  0.0000,  2.8284,  5.6569],
        [ 8.4853,  5.6569,  2.8284,  0.0000,  2.8284],
        [11.3137,  8.4853,  5.6569,  2.8284,  0.0000]])
tensor([[ 0.0000,  2.8284,  5.6569,  8.4853, 11.3137],
        [ 2.8284,  0.0000,  2.8284,  5.6569,  8.4853],
        [ 5.6569,  2.8284,  0.0000,  2.8284,  5.6569],
        [ 8.4853,  5.6569,  2.8284,  0.0000,  2.8284],
        [11.3137,  8.4853,  5.6569,  2.8284,  0.0000]])


In [69]:
from numpy import linalg as LA

np_test = a.numpy()
LA.norm(a, ord='fro')

ValueError: Invalid norm order 'fro' for vectors