# Convergencia Tipo X

In [99]:
import numpy as np
import pandas as pd

import sys, os

from matplotlib.patches import Ellipse
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.preprocessing import MinMaxScaler
import sys, os; sys.path.append(os.path.dirname(os.getcwd()))
from pyfrechet.metric_spaces import MetricData, LogCholesky, spd_to_log_chol, log_chol_to_spd
from pyfrechet.regression.bagged_regressor import BaggedRegressor
from pyfrechet.regression.trees import Tree
from sklearn.model_selection import train_test_split
from pyfrechet.metric_spaces import MetricData, LogEuclidean, CustomAffineInvariant, CustomLogEuclidean, AffineInvariant, LogCholesky, log_chol_to_spd, spd_to_log_chol

from typing import Union
import random

from scipy.stats import wishart
from scipy.special import digamma

## Functions

In [100]:
def generate_random_spd_matrix(q_array, limits_unif = 30, seed = 1):
    """Generate a random q x q symmetric positive definite (SPD) matrix."""
    np.random.seed(seed)
    
    q_array = np.array(q_array, dtype = int)
    # Ensure the matrices are symmetric positive definite
    mat = [(np.random.rand(q_array[i], q_array[i])-1/2)*limits_unif for i in range(len(q_array))]
    return [np.dot(mat[i], mat[i].T) for i in range(len(q_array))]

def Sigma_t(t_array, Sigma_array):
    """Provides an array with the matrices given by a regression model that interpolates between four matrices."""  
    """The regression starts with Sigma_1 and then goes to Sigma_2 and Sigma_3 and ends in Sigma_4."""
    
    # Define time intervals for interpolation
    t_array = np.array(t_array)
    t_array = t_array[:, None, None]

    # Return the interpolated matrices
    return np.where(t_array < 0.5, np.cos(np.pi*t_array)**2 * Sigma_array[0] + (1 - np.cos(np.pi*(1-t_array))**2) * Sigma_array[1], 0) + np.where(t_array >= 0.5, (1 - np.cos(np.pi*t_array)**2) * Sigma_array[1] + np.cos(np.pi*(1-t_array))**2 * Sigma_array[2], 0)

# Define the matrices to interpolate
Sigma_1=np.array([[1, -0.6],
                  [-0.6, 0.5]])
Sigma_2=np.array([[1, 0],
                  [0, 1]])
Sigma_3=np.array([[0.5, 0.4],
                  [0.4, 1]])

Sigmas = (Sigma_1, Sigma_2, Sigma_3)

def sim_regression_matrices(Sigmas: tuple,
                            t: np.array,
                            df: int=2):
    t = np.array(t)
    
    #Simulate the time for regression (sample_t) and the true time (true_t)
    q = Sigmas[0].shape[0]

    c_dq = 2 * np.exp((1 / q) * sum( digamma((df - np.arange(1, q + 1) + 1 ) / 2) ))
    sigma_t = Sigma_t(t, Sigmas)
    sample_Y = [wishart( df=df, scale = sigma_t[k] / c_dq ).rvs( size=1 ) for k in range(t.shape[0])]
    return {'t': t, 'y': sample_Y}

def plot_ellipse(mat: np.ndarray, ax, 
                 xy: tuple=(0,0),
                 scale_factor=1,
                 edgecolor='red',
                 facecolor='None',
                 linewidth=2,
                 alpha=1):
    eigenvalues, eigenvectors = np.linalg.eig(mat)
    theta = np.degrees(np.arctan2(*eigenvectors[:, 0][::-1]))
    ellipse = Ellipse(xy=xy,
                  width=scale_factor*np.sqrt(eigenvalues[0]),
                  height=scale_factor*np.sqrt(eigenvalues[1]),
                  angle=theta,
                  edgecolor=edgecolor,
                  facecolor=facecolor,
                  lw=linewidth,
                  alpha=alpha)
    ax.add_patch(ellipse)


