In [None]:
# This block of code was taken from:
# Farhad Nawaz, Tianyu Li, Nikolai Matni, Nadia Figueroa,
# "Learning Complex Motion Plans using Neural ODEs
# with Safety and Stability Guarantees", ICRA 2024, arXiv:2308.00186.

!pip install gmr
import gmr
import numpy as np
import matplotlib.pyplot as plt
from scipy.integrate import solve_ivp
from scipy import interpolate
from scipy.interpolate import interp1d

import jax
import jax.nn as jnn
import jax.numpy as jnp
from sklearn.preprocessing import MinMaxScaler
import torch
# Reading data from text file

flag = None
traj_all = []
traj_c = 0

file_name = '/content/trajs_loop_4.txt'

f = open(file_name, 'r')
data_text = f.readlines()
for i in data_text:
    if i == 'New trajectory\n':
        flag = 'new'
        if traj_c > 0:
            traj_all.append(traj)
        traj_c += 1
        continue
    if flag == 'new':
        traj = np.fromstring(i, dtype=float, sep=' ').reshape((1, 2))
        flag = 'old'
        continue
    if flag == 'old':
        traj = np.concatenate((traj, np.fromstring(i, dtype=float, sep=' ').reshape((1, 2))))
traj_all.append(traj)

traj_all_norm = []
ts_norm = []
scaler = MinMaxScaler(feature_range=(-0.5,0.5))
scaler_all = []
for i in range(len(traj_all)):
    scaler.fit(traj_all[i])
    scaler_all.append(scaler)
    ysti = scaler.transform(traj_all[i])
    traj_all_norm.append(ysti)
    ts_norm.append(jnp.linspace(0,1,num=ysti.shape[0]))

dim = traj.shape[1]

nsamples = 300

data_aug = 0

traj_all_process = jnp.zeros((traj_c + data_aug, nsamples, dim))

seed = 1385

key = jax.random.PRNGKey(seed)

key_trajs = jax.random.split(key, num=traj_c + data_aug)

for i in range(traj_c + data_aug):
  key_dim = jax.random.split(key_trajs[i], num=dim)
  for j in range(dim):
    f = interpolate.interp1d(ts_norm[i], traj_all_norm[i][:, j])
    ts_new = np.linspace(0, 1, nsamples)
    range_traj = max(traj_all_norm[i][:, j]) - min(traj_all_norm[i][:, j])
    scale = 0
    traj_new = f(ts_new) + jax.random.uniform(key_dim[j], shape=ts_new.shape, minval=-scale*range_traj, maxval=scale*range_traj)
    traj_all_process = traj_all_process.at[i, :, j].set(traj_new)

traj_d = jnp.diff(traj_all_process, axis=1)
traj_d = jnp.concatenate((traj_d, jnp.zeros((traj_c + data_aug, 1, dim))), axis=1)
traj_d_all = jnp.concatenate((traj_all_process, traj_d), axis=2)

for i in range(traj_c + data_aug):
    plt.plot(traj_all_process[i][0, 0], traj_all_process[i][0, 1], 'ro')
    plt.plot(traj_all_process[i][:, 0], traj_all_process[i][:, 1])
    plt.plot(traj_all_process[i][-1, 0], traj_all_process[i][-1, 1], 'go')

In [None]:
drawing = np.array(traj_all_process[0, :, :])

def normalize_drawing(d):
    d = d - np.mean(d, axis=0)
    max_range = np.max(np.abs(d))
    return d / max_range

def rotate_and_scale_drawing(d, angle_degrees=0.0, scale_x=1.0, scale_y=1.0):
    theta = np.radians(angle_degrees)
    rotation_matrix = np.array([
        [np.cos(theta), -np.sin(theta)],
        [np.sin(theta),  np.cos(theta)]
    ])
    scaling_matrix = np.diag([scale_x, scale_y])
    transform = rotation_matrix @ scaling_matrix
    return d @ transform.T

def add_noise(drawing, noise_scale=0.13):
    return drawing + np.random.normal(scale=noise_scale, size=drawing.shape)

