In [None]:
import numpy as np
import functools
import matplotlib.pyplot as plt
from typing import Tuple, Any, List

def create_shared_parameterization(q0, q1, I0, I1):

    if np.array_equal(I0, I1):
        return I0, q0, q1

    #create array of shared interpolation points
    I = np.unique(np.concatenate((I0, I1)))
    i,j = 0,0
    q0_new = np.zeros(I.shape[0]-1)
    q1_new = np.zeros(I.shape[0]-1)

    #interpolate to previous when creating diff
    for k in range(I.shape[0]-1):
        q0_new[k] = q0[i]
        q1_new[k] = q1[j]

        if I0[i+1] <= I[k+1]:
            i +=1
        if I1[j+1] <= I[k+1]:
            j +=1

    return I, q0_new, q1_new

def L2_metric(q0, q1, I0, I1):
    if not np.array_equal(I0, I1):
        I0, q0, q1 = create_shared_parameterization(q0, q1, I0, I1)
    return np.sqrt(np.dot(np.diff(I0), (q0 - q1)**2))

def local_cost(k, l, i, j, q0, q1, I):
    L2 = L2_metric(
            q0[k:i],
            np.sqrt((I[j]-I[l])/(I[i]-I[k]))*q1[l:j],
            I[k:i+1],
            np.linspace(I[k], I[i], (j-l+1))
        )**2
    assert isinstance(L2, (float, np.float64)), f"L2 is not a number: {L2}"
    return L2

def dynamic(local_cost, M, depth):
    A = np.zeros((M, M))
    pointers = dict()

    for i in range(M):
        for j in range(M):
            if i == 0 and j == 0:
                A[i, j] = 0
                continue
            min_cost = np.inf
            best_pred = None

            # Calculates cost of getting to (i,j), where it uses the cost (i,j) to (k,l) and (k,l) to (0,0)
            for pred in predecessors(i, j, depth):
                k, l = pred
                # Local cost = (i,j) -> (k,l)  
                cost = local_cost(k, l, i, j) + A[k, l]
                if min_cost > cost:
                    min_cost = cost
                    best_pred = pred
            A[i, j] = min_cost
            pointers[(i, j)] = best_pred

    return pointers, A


def predecessors(i, j, m):
    return ((k, l) for k in range(np.maximum(0, i - m), i) for l in range(np.maximum(0, j - m), j) if np.gcd(i-k, j-l) == 1)
    # for k in range(max(0, i - m), i):
    #     for l in range(max(0, j - m), j):
    #         yield (k,l)
    
def find_optimal_diffeomorphism(q0, q1, I0, I1, depth, reg = None):
    I, q0_new, q1_new = create_shared_parameterization(q0, q1, I0, I1)
    M = I.shape[0]

    if reg is not None:
        local_cost_partial = functools.partial(local_cost_regulated, q0 = q0_new, q1 = q1_new, I = I, lambda_reg = reg[0], gamma_reg = reg[1])
    else:
        local_cost_partial = functools.partial(local_cost, q0 = q0_new, q1 = q1_new, I = I)
    pointers, A = dynamic(local_cost_partial, M, depth)

    path = reconstruct(pointers, M-1, M-1)
    path = np.array(path) / float(M-1)

    #Construct reparametrization
    x = path[:,0]
    y = path[:,1]

    I_new = np.interp(I1, x, y)
    return I_new
    

def reconstruct(pointers, M, N):
    path = [(M,N)]
    try:
        while True:
            pred = path[-1]
            path.append(pointers[pred])
    except:
        pass

    path.reverse()
    return path

def reparameterize_multiple_rotations(I_new, I, c):
    """
    Creates the new movement
    Input:
        I_new: new parameterization
        I: old parameterization
        c: old movement
    Output:
        c_new: new movement
    """
    c_new = np.zeros(c.shape)
    for i in range(c.shape[0]):
        c_new[i] = reparameterize_rotation(I_new, I, c[i])
    return c_new