def plot_OOB_balls_SPD( predictions: np.ndarray,
                        indices_to_plot: list[int],
                        Ralpha: float,
                        ax,
                        alpha: float = 0.05,
                        reference: Union[np.ndarray, None]=None,
                        scale_factor: float=1/10,
                        xy_factor: float=50,
                        df: int=5,
                        MC_samples: int=100,
                        edge_color='deepskyblue',
                        dist : str = 'LC',
                        limits_unif : int = 30
                        ) -> None:
    index_to_plot = 1
    if dist == 'LC':
        M = LogCholesky(dim = 2)
        if not reference is None:
            for index_to_plot in indices_to_plot:
                sample = generate_random_spd_matrix(q_array=np.repeat(2, MC_samples), limits_unif = limits_unif, seed=4)
                sample = [spd_to_log_chol(A) for A in sample]
                for A in sample:
                    if M.d(A, predictions[index_to_plot])<=Ralpha:
                        plot_ellipse(log_chol_to_spd(A), ax=ax, xy=(index_to_plot/xy_factor,0), scale_factor=scale_factor, edgecolor=edge_color,
                                    alpha=alpha)
                        

                plot_ellipse(log_chol_to_spd(predictions[index_to_plot]), ax=ax, 
                            xy=(index_to_plot/xy_factor,0), scale_factor=scale_factor, edgecolor='black', alpha=1)

                plot_ellipse(log_chol_to_spd(reference[index_to_plot]), ax=ax, 
                            xy=(index_to_plot/xy_factor,0), scale_factor=scale_factor, edgecolor='red', alpha=1)

        else:
            for index_to_plot in indices_to_plot:
                sample = generate_random_spd_matrix(q_array=np.repeat(df, MC_samples), limits_unif = limits_unif, seed=4)
                for A in sample:
                    if M.d(A, predictions[index_to_plot])<=Ralpha:

                        plot_ellipse(log_chol_to_spd(A), ax=ax, xy=(index_to_plot/xy_factor,0), scale_factor=scale_factor, edgecolor=edge_color,
                                    alpha=alpha)
                                    

                plot_ellipse(log_chol_to_spd(predictions[index_to_plot]), ax=ax, 
                            xy=(index_to_plot/xy_factor,0), scale_factor=scale_factor, edgecolor='black', alpha=1)
            

    elif dist == 'AI':
        M = CustomAffineInvariant(dim = 2)
        if not reference is None:
            for index_to_plot in indices_to_plot:
                sample = generate_random_spd_matrix(q_array=np.repeat(2, MC_samples), limits_unif = limits_unif, seed=4)
                for A in sample:
                    if M.d(A, predictions[index_to_plot])<=Ralpha:
                        plot_ellipse(A, ax=ax, xy=(index_to_plot/xy_factor,0), scale_factor=scale_factor, edgecolor=edge_color,
                                    alpha=alpha)
                        

                plot_ellipse(predictions[index_to_plot], ax=ax, 
                            xy=(index_to_plot/xy_factor,0), scale_factor=scale_factor, edgecolor='black', alpha=1)

                plot_ellipse(reference[index_to_plot], ax=ax, 
                            xy=(index_to_plot/xy_factor,0), scale_factor=scale_factor, edgecolor='red', alpha=1)

        else:
            for index_to_plot in indices_to_plot:
                sample = generate_random_spd_matrix(q_array=np.repeat(df, MC_samples), limits_unif = limits_unif, seed=4)
    
                for A in sample:
                    if M.d(A, predictions[index_to_plot])<=Ralpha:

                        plot_ellipse(A, ax=ax, xy=(index_to_plot/xy_factor,0), scale_factor=scale_factor, edgecolor=edge_color,
                                    alpha=alpha)
                                    

                plot_ellipse(predictions[index_to_plot], ax=ax, 
                            xy=(index_to_plot/xy_factor,0), scale_factor=scale_factor, edgecolor='black', alpha=1)
    else:
        M = LogEuclidean(dim = 2)
        if not reference is None:
            for index_to_plot in indices_to_plot:
                sample = generate_random_spd_matrix(q_array=np.repeat(2, MC_samples), limits_unif = limits_unif, seed=4)
                for A in sample:

                    if M.d(A, predictions[index_to_plot])<=Ralpha:
                        plot_ellipse(A, ax=ax, xy=(index_to_plot/xy_factor,0), scale_factor=scale_factor, edgecolor=edge_color,
                                    alpha=alpha)
                        

                plot_ellipse(predictions[index_to_plot], ax=ax, 
                            xy=(index_to_plot/xy_factor,0), scale_factor=scale_factor, edgecolor='black', alpha=1)

                plot_ellipse(reference[index_to_plot], ax=ax, 
                            xy=(index_to_plot/xy_factor,0), scale_factor=scale_factor, edgecolor='red', alpha=1)

        else:
            for index_to_plot in indices_to_plot:
                sample = generate_random_spd_matrix(q_array=np.repeat(df, MC_samples), limits_unif = limits_unif, seed=4)
                for A in sample:
                    if M.d(A, predictions[index_to_plot])<=Ralpha:

                        plot_ellipse(A, ax=ax, xy=(index_to_plot/xy_factor,0), scale_factor=scale_factor, edgecolor=edge_color,
                                    alpha=alpha)
                                    

                plot_ellipse(predictions[index_to_plot], ax=ax, 
                            xy=(index_to_plot/xy_factor,0), scale_factor=scale_factor, edgecolor='black', alpha=1)

