In [133]:
import numpy as np
import math
import matplotlib.pyplot as plt
import torch
import time
from sklearn.model_selection import train_test_split
from torch.utils.data import TensorDataset, DataLoader
import deepxde as dde

In [23]:
coms = np.array([[-0.05117, 0.07908, 0.00086, 1],
                 [0.00269, -0.00529, 0.06845, 1],
                 [-0.07176, 0.08149, 0.00132, 1],
                 [0.00159, -0.01117, 0.02618, 1],
                 [-0.01168, 0.13111, 0.0046, 1],
                 [0.00697, 0.006, 0.06048, 1],
                 [0.005137, 0.0009572, -0.06682, 1]])
# Ixx, Iyy, Izz, Ixy, Iyz, Ixz
inertias = np.array([[0.0470910226, 0.035959884, 0.0376697645, -0.0061487003, -0.0007808689, 0.0001278755],
[0.027885975, 0.020787492, 0.0117520941,-0.0001882199, 0.0020767576, -0.00030096397],
[0.0266173355, 0.012480083, 0.0284435520,-0.0039218988, -0.001083893, 0.0002927063],
[0.0131822787, 0.009268520, 0.0071158268,-0.0001966341, 0.000745949, 0.0003603617],
[0.0166774282, 0.003746311, 0.0167545726,-0.0001865762, 0.0006473235, 0.0001840370],
[0.0070053791, 0.005527552, 0.0038760715,0.0001534806, -0.0002111503, -0.000443847],
[0.0008162135, 0.0008735012, 0.0005494148,0.000128440, 0.0001057726, 0.00018969891]])

# theta, d, a, alpha, m
dh_params = np.array([[0.2703, 0.069, -math.pi/2, 5.70044],
[0, 0, math.pi/2, 3.22698],
[0.3644, 0.069, -math.pi/2, 4.31272],
[0, 0, math.pi/2, 2.07206],
[0.3743, 0.01, -math.pi/2, 2.24665],
[0, 0, math.pi/2, 1.60979],
[0.2295,  0, 0, 0.54218]])

dof = 7
a = np.zeros(7)
d = np.zeros(7)
alpha = np.zeros(7)
m = np.zeros(7)
for i in range(7):
    a[i] = dh_params[i, 1]
    d[i] = dh_params[i, 0]
    alpha[i] = dh_params[i, 2]
    m[i] = dh_params[i, 3]

J = np.zeros((4,4,dof))
for i in range(7):
    J[:,:,i] = [[0.5*(-inertias[i,0]+inertias[i, 1]+inertias[i, 2]),inertias[i, 3],inertias[i,5], m[i]*coms[i, 0]],\
                    [inertias[i, 3], 0.5*(inertias[i,0]-inertias[i, 1]+inertias[i, 2]), inertias[i, 4], m[i]*coms[i, 1]], \
                    [inertias[i, 5], inertias[i, 4], 0.5*(inertias[i,0]+inertias[i, 1]-inertias[i, 2]), m[i]*coms[i, 2]], \
                     [m[i]*coms[i, 0], m[i]*coms[i, 1], m[i]*coms[i, 2], m[i]]]
    
Q = np.zeros((4,4))
Q[0,1] = -1
Q[1,0] = 1

