In [None]:
# ==============================================================================
# IMPORTS AND GLOBAL VARIABLES
# ==============================================================================

from pynput import keyboard
from stable_baselines3 import PPO
from imitation.algorithms import bc
from imitation.data.types import Trajectory
from imitation.data import serialize
from donkey_env_lidar import DonkeyEnvLidar
from stable_baselines3.common.policies import ActorCriticPolicy
import numpy as np, torch, yaml

# MAIN VARIABLES YOU MIGHT WANT TO EDIT
PATH_TO_SIMULATOR_EXECUTABLE = "/home/animated/Projects/donkeycar/DonkeySimLinux/donkey_sim.x86_64"
IMITATION_LEARNING_DATASET_PATH = "imitation_learning_datasets/imitation_training_data4"
IMITATION_LEARNING_MODEL_PATH = "imitation_learning_models/imitation_learning4.zip"

# to learn more about what you can change here, please see: https://stable-baselines3.readthedocs.io/en/master/guide/custom_policy.html
# keywords include "net_arch", "activation_fn", "features_extractor_class", "features_extractor_kwargs", etc etc etc
POLICY_KEYWORD_ARGUMENTS = {"net_arch": [32, 32]}



# VARIABLES THAT PERTAIN TO VARIOUS SIMULATION CONFIGS. FOR THE MOST PART, YOU SHOULDN'T NEED TO TOUCH THIS
TRACK_NAME = "mini_monaco"
DONKEYCAR_SIMULATION_CONFIG = {
    "port": 9091,
    "max_cte": 8,
    "lidar_config": { 
        "deg_per_sweep_inc": 1.0
    }
}

In [16]:
# ==============================================================================
# KEYBOARD CALLBACKS
# ==============================================================================

keyboard_throttle_command = 0.
keyboard_steering_command = 0.

def keyboard_press_callback(key: keyboard.KeyCode):
    global keyboard_steering_command, keyboard_throttle_command
    try:
        if key.char == "d":
            keyboard_steering_command = 1
        elif key.char == "a":
            keyboard_steering_command = -1
        elif key.char == "w":
            keyboard_throttle_command = 0.7
        elif key.char == "s":
            keyboard_throttle_command = -0.7
            
    except:
        pass

def keyboard_release_callback(key: keyboard.KeyCode):
    global keyboard_steering_command, keyboard_throttle_command
    
    try:
        if key.char == "d":
            keyboard_steering_command = 0.
        elif key.char == "a":
            keyboard_steering_command = 0.
        elif key.char == "w":
            keyboard_throttle_command = 0.
        elif key.char == "s":
            keyboard_throttle_command = 0.
            
    except:
        pass
    
    
listener = keyboard.Listener(on_press=keyboard_press_callback, on_release=keyboard_release_callback)
listener.start()


In [17]:
# ==============================================================================
# SET UP THE DONKEYCAR SIMULATOR
# ==============================================================================

donkeycar_simulation_config = {
    "exe_path" : PATH_TO_SIMULATOR_EXECUTABLE, 
    **DONKEYCAR_SIMULATION_CONFIG
}
env = DonkeyEnvLidar(level=TRACK_NAME, conf=donkeycar_simulation_config)



# ==============================================================================
# RUN THE SIMULATOR WITH KEYBOARD CONTROL AND SAVE OBSERVATIONS AND ACTIONS
# ==============================================================================

# This is the main array that contains data about all of the trajectories, which we will use as training data for the behavioural cloning
list_of_trajectories: list[Trajectory] = []


try:
    for lap_index in range(1000):

        current_lap_observations = []
        current_lap_actions = []
        
        observation = env.reset()
        current_lap_observations.append(observation)
                
                
        for time_step in range(10000000):
            
            action = np.array([keyboard_steering_command, keyboard_throttle_command])
            observation, reward, done, info = env.step(action)
            
            current_lap_observations.append(observation)
            current_lap_actions.append(action)


            # If we finished the current lap
            if (info["lap_count"] == 1):                    
                list_of_trajectories.append(Trajectory(np.array(current_lap_observations), np.array(current_lap_actions), infos=None, terminal=True))  
                serialize.save(f"{IMITATION_LEARNING_DATASET_PATH}", list_of_trajectories)
                break
            
            # If we hit an object or we go too far off of the track, then we should not record the current lap
            if (abs(info["cte"]) > DONKEYCAR_SIMULATION_CONFIG["max_cte"]) or (info["hit"] != "none"):
                break
        
except:
    pass

env.close()