In [101]:
dfs_names = [2, 2.5, 3, 3.5, 4, 5, 6]

# Obtain coverage results dataframe from the results files
def coverage_results(dfs: list, dist: str= 'LC') -> pd.DataFrame:
    coverage_df=pd.DataFrame(columns=['sample_index', 'train_size', 'df', 'y_train_data', 'train_predictions', 'OOB_quantile', 'OOB_errors', 'forest'])
    for file in os.listdir(os.path.join(os.getcwd(), 'results')):
        if file.endswith('.npy') and file.split('_')[0] == dist:
            infile=open(os.path.join(os.getcwd(), 'results/'+file), 'rb')
            result=np.load(infile, allow_pickle=True).item()
            infile.close()
            coverage_df=pd.concat([coverage_df, 
                                    pd.DataFrame({  'distance': dist,
                                                    'sample_index': int(file.split('_')[2][4:]),
                                                    'train_size': int(file.split('_')[3][1:]),
                                                    'df': dfs_names[int(file.split('_')[4][2:])-1],
                                                    'y_train_data': [result['y_train_data']],
                                                    'train_predictions': [result['train_predictions']],
                                                    'OOB_quantile': [result['OOB_quantile']],
                                                    'OOB_errors': [result['OOB_errors']], 
                                                    'forest': [result['forest']],
                                                }, index=pd.RangeIndex(0,1))],
                                    ignore_index=True)
        

    coverage_df['train_size']=coverage_df['train_size'].astype('category')
    coverage_df['sample_index']=coverage_df['sample_index'].astype('category')
    coverage_df['df'] = coverage_df.df.astype('category')
    return coverage_df

coverage_df_AI=coverage_results(dist = 'AI', dfs=dfs_names)
# coverage_df_LC=coverage_results(dist = 'LC', dfs=dfs_names)
# coverage_df_LE=coverage_results(dist = 'LE', dfs=dfs_names)
# 
# coverage_df_combined = pd.concat([coverage_df_AI, coverage_df_LC, coverage_df_LE], ignore_index=True)
# print(coverage_df_AI.info())
# print(coverage_df_LC.info())
# print(coverage_df_LE.info())

In [102]:
m = 20
n_estimations = 25

zeros_init = np.zeros(shape = (n_estimations, 3))
cov = np.zeros(shape = (n_estimations, 3))

diccionario = {'df_2': {'AI': {'50': zeros_init, '100': zeros_init, '200': zeros_init, '500': zeros_init}, 'LC': {'50': zeros_init, '100': zeros_init, '200': zeros_init, '500': zeros_init}, 'LE': {'50': zeros_init, '100': zeros_init, '200': zeros_init, '500': zeros_init}}, 'df_5': {'AI': {'50': zeros_init, '100': zeros_init, '200': zeros_init, '500': zeros_init}, 'LC': {'50': zeros_init, '100': zeros_init, '200': zeros_init, '500': zeros_init}, 'LE': {'50': zeros_init, '100': zeros_init, '200': zeros_init, '500': zeros_init}}}