In [24]:
def compute_matrices(q, qdot):
    T01 = np.array([[math.cos(q[0]),  -math.cos(alpha[0])*math.sin(q[0]),  math.sin(alpha[0])*math.sin(q[0]), a[0]*math.cos(q[0])],
           [math.sin(q[0]),   math.cos(alpha[0])*math.cos(q[0]), -math.sin(alpha[0])*math.cos(q[0]), a[0]*math.sin(q[0])],
           [0,           math.sin(alpha[0]),            math.cos(alpha[0]),           d[0]],
           [0,           0,                        0,                       1]])   

    
    T12 = np.array([[math.cos(q[1]),  -math.cos(alpha[1])*math.sin(q[1]),  math.sin(alpha[1])*math.sin(q[1]), a[1]*math.cos(q[1])],
           [math.sin(q[1]),   math.cos(alpha[1])*math.cos(q[1]), -math.sin(alpha[1])*math.cos(q[1]), a[1]*math.sin(q[1])],
           [0,           math.sin(alpha[1]),            math.cos(alpha[1]),           d[1]],
           [0,           0,                        0,                       1]])

    T23 = np.array([[math.cos(q[2]),  -math.cos(alpha[2])*math.sin(q[2]),  math.sin(alpha[2])*math.sin(q[2]), a[2]*math.cos(q[2])],
           [math.sin(q[2]),   math.cos(alpha[2])*math.cos(q[2]), -math.sin(alpha[2])*math.cos(q[2]), a[2]*math.sin(q[2])],
           [0,           math.sin(alpha[2]),            math.cos(alpha[2]),           d[2]],
           [0,           0,                        0,                       1]])

    
    T34 = np.array([[math.cos(q[3]),  -math.cos(alpha[3])*math.sin(q[3]),  math.sin(alpha[3])*math.sin(q[3]), a[3]*math.cos(q[3])],
           [math.sin(q[3]),     math.cos(alpha[3])*math.cos(q[3]), -math.sin(alpha[3])*math.cos(q[3]), a[3]*math.sin(q[3])],
           [0,                 math.sin(alpha[3]),                 math.cos(alpha[3]),               d[3]],
           [0,           0,                        0,                       1]])

        
    T45 = np.array([[math.cos(q[4]),  -math.cos(alpha[4])*math.sin(q[4]),  math.sin(alpha[4])*math.sin(q[4]), a[4]*math.cos(q[4])],
           [math.sin(q[4]),     math.cos(alpha[4])*math.cos(q[4]), -math.sin(alpha[4])*math.cos(q[4]), a[4]*math.sin(q[4])],
           [0,                 math.sin(alpha[4]),                 math.cos(alpha[4]),               d[4]],
           [0,           0,                        0,                       1]])

        
    T56 = np.array([[math.cos(q[5]),  -math.cos(alpha[5])*math.sin(q[5]),  math.sin(alpha[5])*math.sin(q[5]), a[5]*math.cos(q[5])],
           [math.sin(q[5]),     math.cos(alpha[5])*math.cos(q[5]), -math.sin(alpha[5])*math.cos(q[5]), a[5]*math.sin(q[5])],
           [0,                 math.sin(alpha[5]),                 math.cos(alpha[5]),               d[5]],
           [0,           0,                        0,                       1]]) 

        
    T67 = np.array([[math.cos(q[6]),  -math.cos(alpha[6])*math.sin(q[6]),  math.sin(alpha[6])*math.sin(q[6]), a[6]*math.cos(q[6])],
           [math.sin(q[6]),     math.cos(alpha[6])*math.cos(q[6]), -math.sin(alpha[6])*math.cos(q[6]), a[6]*math.sin(q[6])],
           [0,                 math.sin(alpha[6]),                 math.cos(alpha[6]),               d[6]],
           [0,           0,                        0,                       1]])  

    T00 = np.identity(4);
    T11 = np.identity(4);
    T22 = np.identity(4);
    T33 = np.identity(4);
    T44 = np.identity(4);
    T55 = np.identity(4);
    T66 = np.identity(4);
    T77 = np.identity(4);
    
    T02 = (T01@T12);
    T03 = (T02@T23);
    T04 = (T03@T34);
    T05 = (T04@T45);
    T06 = (T05@T56);
    T07 = (T06@T67);
    
    T13 = (T12@T23);
    T14 = (T13@T34);
    T15 = (T14@T45);
    T16 = (T15@T56);
    T17 = (T16@T67);
    
    T24 = (T23@T34);
    T25 = (T24@T45);
    T26 = (T25@T56);
    T27 = (T26@T67);
    
    T35 = (T34@T45);
    T36 = (T35@T56);
    T37 = (T36@T67);
    
    T46 = (T45@T56);
    T47 = (T46@T67);
    
    T57 = (T56@T67);


    U = np.zeros((4,4,dof,dof));
    for i in range(1,dof+1):
        for j in range(1, dof+1):
            if j <= i:
                U[:,:,i-1,j-1] = eval('T'+str(0)+str(j-1)) @ Q @ eval('T' + str(j-1) + str(i))
            else:
                U[:,:,i-1,j-1] = np.zeros((4, 4)); 
    
    U3 = np.zeros((4,4,dof, dof, dof));
    for i in range(1, dof+1):
        for j in range(1, dof+1):
            for k in range(1, dof+1):
                if (i >= k) and (k >= j):
                    U3[:,:,i-1,j-1,k-1] = eval('T'+str(0)+str(j-1)) @ Q @ eval('T'+str(j-1)+str(k-1)) @ Q @ eval('T'+str(k-1)+str(i))
                elif (i >= j) and (j >= k):
                    U3[:,:,i-1,j-1,k-1] = eval('T'+str(0)+str(k-1)) @ Q @ eval('T'+str(k-1)+str(j-1)) @ Q @ eval('T'+str(j-1)+str(i))
                else:
                    U3[:,:,i-1,j-1,k-1] = np.zeros((4, 4))

    
    # Inertia
    D = np.zeros((dof, dof))
    for i in range(0, dof):
        for k in range(0, dof):
            res = 0;
            for j in range(max(i,k), dof):
                res = res + np.trace(U[:,:,j,k]@J[:,:,j]@(U[:,:,j,i].T))
            D[i,k] = res;

    # Coriolis
    h3 = np.zeros((dof, dof, dof))
    for i in range(0, dof):
        for k in range(0, dof):
            for m_idx in range(0, dof):
                res = 0;
                for j in range(max([i, k, m_idx]), dof):
                    res = res + np.trace(U3[:,:,j,k,m_idx] @ J[:,:,j] @ (U[:,:,j,i].T))
                h3[i,k,m_idx] = res;
    h = np.zeros(dof);
    for i in range(dof):
        sum_k = 0
        for k in range(dof):
            sum_m = 0
            for m_idx in range(dof):
                sum_m = sum_m + h3[i,k,m_idx] * qdot[k] * qdot[m_idx]
            sum_k = sum_k + sum_m
        h[i] = sum_k;

    # Gavity
    g = 9.81
    g_vec = np.array([0, 0, -g, 0]);
    c = np.zeros(dof);
    for i in range(dof):
        res = 0;
        for j in range(dof):
            res = res + (-m[j] * g_vec @ U[:,:,j,i] @ coms[j]);
        c[i] = res;

    return D, h, c