def reparameterize_rotation(I_new, I, c):
    assert I.shape[0] == c.shape[0], "I and c must have the same length"
    assert I_new[0] == 0, "I_new must start at 0"
    assert I_new[-1] == 1, "I_new must end at 1"
    assert np.all(np.diff(I_new) > 0), "I_new must be in ascending order"

    c_new = np.interp(I_new, I, c)
    return c_new

def srvf(I, c):
    TOL = 1e-10
    dc = np.diff(c) / np.diff(I)
    norms = np.abs(dc)
    sqrt_norms = np.sqrt(norms)
    scale_factors = np.where(sqrt_norms < TOL, 1, sqrt_norms)
    q = (dc.T / scale_factors).T  # Normalize each segment individually
    return q

def curve_distance(c0, c1): 
    """
    Computes the distance between two curves
    Input:
        c0: first curve
        c1: second curve
    Output:
        d: distance
    """
    d = np.linalg.norm(c0 - c1)
    return d

def local_cost_regulated(k, l, i, j, q0, q1, I, lambda_reg, gamma_reg):
    cost_l2 = local_cost(k, l, i, j, q0, q1, I)
    new_q1 = np.sqrt((I[j]-I[l])/(I[i]-I[k]))*q1[l:j] 

    # This regulates how much the curve can change
    def warp(k, l, i, j, t): 
        return l + (t-k) * (j-l) / (i-k)
    s_m = np.arange(k+1, i+1) # k < s_m <= i 
    reg_cost_1 = lambda_reg * np.mean(np.abs(warp(k, l, i, j, s_m) - s_m)**gamma_reg) 

    # This tries to smooth the curve
    reg_cost_2 = np.mean(np.abs(new_q1) ** gamma_reg)

    return cost_l2 + lambda_reg * reg_cost_2

### Introducing perturbation to the parameterization of the model

I.e. I = I +- dI 

In [None]:
def perturb_parameterization(I: np.ndarray, scalar: float) -> np.ndarray:
    
    assert scalar >= 1, "scalar must be greater than or equal to 1"

    I_perturbed = I.copy()
    max_perturbation = (I[1] - I[0]) / 2
    perturbation = max_perturbation / scalar
    print(f"Perturbation: {perturbation:.4f}")

    # Set seed for reproducibility
    np.random.seed(0)
    # I_perturbed[1:-1] += np.random.uniform(-perturbation, perturbation, I.shape[0] - 2)
    I_perturbed[1:-1] += np.random.normal(0, perturbation, I.shape[0] - 2)  
    
    assert np.all(np.diff(I_perturbed) > 0), "I_new must be in ascending order"
    assert I_perturbed[0] == 0, "I_new must start at 0"
    assert I_perturbed[-1] == 1, "I_new must end at 1"

    return I_perturbed

def create_perturbed_data(f: callable, num_points: int, pertubation_scalar: float) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: 
    I = np.linspace(0, 1, num_points)
    I_perturbed = perturb_parameterization(I, pertubation_scalar)
    c = f(I)
    c_perturbed = f(I_perturbed)
    return I, I_perturbed, c, c_perturbed

def plot_perturbation(I: np.ndarray, I_perturbed: np.ndarray, c: np.ndarray, c_perturbed: np.ndarray) -> None:
    _, axs = plt.subplots(1, 2, figsize=(8, 4))
    axs[0].plot(I, c, label="Original Curve", marker = ".")
    axs[0].plot(I, c_perturbed, label="Perturbed Curve", marker = ".")
    axs[0].set_xlabel("I")
    axs[0].set_ylabel("f(I)")
    axs[0].legend()

    axs[1].plot(I, I, label="Original Parameterization", marker = ".")
    axs[1].plot(I, I_perturbed, label="Perturbed Parameterization", marker = ".")
    axs[1].set_xlabel("I")
    axs[1].set_ylabel("I perturbed")
    axs[1].legend()
    plt.show()

