# Comparison between $\texttt{LiePCA}$ and $\texttt{LieDetect}$ for density estimation 

Henrique Hennes https://github.com/HLovisiEnnes \
Raphaël Tinarrage https://raphaeltinarrage.github.io/

See the repo at https://github.com/HLovisiEnnes/LieDetect and the article at https://arxiv.org/abs/2309.03086

- $\mathrm{SO}(2)$
- $T^2$
- $T^3$
- $\mathrm{SU}(2)$

***New functions in this notebook (in addition to the import)***:
- ResamplingDistance
- GetLiePCA
- SilvermanFactor
- SampleLieAlgebra
- EstimateDensitiesOnOrbit
- ComputeHausdorffOnOrbit
- PrintResults

In [2]:
# Import LieDetect functions

from functions_20240715 import*

In [None]:
import IPython.display    # for IPython.display.clear_output()

def ResamplingDistance(X):
    '''
    Maximum distance for resampling. We choose it to be the maximum distance between two closest neighbors 
    of the dataset. Faster version with KDtree.
    '''
    
    tree = scipy.spatial.KDTree(X)       # build the KDTREE
    distances, _ = tree.query(X, k=2)    # k=2 includes the point itself and its nearest neighbor
    nearest_distances = distances[:, 1]  # nearest neighbor distances
    return max(nearest_distances)

def GetLiePCA(X, n_neighbors = 10, ambient_dim = 4, dim = 1, correction = True, verbose = False):
    '''
    Part of the code from FindClosestLieAlgebra in functions
    '''
    
    Sigma = GetLiePCAOperator(X,n_neighbors,dim,method='localPCA', correction = correction, verbose=verbose)
    
    # Get and sort eigenvectors of Sigma
    vals, vecs = np.linalg.eig(Sigma) # finds eigenvalues and vectors of sigma as a matrix
    vecs = [vecs[:,i] for i in range(len(vecs))]
    indices = np.argsort(vals)        # argsort eigenvalues
    vals, vecs = [np.real(vals[i]) for i in indices], [np.real(vecs[i]) for i in indices]

    # Select the bottom eigenvectors of Sigma
    eigenmatrices = [vecs[i].reshape((ambient_dim,ambient_dim)) for i in range(dim)]
#     eigenmatrices = [(A-A.T)/2 for A in eigenmatrices] # not implemented in the original LiePCA article
    return Sigma, eigenmatrices

def SilvermanFactor(n, dim):
    '''
    Implement Silverman's factor for normal distribution
    https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.gaussian_kde.silverman_factor.html#scipy.stats.gaussian_kde.silverman_factor
    '''
    
    return (n*(dim + 2) / 4.)**(-1. / (dim + 4))


def SampleLieAlgebra(n_points,LieAlgebra):
    '''
    Sample Lie algebra element using normal distribution with Silverman factor
    n_points: number of points in dataset, to compute SilvermanFactor
    '''
    
    # Sample the coeficients according to normal distribution using Silverman's rule of thumb
    dim = len(LieAlgebra)
    coefs = scipy.stats.multivariate_normal.rvs(mean = [0]*dim, 
                                    cov = SilvermanFactor(n_points, dim)*np.identity(dim)) 
    if dim == 1: coefs = [coefs] # make a list out of coefs in case dim = 1
        
    return sum([coefs[i]*LieAlgebra[i] for i in range(len(LieAlgebra))]) # the random Lie algebra element