In [25]:
# def compute_dynamics_vals(states, qdes, qdesdot, qdesddot):
#     # x1 = e1 + qdes
#     # x2 = e2 + qdesdot
#     x1 = qdes - states[:dof] 
#     x2 = qdesdot - states[dof:] 
#     e1 = states[:dof]
#     e2 = states[dof:]
#     M, C, G = compute_matrices(x1, x2)
#     MInv = np.linalg.inv(M)
#     h = qdesddot - alpha_mat @ alpha_mat @ e1 + MInv @ (C @ qdesdot + G + C @ alpha_mat @ e1 - C @ e2)
#     return h, MInv, M, G

def step_q(states, torque):
    x1 = states[:dof]
    x2 = states[dof:]
    M, C, G = compute_matrices(x1, x2)
    x1dot = x2
    x2dot = np.linalg.inv(M) @ (torque - C @ x2 - G)
    return np.array([x1dot, x2dot]).reshape(2*dof)

def get_error_states(states, qdes, qdesdot):
    x1 = states[:dof]
    x2 = states[dof:]
    e1 = qdes - x1
    e2 = (qdesdot - x2) + alpha_mat @ e1
    return np.array([e1, e2]).reshape(2*dof)

def compute_control(states, qdes, qdesdot, qdesddot):
    x1 = states[:dof]
    x2 = states[dof:]
    error_states = get_error_states(states, qdes, qdesdot)
    e1 = error_states[:dof]
    e2 = error_states[dof:]
    M, C, G = compute_matrices(x1, x2)
    h = qdesddot - alpha_mat @ alpha_mat @ e1 + np.linalg.inv(M) @ (C@qdesdot + G + C @ alpha_mat @ e1 - C @ e2)
    return saturate_torques(M @ (h + (beta_mat + alpha_mat)@ e2))

