In [1009]:
import custom_utils
import cv2
import matplotlib
import matplotlib.colors as col
import matplotlib.pyplot as plt
import numpy
import numpy as np
import pandas as pd
import torch
import torch.backends.cudnn as cudnn
import torch.nn as nn
import torch.nn.functional as F
from custom_utils import plot_graph
from matplotlib import pyplot as plt
from sklearn.manifold import TSNE
from sklearn.metrics import ConfusionMatrixDisplay
from torch.optim import SGD
from torch.optim.lr_scheduler import LambdaLR
from torch.utils.data import DataLoader, Dataset
from torchsummary import summary

In [1010]:
from warnings import warn
from numpy import mean, transpose, cov, cos, sin, shape, exp, newaxis, concatenate
from numpy.linalg import linalg, LinAlgError, solve
from scipy.stats import chi2

In [1011]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

### Create 2 tensor

In [1012]:
import torch

# Create two random tensors of size (16, 256)
A = torch.randn(16, 256)
B = torch.randn(16, 256)

# Print the resulting tensors
print(A.size())
print(B.size())


torch.Size([16, 256])
torch.Size([16, 256])


In [1013]:
# points = numpy.random.randn(2, 256)
# points = torch.randn(2, 256)

### Original code

In [1014]:
def mahalanobis_distance(difference, num_random_features):
    num_samples, _ = shape(difference)
    sigma = cov(transpose(difference))

    try:
        linalg.inv(sigma)
    except LinAlgError:
        warn('covariance matrix is singular. Pvalue returned is 1.1')
        raise

    mu = mean(difference, 0)

    if num_random_features == 1:
        stat = float(num_samples * mu ** 2) / float(sigma)
    else:
        print('Original')
        solve_function = solve(sigma, transpose(mu))
        print(f'solve_function:\n{solve_function}')
        right_side = mu.dot(solve_function)
        print(f'right_side:\n{right_side} **')
        stat = num_samples * right_side

    return chi2.sf(stat, num_random_features)

In [1015]:
class MeanEmbeddingTest:

    def __init__(self, data_x, data_y, scale=1, number_of_random_frequencies=5):
        self.data_x = scale*data_x
        self.data_y = scale*data_y
        self.number_of_frequencies = number_of_random_frequencies
        self.scale = scale

    def get_estimate(self, data, point):
        z = data - self.scale * point
        z2 = numpy.linalg.norm(z, axis=1)**2
        return numpy.exp(-z2/2.0)


    def get_difference(self, point):
        return self.get_estimate(self.data_x, point) - self.get_estimate(self.data_y, point)


    def vector_of_differences(self, dim):
        points = numpy.random.randn(self.number_of_frequencies, dim)
        a = [self.get_difference(point) for point in points]
        return numpy.array(a).T

    def compute_pvalue(self):

        _, dimension = numpy.shape(self.data_x)
        obs = self.vector_of_differences(dimension)

        return mahalanobis_distance(obs, self.number_of_frequencies)

In [1016]:
mkme_loss = MeanEmbeddingTest(A, B, scale=1, number_of_random_frequencies=2)
transfer_loss = mkme_loss.compute_pvalue()
print(f'transfer_loss: {transfer_loss}')

Original
solve_function:
[-1.64265500e+98 -1.18128473e+98]
right_side:
0.16125854497196118 **
transfer_loss: 0.27525196628393006


In [1017]:
def smooth(data):
    w = linalg.norm(data, axis=1)
    print('after norm')
    print(w)
    beta = -w ** 2 / 2.0
    print('beta')
    print(beta)
    w = exp(beta)
    print('after exp')
    print(w)
    return w[:, newaxis]


def smooth_cf(data, w, random_frequencies):
    n, _ = data.shape
    _, d = random_frequencies.shape
    mat = data.dot(random_frequencies)
    arr = concatenate((sin(mat) * w, cos(mat) * w), 1)
    n1, d1 = arr.shape
    assert n1 == n and d1 == 2 * d and w.shape == (n, 1)
    return arr


def smooth_difference(random_frequencies, X, Y):
    x_smooth = smooth(X)
    y_smooth = smooth(Y)
    characteristic_function_x = smooth_cf(X, x_smooth, random_frequencies)
    characteristic_function_y = smooth_cf(Y, y_smooth, random_frequencies)
    return characteristic_function_x - characteristic_function_y