# Obtain 25 estimations of Type I coverage error for each distance and N, to calculate the mean of the estimations and the sample variance
for df in [2, 5]:
    for dist in ['AI']:
        # Select the distance analyzed
        #if dist == 'AI':
        coverage_df = coverage_df_AI[coverage_df_AI['df'] == df]
        M = CustomAffineInvariant(dim = 2)
        # elif dist == 'LC':
        #     coverage_df = coverage_df_LC[coverage_df_LC['df'] == df]
        #     M = LogCholesky(dim = 2)
        # else:
        #     coverage_df = coverage_df_LE[coverage_df_LE['df'] == df]
        #     M = LogEuclidean(dim = 2)

        for N in [50, 100, 200, 500]:
            # Select the size of the training set
            coverage_df_N = coverage_df[coverage_df['train_size'] == N]

            for estimation in range(n_estimations):
                yesno = np.zeros(3)
                new_ys = sim_regression_matrices(Sigmas = (Sigma_1, Sigma_2, Sigma_3), 
                                                t = np.repeat(0, m),  
                                               df = df)
                lns = coverage_df_N.sample(n=m, replace=False)
                i = 0
                for _, ln in lns.iterrows():
                    # Randomly select a row from the dataframe
                    
                    # Retrieve the data           
                    new_y = new_ys['y'][i]
                    new_pred = ln['forest'].predict_matrix(np.array([0]).reshape(-1,1))

                    # Store the selected values
                    yesno = np.vstack((yesno, M.d(new_pred, new_y) <= ln['OOB_quantile']))
                    i += 1
                cov[estimation, :] = yesno[1:,:].sum(axis=0) / m
                
            diccionario['df_'+str(df)][dist][str(N)] = np.copy(cov)

In [103]:
# Obtain 25 estimations of Type I coverage error for each distance and N, to calculate the mean of the estimations and the sample variance
#dist = 'LC'
#
#for df in [2, 5]:
#        
#    coverage_df = coverage_df_LC[coverage_df_LC['df'] == df]
#    M = LogCholesky(dim = 2)
#
#    for N in [50, 100, 200, 500]:
#        # Select the size of the training set
#        coverage_df_N = coverage_df[coverage_df['train_size'] == N]
#
#        for estimation in range(n_estimations):
#            yesno = np.zeros(3)
#
#            for _ in range(m):
#                # Randomly select a row from the dataframe
#                random_row = coverage_df_N.sample(n=1)
#                
#                # Retrieve the data
#                c_dq = 2 * np.exp((1 / 2) * sum([digamma((df - i + 1) / 2) for i in range(1, 2 + 1)]))
#                sigma_t = Sigma_t(0, (Sigma_1, Sigma_2, Sigma_3))[0]
#                sample_Y = np.array([spd_to_log_chol(1/c_dq*wishart( df=df, scale = sigma_t ).rvs( size=1 ))])
#                y_test_datum = MetricData(M, sample_Y)
#                test_pred = MetricData(M, np.array([random_row['forest'].values[0].predict_matrix(np.array([0]).reshape(-1,1))]))
#                
#                # Store the selected values
#                yesno = np.vstack((yesno, M.d(test_pred, y_test_datum) <= random_row['OOB_quantile'].values[0]))
#            cov[estimation, :] = yesno[1:,:].sum(axis=0) / m
#            
#        diccionario['df_'+str(df)][dist][str(N)] = np.copy(cov)

In [107]:
for df in [2, 5]:    
    for dist in ['AI']:
        for N in [50, 100, 200, 500]:
            print(f"{df} degrees of freedom, N = {N}, {dist} distance, mean of Type I coverage estimates: ", np.mean(diccionario['df_'+str(df)][dist][str(N)], axis = 0)) 
            print(f"{df} degrees of freedom, N = {N}, {dist} distance, standard deviation of Type I coverage estimates: ", np.sqrt(np.var(diccionario['df_'+str(df)][dist][str(N)], axis = 0))  )    