def saturate_torques(torque):
    min_torques = [-50, -50, -50, -50, -15, -15, -15]
    max_torques = [50, 50, 50, 50, 15, 15, 15]
    return np.maximum(min_torques, np.minimum(max_torques, torque))

def saturate_joints(joints):
    joint_lim_min = np.array([-1.7016, -2.147, -3.0541, -0.05, -3.059, -1.571, -3.059, -2, -2, -2, -2, -4, -4, -4])
    joint_lim_max = np.array([1.7016, 1.047, 3.0541, 2.618, 3.059, 2.094, 3.059, 2, 2, 2, 2, 4, 4, 4])
    return np.maximum(joint_lim_min, np.minimum(joint_lim_max, joints))

# def dynamics(states, h, MInv, control):
#     e1 = states[:dof]
#     e2 = states[dof:]
#     e1dot = e2 - alpha_mat @ e1
#     e2dot = alpha_mat @ e2 + h - MInv @ control
#     return np.array([e1dot, e2dot]).reshape(e1dot.shape[0]*2)

def vector_trap(x):
    return dt*np.sum((x[:-1] + x[1:]) / 2)

def compute_predictors(X, controls):
    res = np.copy(X)
    for i in range(len(controls)):
        res = saturate_joints(res + dt*step_q(res, controls[i]))
    return res

def compute_time_varying_predictors(X, controls, tVals):
    res = np.copy(X)
    for i in range(len(controls)):
        res = saturate_joints(res + 1/delay_time_derive(delay_time_inv(tVals[i]))*1/dt*step_q(res, controls[i]))
    return res


def time_varying_delay_func(time):
    return time - (1+time) / (1+2*time)

def delay_time_inv(t):
    return t + (t+1)/(math.sqrt((t+1)**2 + 1) + t)

def delay_time_deriv(t):
    return 1 + 1/((1+2*t)**2)

# LEFT OFF HERE. NEED TO SIMULATE TIME-VARYING NEXT

def simulate_system(x0, qdes, qdesdot, qdesddot, dt, T, D, dof):
    t = np.arange(0, T, dt)
    q_vals = np.zeros((len(t), len(x0)))
    nD = int(round(D/dt))
    controls = np.zeros((len(t)+nD, dof))
    predictors = np.zeros((len(t), len(x0)))
    q_vals[0] = x0
    # Setup initial controllers. This matters - ALOT. 
    controllerStatic = compute_control(q_vals[0], q_vals[0, :dof], np.zeros(7), np.zeros(7))
    controls[0:nD, :] = controllerStatic
    for i in range(1, len(t)):
        if i % 100 == 0:
            print(i, "/", len(t))
        prediction = compute_predictors(q_vals[i-1], controls[i-1:i-1+nD])
        predictors[i] = prediction
        controls[i-1+nD] = compute_control(prediction, qdes[i-1+nD], qdesdot[i-1+nD], qdesddot[i-1+nD])
        q_vals[i] =  saturate_joints(q_vals[i-1] + dt*step_q(q_vals[i-1], controls[i-1]))
    return q_vals, controls, predictors
    
