In [3]:
import numpy as np
import random
from keras import regularizers
from keras import optimizers
from keras.models import Sequential
from keras.layers import Dense, Activation, Dropout
import crocoddyl
from tensorflow.python.client import device_lib
print(device_lib.list_local_devices())
crocoddyl.switchToNumpyMatrix()


        
class unicycle_warmstart():
    """
    A self contained class to generate trajectories to train the neural net and warmstart the solver.
    """
    def __init__(self,
                 n_trajectories: int = 10, 
                 state_weight: float = 1., 
                 control_weight: float = 0.3, 
                 nodes: int = 20,
                 n_hidden: int = 5,
                 neurons: int = 256,
                 optimizer: str = 'sgd',
                 save_trajectories: bool = False,
                 save_model: bool = False,
                 plot: bool = False)
    
        """
        @ Args:
             n_trajectories : number of trajectories to generate from crocoddyll
             
             state_weight   : the weight of state in unicycle
             
             control_weight : control weight of the unicycle
             
             nodes          : number of knots, e.g T = 10 or 30   
             
             n_hidden       : number of hidden layers in the neural network
             
             neurons        : number of neurons in the hidden layer
             
             optmizer       : the optimizer to be used to train the net
             
             save_trajectories : save the trajectories with pickle
             
             save_model     : save the net
             
             plot           : plot results  
             
        """
        self.__n_trajectories = n_trajectories
        self.__state_weight = state_weight
        self.__control_weight = control_weight
        self.__nodes = nodes
        self.__n_hidden = n_hidden
        self.__neurons = neurons
        self.optimizer = optimizer
        self.save_trajectories = save_trajectories
        self.save_model = save_model
        self.plot = plot
        
    def _generate_trajectories(self):
        """
        This could be done better with pool. But since we are generating a maximum of 10K trajectories, 
        there' no need for pool.
        
        @ Description: generate 10K trajectories, each trajectory with same state and control weight.
        """

        starting_configurations = []
        optimal_trajectories = []
        feasible_trajectories = 0
        for _ in range(self.__n_trajectories):
            initial_config = np.matrix([random.uniform(-2.1, 2.),
                                        random.uniform(-2.1, 2.), 
                                        random.uniform(0, 1)]).T
            
            model = crocoddyl.ActionModelUnicycle()
            model.costWeights = np.matrix([self.__state_weight, self.__control_weight]).T
            problem = crocoddyl.ShootingProblem(initial_config, [ model ] * self.__nodes, model)
            ddp = crocoddyl.SolverDDP(problem)
            ddp.solve()
            if ddp.isFeasible:
                state = np.squeeze(np.asarray(ddp.xs[1:]))
                control = np.squeeze(np.asarray(ddp.us))
                feasible_trajectories += 1

                optimal_trajectory = np.hstack((state, control))
                starting_configurations.append(np.squeeze(np.asarray(initial_config)))
                optimal_trajectories.append(optimal_trajectory)

        starting_configurations = np.asarray(starting_configurations)
        optimal_trajectories = np.asarray(optimal_trajectories)
        if self.save_trajectories:
            f = open('x_data.pkl', 'wb')
            cPickle.dump(starting_configurations, f, protocol=cPickle.HIGHEST_PROTOCOL)
            g = open("y_data.pkl", "wb")
            cPickle.dump(optimal_trajectories, g, protocol=cPickle.HIGHEST_PROTOCOL)
            f.close(), g.close()


        else:
            
            return starting_configurations, optimal_trajectories, feasible_trajectories
        
        
    def _train_net(self):
        starting_configurations, optimal_trajectories, feasible_trajectories = self._generate_trajectories()
        x_train = starting_configurations[0 : 9000, :]
        y_train = optimal_trajectories[0 : 9000, :]
        x_test = starting_configurations[9000 :, :]
        y_test = optimal_trajectories[9000 :, :]
        model = Sequential()
        model.add(Dense(256, input_dim=(starting_configurations.shape[1])))
        model.add(Activation('relu'))
        for _ in range(self.__n_hidden):
            model.add(Dense(256,
                            activation = "tanh",
                            kernel_initializer='random_uniform',
                            kernel_regularizer=regularizers.l2(0.01),
                            activity_regularizer=regularizers.l1(0.01)))            
            model.add(Dropout(0.25))
            
        model.add(Dense(optimal_trajectories.shape[1], 
                        activation = 'linear'))        
        
     
        
        sgd = optimizers.SGD(lr=0.01, decay=1e-6, momentum=0.9, nesterov=True)
        model.compile(loss='mean_squared_error',
              optimizer=sgd,
              metrics=['mean_squared_error', "mean_absolute_error"])
        
        print('Train...')
        
        model.fit(x_train, 
                  y_train,
                  batch_size = 32,
                  epochs = 200,
                  verbose = 0
                  )
        
        score = model.evaluate(x_test, y_test, batch_size = 16, use_multiprocessing=True)
        
        print(score)
        
        return model
    
    def warmstart(self, test_trajectories: int = 1):
        neural_net = self._train_net()
        trajectory_shape = neural_net.output_shape[1]
        
        for _ in range(test_trajectories):
            initial_config = np.matrix([random.uniform(-2.1, 2.),
                                        random.uniform(-2.1, 2.), 
                                        random.uniform(0, 1)]).T
            
            result = neural_net.predict(np.asarray(initial_config).T)
            result.reshape(self.__nodes, 5)
            
            states, controls = [], []
            states.append(np.asarray(initial_config).T)
            for i in range(result.shape[1] ):
                states.append(result[0:3,i])
                control.append(result[3:5,i])
                

            
            
            model = crocoddyl.ActionModelUnicycle()
            model.costWeights = np.matrix([self.__state_weight, self.__control_weight]).T
            problem = crocoddyl.ShootingProblem(initial_config, [ model ] * self.__nodes, model)
            ddp = crocoddyl.SolverDDP(problem)
            ddp.solve(states, control)
            print(ddp.iter)

SyntaxError: invalid syntax (<ipython-input-3-4f0cee68be73>, line 28)