class SmoothCFTest:

    def _gen_random(self, dimension):
        return numpy.random.randn(dimension, self.num_random_features)


    def __init__(self, data_x, data_y, scale, num_random_features, frequency_generator=None):
        self.data_x = scale*data_x.detach().cpu().numpy()
        self.data_y = scale*data_y.detach().cpu().numpy()
        self.num_random_features = num_random_features

        _, dimension_x = numpy.shape(self.data_x)
        _, dimension_y = numpy.shape(self.data_y)
        assert dimension_x == dimension_y
        self.random_frequencies = self._gen_random(dimension_x)


    def compute_pvalue(self):

        difference = smooth_difference(self.random_frequencies, self.data_x, self.data_y)
        return mahalanobis_distance(difference, 2 * self.num_random_features)

In [1018]:
mkme_loss = SmoothCFTest(A, B, scale=1, num_random_features=2)
transfer_loss = mkme_loss.compute_pvalue()
print(f'transfer_loss: {transfer_loss}')

after norm
[17.299675  15.755465  15.693022  15.9108715 16.26306   17.860605
 14.877518  15.766319  16.149252  16.292368  16.076368  15.546218
 15.941503  16.281757  16.616148  16.971794 ]
beta
[-149.63937  -124.11733  -123.13547  -126.57792  -132.24356  -159.50061
 -110.670265 -124.288414 -130.39917  -132.72063  -129.22481  -120.842445
 -127.06575  -132.5478   -138.04819  -144.0209  ]
