In [31]:
import numpy as np
import cma
import re
import math

x0 = 0.3
y0 = 0.3 
theta0 = math.pi
delta_t = 0.01

input_seq = [
                [0,1],[0,1],[0,1],[0,1],[0,1],[0,1],[0,1],[1,1],[1,1],[1,1],[1,1],[1,1],[1,1],[1,1],[1,1],[1,1],
                [1,1],[1,1],[1,1],[1,0],[1,0],[1,0],[1,0],[1,0],[1,0],[1,0],[1,0],[1,0],[1,0],[1,0.3],[1,0.3],
                [1,0.3],[1,0.3],[1,0.3],[1,0.3],[1,0.3],[1,0.3],[1,0.3],[1,0.3],[1,0.3],[1,0.3],[1,0.3],[0.3,1],
                [0.3,1],[0.3,1],[0.3,1],[0.3,1],[0.3,1],[0.3,1],[0.3,1],[0.3,1],[0.3,1],[0.3,1]
            ]

In [15]:
def generate_path(L, r):
    x = x0
    y = y0
    theta = thetha0

    path = [[x,y,theta]]

    for wl, wr in input_seq:
        v = (wl + wr) * r / 2      
        w = (wl - wr) * r / L 

        x = x + v * delta_t * math.cos(theta)
        y = y + v * delta_t * math.sin(theta)
        theta = theta + w * delta_t

        if theta > math.pi:
            theta = theta - 2*math.pi
        elif theta < -math.pi:
            theta = (2*math.pi + theta)
        
        path.append([x,y,theta])
    
    return path

In [16]:
def loss(params):
    real = generate_path(real_params[0], real_params[1])
    pred = generate_path(params[0], params[1])
    return np.sqrt(np.mean((np.array(pred) - np.array(real)) ** 2))

In [17]:
def dist2target(path,target=[0,0]):
    return [np.linalg.norm(target)-np.array(step[0:-1])) for step in path]

In [18]:
L = 0.8
r = 0.69
real_params = [L,r]

L_fake = 0.6
r_fake = 0.13
fake_params = [L_fake,r_fake]

test_loss = loss(real_params)


In [19]:
es = cma.CMAEvolutionStrategy(len(real_params) * [0], 0.5, {'verbose': -3})
es.optimize(loss)

Iterat #Fevals   function value  axis ratio  sigma  min&max std  t[m:s]
    1      6 7.188720072169344e-02 1.0e+00 4.39e-01  3e-01  5e-01 0:00.0
    2     12 2.755491942895244e-02 1.4e+00 7.14e-01  6e-01  7e-01 0:00.0
    3     18 1.811749335266345e-02 1.3e+00 7.32e-01  6e-01  6e-01 0:00.0
  100    600 2.623307939999283e-13 4.7e+00 5.82e-08  2e-12  7e-12 0:00.3


<cma.evolution_strategy.CMAEvolutionStrategy at 0x125f941f408>

In [20]:
print(f'Best loss value: {loss(es.result.xbest):.2}')

Best loss value: 1.1e-13


In [21]:
print('The parameters set is: L = {}, r = {}'.format(es.result.xbest[0],es.result.xbest[1]))


The parameters set is: L = 0.800000000001304, r = 0.6900000000009019


In [1]:
# Alternative with full control 
import numpy as np
import cma
import re
import math

x0 = 0.3
y0 = 0.3 
theta0 = math.pi
delta_t = 0.01

input_seq = [
                [0,1],[0,1],[0,1],[0,1],[0,1],[0,1],[0,1],[1,1],[1,1],[1,1],[1,1],[1,1],[1,1],[1,1],[1,1],[1,1],
                [1,1],[1,1],[1,1],[1,0],[1,0],[1,0],[1,0],[1,0],[1,0],[1,0],[1,0],[1,0],[1,0],[1,0.3],[1,0.3],
                [1,0.3],[1,0.3],[1,0.3],[1,0.3],[1,0.3],[1,0.3],[1,0.3],[1,0.3],[1,0.3],[1,0.3],[1,0.3],[0.3,1],
                [0.3,1],[0.3,1],[0.3,1],[0.3,1],[0.3,1],[0.3,1],[0.3,1],[0.3,1],[0.3,1],[0.3,1]
            ]

def generate_mpc_history(r, input_seq):
    x = x0
    y = y0
    theta = theta0

    hist = []
    elem = {'init_state':[x,y,theta]}
    for wl, wr in input_seq:
        v = (wl + wr) * r / 2      
        w = (wl - wr) * r / L 

        x = x + v * delta_t * math.cos(theta)
        y = y + v * delta_t * math.sin(theta)
        theta = theta + w * delta_t

        if theta > math.pi:
            theta = theta - 2*math.pi
        elif theta < -math.pi:
            theta = (2*math.pi + theta)
        
        elem['input'] = [wl, wr]
        elem['end_state'] = [x,y,theta]
        hist.append(elem)
        elem = {'init_state':[x,y,theta]}
    return hist

