In [None]:
import pandas as pd
import numpy as np
import scipy
from numpy.linalg import norm
from numpy.random import default_rng
from random import choice

import scipy.stats as st
from sklearn.metrics.pairwise import pairwise_distances
import pickle
import importlib
import os, sys
import ot


%matplotlib inline
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from matplotlib import rc
from matplotlib.patches import Ellipse
from matplotlib.patches import Patch
from matplotlib.cm import ScalarMappable
from matplotlib.colors import Normalize
from matplotlib.ticker import StrMethodFormatter



from scipy.optimize import linear_sum_assignment

from utils import * 


import re

In [None]:
def cost_matrix(X, Y):
    x = np.shape(X)[0]
    y = np.shape(Y)[0]
    cost  = [[norm(X[i] - Y[j])**2 for i in range(x)] for j in range(y)]
    return(cost)

def gen_random_ball(dimension, num_points, rs = None, radius = 1):
    #from numpy import random, linalg
    # First generate random directions by normalizing the length of a
    # vector of random-normal values (these distribute evenly on ball)
    #rs - random state, None by default
    rng = default_rng(rs)
    random_directions = rng.standard_normal(size=(dimension, num_points))
    random_directions /= norm(random_directions, axis=0)
    # Second generate a random radius with probability proportional to
    # the surface area of a ball with a given radius.
    random_radii = rng.random(num_points) ** (1/dimension)
    # Return the list of random (direction & length) points.
    ball = radius * (random_directions * random_radii).T
    return ball

def split_data_randomly(d):
    #function randomly splits 2n points into two groups of n points each 
    #d - data
    ind = np.arange(d.shape[0])
    np.random.shuffle(ind)
    n = int(len(d)/2)
    ind1 = ind[:n]
    ind2 = ind[n:]

    s1 = d[ind1]
    s2 = d[ind2]
    return s1, s2

#see Algorithm 3 (Section 2.2)
def compute_critical_level(ball, N, alpha):
    #N - num of iterations for resampling
    n = int(len(ball)/2)
    stat = []
    m0     = [1/n] * n
    m1     = [1/n] * n
    
    for i in range(0, N):
        s1, s2 = split_data_randomly(d = ball)
        M = cost_matrix(s1, s2)
        M = np.array(M)
        plan = ot.emd2(m0, m1, M) #ot distance
        stat.append(plan)

    q = (1 - alpha)*100
    cr_level = np.percentile(stat, q)
    return cr_level

#returns OT plan (if mode = plan) and OT distance (if mode = distance)
def computeOT(s, t, mode):
    #returns OT plan (if mode = plan) and OT distance (if mode = distance)
    n = len(s)
    k = len(t)
    m0     = [1/n] * n
    m1     = [1/k] * k
    M = cost_matrix(s, t)
    M = np.array(M)
    if mode == 'plan':
        plan = ot.emd(m0, m1, M) #transport plan
    elif mode == 'distance':
        plan = ot.emd2(m0, m1, M)
    return plan


# #returns OT distance
# def computeOT2(s, t):
#     n = len(s)
#     k = len(t)
#     m0     = [1/n] * n
#     m1     = [1/k] * k
#     M = cost_matrix(s, t)
#     M = np.array(M)
#     plan = ot.emd2(m0, m1, M) #transport plan
#     return plan

#OT Map



#s1 - first sample, s2 -second sample, t - target distribution
def compute_distance(s1, s2, t):
    ##Step 2 in Alogorithm 3: OT distance between ball partitions 
    ## induced by the transport of two data distributions
    L = len(s1)
    s = np.concatenate((s1, s2), axis = 0)
    plan = computeOT(s = t, t = s, mode = 'plan')
    ind = [np.nonzero(plan[i])[0][0] for i in range(2*L)] #indices of assigned distributions
    
    #split target t
    ind_t1 = []
    ind_t2 = []
    for i in range(0, len(s)):
        if i < L:
            ind_t1.append(ind[i])
        else:
            ind_t2.append(ind[i])
   
    t1 = t[ind_t1]
    t2 = t[ind_t2]   
    dst_fin = computeOT(s = t1, t = t2, mode = 'distance')
    
    return dst_fin

#Aux functions to plot Img. 7

def decompose(X, method = 'numpy.eigh'):
    """Eigenvalue decomposition of X"""
    if method == "tf.eig":
        import tensorflow as tf
        A_tf = tf.convert_to_tensor(X)
        eigvals, eigvects = tf.linalg.eig(A_tf)
        eigvals, eigvects = eigvals.numpy(), eigvects.numpy()
    elif method == "tf.eigh":
        import tensorflow as tf
        A_tf = tf.convert_to_tensor(X)
        eigvals, eigvects = tf.linalg.eigh(A_tf)
        eigvals, eigvects = eigvals.numpy(), eigvects.numpy()
    elif method == "numpy.eig":
        eigvals, eigvects = np.linalg.eig(X)
    elif method == "numpy.eigh":
        eigvals, eigvects = np.linalg.eigh(X)
    else:
        raise NotImplementedError('Unknown method: ' + method)
    return eigvals, eigvects

