In [None]:
import os
import sys
import tensorflow as tf
import tensorflow.compat.v1 as tf_v1
from donkeycar import Vehicle
from donkeycar.parts.dgym import DonkeyGymEnv
from donkeycar.parts.datastore import TubHandler
from donkeycar.parts.controller import LocalWebController

In [None]:
SRC_DIR = os.path.dirname(os.path.abspath("."))
sys.path.insert(0, SRC_DIR)

In [None]:
from src.pid_controller import PIDController
from src.utils import ContextManagerWrapper
from src.parts import DonkeyNetController, DriveSelector

In [None]:
os.environ["CUDA_VISIBLE_DEVICES"]="-1"
RECORD_PATH = r"/home/rajk/LogDir/"
THROTTLE = 0.2
SIM_RATE = 20
version = 1
MODEL_PATH = os.path.join(SRC_DIR, fr"models/new/DonkeyNetV{version}Model_BigNoFlip/")

In [None]:
GPU_OPTIONS = tf_v1.GPUOptions(per_process_gpu_memory_fraction=0.75)
TF_CONFIG = tf_v1.ConfigProto(gpu_options=GPU_OPTIONS, allow_soft_placement=True)
with tf_v1.Session(config=TF_CONFIG) as sess:
    with ContextManagerWrapper(Vehicle(), exit_method="stop") as car:
        SIM_PATH = r"/home/rajk/Machine_Learning/DonkeySimLinux/donkey_sim.x86_64"
        donkey_env = DonkeyGymEnv(SIM_PATH, port=9090, headless=0)
        car.add(donkey_env,
                inputs=["steering", "throttle"],
                outputs=["cam/image_array"],
                threaded=True)

        print("Web controller available at: localhost:{WEB_CONTROLLER_PORT}")
        car.add(LocalWebController(),
                inputs=["cam/image_array"],
                outputs=["user/steering", "user/throttle", "user/mode", "recording"],
                threaded=True)

        input_shape = donkey_env.env.observation_space.shape
        donkey_net_ctr = DonkeyNetController(sess, input_shape=input_shape, model_path=MODEL_PATH,
                                             version=version, throttle=THROTTLE)
        car.add(donkey_net_ctr,
                inputs=["cam/image_array"],
                outputs=["random/steering", "random/throttle"],
                run_condition="drive/auto")

        pid_values = {"P": -5.7e-1, "I": -7e-2, "D": -2.2e-1}
        print(f"PID values: {pid_values}")
        pid_ctr = PIDController(**pid_values, min_value=-1.0, max_value=1.0, store_n_errors=20, target_value=0.0)
        car.add(DriveSelector(pid_ctr),
                inputs=["user/steering", "user/throttle", "random/steering", "random/throttle", "user/mode"],
                outputs=["steering", "throttle", "drive/auto"])

        if RECORD_PATH:
            tub_handler = TubHandler(path=RECORD_PATH)
            tub_inputs = ["cam/image_array", "user/angle", "user/throttle"]
            tub_input_types = ["image_array", "float", "float"]
            car.add(tub_handler.new_tub_writer(inputs=tub_inputs, types=tub_input_types, user_meta=[]),
                    inputs=tub_inputs,
                    outputs=["tub/num_records"],
                    run_condition="recording")
        car.start(rate_hz=SIM_RATE)