def generate_step_history(r, mpc_history):
    hist = []
    for elem in mpc_history:
        x0, y0, theta0 = elem['init_state']
        wl, wr = elem['input']

        v = (wl + wr) * r / 2      
        w = (wl - wr) * r / L 

        x = x0 + v * delta_t * math.cos(theta0)
        y = y0 + v * delta_t * math.sin(theta0)
        theta = theta0 + w * delta_t

        if theta > math.pi:
            theta = theta - 2*math.pi
        elif theta < -math.pi:
            theta = (2*math.pi + theta)
        
        hist.append([x,y,theta])
    return hist

def get_mean_sigma(param_set):
    min_param = min(param_set)
    max_param = max(param_set)

    mean = (min_param+max_param)/2
    sigma = (max_param-min_param)/4

    return mean, sigma

def loss(param_val):
    step_hist = generate_step_history(param_val[0], mpc_history)
    return np.sqrt(np.mean((np.array(step_mpc_history) - np.array(step_hist)) ** 2))

In [2]:
PARAMS_NUM = 1
SCENARIOS_NUM = 3

L = 0.5
r_set = [0.12, 0.16, 0.17]

true_r = 0.15

mean, sigma = get_mean_sigma(r_set)
print(mean, sigma)

0.14500000000000002 0.012500000000000004


In [3]:
mpc_history = generate_mpc_history(true_r,input_seq)
step_mpc_history = [elem['end_state'] for elem in mpc_history]

es = cma.CMAEvolutionStrategy((PARAMS_NUM+1)*[mean], sigma, {'verbose': -3,'popsize': SCENARIOS_NUM})
while not es.stop() and es.best.f > 1e-5:
    solutions = es.ask()
    es.tell(solutions, [loss(param_val) for param_val in solutions])
    es.disp(20)
print('terminated on ' + str(es.stop()))


Iterat #Fevals   function value  axis ratio  sigma  min&max std  t[m:s]
    1      3 6.153430754856776e-05 1.0e+00 1.14e-02  1e-02  1e-02 0:00.0
    2      6 1.452835660685141e-05 1.1e+00 9.96e-03  9e-03  1e-02 0:00.0
    3      9 1.630205031616936e-05 1.1e+00 7.52e-03  6e-03  7e-03 0:00.0
terminated on {}


In [25]:
print('The parameters set is: r = {}, _ = {}'.format(es.result.xbest[0],es.result.xbest[1]))
print(es.result)

The parameters set is: r = 0.14900799736561005, _ = 0.14741830104565612
CMAEvolutionStrategyResult(xbest=array([0.149008 , 0.1474183]), fbest=9.355888426736481e-06, evals_best=31, evaluations=33, iterations=11, xfavorite=array([0.149008 , 0.1474183]), stds=array([0.00601246, 0.01011005]), stop={})


In [26]:
new_param_set = [sample[0] for sample in es.ask()]
print(new_param_set)

[0.15658645718077466, 0.15345220385878075, 0.14449769793239053]


In [6]:
# With the wrapper
from cma_es_wrapper import CMA_ES_Wrapper

true_r = 0.15
r_set = [0.12, 0.16, 0.17]

mpc_history = generate_mpc_history(true_r,input_seq)

es_wrapper = CMA_ES_Wrapper()

# r_set = es_wrapper.optimize_step(r_set, mpc_history)
# print(r_set)

for _ in range(100):
    r_set = es_wrapper.optimize_step(r_set, mpc_history)
    print(r_set)



[0.15218443840685525, 0.15227185888958572, 0.1538617368789623]
[-0.021292689365684375, 0.07557704231889084, 0.09536875359374977]
[0.148001117210121, 0.14819136741304975, 0.14943845657947774]
[0.14967197307306876, 0.14994753192390947, 0.15027692733080297]
[0.1495155341303793, 0.1496551160968137, 0.14965618343268566]
[0.1495172701864073, 0.14953716361984723, 0.14957064731019792]
[0.1495307271764757, 0.14953699407221913, 0.1495462986663896]
[0.14953830245128322, 0.14953886558051957, 0.14954014036656121]
[0.14954037880034388, 0.1495405893810501, 0.14954093156689605]
[0.14954068796250836, 0.1495409019884142, 0.14954093231954732]
[0.14954096514046325, 0.14954098541473507, 0.14954098901561175]
[0.14954096842819298, 0.14954096975519038, 0.14954097545232964]
[0.14954097220501655, 0.14954097327046365, 0.14954097682324577]
[0.1495409726713256, 0.14954097322957918, 0.1495409750390309]
[0.1495409740534299, 0.14954097482595965, 0.14954097545331888]
[0.14954097446279505, 0.14954097504346678, 0.149540

AssertionError: 