-
Notifications
You must be signed in to change notification settings - Fork 2
/
mdmp.py
143 lines (112 loc) · 4.45 KB
/
mdmp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# -*- coding: utf-8 -*-
'''
Implementation of Dynamical Motor Primitives (DMPs) for multi-dimensional
trajectories.
'''
import numpy as np
from dmp_1 import DMP
class mDMP(object):
'''
Implementation of a Multi DMP (mDMP) as composition of several
Single DMPs (sDMP). This type of DMP is used with multi-dimensional
trajectories.
'''
def __init__(self, dim=1, nbfs=100):
'''
dim int: number of coordinates of the trajectory >= 1.
nbfs int: number of basis functions per sDMP >= 0.
'''
self.dmps = [DMP(nbfs) for _ in xrange(dim)]
self.dim = dim
self.nbfs = nbfs
self.ns = 0
def _weights(self):
W = np.zeros((self.dim, self.nbfs))
for sdx in xrange(self.dim):
sdmp = self.dmps[sdx]
W[sdx,:] = np.array(np.squeeze(sdmp.ff.weights))
return W.T
def _fs(self):
Fd = np.zeros((self.dim, self.ns))
Fp = np.zeros((self.dim, self.ns))
for sdx in xrange(self.dim):
sdmp = self.dmps[sdx]
Fd[sdx,:] = np.array(np.squeeze(sdmp.ff.Fd))
# TODO: Next line is patch as response time has 1 extra sample.
time = sdmp.responseTime
Fp[sdx,:] = np.array(np.squeeze(sdmp.ff.responseToTimeArray(time)))
return Fd.T, Fp.T
def learnFromDemo(self, trajectory, time):
'''
trajectory np.array([]): trajectory example (NxM).
time np.array([]): time of the trajectory (NxM).
'''
for sdx in xrange(self.dim):
sdmp = self.dmps[sdx]
sdmp.learn(trajectory[sdx,:], time)
self.ns = self.dmps[0].ff.ns
self.W = self._weights()
def planNewTrajectory(self, start, goal, time):
'''
start float: start positio of the new trajectory.
goal float: end positio of the new trajectory.
time float: time to execute the new trajectory.
'''
ns = int(time/self.dmps[0].stepTime)
pos = np.zeros((self.dim, ns))
vel = np.zeros((self.dim, ns))
acc = np.zeros((self.dim, ns))
for sdx in xrange(self.dim):
sdmp = self.dmps[sdx]
sdmp.setup(start[sdx], goal[sdx])
sdmp.plan(time)
# TODO: Next line is patch as response time has 1 extra sample.
pos[sdx,:] = np.array(np.squeeze(sdmp.responsePos[1:]))
vel[sdx,:] = np.array(np.squeeze(sdmp.responseVel[1:]))
acc[sdx,:] = np.array(np.squeeze(sdmp.responseAccel[1:]))
self.Fd, self.Fp = self._fs()
return pos, vel, acc
# -----------------------------------------------------------------------------
from simple_cs import CanonicalSystem
from transformation_system import TransfSystem
class sDMP(object):
'''
A Single Dynamical Movement Primitive (sDMP), it is used with one dimension
trajectories. And it is composed by a transformation system and a canonical system.
'''
def __init__(self, tf):
'''
tf TransfSystem: the transformation system of a DMP.
'''
self.cs = CanonicalSystem(-2)
self.ts = tf
self.ts.configure(self.cs)
self.exampleTraj = np.array([])
self.exampleTime = np.array([])
self.start = 0.0
self.goal = 0.0
self.tau = 1.0
def learn(self, trajectory, time):
'''
trajectory np.array([]): trajectory example (1xM).
time np.array([]): time of the trajectory (1xM).
'''
self.exampleTraj = trajectory
self.exampleTime = time
self.ts.train(self.exampleTraj, self.exampleTime)
def plan(self, start, goal, time):
'''
start float: start positio of the new trajectory.
goal float: end positio of the new trajectory.
time float: time to execute the new trajectory.
'''
self.start = start
self.goal = goal
return self.ts.predict(self.start, self.goal, time)
# -----------------------------------------------------------------------------
class mDmpUsing1stFormulation:
pass
class mDmpUsing2ndFormulation:
pass
class mDmpUsing3rdFormulation:
pass