def sqrtmInv(X, method='numpy.eigh'):
    """Inversion of a square root of X"""
    eigval, eigvects = decompose(X, method)
    Y = (eigvects / np.sqrt(np.maximum(eigval, 0))[np.newaxis,:]).dot(eigvects.T)
    return(Y)

def sqrtm(X, method='numpy.eigh'):
    """Square root of a symmetric matrix"""
    eigval, eigvects = decompose(X, method)
    Y = (eigvects * np.sqrt(np.maximum(eigval, 0))[np.newaxis,:]).dot(eigvects.T)
    return(Y)


#2-Wasserstein distance between two Gaussian distributions
def BW(K1, K2, method='numpy.eigh'):
    """Compute the Bures-Wasserstein distance between covariance matrices"""
    Q = sqrtm(K1, method)
    d = np.sqrt(np.maximum(0, K1.trace() + K2.trace() - 2 * sqrtm(Q.dot(K2).dot(Q), method).trace()))
    return d

def OT_map(V, U):
    #map from V to U
    sqU = sqrtm(U,method='numpy.eigh')
    Cn  =  (sqU @ V) @ sqU
    Z = sqrtmInv(Cn, method='numpy.eigh')
#     pinvCn = pinvsq(Cn)
    T = (sqU @ Z) @ sqU
    return T

def OT_geod(V, T, t):
    E = np.array([[1,0],[0, 1]])
    Z = E * (1-t) + t*T
    V = Z @ V 
    W = V @ Z
    return W

# Experiments on synthetic data

In [None]:
#Section 2.2.2

#We consider two 2D point clouds coming from centred Gaussian distributions


#num of points in each sample
#ATTENTION: by now we assume the num of points in each sample are eqaual

m = [0, 0]

#sample X
C1=np.array([[5, 2], [2, 2]])

#sample Y
C2=np.array([[12.0, 4], [4, 15.0]])
m = [0, 0]
 

## Test performance of different sample sizes (Img. 6)

In [None]:
#different sample sizes
grd = [10, 20, 30, 50, 70]

#store distribution of test values
test_values  = []

#num of iterations to estimate the distribution of the test
Z = 100

#store critical level for each sample size
critical_levels = []
for g in grd:
    #target
    b = gen_random_ball(dimension=2, num_points=2*g, rs=None, radius = 1) #(see Algorithm 2, steps 2) 
    cr_l = compute_critical_level(ball = b, N=1500, alpha=.05) #(see Algorithm 3) 
    critical_levels.append(cr_l)
    
    #resampling for a given sample size: store test values 
    dummy = []
    for j in range(0, Z):
        #generate two samples of g points each, X = s1, Y = s2
        s1 = np.random.multivariate_normal(m, C1, g)
        s2 = np.random.multivariate_normal(m, C2, g)
        
        #compute test statistics (see Algorithm 2, steps 1, 3, 4, 5) 
        dst = compute_distance(s1 = s1, s2 =s2, t=b)
        dummy.append(dst)
    test_values.append(dummy)  

In [None]:
# Plot violin plots (see Img. 6)
data = test_values

# Create four separate subplots for violin plots in a row
fig, axs = plt.subplots(1, len(grd), figsize=(20, 6))

# Plot violin plots and add red horizontal line for the 0.95 quantile for each set of data
for i, ax in enumerate(axs):
    parts = ax.violinplot(data[i], showmeans=False, showmedians=True)
    quantile_95 = np.percentile(data[i], 95)  # Calculate 0.95 quantile
    ax.axhline(y=quantile_95, color='blue', linestyle='-', linewidth=2)  # Add red horizontal line at 0.95 quantile
    ax.axhline(y=critical_levels[i],  color='red', linestyle='--', linewidth=2)  # Add red horizontal line at y=3.5


    ax.set_title(f'n = {grd[i]}')
    ax.set_xticks([1])
    ax.set_xticklabels([''])
    ax.set_ylabel(r'Distribution of $d_{W}(U_{C_1}, U_{C_2})$')

    # Customize the appearance of the violin plots
    for pc in parts['bodies']:
        pc.set_facecolor('skyblue')  # Set violin color
        pc.set_edgecolor('black')
        pc.set_alpha(1)  # Set transparency
        pc.set_linewidth(1.5)  # Set edge linewidth

# Adjust layout and display the plots
plt.tight_layout()
plt.show()


## Test performance depending on the size of a change point (Img. 7 and Img.8)