starting DonkeyGym env
Setting default: start_delay 5.0
Setting default: frame_skip 1
Setting default: cam_resolution (120, 160, 3)
Setting default: log_level 20
Setting default: host localhost
Setting default: steer_limit 1.0
Setting default: throttle_min 0.0
Setting default: throttle_max 1.0
donkey subprocess started
Found path: /home/animated/Projects/donkeycar/DonkeySimLinux/donkey_sim.x86_64


INFO:gym_donkeycar.core.client:connecting to localhost:9091 


Exception: Could not connect to server. Is it running? If you specified 'remote', then you must start it manually.

In [None]:
# ==============================================================================
# TRAIN AND SAVE A BEHAVIOURAL CLONING ALGORITHM TO A FILE
# ==============================================================================
    

trajectory_list = serialize.load(f"{IMITATION_LEARNING_DATASET_PATH}")

bc_trainer = bc.BC(
    observation_space= env.observation_space,
    action_space=env.action_space,
    demonstrations=trajectory_list,
    policy=PPO("MlpPolicy", env, policy_kwargs=POLICY_KEYWORD_ARGUMENTS).policy,
    rng = np.random.default_rng()
)


bc_trainer.train(n_epochs=5)

bc_trainer.policy.save(IMITATION_LEARNING_MODEL_PATH)

0batch [00:00, ?batch/s]

--------------------------------
| batch_size        | 32       |
| bc/               |          |
|    batch          | 0        |
|    ent_loss       | -0.00284 |
|    entropy        | 2.84     |
|    epoch          | 0        |
|    l2_loss        | 0        |
|    l2_norm        | 129      |
|    loss           | 2.18     |
|    neglogp        | 2.18     |
|    prob_true_act  | 0.115    |
|    samples_so_far | 32       |
--------------------------------


65batch [00:01, 36.78batch/s]


In [None]:
# ==============================================================================
# LOAD THE BEHAVIOURAL CLONING ALGORITHM FROM A FILE
# ==============================================================================

saved_variables = torch.load(IMITATION_LEARNING_MODEL_PATH, weights_only=False)

model = PPO("MlpPolicy", env, verbose=1, policy_kwargs=POLICY_KEYWORD_ARGUMENTS)
model.policy.load_state_dict(saved_variables["state_dict"])



Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


<All keys matched successfully>

In [None]:
# ==============================================================================
# SET UP THE DONKEYCAR SIMULATOR
# ==============================================================================

donkeycar_simulation_config = {
    "exe_path" : PATH_TO_SIMULATOR_EXECUTABLE, 
    **DONKEYCAR_SIMULATION_CONFIG
}
env = DonkeyEnvLidar(level=TRACK_NAME, conf=donkeycar_simulation_config)


# ==============================================================================
# RUN THE SIMULATOR WITH OUR MODEL TO OBSERVE HOW WELL IT DRIVES
# ==============================================================================

try:
    for lap_index in range(1000):

        current_lap_observations = []
        current_lap_actions = []
        
        observation = env.reset()
        current_lap_observations.append(observation)
                
                
        for time_step in range(10000000):
            
            action, _ = model.predict(observation, deterministic=True)
            observation, reward, done, info = env.step(action)
            
            current_lap_observations.append(observation)
            current_lap_actions.append(action)


            # If we finished the current lap
            if (info["lap_count"] == 1):                    
                break
            
            # If we hit an object or we go too far off of the track, then we should not record the current lap
            if (abs(info["cte"]) > DONKEYCAR_SIMULATION_CONFIG["max_cte"]) or (info["hit"] != "none"):
                break

except:
    pass

env.close()
        
        


starting DonkeyGym env
Setting default: start_delay 5.0
Setting default: frame_skip 1
Setting default: cam_resolution (120, 160, 3)
Setting default: log_level 20
Setting default: host localhost
Setting default: steer_limit 1.0
Setting default: throttle_min 0.0
Setting default: throttle_max 1.0
donkey subprocess started
Found path: /home/animated/Projects/donkeycar/DonkeySimLinux/donkey_sim.x86_64


INFO:gym_donkeycar.core.client:connecting to localhost:9091 


loading scene mini_monaco


INFO:gym_donkeycar.envs.donkey_sim:on need car config
INFO:gym_donkeycar.envs.donkey_sim:sending car config.
INFO:gym_donkeycar.envs.donkey_sim:done sending lidar config., {'deg_per_sweep_inc': 1.0}
INFO:gym_donkeycar.envs.donkey_sim:sim started!


number of lidar measurements: 360
closing donkey sim subprocess