def EstimateDensitiesOnOrbit(group, dim, ambient_dim, n_points, N, frequencies, n_neighbors, frequency_max, 
                             noise, correction = True, verbose=True):
    '''
    Sample using Lie PCA only in accordance to [18], Lie Detect using normal distribution for the 
    picking an element of the Lie algebra and Lie Detect with a single point source
    '''
    
    # Sample train data and test data, with the total being the sum of both
    # The test data is to compute Hausdorff distance, whereas the other is used as true data
    if verbose: start_time = ChronometerStart('Sample on orbit... ')
    if group=='torus':
        X_train, LieAlgebra, Frequencies = SampleOnTorus(dim=dim, ambient_dim=ambient_dim, n_points=n_points, 
                                                         frequencies=frequencies, conjugate=True, 
                                                         right_multiply=True, verbose=False)
        X_test = GenerateOrbitTorus(LieAlgebra, Frequencies, n_points=int(np.ceil(N**(1/dim))),
                                    x=random.choice(X_train),method='uniform')

    if group=='SU(2)':
        X_train, LieAlgebra, Frequencies = SampleOnSU2(ambient_dim=ambient_dim, n_points=n_points, 
                                                       frequencies=frequencies, conjugate=False, 
                                                       right_multiply=False, verbose=False)
        X_test = GenerateOrbitSU2(LieAlgebra, Frequencies, n_points=int(np.ceil(N**(1/dim))),
                                    x=random.choice(X_train))
    
    #add noise if necessary
    if noise > 0:
        X_train = X_train+np.random.multivariate_normal(np.zeros(ambient_dim), noise*np.eye(ambient_dim), n_points)
    
    if verbose: 
        ChronometerStop(start_time, 's')
        print('Shape of train and test:', np.shape(X_train), np.shape(X_test))
        
        d1 = scipy.spatial.distance.directed_hausdorff(X_train, X_test)[0]
        d2 = scipy.spatial.distance.directed_hausdorff(X_test, X_train)[0]
        print('Hausdorff distance X_train to X_test:', d1, '- X_test to X_train:', d2)   
        print('Resampling distance X_train =', ResamplingDistance(X_train))
        print('Resampling distance X_test =', ResamplingDistance(X_test))
        
    # Lie PCA
    dim_orbit = dim    
    Sigma, LieAlgebraPCA = GetLiePCA(X_train, n_neighbors=n_neighbors, ambient_dim=ambient_dim, dim=dim_orbit, 
                                     correction = correction)

    # Lie Detect
    if verbose: start_time = ChronometerStart('FindClosestLieAlgebra... ')
    if group=='torus':
        OptimalFrequencies, OptimalLieAlgebra = FindClosestLieAlgebra('torus',Sigma,dim,ambient_dim,
                                                                      frequency_max=frequency_max,
                                                                      method='NormalForm',verbosity=0)
    if group=='SU(2)':
        # Only test ambient space-spanning representations
        FrequenciesToTest = [tuple(f) for f in partition_su2(ambient_dim)]
        FrequenciesToTestMaximal = [f for f in FrequenciesToTest if len(np.unique(f))==len(f) and 1 not in f]
        if verbose: print('Partitions generating an orbit that spans the whole ambient space:', 
                          len(FrequenciesToTestMaximal), 'out of', len(FrequenciesToTest), 
                          '-', FrequenciesToTestMaximal)
        OptimalFrequencies, OptimalLieAlgebra = FindClosestLieAlgebra('SU(2)',Sigma,dim,ambient_dim,
                                                                      FrequenciesToTest=FrequenciesToTestMaximal,
                                                                      method='Stiefel',verbosity=verbose*2)
    if verbose: ChronometerStop(start_time, 's')
    
    # Check if frequencies are equivalent
    if verbose: AreFrequencesEquivalent(frequencies,OptimalFrequencies,verbose=True)
    
    # Compute resampling distance for Lie PCA's density estimation
    resampling_distance = ResamplingDistance(X_train)
    
    # Sample accordingly
    new_points_pca = []
    new_points_normal = []
    if verbose: start_time = ChronometerStart('Sample points. Now looping... \n')
    for index_N in range(N):        
        if verbose: print('Loop '+str(index_N), end= '\r')

        # Sample Lie PCA
        old_point = random.choice(X_train)                           #randomly choose a point in the dataset
        distance = np.inf
        while distance > resampling_distance:                        #only add the new point if it is close to the orbit
            A =  SampleLieAlgebra(len(X_train),LieAlgebraPCA)        #randomly choose an element of the Lie algebra using Silverman's rule of thumb
            new_point_pca =  scipy.linalg.expm(A) @ old_point 
            distance = np.min(np.linalg.norm(new_point_pca-X_train,axis=1))
        new_points_pca.append(new_point_pca.reshape(1,-1))
        
        # Sample Lie Detect
        old_point = random.choice(X_train)                           #randomly choose a point in the dataset
        distance = np.inf
        while distance > resampling_distance:                        #only add the new point if it is close to the orbit
            A =  SampleLieAlgebra(len(X_train),OptimalLieAlgebra)    #randomly choose an element of the Lie algebra using Silverman's rule of thumb
            new_point_normal =  scipy.linalg.expm(A) @ old_point 
            distance = np.min(np.linalg.norm(new_point_pca-X_train,axis=1))
        new_points_normal.append(new_point_normal.reshape(1,-1))
    
    if verbose: ChronometerStop(start_time, 's')
    
    #reshape datasets
    new_points_pca = np.array(new_points_pca).reshape(N,-1)
    new_points_normal = np.array(new_points_normal).reshape(N,-1)

    # Sample Lie Detect with single point source
    if verbose: start_time = ChronometerStart('Generate orbit single source... ')
    if group=='torus':
        new_points_detect = GenerateOrbitTorus(OptimalLieAlgebra, OptimalFrequencies, n_points=int(np.ceil(N**(1/dim))),
                                               x=random.choice(X_train),method='uniform')
    if group=='SU(2)':
        new_points_detect = GenerateOrbitSU2(OptimalLieAlgebra, OptimalFrequencies, n_points=int(np.ceil(N**(1/dim))), 
                                             x=random.choice(X_train))
    if verbose: 
        ChronometerStop(start_time, 's')

    # Collect results
    samples = {'lie pca':new_points_pca, 'normal':new_points_normal, 'detect':new_points_detect}

    return samples, X_test, X_train

