In [None]:
import torch
import torch.nn as nn
import numpy as np
import crocoddyl
from sklearn.utils import shuffle
import matplotlib.pyplot as plt
from network import PolicyNetwork
torch.set_default_dtype(torch.double)

if torch.cuda.is_available():
    torch.cuda.empty_cache()

In [None]:
#...... Global Hyperparameters
TRAJECTORIES     = 500
HORIZON          = 20
PRECISION        = 1e-9
MAXITERS         = 1000

FC1_DIMS         =  256
FC2_DIMS         =  256
FC3_DIMS         =  20
ACTIVATION       =  nn.Tanh()
DEVICE           = 'cpu'


BATCHSIZE        = 64
lr               = 1e-3
EPOCHS           = 150
DECAY            = 0

In [None]:
net = PolicyNetwork(policy_dims = 60, fc1_dims=FC1_DIMS, fc2_dims=FC2_DIMS, fc3_dims=FC3_DIMS, activation=ACTIVATION, device=DEVICE)

In [None]:
def griddedData(n_points:int = 500,
            xy_limits:list = [-1.9,1.9],
            theta_limits:list = [-np.pi/2, np.pi/2]
            ):
    """Generate datapoints from a grid"""

    size = int(np.sqrt(n_points)) + 1

    min_x, max_x = [*xy_limits]
    xrange = np.linspace(min_x,max_x,size, endpoint=True)
    trange = np.linspace(*theta_limits, size, endpoint=True)
    points = np.array([ [x1,x2, x3] for x1 in xrange for x2 in xrange for x3 in trange])

    np.round_(points, decimals=6)
    np.random.shuffle(points)
    points = points[0:n_points, : ]
    return points

In [None]:
x, y = [], []
points = griddedData(n_points=TRAJECTORIES)
for x0 in points :
    model = crocoddyl.ActionModelUnicycle()
    model.costWeights = np.array([1., 1]).T
    problem = crocoddyl.ShootingProblem(x0.T, [model]*HORIZON, model)
    ddp = crocoddyl.SolverDDP(problem)
    ddp.solve()
    xs = np.array(ddp.xs)
    for index, value in enumerate(xs):
        #print(index, value, xs[index+1:])
        length_subtrajectories = len(xs[index+1:])
        if length_subtrajectories != 0:
            xdata = [value[0], value[1], value[2], length_subtrajectories*3]
            ydata = xs[index+1:].flatten().tolist()
            while len(ydata) != HORIZON:
                ydata.append(0)
            
            ydata = np.array(ydata)
            xdata = np.array(xdata)
            x.append(xdata)
            y.append(ydata)

    del xdata, ydata


x, y = shuffle(x, y)


x = torch.Tensor(x)
y = torch.Tensor(y)
print(x.shape, y.shape)


    

In [None]:
xtest , ytest = x[0:100, :], y[0:100, :]
xtrain , ytrain = x[100:, :], y[100:, :]

dataset = torch.utils.data.TensorDataset(xtrain, ytrain)
dataloader = torch.utils.data.DataLoader(dataset, batch_size = BATCHSIZE, shuffle=True)



opt = torch.optim.Adam(net.parameters(), lr = lr, betas=[0.5, 0.9], weight_decay=DECAY)

for epoch in range(EPOCHS):
    for data, target in dataloader:
        net.train()
        opt.zero_grad()

        data        = data.to(DEVICE)
        target      = target.to(DEVICE)
        loss_mse = []
        loss_mae = []
        for position, trajectory in zip(data, target):
            pred = net(position)
            #print(int(position[-1]))
            true = trajectory[0:int(position[-1])]
            assert true.shape == pred.shape
            mae = torch.mean(torch.abs(pred - true)).item()
            mse = torch.mean(torch.square(pred - true)).item()
            loss_mae.append(mae)
            loss_mse.append(mae)
            
        sum_loss_mse = torch.sum(loss_mse)
        sum_loss_mae = torch.sum(loss_mae)

        loss = .requires_grad_(True)
        loss.backward()

    # Validation with accuracy, mse, mae:
    _mse=[]
    _mae=[]
    _acc=0.
    for position, target in zip(xtest, ytest):
        pred = net(position)
        #print(int(position[-1]))
        true = trajectory[0:int(position[-1])]
        assert true.shape == pred.shape
        mae = torch.mean(torch.abs(pred - true)).item()
        mse = torch.mean(torch.square(pred - true)).item()
        _mse.append(mse)
        _mae.append(mae)

        if mae < 0.1:
            _acc+=1
    smae = np.round(np.sum(_mae), 4)
    smse = np.round(np.sum(_mse), 4)

    print(f"Epoch : {epoch} || Accuracy: {_acc}/{len(xtest)} || SumMeanAbsError : {smae} || SumMeanSquaredError : {smse}")


    


In [None]:
import matplotlib.pyplot as plt

x = np.array([np.random.uniform(-1.5,1.5), np.random.uniform(-1.5,1.5), np.random.uniform(-np.pi/4, np.pi/4), 60]).reshape(1, 4)
np.round_(x, 4)

x1 = torch.Tensor(x).to(DEVICE)
with torch.no_grad():
    guess = net(x1).detach().numpy().reshape(20, 3)



guess = np.vstack((x[:,:-1], guess))

x0 = x[:, 0:-1]
model               = crocoddyl.ActionModelUnicycle()
model.costWeights   = np.array([1.,1.]).T
problem             = crocoddyl.ShootingProblem(x0.T, [model]*HORIZON, model)
ddp1                 = crocoddyl.SolverDDP(problem)
log1                 = crocoddyl.CallbackLogger()
ddp1.setCallbacks([log1])

ddp1.solve(guess, [], MAXITERS)
stops1 = log1.stops[1:]

ddp2                 = crocoddyl.SolverDDP(problem)
log2                 = crocoddyl.CallbackLogger()
ddp2.setCallbacks([log2])

ddp2.solve([], [], MAXITERS)
stops2 = log2.stops[1:]

xs1 = np.array(ddp1.xs)
xs2 = np.array(ddp2.xs)

plt.clf()
fig, (axs1, axs2) = plt.subplots(2, figsize=(8,6))


axs1.plot(xs1[:,0], xs1[:,1], c = 'blue', label= " Warmstarted")
axs1.plot(xs2[:,0], xs2[:,1], c = 'grey', label= " Coldstarted")
axs1.plot(guess[:,0], guess[:,1], c = 'red', label = "Guess")
axs1.set_xlim([-1.5, 1.5])
axs1.set_ylim([-1.5, 1.5])
#plt.legend(axs[0])

axs2.plot(stops1, '--*', label = "Warmstarted")
axs2.plot(stops2, '--+', label = "Coldstarted")
axs2.set_ylabel("Stopping Criteria")
axs2.set_xlabel("Iterations")
axs1.legend()
axs2.legend()
#plt.savefig("quickresult.png")
plt.show()