after exp
[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
after norm
[16.238874 16.428133 16.914322 15.943955 15.773848 16.411478 16.847824
 15.950004 15.305755 15.590869 14.610736 15.231061 15.188093 14.33715
 15.91739  15.268224]
beta
[-131.85052  -134.94177  -143.04715  -127.10486  -124.407135 -134.6683
 -141.92459  -127.20131  -117.133064 -121.5376   -106.7368   -115.99261
 -115.33909  -102.77693  -126.68165  -116.559326]
after exp
[0.e+00 0.e+00 0.e+00 0.e+00 0.e+00 0.e+00 0.e+00 0.e+00 0.e+00 0.e+00
 0.e+00 0.e+00 0.e+00 3.e-45 0.e+00 0.e+00]
Original
solve_function:
[-2.36974918e+44 -1.78405962e

### custom_eu

In [1019]:
def mahalanobis_distance(difference, num_random_features):
    num_samples, _ = difference.shape
    sigma = torch.cov(difference.T)
    mu = torch.mean(difference, 0)
    if num_random_features == 1:
        stat = float(num_samples * torch.pow(mu, 2)) / float(sigma)
    else:
        right_side = torch.matmul(mu, mu.T)
        print(f'right_side:\n{right_side}')
        stat = num_samples * right_side
        print(f'stat:\n{stat}')
    return chi2.sf(stat.detach().cpu(), num_random_features)


class MeanEmbeddingTest:

    def __init__(self, data_x, data_y, scale, number_of_random_frequencies, device):
        self.device = device
        self.data_x = scale * data_x.to(device)
        self.data_y = scale * data_y.to(device)
        self.number_of_frequencies = number_of_random_frequencies
        self.scale = scale

    def get_estimate(self, data, point):
        z = data - self.scale * point
        z2 = torch.norm(z, p=2, dim=1)**2
        return torch.exp(-z2/2.0)

    def get_difference(self, point):
        return self.get_estimate(self.data_x, point) - self.get_estimate(self.data_y, point)

    def vector_of_differences(self, dim):
        points = torch.tensor(numpy.random.randn(
            self.number_of_frequencies, dim)).to(self.device)
        a = [self.get_difference(point) for point in points]
        return torch.stack(a).T

    def compute_pvalue(self):

        _, dimension = self.data_x.size()
        obs = self.vector_of_differences(dimension)
        return mahalanobis_distance(obs, self.number_of_frequencies)


In [1020]:
mkme_loss = MeanEmbeddingTest(A, B ,scale=1, number_of_random_frequencies=2, device=device)
transfer_loss = mkme_loss.compute_pvalue()
print(f'transfer_loss: {transfer_loss}')

right_side:
1.842170648771096e-179
stat:
2.947473038033754e-178
transfer_loss: 1.0


## SCF

In [1021]:
def smooth(data):
    print('data')
    print(data)
    w = torch.linalg.norm(data, dim=1)
    print('after norm')
    print(w)
    print(f'-torch.pow(w,2): {-torch.pow(w,2)}')
    w = torch.exp(-w ** 2 / 2.0)
    print('after exp')
    print(w)
    return w[:, newaxis]


def smooth_cf(data, w, random_frequencies):
    n, _ = data.shape
    _, d = random_frequencies.shape
    mat = torch.matmul(data,random_frequencies)
    arr = torch.cat((torch.sin(mat) * w, torch.cos(mat) * w), dim = 1)
    n1, d1 = arr.shape
    assert n1 == n and d1 == 2 * d and w.shape == (n, 1)
    return arr


def smooth_difference(random_frequencies, X, Y):
    x_smooth = smooth(X)
    y_smooth = smooth(Y)
    characteristic_function_x = smooth_cf(X, x_smooth, random_frequencies)
    characteristic_function_y = smooth_cf(Y, y_smooth, random_frequencies)
    return characteristic_function_x - characteristic_function_y

class SmoothCFTest:

    def _gen_random(self, dimension):
        return torch.tensor(numpy.random.randn(dimension, self.num_random_features).astype(np.float32)).to(self.device)


    def __init__(self, data_x, data_y, scale, num_random_features, device, frequency_generator=None):
        self.device = device
        self.data_x = scale*data_x.to(self.device)
        self.data_y = scale*data_y.to(self.device)
        self.num_random_features = num_random_features

        _, dimension_x = numpy.shape(self.data_x)
        _, dimension_y = numpy.shape(self.data_y)
        assert dimension_x == dimension_y
        self.random_frequencies = self._gen_random(dimension_x)


    def compute_pvalue(self):
        difference = smooth_difference(self.random_frequencies, self.data_x, self.data_y)
        return mahalanobis_distance(difference, 2 * self.num_random_features)

In [1022]:
mkme_loss = SmoothCFTest(
    A, B, scale=1, num_random_features=2, device=device)
transfer_loss = mkme_loss.compute_pvalue()
print(f'transfer_loss: {transfer_loss}')

data
tensor([[ 0.8408,  0.6097,  2.6534,  ...,  0.6043, -1.8668, -0.1236],
        [-2.3901, -0.0141, -0.2644,  ..., -0.7488,  1.0507, -0.8017],
        [-0.7533, -0.9772, -0.6795,  ...,  0.6511, -1.3017, -2.2515],
        ...,
        [ 2.0089,  0.5029, -1.0588,  ..., -0.7731,  0.9187, -0.5550],
        [-0.4826, -0.2987,  0.1411,  ...,  0.7256,  1.5195,  2.2034],
        [ 0.9428, -1.2763, -0.9038,  ...,  1.2772, -2.3631,  0.6699]],
       device='cuda:0')
after norm
tensor([17.2997, 15.7555, 15.6930, 15.9109, 16.2631, 17.8606, 14.8775, 15.7663,
        16.1493, 16.2924, 16.0764, 15.5462, 15.9415, 16.2818, 16.6161, 16.9718],
       device='cuda:0')
-torch.pow(w,2): tensor([-299.2788, -248.2347, -246.2709, -253.1558, -264.4871, -319.0012,
        -221.3405, -248.5769, -260.7983, -265.4413, -258.4496, -241.6849,
        -254.1315, -265.0956, -276.0964, -288.0418], device='cuda:0')
after exp
tensor([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       device='cuda:0')

### pinverse

In [1023]:
def mahalanobis_distance(difference, num_random_features):
    num_samples, _ = difference.shape
    sigma = torch.cov(difference.T)
    mu = torch.mean(difference, 0)
    if num_random_features == 1:
        stat = float(num_samples * torch.pow(mu,2)) / float(sigma)
    else:
        sigma = torch.pinverse(sigma)
        mathmul_function = torch.matmul(sigma, mu.T)
        print(f'mathmul_function:\n{mathmul_function}')
        right_side =  torch.matmul(mu, mathmul_function)
        print(f'right_side:\n{right_side}')
        stat = num_samples * right_side
    return chi2.sf(stat.detach().cpu(), num_random_features)

class MeanEmbeddingTest:

    def __init__(self, data_x, data_y, scale, number_of_random_frequencies, device):
        self.device = device
        self.data_x = scale * data_x.to(device)
        self.data_y = scale * data_y.to(device)
        self.number_of_frequencies = number_of_random_frequencies
        self.scale = scale

    def get_estimate(self, data, point):
        z = data - self.scale * point
        z2 = torch.norm(z, p=2, dim=1)**2
        return torch.exp(-z2/2.0)

    def get_difference(self, point):
        return self.get_estimate(self.data_x, point) - self.get_estimate(self.data_y, point)

    def vector_of_differences(self, dim):
        points = torch.tensor(numpy.random.randn(self.number_of_frequencies, dim)).to(device)
        a = [self.get_difference(point) for point in points]
        return torch.stack(a).T

    def compute_pvalue(self):

        _, dimension = self.data_x.size()
        obs = self.vector_of_differences(dimension)
        return mahalanobis_distance(obs, self.number_of_frequencies)

In [1024]:
mkme_loss = MeanEmbeddingTest(
    A, B, scale=1, number_of_random_frequencies=2, device=device)
transfer_loss = mkme_loss.compute_pvalue()
print(f'transfer_loss: {transfer_loss}')

mathmul_function:
tensor([-3.2029e+95, -4.4704e+97], device='cuda:0', dtype=torch.float64)
right_side:
0.13254002980381024
transfer_loss: 0.3463448796282109


In [1025]:
def smooth(data):
    print('data')
    print(data)
    w = torch.linalg.norm(data, dim=1)
    print('after norm')
    print(w)
    print(f'-torch.pow(w,2): {-torch.pow(w,2)}')
    w = torch.exp(-w ** 2 / 2.0)
    print('after exp')
    print(w)
    return w[:, newaxis]


def smooth_cf(data, w, random_frequencies):
    n, _ = data.shape
    _, d = random_frequencies.shape
    mat = torch.matmul(data,random_frequencies)
    arr = torch.cat((torch.sin(mat) * w, torch.cos(mat) * w), dim = 1)
    n1, d1 = arr.shape
    assert n1 == n and d1 == 2 * d and w.shape == (n, 1)
    return arr


def smooth_difference(random_frequencies, X, Y):
    x_smooth = smooth(X)
    y_smooth = smooth(Y)
    characteristic_function_x = smooth_cf(X, x_smooth, random_frequencies)
    characteristic_function_y = smooth_cf(Y, y_smooth, random_frequencies)
    return characteristic_function_x - characteristic_function_y

class SmoothCFTest:

    def _gen_random(self, dimension):
        return torch.tensor(numpy.random.randn(dimension, self.num_random_features).astype(np.float32)).to(self.device)


    def __init__(self, data_x, data_y, scale, num_random_features, device, frequency_generator=None):
        self.device = device
        self.data_x = scale*data_x.to(self.device)
        self.data_y = scale*data_y.to(self.device)
        self.num_random_features = num_random_features

        _, dimension_x = numpy.shape(self.data_x)
        _, dimension_y = numpy.shape(self.data_y)
        assert dimension_x == dimension_y
        self.random_frequencies = self._gen_random(dimension_x)


    def compute_pvalue(self):
        difference = smooth_difference(self.random_frequencies, self.data_x, self.data_y)
        return mahalanobis_distance(difference, 2 * self.num_random_features)

In [1026]:
mkme_loss = SmoothCFTest(
    A, B, scale=1, num_random_features=2, device=device)
transfer_loss = mkme_loss.compute_pvalue()
print(f'transfer_loss: {transfer_loss}')

data
tensor([[ 0.8408,  0.6097,  2.6534,  ...,  0.6043, -1.8668, -0.1236],
        [-2.3901, -0.0141, -0.2644,  ..., -0.7488,  1.0507, -0.8017],
        [-0.7533, -0.9772, -0.6795,  ...,  0.6511, -1.3017, -2.2515],
        ...,
        [ 2.0089,  0.5029, -1.0588,  ..., -0.7731,  0.9187, -0.5550],
        [-0.4826, -0.2987,  0.1411,  ...,  0.7256,  1.5195,  2.2034],
        [ 0.9428, -1.2763, -0.9038,  ...,  1.2772, -2.3631,  0.6699]],
       device='cuda:0')
after norm
tensor([17.2997, 15.7555, 15.6930, 15.9109, 16.2631, 17.8606, 14.8775, 15.7663,
        16.1493, 16.2924, 16.0764, 15.5462, 15.9415, 16.2818, 16.6161, 16.9718],
       device='cuda:0')
-torch.pow(w,2): tensor([-299.2788, -248.2347, -246.2709, -253.1558, -264.4871, -319.0012,
        -221.3405, -248.5769, -260.7983, -265.4413, -258.4496, -241.6849,
        -254.1315, -265.0956, -276.0964, -288.0418], device='cuda:0')
after exp
tensor([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       device='cuda:0')