def ComputeHausdorffOnOrbit(group, dim, ambient_dim, n_points, N, frequencies, n_neighbors, frequency_max, noise, 
                            correction = True, verbose = True, plot = False, plot_names = None):
    distances = {}
    samples, X_test, X_train = EstimateDensitiesOnOrbit(group, dim, ambient_dim, n_points, N, frequencies, n_neighbors,
                                          frequency_max, noise, correction = correction, verbose=verbose)
    
    for i_sample, sample in enumerate(samples.keys()):
        
        # Two-sided Hausdorff
        d1 = scipy.spatial.distance.directed_hausdorff(X_test, samples[sample])[0]
        d2 = scipy.spatial.distance.directed_hausdorff(samples[sample], X_test)[0]
        d = max(d1,d2)
        distances[sample] = d
        
        if verbose: print(sample+':', 'distance test to orb:', d1, '& orb to test:', d2)     

        if plot:
            fig = plt.figure(figsize=(4,4)); 
            ax = fig.add_subplot(projection='3d')
            ax.set_title('Hausdorff distance: '+'{:.2f}'.format(distances[sample]), pad=0)
            ticks = [-1, 0, 1]
            ax.set_xticks(ticks)
            ax.set_yticks(ticks)
            ax.set_zticks(ticks)
            ax.set_xlim(-1,1)
            ax.set_ylim(-1,1)
            ax.set_zlim(-1,1)

            # PCA test orbit
            pca = sklearn.decomposition.PCA(n_components=3)
            Xpca = pca.fit_transform(X_train)

            # Plot estimated orbit
            Orbitpca = pca.transform(samples[sample])
            ax.scatter(Orbitpca[:,0], Orbitpca[:,1], Orbitpca[:,2],c='magenta',lw=0.5, alpha=0.3); 

            # Plot test orbit
            ax.scatter(Xpca[:,0], Xpca[:,1], Xpca[:,2], c='black', lw=1);

            # Save figure
            plt.savefig('Figures/'+plot_names[i_sample]+'.png', bbox_inches='tight');
            plt.show();

    return distances