In [None]:
#Change point is simulated as a movement over geodesic in 2-Wasserstein space 
#connecting the data generating distributions N(0, C1) and N(0, C2)


#step at geodesic
grd = [0, .1, .2, .3, .4, .5, .6, .7, .8, .9, 1]

#find optimal map between C1 and C2
mapT = OT_map(C1, C2)

#store change point values
stat_BW = []

#store test statistic values
stat_U  = []

#num of iterations to estimate the distribution of the test
Z = 100

#num of points in each sample
num_points = 80

# #generate points from uniform distributopm on a ball
b = gen_random_ball(dimension=2, num_points=2*num_points, rs=None, radius = 1) #(see Algorithm 2, steps 2) 

#compute critical level
cr_l = compute_critical_level(ball = b, N=1500, alpha=.05) #(see Algorithm 3) 

#move along geodesic
for t in grd:
    if t == 0:
        gC = C1
    elif t == 1:
        gC = C2
    else: 
         gC = OT_geod(C1, mapT, t)
    
    stat_BW.append(BW(C1, gC))#distance between data generating distributions
    dummy = []
    for j in range(0, Z):
        s1 = np.random.multivariate_normal(m, C1, num_points)
        s2 = np.random.multivariate_normal(m, gC, num_points)
        dst = compute_distance(s1 = s1, s2 =s2, t=b)
        dummy.append(dst)
    stat_U.append(dummy)
    

In [None]:
#Plot Img. 7

data_to_plot = np.array(stat_BW)


# Create a color normalization instance
nrm = Normalize(vmin=data_to_plot.min(), vmax=data_to_plot.max())
colors = plt.cm.viridis_r(nrm(data_to_plot))


# Create a figure and axis
fig, ax = plt.subplots(figsize=(12, 8))

# Plot the ellipses with varying colors and sizes
k = -1
for g in grd:
    k+=1
    gC = OT_geod(C1, mapT, g)
    eigenvalues, eigenvectors = np.linalg.eigh(gC)
    theta = np.degrees(np.arctan2(eigenvectors[1, 0], eigenvectors[0, 0]))
    ellipse = Ellipse((0, 0), 2 * np.sqrt(2.5 * eigenvalues[1]), 2 * np.sqrt(2.5 * eigenvalues[0]),
                      angle=theta, color=colors[k], fill = False)
    ax.add_patch(ellipse)

# Set axis limits
ax.set_xlim(-10, 10)
ax.set_ylim(-10, 10)

# Set labels and title
ax.set_xlabel('X')
ax.set_ylabel('Y')
ax.set_title(r'2-Wasserstein geodesic $C_t$ between $C_0$ and $C_1$')

# # Add a colorbar

sm = ScalarMappable(norm=nrm, cmap=plt.cm.viridis_r)
sm.set_array([])  # empty array for the mappable
cbar = plt.colorbar(sm, ax=ax)
cbar.set_label(r'$d_W(\mathcal{N}(0, C_0), \mathcal{N}(0, C_t))$')

# cbar = plt.colorbar(sm)
# cbar.set_label('2-Wasserstein distance')
legend_elements = [
    Patch(facecolor=colors[0], edgecolor='white', label=r'Covariance $C_0$'),
    Patch(facecolor=colors[-1], edgecolor='white', label=r'Covariance $C_1$')
]
# Show the plot
ax.legend(handles=legend_elements)
plt.show()


In [None]:
# Set the figure size for violin plots (width x height in inches)
fig_violin, ax_violin = plt.subplots(figsize=(10, 6))

num_ticks = len(grd)
# Create random data for the violin plots (number of data points = number of ellipses)
data_to_plot = stat_U
quantiles = [np.percentile(dataset, 95) for dataset in data_to_plot]
# Create violin plots with the same colors as the ellipses
violin_parts = ax_violin.violinplot(data_to_plot, showmeans=False, showmedians=False)
for i, pc in enumerate(violin_parts['bodies']):
    pc.set_facecolor(colors[i])
    pc.set_edgecolor('black')
    ax_violin.plot([i + 1], [quantiles[i]], marker='o', markersize=8, color='red', zorder=3)


# Set axis limits for violin plots
ax_violin.set_xticks(np.arange(1, num_ticks + 1))
ax_violin.set_xticklabels([fr'$t = {(i-1)/10}$' for i in range(1, num_ticks + 1)])
ax_violin.set_xlabel(r'$2$-Wasserstein geodesic in original space')
ax_violin.set_ylabel('Distributions')
ax_violin.set_title(r'Distribution of $d_{W}(U_{C_1}, U_{C_t})$, sample sizes $n=80$')

ax_violin.axhline(y=cr_l, color='red', linestyle='--', linewidth=1, label='Critical level')


# Show the plots
plt.tight_layout()
plt.show()