# def simulate_system(x0, qdes, qdesdot, qdesddot, dt, T,D, dof):
#     t = np.arange(0, T, dt)
#     res = np.zeros((len(t), len(x0)))
#     res[0, :dof] = qdes[0, 0] - x0[:dof]
#     res[0, dof:] = qdesdot[0, 0] - x0[dof:]
#     nD = int(round(D/dt))
#     controls = np.zeros((len(t)+nD, dof))
#     states = np.zeros((len(t), len(x0)))
#     states[0] = np.array([res[0, :dof]+qdes[0, 0], res[0, dof:]+qdesdot[0, 0]]).reshape(dof*2)
#     controls_cur = []
#     predictions_arr = []
#     predictions = np.zeros(14)
#     hTrue, MInvTrue, MTrue, GTrue = compute_dynamics_vals(res[0], res[0, :dof], np.zeros(dof), np.zeros(dof))
#     controllerStatic = compute_controller(res[0], MTrue, hTrue)
#     # Set the controller to be static for the first D steps
#     controls[0:nD, :] = controllerStatic
#     for i in range(1, len(t)):
#         if i % 100 == 0:
#             print(i, "/", len(t))
#         predictions, hPred, MPred = compute_predictors(res[i-1], qdes[i-1:i-1+nD], qdesdot[i-1:i-1+nD], qdesddot[i-1:i-1+nD], controls[i-1:i-1+nD])
#         # hTrue, MInvTrue, MTrue, GTrue = compute_dynamics_vals(res[i-1], qdes[i-1], qdesdot[i-1], qdesddot[i-1])
#         hTrue, MInvTrue, MTrue, GTrue = compute_dynamics_vals(res[i-1], qdes[i-1], qdesdot[i-1], qdesddot[i-1])
#         control = compute_controller(predictions, MPred, hPred)
#         controls[i-1+nD] = control
#         res[i] = res[i-1] + dt*dynamics(res[i-1], hTrue, MInvTrue, controls[i-1])
#         controls_cur.append(controls[i-1])
#         states[i] = np.array([res[i, :dof]+qdes[i, 0], res[i, dof:]+qdesdot[i, 0]]).reshape(dof*2)
#         predictions_arr.append(predictions)
#     return res, np.array(states), np.array(controls), np.array(controls_cur), np.array(predictions_arr)

In [98]:
joint_lim_min = np.array([-1.7016, -2.147, -3.0541, -0.05, -3.059, -1.571, -3.059])
joint_lim_max = np.array([1.7016, 1.047, 3.0541, 2.618, 3.059, 2.094, 3.059])

# Generate a trajecotry:
def generate_trajectory(joint_lim_min, joint_lim_max, scaling, dt, T, D):
    t = np.arange(0, T+D, dt)
    q = np.zeros((len(t), len(joint_lim_min)))
    qd= np.zeros((len(t), len(joint_lim_min)))
    qdd = np.zeros((len(t), len(joint_lim_min)))
    for i in range(len(t)):
        # ensures positive q so we are in the joints
        q[i] = [math.sin(t[i]*period)*scaling + scaling]*len(joint_lim_min)
        qd[i] = [math.cos(t[i]*period)*scaling*period]*len(joint_lim_min)
        qdd[i] = [-math.sin(t[i]*period)*scaling*period**2] * len(joint_lim_min)
        if (np.maximum(np.minimum(q[i], joint_lim_max), joint_lim_min) != q[i]).any():
            raise Exception("Trajecotry not feasible")
    return q, qd, qdd

T = 10
period = 1
dt = 0.1
scaling = 0.1
dx = 0.2
t = np.arange(0, T, dt)
# Handles delays up to any time essentially
D = 1
qdes, qd_des, qdd_des = generate_trajectory(joint_lim_min, joint_lim_max, 0.1, dt, T, D)
nD = int(round(D/dt))