def PrintResults(parameters, results_statistics_allparameters):
    assert len(parameters)%4==0, 'should be a multiple of 4'
    for i in range(len(parameters)//4):
        parameters_list_4 = parameters[4*i:(4*i+4)]
        spacedim = str(parameters_list_4[0][2])
        points1, points2, points3, points4 = [str(param[3]) for param in parameters_list_4]
        if parameters_list_4[0][0]=='SU(2)':
            groupname = '$\SU(2)$'
            freq = str(parameters_list_4[0][5])
        elif parameters_list_4[0][1]==1:  # Torus of dim 1
            groupname = '$\SO(2)$'
            freq = str(parameters_list_4[0][5][0])
        elif parameters_list_4[0][1]==2:  # Torus of dim 2
            groupname = '$T^2$'
            freq = '\\begin{tabular}{@{}c@{}} ('+str(parameters_list_4[0][5][0])+',\\\\'+\
                    str(parameters_list_4[0][5][1])+')\\end{tabular}'
        elif parameters_list_4[0][1]==3:  # Torus of dim 3
            groupname = '$T^3$'
            freq = '\\begin{tabular}{@{}c@{}} ('+str(parameters_list_4[0][5][0])+',\\\\'+\
                    str(parameters_list_4[0][5][1])+',\\\\'+\
                    str(parameters_list_4[0][5][1])+')\\end{tabular}'
        std1, std2 = str(0), str(0.1)

        liepca_list = [res['lie pca'] for res in results_statistics_allparameters[4*i:(4*i+4)]]
        normal_list = [res['normal'] for res in results_statistics_allparameters[4*i:(4*i+4)]]
        detect_list = [res['detect'] for res in results_statistics_allparameters[4*i:(4*i+4)]]

        best_scores_column = []
        for k in range(4):
            scores_row = [liepca_list[k][0], normal_list[k][0], detect_list[k][0]]
            best_scores_column.append(np.argmin(scores_row))

        liepca_list = ['{:.2f}'.format(score[0])+'$\pm$'+'{:.2f}'.format(score[1]) for score in liepca_list]
        normal_list = ['{:.2f}'.format(score[0])+'$\pm$'+'{:.2f}'.format(score[1]) for score in normal_list]
        detect_list = ['{:.2f}'.format(score[0])+'$\pm$'+'{:.2f}'.format(score[1]) for score in detect_list]
        scores_list = [liepca_list, normal_list, detect_list]

        for k in range(4):
            score = scores_list[best_scores_column[k]][k]
            score = '\\textbf{'+score+'}'
            scores_list[best_scores_column[k]][k] = score        

        liepca1, liepca2, liepca3, liepca4 = scores_list[0]
        normal1, normal2, normal3, normal4 = scores_list[1]
        detect1, detect2, detect3, detect4 = scores_list[2]

        string = '\multicolumn{1}{||c|}{\multirow{4}{*}{'+groupname+'}} & \
        \multicolumn{1}{c|}{\multirow{4}{*}{'+spacedim+'}} & \
        \multirow{4}{*}{'+freq+'} & \multicolumn{1}{r|}{'+points1+'} & \
        \multirow{2}{*}{'+std1+'} & \multicolumn{1}{c|}{'+liepca1+'} & \
        \multicolumn{1}{c|}{'+normal1+'} & '+detect1+' \\\\ \cline{4-4} \cline{6-8} \
        \multicolumn{1}{||c|}{} & \multicolumn{1}{c|}{} & & \
        \multicolumn{1}{r|}{'+points2+'} & & \multicolumn{1}{c|}{'+liepca2+'} & \
        \multicolumn{1}{c|}{'+normal2+'} & '+detect2+' \\\\ \cline{4-8} \
        \multicolumn{1}{||c|}{} & \multicolumn{1}{c|}{} & & \
        \multicolumn{1}{r|}{'+points3+'} & \multirow{2}{*}{'+std2+'} & \
        \multicolumn{1}{c|}{'+liepca3+'} & \multicolumn{1}{c|}{'+normal3+'} & \
        '+detect3+' \\\\ \cline{4-4} \cline{6-8} \
        \multicolumn{1}{||c|}{} & \multicolumn{1}{l|}{} & & \
        \multicolumn{1}{r|}{'+points4+'} & & \multicolumn{1}{c|}{'+liepca4+'} & \
        \multicolumn{1}{c|}{'+normal4+'} & '+detect4+' \\\\ \hline\hline'

        print(string, '\n')

# $\mathrm{SO}(2)$

In [None]:
' Scores SO(2) '

method        = 'localPCA'
correction    = False
verbose       = False
plot          = False
n_repetitions = 100

parameters = [
              ('torus', 1, 4, 30,  500, ((1,2),), 10, 2, 0),
              ('torus', 1, 4, 100, 500, ((1,2),), 10, 2, 0),
              ('torus', 1, 4, 30,  500, ((1,2),), 10, 2, 0.1**2),
              ('torus', 1, 4, 100, 500, ((1,2),), 10, 2, 0.1**2),
    
              ('torus', 1, 6, 30,  500, ((1,2,3),), 10, 3, 0),
              ('torus', 1, 6, 200, 500, ((1,2,3),), 10, 3, 0),
              ('torus', 1, 6, 30,  500, ((1,2,3),), 10, 3, 0.1**2),
              ('torus', 1, 6, 200, 500, ((1,2,3),), 10, 3, 0.1**2),

              ('torus', 1, 6, 50,  500, ((1,3,10),), 10, 10, 0),
              ('torus', 1, 6, 500, 500, ((1,3,10),), 10, 10, 0),
              ('torus', 1, 6, 50,  500, ((1,3,10),), 10, 10, 0.1**2),
              ('torus', 1, 6, 500, 500, ((1,3,10),), 10, 10, 0.1**2),
             ]

results_statistics_allparameters = []
for parameter in parameters:
    # Set parameter
    print('Parameters:', parameter)
    group, dim, ambient_dim, n_points, N, frequencies, n_neighbors, frequency_max, noise = parameter

    # Estimate orbits
    results = {}
    msg = 'Estimate orbits... '
    start_time = ChronometerStart(msg)
    for i in range(n_repetitions):
        results[i] = ComputeHausdorffOnOrbit(group, dim, ambient_dim, n_points, N, frequencies, n_neighbors, 
                                             frequency_max, noise, correction = correction, 
                                             verbose=verbose, plot=plot)            
        ChronometerTick(start_time, i, n_repetitions, msg)
    
    # Reshape dict and get statistics
    results = {key:list([results[i][key] for i in range(len(results))]) for key in results[0].keys()}
    results_statistics = {key:[np.mean(results[key]), np.std(results[key])] for key in results.keys()}
    results_statistics_allparameters.append(results_statistics)
    
# Print results
IPython.display.clear_output()
PrintResults(parameters, results_statistics_allparameters)

# $T^2$

In [None]:
' Scores T^2 '

method        = 'localPCA'
correction    = False
verbose       = False
plot          = False
n_repetitions = 100

parameters = [
              ('torus', 2, 6, 100,  5000, ((1,2,1),(2,1,1)), 15, 2, 0),
              ('torus', 2, 6, 1000, 5000, ((1,2,1),(2,1,1)), 15, 2, 0),
              ('torus', 2, 6, 100,  5000, ((1,2,1),(2,1,1)), 15, 2, 0.1**2),
              ('torus', 2, 6, 1000, 5000, ((1,2,1),(2,1,1)), 15, 2, 0.1**2),
             ]

results_statistics_allparameters = []
for parameter in parameters:
    # Set parameter
    print('Parameters:', parameter)
    group, dim, ambient_dim, n_points, N, frequencies, n_neighbors, frequency_max, noise = parameter

    # Estimate orbits
    results = {}
    msg = 'Estimate orbits... '
    start_time = ChronometerStart(msg)
    for i in range(n_repetitions):
        results[i] = ComputeHausdorffOnOrbit(group, dim, ambient_dim, n_points, N, frequencies, n_neighbors, 
                                             frequency_max, noise, correction = correction, 
                                             verbose=verbose, plot=plot)            
        ChronometerTick(start_time, i, n_repetitions, msg)
    
    # Reshape dict and get statistics
    results = {key:list([results[i][key] for i in range(len(results))]) for key in results[0].keys()}
    results_statistics = {key:[np.mean(results[key]), np.std(results[key])] for key in results.keys()}
    results_statistics_allparameters.append(results_statistics)
    
# Print results
IPython.display.clear_output()
PrintResults(parameters, results_statistics_allparameters)

In [None]:
# Number of representations to test

GetFrequenciesToTest(dim=2, ambient_dim=6, frequency_max=2, verbose=True);
GetFrequenciesToTest(dim=3, ambient_dim=8, frequency_max=1, verbose=True);

# $T^3$

In [None]:
' Scores T^3 '

method        = 'localPCA'
correction    = False
verbose       = False
plot          = False
n_repetitions = 10

parameters = [
              ('torus', 3, 8, 500,  20**3, ((0, 1, -1, 0), (1, 0, -1, -1), (1, 0, 0, 0)), 30, 1,0),
              ('torus', 3, 8, 2000, 20**3, ((0, 1, -1, 0), (1, 0, -1, -1), (1, 0, 0, 0)), 30, 1,0),
              ('torus', 3, 8, 500,  20**3, ((0, 1, -1, 0), (1, 0, -1, -1), (1, 0, 0, 0)), 30, 1,0.1**2),
              ('torus', 3, 8, 2000, 20**3, ((0, 1, -1, 0), (1, 0, -1, -1), (1, 0, 0, 0)), 30, 1,0.1**2),
             ]

results_statistics_allparameters = []
for parameter in parameters:
    # Set parameter
    print('Parameters:', parameter)
    group, dim, ambient_dim, n_points, N, frequencies, n_neighbors, frequency_max, noise = parameter

    # Estimate orbits
    results = {}
    msg = 'Estimate orbits... '
    start_time = ChronometerStart(msg)
    for i in range(n_repetitions):
        results[i] = ComputeHausdorffOnOrbit(group, dim, ambient_dim, n_points, N, frequencies, n_neighbors, 
                                             frequency_max, noise, correction = correction, 
                                             verbose=verbose, plot=plot)            
        ChronometerTick(start_time, i, n_repetitions, msg)

    # Reshape dict and get statistics
    results = {key:list([results[i][key] for i in range(len(results))]) for key in results[0].keys()}
    results_statistics = {key:[np.mean(results[key]), np.std(results[key])] for key in results.keys()}
    results_statistics_allparameters.append(results_statistics)

# Print results
IPython.display.clear_output()
PrintResults(parameters, results_statistics_allparameters)

# $\mathrm{SU}(2)$

In [None]:
' Scores SU(2) '

method        = 'localPCA'
correction    = False
verbose       = False
plot          = False
n_repetitions = 10

parameters = [
              ('SU(2)', 3, 5, 500,  20**3, [5], 30, None, 0),
              ('SU(2)', 3, 5, 2000, 20**3, [5], 30, None, 0),
              ('SU(2)', 3, 5, 500,  20**3, [5], 30, None, 0.1**2),
              ('SU(2)', 3, 5, 2000, 20**3, [5], 30, None, 0.1**2),

              ('SU(2)', 3, 7, 500,  20**3, [7], 30, None, 0),
              ('SU(2)', 3, 7, 2000, 20**3, [7], 30, None, 0),
              ('SU(2)', 3, 7, 500,  20**3, [7], 30, None, 0.1**2),
              ('SU(2)', 3, 7, 2000, 20**3, [7], 30, None, 0.1**2),

             ('SU(2)', 3, 7, 500,  20**3, [3,4], 30, None, 0),
             ('SU(2)', 3, 7, 2000, 20**3, [3,4], 30, None, 0),
             ('SU(2)', 3, 7, 500,  20**3, [3,4], 30, None, 0.1**2),
             ('SU(2)', 3, 7, 2000, 20**3, [3,4], 30, None, 0.1**2),

              ('SU(2)', 3, 11, 500,  20**3, [4,7], 30, None, 0),
              ('SU(2)', 3, 11, 5000, 20**3, [4,7], 30, None, 0),
              ('SU(2)', 3, 11, 500,  20**3, [4,7], 30, None, 0.1**2),
              ('SU(2)', 3, 11, 5000, 20**3, [4,7], 30, None, 0.1**2),
]

results_statistics_allparameters = []
for parameter in parameters:
    # Set parameter
    print('Parameters:', parameter)
    group, dim, ambient_dim, n_points, N, frequencies, n_neighbors, frequency_max, noise = parameter

    # Estimate orbits
    results = {}
    msg = 'Estimate orbits... '
    start_time = ChronometerStart(msg)
    for i in range(n_repetitions):
        results[i] = ComputeHausdorffOnOrbit(group, dim, ambient_dim, n_points, N, frequencies, n_neighbors, 
                                             frequency_max, noise, correction = correction, 
                                             verbose=verbose, plot=plot)            
        ChronometerTick(start_time, i, n_repetitions, msg)
    
    # Reshape dict and get statistics
    results = {key:list([results[i][key] for i in range(len(results))]) for key in results[0].keys()}
    results_statistics = {key:[np.mean(results[key]), np.std(results[key])] for key in results.keys()}
    results_statistics_allparameters.append(results_statistics)
    print(results)
    
# Print results
# IPython.display.clear_output()
PrintResults(parameters, results_statistics_allparameters)