def plot_reparameterization(I, I_per, I_new, c, c_per, c_new) -> None: 
    _, axs = plt.subplots(1, 2, figsize=(8, 4))
    
    axs[0].plot(I, c, label="C", marker = ".", linestyle='-', color='blue')
    axs[0].plot(I, c_per, label="C Pert", marker = ".", linestyle='--', color='red')
    axs[0].plot(I, c_new, label="C New", marker = ".", linestyle=':', color='green')
    axs[0].set_xlabel("I")
    axs[0].set_ylabel("f(I)")
    axs[0].legend()

    axs[1].plot(I, I, label="Orig I", marker = ".", linestyle='-', color='blue')
    axs[1].plot(I, I_per, label="I Pert", marker = ".", linestyle='--', color='red')
    axs[1].plot(I, I_new, label="I New", marker = ".", linestyle=':', color='green')
    axs[1].set_xlabel("I")
    axs[1].set_ylabel("I_new")
    axs[1].legend()
    
    plt.show()

def perform_reparameterization(I: np.ndarray, c0: np.ndarray, c1: np.ndarray, depth: float, reg: Any = None) -> Tuple[np.ndarray, np.ndarray, np.ndarray, float]:
    q0 = srvf(I, c0)
    q1 = srvf(I, c1)
    I_new = find_optimal_diffeomorphism(q0, q1, I, I, depth, reg)
    c_new = reparameterize_rotation(I_new, I, c1)
    q1_new = srvf(I, c_new)
    distance = L2_metric(q0, q1_new, I, I)**2
    return I_new, c_new, q1_new, distance

def plot_error(pertubation_lst: np.ndarray, error_lst: np.ndarray, order: int) -> None:
    plt.loglog(pertubation_lst, error_lst, marker = ".", label = "Error")
    plt.xlabel('Perturbation')
    plt.ylabel('Error')

    x_ref = np.linspace(min(pertubation_lst), max(pertubation_lst), 100)
    y_ref = x_ref**(order)
    y_ref = y_ref * (error_lst[0] + y_ref[0] * 1) / y_ref[0]
    plt.loglog(x_ref, y_ref, label=f'Reference Line $h^{{-{order}}}$', linestyle='--', color='red')    
    plt.legend()
    plt.show()

### Perturbation plot for std. reparametrization

- c_0 = f(I)
- c_1 = f(I + error)
- Want to see if E -> 0 as error -> 0

In [None]:
# Test
# pertubation_scalar = 1
num_points = 30
depth = num_points
lambda_reg = .1
gamma_reg = 1
# reg = (lambda_reg, gamma_reg)
# f = lambda x: np.sin(5*np.pi*x) * np.cos(2*np.pi*x)
# f = lambda x: (x - x**2) / (x**2 + 1) 
f = lambda x: np.sin(10 * np.cos(3 * x)) * np.cos(2 * x)

# List that starts at 10 and doubles for each iteration
pertubation_lst = [2**i*10 for i in range(5)]
std_err_lst = np.zeros(len(pertubation_lst))
reg_err_lst = np.zeros(len(pertubation_lst))
for index, pertubation_scalar in enumerate(pertubation_lst):

    I, I_perturbed, c, c_perturbed = create_perturbed_data(f, num_points, pertubation_scalar)
    I_new, c_new, q1_new, std_err_lst[index] =  perform_reparameterization(I, c, c_perturbed, depth)
    # _, _, _, reg_err_lst[index] = perform_reparameterization(I, c, c_perturbed, depth, reg = (lambda_reg / (2**index), gamma_reg))
    print(f"Distance std: {std_err_lst[index]}")  
    # print(f"Distance reg: {reg_err_lst[index]}")

order = 2
pertubation_lst = (I[1] - I[0]) / pertubation_lst 
plot_error(pertubation_lst, std_err_lst, order)

plot_perturbation(I, I_perturbed, c, c_perturbed)
plot_reparameterization(I, I_perturbed, I_new, c, c_perturbed, c_new)

### Want to check if regularizing helps for perturbed curves

- There is a risiko that the curve is to simple and that the regularization will not help.
- It doesn't help on perturbed parameterization


In [None]:
def create_perturbed_curve(c: np.ndarray, scalar: float) -> np.ndarray:
    c_perturbed = c.copy()
    np.random.seed(0)
    # c_perturbed += np.random.uniform(-scalar, scalar, c.shape[0])
    c_perturbed += np.random.normal(0, scalar, c.shape[0])
    return c_perturbed