def simplify_and_interpolate(drawing, keep_ratio=0.05, noise_scale=0.15):
    num_points = int(len(drawing) * keep_ratio)
    idx = np.sort(np.random.choice(len(drawing), num_points, replace=False))
    simplified = drawing[idx]
    simplified += np.random.normal(scale=noise_scale, size=simplified.shape)
    f_interp = interp1d(np.linspace(0, 1, num_points), simplified, axis=0, kind='cubic')
    return f_interp(np.linspace(0, 1, len(drawing)))

def distort_shape(drawing):
    scales = np.random.uniform(0.9, 1, size=(len(drawing), 1))
    return drawing * scales

def exaggerate_parts(drawing, start=100, end=250, factor=0.8):
    modified = drawing.copy()
    modified[start:end] *= factor
    return modified

def create_childlike_versions(drawing):
    drawing = normalize_drawing(drawing)

    version1 = add_noise(drawing)
    version2 = simplify_and_interpolate(drawing)
    version3 = distort_shape(drawing)
    version4 = exaggerate_parts(drawing)

    version1 = (rotate_and_scale_drawing(version1,5,1,1))
    version2 = (rotate_and_scale_drawing(version2,-5,1.1,1.1))
    version3 = (rotate_and_scale_drawing(version3,10,1.2,1.2))
    version4 = (rotate_and_scale_drawing(version4,-10,1.3,1.3))

    return version1, version2, version3, version4


drawing = normalize_drawing(drawing)
child1, child2, child3, child4 = create_childlike_versions(drawing)


fig, axs = plt.subplots(1, 5, figsize=(22, 4))
titles = ['Expert (Original)', 'Child 1: Noisy', 'Child 2: Simplified',
          'Child 3: Distorted', 'Child 4: Exaggerated']

for ax, data, title in zip(axs, [drawing, child1, child2, child3, child4], titles):
    ax.plot(data[:, 0], data[:, 1], lw=2)
    ax.set_title(title)
    ax.axis('equal')
    ax.axis('off')

plt.tight_layout()
plt.show()


In [11]:
noisy_d = np.array([child1, child2, child3, child4])

In [None]:
fig, axes = plt.subplots(1, 4, figsize=(16, 4))

for i in range(4):
    x = noisy_d[i, :, 0]
    y = noisy_d[i, :, 1]

    ax = axes[i]
    ax.plot(x, y, lw=2)
    ax.set_title(f"Trajectory {i+1}")
    ax.set_xlabel("X")
    ax.set_ylabel("Y")
    ax.set_aspect('equal')
    ax.grid(True)

plt.tight_layout()
plt.show()


In [None]:
def smoothness_losst(pred):
    dx = torch.diff(pred[:, 0], dim=0)
    dy = torch.diff(pred[:, 1], dim=0)
    return torch.sum(dx**2 + dy**2).item()

def closed_shape_loss(pred):
    return torch.norm(pred[0] - pred[-1], dim=-1).item()

def symmetry_loss(pred):
    centroid = torch.mean(pred, dim=0)
    reflected_x = 2 * centroid[0] - pred[:, 0]
    reflected_points = torch.stack((reflected_x, pred[:, 1]), dim=1)
    diff = reflected_points.unsqueeze(1) - pred.unsqueeze(0)
    dists = torch.norm(diff, dim=2)
    min_dists, _ = torch.min(dists, dim=1)
    return torch.mean(min_dists).item()

def compute_scores(pred):
    return {
        'Smoothness': smoothness_losst(pred),
        'Closed Shape': closed_shape_loss(pred),
        'Symmetry': symmetry_loss(pred)
    }

weights = {
    'Symmetry': 1.0,
    'Closed Shape': 0.5,
    'Smoothness': 0.1
}

drawing = torch.tensor(drawing, dtype=torch.float32)
chil1 = torch.tensor(child1, dtype=torch.float32)
chil2 = torch.tensor(child2, dtype=torch.float32)
chil3 = torch.tensor(child3, dtype=torch.float32)
chil4 = torch.tensor(child4, dtype=torch.float32)

drawings = [drawing, chil1, chil2, chil3, chil4]
score_dicts = [compute_scores(d) for d in drawings]

all_metrics = {key: [d[key] for d in score_dicts] for key in weights}

normalized_scores = []
for scores in score_dicts:
    norm_score = {}
    for k in scores:
        vals = np.array(all_metrics[k])
        min_val, max_val = vals.min(), vals.max()
        if max_val - min_val > 1e-8:
            norm_score[k] = (scores[k] - min_val) / (max_val - min_val)
        else:
            norm_score[k] = 0.0
    normalized_scores.append(norm_score)

