|
| 1 | +""" |
| 2 | +Author: Jonathan Schwartz (github.com/SchwartzCode) |
| 3 | +
|
| 4 | +This code provides a simple implementation of Dynamic Movement |
| 5 | +Primitives, which is an approach to learning curves by modelling |
| 6 | +them as a weighted sum of gaussian distributions. This approach |
| 7 | +can be used to dampen noise in a curve, and can also be used to |
| 8 | +stretch a curve by adjusting its start and end points. |
| 9 | +
|
| 10 | +More information on Dynamic Movement Primitives available at: |
| 11 | +https://arxiv.org/abs/2102.03861 |
| 12 | +https://www.frontiersin.org/articles/10.3389/fncom.2013.00138/full |
| 13 | +
|
| 14 | +""" |
| 15 | + |
| 16 | + |
| 17 | +from matplotlib import pyplot as plt |
| 18 | +import numpy as np |
| 19 | + |
| 20 | + |
| 21 | +class DMP(object): |
| 22 | + |
| 23 | + def __init__(self, training_data, data_period, K=156.25, B=25): |
| 24 | + """ |
| 25 | + Arguments: |
| 26 | + training_data - input data of form [N, dim] |
| 27 | + data_period - amount of time training data covers |
| 28 | + K and B - spring and damper constants to define |
| 29 | + DMP behavior |
| 30 | + """ |
| 31 | + |
| 32 | + self.K = K # virtual spring constant |
| 33 | + self.B = B # virtual damper coefficient |
| 34 | + |
| 35 | + self.timesteps = training_data.shape[0] |
| 36 | + self.dt = data_period / self.timesteps |
| 37 | + |
| 38 | + self.weights = None # weights used to generate DMP trajectories |
| 39 | + |
| 40 | + self.T_orig = data_period |
| 41 | + |
| 42 | + self.training_data = training_data |
| 43 | + self.find_basis_functions_weights(training_data, data_period) |
| 44 | + |
| 45 | + def find_basis_functions_weights(self, training_data, data_period, |
| 46 | + num_weights=10): |
| 47 | + """ |
| 48 | + Arguments: |
| 49 | + data [(steps x spacial dim) np array] - data to replicate with DMP |
| 50 | + data_period [float] - time duration of data |
| 51 | + """ |
| 52 | + |
| 53 | + if not isinstance(training_data, np.ndarray): |
| 54 | + print("Warning: you should input training data as an np.ndarray") |
| 55 | + elif training_data.shape[0] < training_data.shape[1]: |
| 56 | + print("Warning: you probably need to transpose your training data") |
| 57 | + |
| 58 | + dt = data_period / len(training_data) |
| 59 | + |
| 60 | + init_state = training_data[0] |
| 61 | + goal_state = training_data[-1] |
| 62 | + |
| 63 | + # means (C) and std devs (H) of gaussian basis functions |
| 64 | + C = np.linspace(0, 1, num_weights) |
| 65 | + H = (0.65*(1./(num_weights-1))**2) |
| 66 | + |
| 67 | + for dim, _ in enumerate(training_data[0]): |
| 68 | + |
| 69 | + dimension_data = training_data[:, dim] |
| 70 | + |
| 71 | + q0 = init_state[dim] |
| 72 | + g = goal_state[dim] |
| 73 | + |
| 74 | + q = q0 |
| 75 | + qd_last = 0 |
| 76 | + |
| 77 | + phi_vals = [] |
| 78 | + f_vals = [] |
| 79 | + |
| 80 | + for i, _ in enumerate(dimension_data): |
| 81 | + if i + 1 == len(dimension_data): |
| 82 | + qd = 0 |
| 83 | + else: |
| 84 | + qd = (dimension_data[i+1] - dimension_data[i]) / dt |
| 85 | + |
| 86 | + phi = [np.exp(-0.5 * ((i * dt / data_period) - c)**2 / H) |
| 87 | + for c in C] |
| 88 | + phi = phi/np.sum(phi) |
| 89 | + |
| 90 | + qdd = (qd - qd_last)/dt |
| 91 | + |
| 92 | + f = (qdd * data_period**2 - self.K * (g - q) + self.B * qd |
| 93 | + * data_period) / (g - q0) |
| 94 | + |
| 95 | + phi_vals.append(phi) |
| 96 | + f_vals.append(f) |
| 97 | + |
| 98 | + qd_last = qd |
| 99 | + q += qd * dt |
| 100 | + |
| 101 | + phi_vals = np.asarray(phi_vals) |
| 102 | + f_vals = np.asarray(f_vals) |
| 103 | + |
| 104 | + w = np.linalg.lstsq(phi_vals, f_vals, rcond=None) |
| 105 | + |
| 106 | + if self.weights is None: |
| 107 | + self.weights = np.asarray(w[0]) |
| 108 | + else: |
| 109 | + self.weights = np.vstack([self.weights, w[0]]) |
| 110 | + |
| 111 | + def recreate_trajectory(self, init_state, goal_state, T): |
| 112 | + """ |
| 113 | + init_state - initial state/position |
| 114 | + goal_state - goal state/position |
| 115 | + T - amount of time to travel q0 -> g |
| 116 | + """ |
| 117 | + |
| 118 | + nrBasis = len(self.weights[0]) # number of gaussian basis functions |
| 119 | + |
| 120 | + # means (C) and std devs (H) of gaussian basis functions |
| 121 | + C = np.linspace(0, 1, nrBasis) |
| 122 | + H = (0.65*(1./(nrBasis-1))**2) |
| 123 | + |
| 124 | + # initialize virtual system |
| 125 | + time = 0 |
| 126 | + |
| 127 | + q = init_state |
| 128 | + dimensions = self.weights.shape[0] |
| 129 | + qd = np.zeros(dimensions) |
| 130 | + |
| 131 | + positions = np.array([]) |
| 132 | + for k in range(self.timesteps): |
| 133 | + time = time + self.dt |
| 134 | + |
| 135 | + qdd = np.zeros(dimensions) |
| 136 | + |
| 137 | + for dim in range(dimensions): |
| 138 | + |
| 139 | + if time <= T: |
| 140 | + phi = [np.exp(-0.5 * ((time / T) - c)**2 / H) for c in C] |
| 141 | + phi = phi / np.sum(phi) |
| 142 | + f = np.dot(phi, self.weights[dim]) |
| 143 | + else: |
| 144 | + f = 0 |
| 145 | + |
| 146 | + # simulate dynamics |
| 147 | + qdd[dim] = (self.K*(goal_state[dim] - q[dim])/T**2 |
| 148 | + - self.B*qd[dim]/T |
| 149 | + + (goal_state[dim] - init_state[dim])*f/T**2) |
| 150 | + |
| 151 | + qd = qd + qdd * self.dt |
| 152 | + q = q + qd * self.dt |
| 153 | + |
| 154 | + if positions.size == 0: |
| 155 | + positions = q |
| 156 | + else: |
| 157 | + positions = np.vstack([positions, q]) |
| 158 | + |
| 159 | + t = np.arange(0, self.timesteps * self.dt, self.dt) |
| 160 | + return t, positions |
| 161 | + |
| 162 | + @staticmethod |
| 163 | + def dist_between(p1, p2): |
| 164 | + return np.linalg.norm(p1 - p2) |
| 165 | + |
| 166 | + def view_trajectory(self, path, title=None, demo=False): |
| 167 | + |
| 168 | + path = np.asarray(path) |
| 169 | + |
| 170 | + plt.cla() |
| 171 | + plt.plot(self.training_data[:, 0], self.training_data[:, 1], |
| 172 | + label="Training Data") |
| 173 | + plt.plot(path[:, 0], path[:, 1], |
| 174 | + linewidth=2, label="DMP Approximation") |
| 175 | + |
| 176 | + plt.xlabel("X Position") |
| 177 | + plt.ylabel("Y Position") |
| 178 | + plt.legend() |
| 179 | + |
| 180 | + if title is not None: |
| 181 | + plt.title(title) |
| 182 | + |
| 183 | + if demo: |
| 184 | + plt.xlim([-0.5, 5]) |
| 185 | + plt.ylim([-2, 2]) |
| 186 | + plt.draw() |
| 187 | + plt.pause(0.02) |
| 188 | + else: |
| 189 | + plt.show() |
| 190 | + |
| 191 | + def show_DMP_purpose(self): |
| 192 | + """ |
| 193 | + This function conveys the purpose of DMPs: |
| 194 | + to capture a trajectory and be able to stretch |
| 195 | + and squeeze it in terms of start and stop position |
| 196 | + or time |
| 197 | + """ |
| 198 | + |
| 199 | + q0_orig = self.training_data[0] |
| 200 | + g_orig = self.training_data[-1] |
| 201 | + T_orig = self.T_orig |
| 202 | + |
| 203 | + data_range = (np.amax(self.training_data[:, 0]) |
| 204 | + - np.amin(self.training_data[:, 0])) / 4 |
| 205 | + |
| 206 | + q0_right = q0_orig + np.array([data_range, 0]) |
| 207 | + q0_up = q0_orig + np.array([0, data_range/2]) |
| 208 | + g_left = g_orig - np.array([data_range, 0]) |
| 209 | + g_down = g_orig - np.array([0, data_range/2]) |
| 210 | + |
| 211 | + q0_vals = np.vstack([np.linspace(q0_orig, q0_right, 20), |
| 212 | + np.linspace(q0_orig, q0_up, 20)]) |
| 213 | + g_vals = np.vstack([np.linspace(g_orig, g_left, 20), |
| 214 | + np.linspace(g_orig, g_down, 20)]) |
| 215 | + T_vals = np.linspace(T_orig, 2*T_orig, 20) |
| 216 | + |
| 217 | + for new_q0_value in q0_vals: |
| 218 | + plot_title = "Initial Position = [%s, %s]" % \ |
| 219 | + (round(new_q0_value[0], 2), round(new_q0_value[1], 2)) |
| 220 | + |
| 221 | + _, path = self.recreate_trajectory(new_q0_value, g_orig, T_orig) |
| 222 | + self.view_trajectory(path, title=plot_title, demo=True) |
| 223 | + |
| 224 | + for new_g_value in g_vals: |
| 225 | + plot_title = "Goal Position = [%s, %s]" % \ |
| 226 | + (round(new_g_value[0], 2), round(new_g_value[1], 2)) |
| 227 | + |
| 228 | + _, path = self.recreate_trajectory(q0_orig, new_g_value, T_orig) |
| 229 | + self.view_trajectory(path, title=plot_title, demo=True) |
| 230 | + |
| 231 | + for new_T_value in T_vals: |
| 232 | + plot_title = "Period = %s [sec]" % round(new_T_value, 2) |
| 233 | + |
| 234 | + _, path = self.recreate_trajectory(q0_orig, g_orig, new_T_value) |
| 235 | + self.view_trajectory(path, title=plot_title, demo=True) |
| 236 | + |
| 237 | + |
| 238 | +def example_DMP(): |
| 239 | + """ |
| 240 | + Creates a noisy trajectory, fits weights to it, and then adjusts the |
| 241 | + trajectory by moving its start position, goal position, or period |
| 242 | + """ |
| 243 | + t = np.arange(0, 3*np.pi/2, 0.01) |
| 244 | + t1 = np.arange(3*np.pi/2, 2*np.pi, 0.01)[:-1] |
| 245 | + t2 = np.arange(0, np.pi/2, 0.01)[:-1] |
| 246 | + t3 = np.arange(np.pi, 3*np.pi/2, 0.01) |
| 247 | + data_x = t + 0.02*np.random.rand(t.shape[0]) |
| 248 | + data_y = np.concatenate([np.cos(t1) + 0.1*np.random.rand(t1.shape[0]), |
| 249 | + np.cos(t2) + 0.1*np.random.rand(t2.shape[0]), |
| 250 | + np.sin(t3) + 0.1*np.random.rand(t3.shape[0])]) |
| 251 | + training_data = np.vstack([data_x, data_y]).T |
| 252 | + |
| 253 | + period = 3*np.pi/2 |
| 254 | + DMP_controller = DMP(training_data, period) |
| 255 | + |
| 256 | + DMP_controller.show_DMP_purpose() |
| 257 | + |
| 258 | + |
| 259 | +if __name__ == '__main__': |
| 260 | + |
| 261 | + example_DMP() |
0 commit comments