def create_perturbed_curve_data(f: callable, num_points: int, pertubation_scalar: float) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    I = np.linspace(0, 1, num_points)
    c = f(I)
    c_perturbed = create_perturbed_curve(c, pertubation_scalar)
    return I, c, c_perturbed

def plot_perturbation_curve(I: np.ndarray, c: np.ndarray, c_perturbed: np.ndarray) -> None:
    fig = plt.figure(figsize=(8, 4))
    plt.plot(I, c, label="Original Curve", marker = ".")
    plt.plot(I, c_perturbed, label="Perturbed Curve", marker = ".")
    plt.xlabel("I")
    plt.ylabel("f(I)")
    plt.legend()
    

def plot_reparameterization_curve(I, I_new, c, c_per, c_new) -> None: 
    _, axs = plt.subplots(1, 2, figsize=(8, 4))
    
    axs[0].plot(I, c, label="C", marker = ".", linestyle='-', color='blue')
    axs[0].plot(I, c_per, label="C Pert", marker = ".", linestyle='--', color='red')
    # axs[0].plot(I, c_new, label="C New", marker = ".", linestyle=':', color='green')
    axs[0].set_xlabel("I")
    axs[0].set_ylabel("f(I)")
    axs[0].legend()

    axs[1].plot(I, c, label="C", marker = ".", linestyle='-', color='blue')
    axs[1].plot(I, c_new, label="C New", marker = ".", linestyle=':', color='green')
    axs[1].set_xlabel("I")
    axs[1].set_ylabel("f(I)")
    axs[1].legend()


    # axs[1].plot(I, I, label="Orig I", marker = ".", linestyle='-', color='blue')
    # axs[1].plot(I, I_new, label="I New", marker = ".", linestyle=':', color='green')
    # axs[1].set_xlabel("I")
    # axs[1].set_ylabel("I_new")
    # axs[1].legend()
    
    plt.show()

### Perturbation of curve

- c_0 = f(I)
- c_1 = f(I) + error
- Want to see if E -> 0 as error -> 0

In [None]:
# pertubation_scalar = 0.002
num_points = 25
depth = 25
lambda_reg = .05
gamma_reg = 1

# Function to test
# f = lambda x: np.sin(5*np.pi*x) * np.cos(2*np.pi*x)
# f = lambda x: (x - x**2) / (x**2 + 1) 
f = lambda x: np.sin(10 * np.cos(3 * x)) * np.cos(2 * x)

# List that starts at 10 and doubles for each iteration
pertubation_lst = [2**i*0.001 for i in range(5)]
std_err_lst = np.zeros(len(pertubation_lst))
reg_err_lst = np.zeros(len(pertubation_lst))
for index, pertubation_scalar in enumerate(pertubation_lst):
    I, c, c_perturbed = create_perturbed_curve_data(f, num_points, pertubation_scalar)
    I_new, c_new, q1_new, std_err_lst[index] =  perform_reparameterization(I, c, c_perturbed, depth)
    print(f"Distance std: {std_err_lst[index]}")


order = 2
plot_error(pertubation_lst, std_err_lst, order)

plot_perturbation_curve(I, c, c_perturbed)
plot_reparameterization_curve(I, I_new, c, c_perturbed, c_new)


### Want to check if regularizing helps for perturbed curves

- If the curve have order 1, then the regularization will not help, as the noise isn't big enough. 
- Assume that the curve at max can have order 1, but will probably have lower order.
- Should start by finding an alg. to find the correct parameterization of the curve.


In [None]:
pertubation_scalar = .1
num_points = 40
depth = 10
#O(n^2, d^2)

# Function to test
# f = lambda x: np.sin(5*np.pi*x) * np.cos(2*np.pi*x)
# f = lambda x: (x - x**2) / (x**2 + 1) 
f = lambda x: np.sin(10 * np.cos(3 * x)) * np.cos(2 * x)

I, c, c_perturbed = create_perturbed_curve_data(f, num_points, pertubation_scalar)