def compute_weighted_optimal(score, weights):
    weighted_sum = sum(score[k] * weights[k] for k in weights)
    return 1.0 / (weighted_sum + 1e-8)


optimality_scores = [compute_weighted_optimal(s, weights) for s in normalized_scores]
min_score = min(optimality_scores[1:])
max_score = max(optimality_scores[1:])

optimality_scores = [(score - min_score) / (max_score - min_score + 1e-8) for score in optimality_scores]

titles_with_scores = [
    f'Original Expert ',
    f'Child 1: Noisy | Optimality: {optimality_scores[1]:.3f}',
    f'Child 2: Simplified | Optimality: {optimality_scores[2]:.3f}',
    f'Child 3: Distorted | Optimality: {optimality_scores[3]:.3f}',
    f'Child 4: Exaggerated | Optimality: {optimality_scores[4]:.3f}'
]


In [None]:
fig, axs = plt.subplots(1, 5, figsize=(22, 5))
for ax, data, title, score in zip(axs, drawings, titles_with_scores, score_dicts):
    data_np = data.numpy()
    ax.plot(data_np[:, 0], data_np[:, 1], lw=2)
    ax.axis('equal')
    ax.axis('off')

    score_text = '\n'.join([f"{k}: {v:.3f}" for k, v in score.items()])
    ax.text(0.05, 0.95, score_text, transform=ax.transAxes, fontsize=12,
            verticalalignment='top', bbox=dict(facecolor='white', alpha=0.7, edgecolor='none'))
    ax.set_title(title, fontsize=11)

plt.tight_layout()
plt.show()


In [15]:
class PeriodicDMP:
    def __init__(self, n_dof=2, n_basis=20, tau=1.0, alpha=25.0, beta=10.0):
        self.n_dof = n_dof
        self.n_basis = n_basis
        self.tau = tau
        self.alpha = alpha
        self.beta = beta

        self.time = np.linspace(0, tau, 300)
        self.psi = self._compute_basis_functions(self.time)

        self.weights = np.random.randn(self.n_basis, self.n_dof)

    def _compute_basis_functions(self, time):

        centers = np.linspace(0, 1, self.n_basis)
        psi = np.exp(-self.alpha * (time[:, None] - centers[None, :]) ** 2)
        return psi

    def fit(self, trajectory):
        goal_position = trajectory[-1]

        for i in range(self.n_dof):
            self.weights[:, i] = np.linalg.lstsq(self.psi, trajectory[:, i], rcond=None)[0]

    def generate(self, goal_position):
        y = np.zeros((len(self.time), self.n_dof))
        dy = np.zeros((len(self.time), self.n_dof))

        for i in range(len(self.time)):
            for j in range(self.n_dof):
                y[i, j] = np.dot(self.psi[i, :], self.weights[:, j])

            dy[i] = np.gradient(y[i], self.time[i])

        return y, dy

In [None]:
example_trajectory = child4
dmp = PeriodicDMP(n_dof=2)

dmp.fit(example_trajectory)

goal_position = example_trajectory[-1]

target_trajectory, _ = dmp.generate(goal_position)

plt.plot(target_trajectory[:, 0], target_trajectory[:, 1], label='Generated by DMP', color='r')
plt.plot(example_trajectory[:, 0], example_trajectory[:, 1], label='Original Trajectory', color='b', linestyle='dashed')
plt.title("Original vs. Generated Trajectory")
plt.legend()
plt.axis('equal')
plt.show()

In [17]:
import torch
import torch.nn as nn
import torch.optim as optim


class LSTMModel(nn.Module):
    def __init__(self, input_size=2, hidden_size=128, output_size=2, num_layers=2):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        lstm_out, (hn, cn) = self.lstm(x)
        out = self.fc(lstm_out)
        return out

def smoothness_loss(pred):

    dx = torch.diff(pred[:, :, 0], dim=0)
    dy = torch.diff(pred[:, :, 1], dim=0)
    d1 = dx**2 + dy**2
    loss_d1 = torch.sum(d1)

    ddx = torch.diff(dx, dim=0)
    ddy = torch.diff(dy, dim=0)
    d2 = ddx**2 + ddy**2
    loss_d2 = torch.sum(d2)

    dddx = torch.diff(ddx, dim=0)
    dddy = torch.diff(ddy, dim=0)
    d3 = dddx**2 + dddy**2
    loss_d3 = torch.sum(d3)

    loss = 0.2 * loss_d1 + 0.3 * loss_d2 + 0.5 * loss_d3

    return loss