2 degrees of freedom, N = 50, AI distance, mean of Type I coverage estimates:  [0.986 0.95  0.888]
2 degrees of freedom, N = 50, AI distance, standard deviation of Type I coverage estimates:  [0.026533   0.05656854 0.07652451]
2 degrees of freedom, N = 100, AI distance, mean of Type I coverage estimates:  [0.992 0.952 0.9  ]
2 degrees of freedom, N = 100, AI distance, standard deviation of Type I coverage estimates:  [0.0183303  0.04118252 0.06480741]
2 degrees of freedom, N = 200, AI distance, mean of Type I coverage estimates:  [0.98  0.956 0.91 ]
2 degrees of freedom, N = 200, AI distance, standard deviation of Type I coverage estimates:  [0.03741657 0.06053098 0.07071068]
2 degrees of freedom, N = 500, AI distance, mean of Type I coverage estimates:  [0.99  0.958 0.912]
2 degrees of freedom, N = 500, AI distance, standard deviation of Type I coverage estimates:  [0.02       0.03655133 0.05706137]
5 degrees of freedom, N = 50, AI distance, mean of Type I coverage estimates:  [0.98  

In [105]:
# Prepare data for the DataFrame
rows = []
index = []

for df in [2, 5]:
    for N in [50, 100, 200, 500]:
        row = []
        for dist in ['AI']:
            means = np.mean(diccionario[f'df_{df}'][dist][str(N)], axis=0)
            stds = np.sqrt(np.var(diccionario[f'df_{df}'][dist][str(N)], axis=0))
            # Format as "mean (std)"
            formatted_values = [f"{means[i]:.3f} ({stds[i]:.3f})" for i in range(3)]
            row.extend(formatted_values)
        rows.append(row)
        index.append((f"df={df}", f"N={N}"))

# MultiIndex for rows and columns
row_index = pd.MultiIndex.from_tuples(index, names=["df", "N"])
col_index = pd.MultiIndex.from_product(
    [["AI"], ["0.01", "0.05", "0.1"]],
    names=["Distance", "Significance Level"]
)

# Create the DataFrame
df = pd.DataFrame(rows, index=row_index, columns=col_index)

# Display the DataFrame
df


Unnamed: 0_level_0,Distance,AI,AI,AI
Unnamed: 0_level_1,Significance Level,0.01,0.05,0.1
df,N,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2
df=2,N=50,0.986 (0.027),0.950 (0.057),0.888 (0.077)
df=2,N=100,0.992 (0.018),0.952 (0.041),0.900 (0.065)
df=2,N=200,0.980 (0.037),0.956 (0.061),0.910 (0.071)
df=2,N=500,0.990 (0.020),0.958 (0.037),0.912 (0.057)
df=5,N=50,0.980 (0.032),0.950 (0.042),0.896 (0.053)
df=5,N=100,0.982 (0.028),0.940 (0.060),0.896 (0.072)
df=5,N=200,0.984 (0.023),0.948 (0.041),0.924 (0.047)
df=5,N=500,0.982 (0.028),0.944 (0.059),0.882 (0.068)


In [106]:
def format_cell(value):
    mean, std = value.split(" ")
    mean = f"{float(mean):.3f}"
    std = std.strip("()")
    std = f"({float(std):.3f})"
    return f"{mean} {std}"

# Apply formatting to all cells
df = df.applymap(format_cell)

latex = df.to_latex(index=True, multirow=True, multicolumn=True, multicolumn_format='c', bold_rows=True, float_format= "%.3f" , caption='Type I error coverage for different distances, degrees of freedom and sample sizes', label='tab:typeIerrorcoverage')
print(latex)

\begin{table}
\caption{Type I error coverage for different distances, degrees of freedom and sample sizes}
\label{tab:typeIerrorcoverage}
\begin{tabular}{lllll}
\toprule
 & Distance & \multicolumn{3}{c}{AI} \\
 & Significance Level & 0.01 & 0.05 & 0.1 \\
df & N &  &  &  \\
\midrule
\multirow[t]{4}{*}{\textbf{df=2}} & \textbf{N=50} & 0.986 (0.027) & 0.950 (0.057) & 0.888 (0.077) \\
\textbf{} & \textbf{N=100} & 0.992 (0.018) & 0.952 (0.041) & 0.900 (0.065) \\
\textbf{} & \textbf{N=200} & 0.980 (0.037) & 0.956 (0.061) & 0.910 (0.071) \\
\textbf{} & \textbf{N=500} & 0.990 (0.020) & 0.958 (0.037) & 0.912 (0.057) \\
\cline{1-5}
\multirow[t]{4}{*}{\textbf{df=5}} & \textbf{N=50} & 0.980 (0.032) & 0.950 (0.042) & 0.896 (0.053) \\
\textbf{} & \textbf{N=100} & 0.982 (0.028) & 0.940 (0.060) & 0.896 (0.072) \\
\textbf{} & \textbf{N=200} & 0.984 (0.023) & 0.948 (0.041) & 0.924 (0.047) \\
\textbf{} & \textbf{N=500} & 0.982 (0.028) & 0.944 (0.059) & 0.882 (0.068) \\
\cline{1-5}
\bottomrule
\end{tabula

  df = df.applymap(format_cell)