I_new, c_new, q1_new, _ =  perform_reparameterization(I, c, c_perturbed, depth)
err_ = L2_metric(srvf(I, c), q1_new, I, I)**2
print(f"Distance std: {err_}")
# plot_perturbation_curve(I, c, c_perturbed)
plot_reparameterization_curve(I, I_new, c, c_perturbed, c_new)

lam = pertubation_scalar * 0.025
I_new, c_new, q1_new, _  = perform_reparameterization(I, c, c_perturbed, depth, reg=(lam, 1))
err_ = L2_metric(srvf(I, c), q1_new, I, I)**2
print(f"Distance std: {err_}")
# plot_perturbation_curve(I, c, c_perturbed)
plot_reparameterization_curve(I, I_new, c, c_perturbed, c_new)

In [None]:
num_points = 40
depth = 5

# Function to test
f = lambda x: np.sin(5*np.pi*x) * np.cos(2*np.pi*x)
# f = lambda x: (x - x**2) / (x**2 + 1) 
# f = lambda x: np.sin(10 * np.cos(3 * x)) * np.cos(2 * x)

pertubation_scalar = .1

# Create perturbed data
np.random.seed(0)
I = np.linspace(0, 1, num_points)
c = f(I)
c_perturbed = c.copy()
c_perturbed[1:-1] += np.random.normal(0, pertubation_scalar, c.shape[0] - 2)

plt.plot(I, c, label="Original Curve", marker = ".")
plt.plot(I, c_perturbed, label="Perturbed Curve", marker = ".")
plt.show()

I_new, c_new, q1_new, err_ =  perform_reparameterization(I, c, c_perturbed, depth)
plt.plot(I, c, label="Original Curve", marker = ".")
plt.plot(I, c_new, label="New Curve", marker = ".")
plt.show()
print(f"Distance std: {err_} \n")

# Need an alg that needs to calculate some start lambda
# Then it should check if we get the same result as without regularization
    # If same, then increase lambda
    # If higher, then decrease lambda
    # If lower, then we are in the correct range, but there may be a better lambda

perturbation_scalar = 1
lam = perturbation_scalar * 1
direction = None
lam_min = 0
lam_max = 10000
for i in range(10):
    print(f"Iteration: {i}")
    I_new, c_new, q1_new, err_p = perform_reparameterization(I, c, c_perturbed, depth, reg=(lam, 1))

    if err_p < err_:
        print(f"Done: {err_p} < {err_}")
        break 

    elif err_p > err_:
        print(f"Too high lambda: {err_p} > {err_}")
        lam_max = lam  
        print(f"Lambda: {lam}")
        lam = (lam_max + lam_min) / 2 

    else: 
        print(f"Too low lambda: {err_p} == {err_}")
        lam_min = lam
        print(f"Lambda: {lam}")
        lam = (lam_max + lam_min) / 2



    

    

I_new, c_new, q1_new, err_p  = perform_reparameterization(I, c, c_perturbed, depth, reg=(lam, 1))
plt.plot(I, c, label="Original Curve", marker = ".")
plt.plot(I, c_new, label="New Curve", marker = ".")
plt.show()



In [None]:
# f = lambda x: (x - x**2) / (x**2 + 1) 
# f = lambda x: np.sin(5*np.pi*x) * np.cos(2*np.pi*x)
f = lambda x: np.sin(10 * np.cos(3 * x)) * np.cos(2 * x)
# phi = lambda I: np.sin(I * np.pi / 2) 
# phi = lambda I: I**2
phi = lambda I: (np.exp(I) - 1) / (np.exp(1) - 1)

num_points = 50
I = np.linspace(0, 1, num_points)
c0 = f(I)
c1 = f(phi(I))

noise_level = np.min([(np.max(c0) - np.min(c0)), (np.max(c1) - np.min(c1))]) * 0.01
c0_n = c0.copy()
c1_n = c1.copy()
np.random.seed(1) 
c0_n[1:-1] = c0[1:-1] + np.random.normal(0, noise_level, c0.shape[0] - 2)
np.random.seed(2)
c1_n[1:-1] = c1[1:-1] + np.random.normal(0, noise_level, c1.shape[0] - 2)