def closed_shape_loss(pred):
    return torch.norm(pred[:,:,0] - pred[:,:,-1])

def symmetry_loss(pred):
    pred = pred.squeeze(0)

    centroid = torch.mean(pred, dim=0)

    reflected_x = 2 * centroid[0] - pred[:, 0]
    reflected_points = torch.stack((reflected_x, pred[:, 1]), dim=1)

    diff = reflected_points[:, None, :] - pred[None, :, :]
    dists = torch.norm(diff, dim=2)  # [T, T]

    min_dists, _ = torch.min(dists, dim=1)

    return torch.mean(min_dists)


def shape_preservation_loss(pred, dmp_trajectory, weight_edge=12):
    dmp_tensor = torch.tensor(dmp_trajectory, dtype=pred.dtype, device=pred.device)

    diffs = torch.norm(pred - dmp_tensor, dim=-1)


    weights = 1*torch.ones_like(diffs)
    weights[0] = weight_edge
    weights[-1] = weight_edge


    loss = torch.sum(weights * diffs)
    return loss

model = LSTMModel(input_size=2, hidden_size=128, output_size=2, num_layers=2)

mse_loss_function = nn.MSELoss()

optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
expert_trajectory = np.array(target_trajectory)

target1 = rotate_and_scale_drawing(expert_trajectory, angle_degrees=15, scale_x=1, scale_y=1)
target2 = rotate_and_scale_drawing(expert_trajectory, angle_degrees=5, scale_x=1.1, scale_y=1.1)
target3 = rotate_and_scale_drawing(expert_trajectory, angle_degrees=20, scale_x=1.2, scale_y=1.2)
target4 = rotate_and_scale_drawing(expert_trajectory, angle_degrees=0, scale_x=1.3, scale_y=1.3)

target_shapes = [target1, target2, target3, target4]

child_drawings_np = np.stack([child1, child2, child3, child4])
inputs = torch.tensor(child_drawings_np, dtype=torch.float32)
targets = inputs.clone()

target_shapes_torch = [torch.tensor(t, dtype=torch.float32) for t in target_shapes]

model = LSTMModel(input_size=2, hidden_size=128, output_size=2, num_layers=2)
mse_loss_function = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

epochs = 500
for epoch in range(epochs):
    model.train()
    total_epoch_loss = 0.0

    for i in range(inputs.shape[0]):
        input_i = inputs[i].unsqueeze(0)
        target_i = targets[i].unsqueeze(0)
        target_shape_i = target_shapes_torch[i]

        output_i = model(input_i)

        mse_loss = mse_loss_function(output_i, target_i)
        sm_loss = smoothness_loss(output_i)
        cs_loss = closed_shape_loss(output_i)
        sy_loss = symmetry_loss(output_i)
        shape_loss = shape_preservation_loss(output_i.squeeze(0), target_shape_i)

        total_loss = 0.9*mse_loss + 0.1 * sm_loss + 0.1 * cs_loss + 0.1 * sy_loss + 0.1 * shape_loss

        optimizer.zero_grad()
        total_loss.backward()
        optimizer.step()

        total_epoch_loss += total_loss.item()

    if epoch % 50 == 0:
        avg_loss = total_epoch_loss / inputs.shape[0]
        print(f"Epoch [{epoch}/{epochs}], Avg Loss per Trajectory: {avg_loss:.4f}")

model.eval()
with torch.no_grad():
    cleaned_drawings = model(inputs)  # (4, 300, 2)
    cleaned_drawings_np = cleaned_drawings.numpy()


In [None]:
import matplotlib.pyplot as plt
import numpy as np

child_titles = ['Teacher 1', 'Teacher 2', 'Teacher 3', 'Teacher 4']
cleaned_titles = ['Cleaned 1', 'Cleaned 2', 'Cleaned 3', 'Cleaned 4']

target_trajectories = [target1, target2, target3, target4]