In [123]:
# Build dataset
def gen_dataset(num_data,sample_rate, dt, T, dof, D, joint_lim_min, joint_lim_max, deviation):
    t = np.arange(0, T, dt)
    qdes, qd_des, qdd_des = generate_trajectory(joint_lim_min, joint_lim_max, 0.1, dt, T, D)
    if(num_data % sample_rate != 0):
        raise Exception("Please ensure num_data % sample_rate = 0")
    if(len(t) % sample_rate != 0):
        raise Exception("Please ensure int(T/dt)+1 % sample_rate = 0")
    nD = int(round(D/dt))
    inputs = np.zeros((num_data, nD+2, dof))
    outputs = np.zeros((num_data, 2*dof))
    sample_locs = np.linspace(len(t)/sample_rate-1, len(t)-1,sample_rate)
    index = 0
    start_time = time.time()
    print("Generating", num_data, "values with sample rate", sample_rate)
    print("This will require simulating", int(num_data/sample_rate), "PDES")
    for i in range(int(num_data/sample_rate)):
        sample_locs = np.random.choice(np.arange(int(len(t)/sample_rate)-1, len(t)-1)[1:], size=sample_rate, replace=False)
        if i % 5 == 0:
            print("Completed:", i, "/", int(num_data/sample_rate))
        init_cond = np.random.uniform(joint_lim_min + deviation*np.ones(len(joint_lim_min)), joint_lim_max - np.ones(len(joint_lim_max)))
        init_cond = np.array([init_cond, np.zeros(7)]).reshape(14)
        states, controls, predictors = simulate_system(init_cond, qdes, qd_des, qdd_des, dt, T, D, dof)
        for i in range(len(sample_locs)):
            val = int(sample_locs[i])
            inputs[index, 0, 0:dof] =  states[val-1, :dof]
            inputs[index, 1, 0:dof] = states[val-1, dof:]
            inputs[index, 2:, :] = controls[val-1:val-1+nD]
            outputs[index] = predictors[val]
            index += 1
    end_time = time.time()
    inputs = inputs.reshape((num_data, (nD+2)*dof))
    outputs = outputs.reshape((num_data, 2*dof))
    np.save("inputs.npy", inputs)
    np.save("outputs.npy", outputs)
    print("Generated successfully. Total time:", end_time-start_time)


In [124]:
gen_dataset(1000, 10, dt, T, dof, D, joint_lim_min, joint_lim_max, 2)

Generating 1000 values with sample rate 10
This will require simulating 100 PDES
Completed: 0 / 100
Completed: 5 / 100
Completed: 10 / 100
Completed: 15 / 100
Completed: 20 / 100
Completed: 25 / 100
Completed: 30 / 100
Completed: 35 / 100
Completed: 40 / 100
Completed: 45 / 100
Completed: 50 / 100
Completed: 55 / 100
Completed: 60 / 100
Completed: 65 / 100
Completed: 70 / 100
Completed: 75 / 100
Completed: 80 / 100
Completed: 85 / 100
Completed: 90 / 100
Completed: 95 / 100
Generated successfully. Total time: 530.6146428585052


In [126]:
# Loading float32 does introduce some small numerical error, but not large. 
inputs = np.load("inputs.npy").astype(np.float32)
outputs = np.load("outputs.npy").astype(np.float32)

In [127]:
# Parametersdetach
epochs =500
ntrain = 900
ntest = 100
batch_size = 20
gamma = 0.5
learning_rate = 0.001
step_size= 50

In [134]:
x_train, x_test, y_train, y_test = train_test_split(inputs, outputs, test_size=0.1, random_state=1)
x_train = torch.from_numpy(x_train)
y_train = torch.from_numpy(y_train)
x_test = torch.from_numpy(x_test)
y_test = torch.from_numpy(y_test)

trainData = DataLoader(TensorDataset(x_train, y_train), batch_size=batch_size, shuffle=True)
testData = DataLoader(TensorDataset(x_test, y_test), batch_size=batch_size, shuffle=False)

In [134]:
x_train, x_test, y_train, y_test = train_test_split(inputs, outputs, test_size=0.1, random_state=1)
x_train = torch.from_numpy(x_train)
y_train = torch.from_numpy(y_train)
x_test = torch.from_numpy(x_test)
y_test = torch.from_numpy(y_test)

trainData = DataLoader(TensorDataset(x_train, y_train), batch_size=batch_size, shuffle=True)
testData = DataLoader(TensorDataset(x_test, y_test), batch_size=batch_size, shuffle=False)

In [135]:
class DeepONetProjected(nn.Module):
    def __init__(self, m, dim_x, proj1, proj2):
        super().__init__()
        self.deeponet = dde.nn.DeepONetCartesianProd([m, 128, 128, 128], [dim_x, 128, 128, 128], "relu", "Glorot normal").cuda()
        self.linear1 = torch.nn.Linear(m, proj1)
        self.linear2 = torch.nn.Linear(proj1, proj2)

    def forward(self, x):
        y = self.deeponet(x)
        y = self.linear1(y)
        return self.linear2(y)

NameError: name 'nn' is not defined