fig, axs = plt.subplots(1,2, figsize=(10, 5))

# First subplot
axs[0].plot(I, c0, label = "c0", linewidth=0.5)
axs[0].scatter(I, c0, s=10)  # Add points
axs[0].plot(I, c1, label = "c1", linewidth=0.5)
axs[0].scatter(I, c1, s=10)  # Add points
axs[0].title.set_text('Original curves')
axs[0].legend()

# Second subplot
axs[1].plot(I, c0_n, label = "c0 noisy", linewidth=0.5)
axs[1].scatter(I, c0_n, s=10)  # Add points
axs[1].plot(I, c1_n, label = "c1 noisy", linewidth=0.5)
axs[1].scatter(I, c1_n, s=10)  # Add points
axs[1].title.set_text('Noisy curves')
axs[1].legend()

plt.show()

q0 = srvf(I, c0_n)
q1 = srvf(I, c1_n)

depth = 5
I1_new = find_optimal_diffeomorphism(q0, q1, I, I, depth)
c1_new = reparameterize_rotation(I1_new, I, c1_n)

lambda_reg = .1
gamma_reg = 1
I1_new_reg = find_optimal_diffeomorphism(q0, q1, I, I, depth, reg = (lambda_reg, gamma_reg))
c1_new_reg = reparameterize_rotation(I1_new_reg, I, c1_n)

# It is the distance between the new c1s and the original c0 that is interesting
q0 = srvf(I, c0)
q1_std = srvf(I, c1_new)
q1_reg = srvf(I, c1_new_reg)

print(f"Distance std: {L2_metric(q0, q1_std, I, I)}")
print(f"Distance reg: {L2_metric(q0, q1_reg, I, I)}")

fig = plt.figure()
plt.plot(I, I1_new, label = "I_new", linewidth=0.5)
plt.scatter(I, I1_new, s=10)  # Add points
plt.plot(I, I1_new_reg, label = "I1_new_reg", linewidth=0.5)
plt.scatter(I, I1_new_reg, s=10)  # Add points
plt.title('Standard reparameterization')
plt.legend()
plt.show()

# Plot the curves with the new parameterization of c1 against the original c0
fig, axs = plt.subplots(1,2, figsize=(10, 5))

# First subplot
axs[0].plot(I, c0, label = "c0", linewidth=0.5)
axs[0].scatter(I, c0, s=10)  # Add points
axs[0].plot(I, c1_new, label = "c1 new", linewidth=0.5)
axs[0].scatter(I, c1_new, s=10)  # Add points
axs[0].title.set_text('Standard reparameterization')
axs[0].legend()

# Second subplot
axs[1].plot(I, c0, label = "c0", linewidth=0.5)
axs[1].scatter(I, c0, s=10)  # Add points
axs[1].plot(I, c1_new_reg, label = "c1 new reg", linewidth=0.5)
axs[1].scatter(I, c1_new_reg, s=10)  # Add points
axs[1].title.set_text('Regulated reparameterization')
axs[1].legend()

plt.show()

# Plot the srvf of the curves
fig, axs = plt.subplots(1,2, figsize=(10, 5))

# First subplot
axs[0].plot(I[:-1], q0, label = "q0", linewidth=0.5)
axs[0].scatter(I[:-1], q0, s=10)  # Add points
axs[0].plot(I[:-1], q1_std, label = "q1 std", linewidth=0.5)
axs[0].scatter(I[:-1], q1_std, s=10)  # Add points
axs[0].title.set_text('Standard reparameterization')
axs[0].legend()

# Second subplot
axs[1].plot(I[:-1], q0, label = "q0", linewidth=0.5)
axs[1].scatter(I[:-1], q0, s=10)  # Add points
axs[1].plot(I[:-1], q1_reg, label = "q1 reg", linewidth=0.5)
axs[1].scatter(I[:-1], q1_reg, s=10)  # Add points
axs[1].title.set_text('Regulated reparameterization')
axs[1].legend()

plt.show()

### Loop through several lambda values