all_x = np.concatenate([child[:, 0] for child in [child1, child2, child3, child4]] +
                       [target[:, 0] for target in target_trajectories])
all_y = np.concatenate([child[:, 1] for child in [child1, child2, child3, child4]] +
                       [target[:, 1] for target in target_trajectories])

x_min, x_max = all_x.min(), all_x.max()
y_min, y_max = all_y.min(), all_y.max()

x_margin = 0.1 * (x_max - x_min)
y_margin = 0.1 * (y_max - y_min)

x_limits = (x_min - x_margin, x_max + x_margin)
y_limits = (y_min - y_margin, y_max + y_margin)

fig, axs = plt.subplots(1, 4, figsize=(18, 4))
for ax, child, target, title in zip(axs, [child1, child2, child3, child4], target_trajectories, child_titles):
    ax.plot(target[:, 0], target[:, 1], color='red', lw=3, linestyle='--', label='Target Trajectory')
    ax.plot(child[:, 0], child[:, 1], lw=2, label='Child Drawing')
    ax.set_title(title)

    ax.set_xlim(x_limits)
    ax.set_ylim(y_limits)
    ax.set_aspect('equal')

    ax.tick_params(axis='both', which='both', length=5, labelsize=8)
    ax.spines['top'].set_visible(False)
    ax.spines['right'].set_visible(False)
    ax.legend()

fig, axs = plt.subplots(1, 4, figsize=(18, 4))
for ax, cleaned, target, title in zip(axs, cleaned_drawings_np, target_trajectories, cleaned_titles):
    ax.plot(target[:, 0], target[:, 1], color='red', lw=3, linestyle='--', label='Target Trajectory')
    ax.plot(cleaned[:, 0], cleaned[:, 1], lw=2, color='blue', label='Cleaned Drawing')
    ax.set_title(title)

    ax.set_xlim(x_limits)
    ax.set_ylim(y_limits)
    ax.set_aspect('equal')

    ax.tick_params(axis='both', which='both', length=5, labelsize=8)
    ax.spines['top'].set_visible(False)
    ax.spines['right'].set_visible(False)
    ax.legend()

plt.tight_layout()
plt.show()


In [None]:
np.random.seed(0)
x = np.linspace(0, 4 * np.pi, 300)
y = np.sin(x)
trajectory = drawing

indices_uniform = np.linspace(0, 299, 60, dtype=int)
trajectory_uniform = trajectory[indices_uniform]

trajectory_avg = trajectory.reshape(60, 5, 2).mean(axis=1)

plt.figure(figsize=(12, 5))

plt.subplot(1, 3, 1)
plt.plot(trajectory[:, 0], trajectory[:, 1], label='Original', color='gray')
plt.title('Original (300 points)')
plt.xlabel('x')
plt.ylabel('y')

plt.subplot(1, 3, 2)
plt.plot(trajectory[:, 0], trajectory[:, 1], color='gray', alpha=0.3)
plt.plot(2*trajectory_uniform[:, 0], 2*trajectory_uniform[:, 1], 'o-', label='Uniform Sampled', color='blue')
plt.title('Uniform Sampling (60 points)')

plt.subplot(1, 3, 3)
plt.plot(trajectory[:, 0], trajectory[:, 1], color='gray', alpha=0.3)
plt.plot(2*trajectory_avg[:, 0], 2*trajectory_avg[:, 1], 'o-', label='Block Averaged', color='green')
plt.title('Block Averaging (60 points)')

plt.tight_layout()
plt.show()

In [21]:
T = 6.0
ts_new = np.linspace(0, T, 60)
dt = ts_new[1] - ts_new[0]

vel_standard = np.gradient(drawing, dt, axis=1)

traj_standard = np.concatenate([drawing, vel_standard], axis=-1)

In [22]:
import os
curr_path = os.getcwd()
save_dir = os.path.join(curr_path, 'config', 'Data_Trajs')
os.makedirs(save_dir, exist_ok=True)
data_file = os.path.join(save_dir, 'drawing_ref.npy')
with open(data_file, 'wb') as f:
    np.save(f, traj_standard)

In [None]:
trajectory_uniform.shape

In [24]:
data_file = os.path.join(save_dir, 'draw_ref.npy')
with open(data_file, 'wb') as f:
    np.save(f, 2*trajectory_uniform)