# Imports

In [23]:
# Environment imports
import math
import numpy as np
import gymnasium as gym
import cv2
import matplotlib.pyplot as plt

# Tensorflow training imports
from tqdm import tqdm
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense
from tensorflow.keras.optimizers import Adam
import tensorflow as tf
from tensorflow.keras import mixed_precision

# Training monitoring imports
import datetime, os
from collections import deque
import time
import random

# Configs

In [24]:
############################## CONFIGURATION ##################################
# Prevent tensorflow from allocating the all of GPU memory
# From: https://stackoverflow.com/questions/34199233/how-to-prevent-tensorflow-from-allocating-the-totality-of-a-gpu-memory


# 1) Mixed‑precision to leverage Tensor‑core speedups
mixed_precision.set_global_policy('mixed_float16')

# 2) Enable XLA (JIT) compilation for fused kernels
tf.config.optimizer.set_jit(True)



GPUs = tf.config.experimental.list_physical_devices('GPU')
for gpu in GPUs:
    tf.config.experimental.set_memory_growth( gpu, True )   # set memory growth option

# Creates a virtual display for OpenAI gym ( to support running from headless servers)
# pyvirtualdisplay.Display( visible=0, size=(720, 480) ).start()

# Where are models saved? How frequently e.g. every x1 episode?
MODEL_TYPE              = "DDQN_NN"
TIMESTAMP               = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
MODEL_DIR               = f"./model/{MODEL_TYPE}/"

# Setup Reward Dir
REWARD_DIR              = f"rewards/{MODEL_TYPE}/"

# Training params
RENDER                  = True
PLOT_RESULTS            = True     # plotting reward and epsilon vs epsiode (graphically) NOTE: THIS WILL PAUSE TRAINING AT PLOT EPISODE!
EPISODES                = 5000      # training episodes
SAVE_TRAINING_FREQUENCY = 100       # save model every n episodes
SKIP_FRAMES             = 2         # skip n frames between batches
TARGET_UPDATE_STEPS     = 2         # update target action value network every n EPISODES
MAX_PENALTY             = -30       # min score before env reset
BATCH_SIZE              = 64        # number for batch fitting
CONSECUTIVE_NEG_REWARD  = 25        # number of consecutive negative rewards before terminating episode
STEPS_ON_GRASS          = 20        # How many steps can car be on grass for (steps == states)
REPLAY_BUFFER_MAX_SIZE  = 150000    # threshold memory limit for replay buffer (old version was 10000)

# Steering history for smoothing throttle/brake rewards
STEER_BUFFER_LEN = 5

EPSILON_START = 1.0
EPSILON_MIN   = 0.01

# compute decay rate so ε decays from 1.0→0.01 over EPISODES:
EPSILON_DECAY = math.exp(math.log(EPSILON_MIN/EPSILON_START) / EPISODES)
# Testing params
PRETRAINED_PATH = os.path.join(MODEL_DIR, "model.weights.h5")
TEST                    = True      # true = testing, false = training

# Utilities

In [25]:
def convert_greyscale( state ):
    """Take input state and convert to greyscale. Check if road is visible in frame."""
    global on_grass_counter

    x, y, _ = state.shape
    cropped = state[ 0:int( 0.85*y ) , 0:x ]
    mask = cv2.inRange( cropped,  np.array([100, 100, 100]),  # dark_grey
                                  np.array([150, 150, 150]))  # light_grey

    # Create greyscale then normalize array to reduce complexity for neural network
    gray = cv2.cvtColor( state, cv2.COLOR_BGR2GRAY )
    gray = gray.astype(float)
    gray_normalised = gray / 255.0

    # check if car is on grass
    xc = int(x / 2)
    grass_mask = cv2.inRange(   state[67:76 , xc-2:xc+2],
                                np.array([50, 180, 0]),
                                np.array([150, 255, 255]))

    # If on grass for x5 frames or more then trigger True!
    on_grass_counter = on_grass_counter+1 if np.any(grass_mask==255) and "on_grass_counter" in globals() else 0
    if on_grass_counter > STEPS_ON_GRASS:
        on_grass = True
        on_grass_counter = 0
    else: on_grass = False

    # returns [ greyscale image, T/F of if road is visible, is car on grass bool ]
    return [ np.expand_dims( gray_normalised, axis=2 ), np.any(mask== 255), on_grass ]


def calculate_off_far(state):
    """
    Compute lateral offset of the road centroid from center,
    using only the upper 40% of the frame to capture upcoming turns.
    """
    h, w, _ = state.shape

    # Only take the top 40% of the image (0 → 0.4*h):
    bottom = int(0.4 * h)
    crop   = state[0:bottom, 0:w]

    # Mask out the gray track pixels:
    mask = cv2.inRange(
        crop,
        np.array([100,100,100]),
        np.array([150,150,150])
    )

    # Compute image moments of that mask:
    M = cv2.moments(mask)
    if M["m00"] != 0:
        # centroid x = m10/m00
        cx = M["m10"] / M["m00"]
        # normalize to –1…+1 around image center:
        off_far = (cx - (w/2)) / (w/2)
        return float(off_far)
    else:
        # no track pixels detected ⇒ treat as a very sharp turn
        return 1.0



def plot_agent(data, path):
    data = np.array(data)
    """
    Plot agent's training progress from a CSV file.
    """

    episodes = np.arange(len(data))

    # Plotting
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 6), sharex=True)

    # Plot 1: Reward per episode
    ax1.plot(episodes, data[:,[0]], label="Total Reward", color='green', alpha=0.6)
    ax1.set_ylabel("Reward")
    ax1.set_title("Agent Reward per Episode")
    ax1.grid(True)
    ax1.legend()

    # Plot 2: Epsilon per episode
    ax2.plot(episodes, data[:,[1]], label="Epsilon", color='orange')
    ax2.set_ylabel("Epsilon")
    ax2.set_xlabel("Episode")
    ax2.set_title("Epsilon Decay")
    ax2.grid(True)
    ax2.legend()

    plt.tight_layout()
    plt.savefig(path)
    plt.close()

# Main Agent 

In [26]:
class DDQN_Agent:
    def __init__(self,
                 action_space=[(-1, 1, 0.2), (0, 1, 0.2), (1, 1, 0.2),
                               (-1, 1, 0),   (0, 1,   0),   (1, 1,   0),
                               (-1, 0, 0.2), (0, 0, 0.2), (1, 0, 0.2),
                               (-1, 0,   0), (0, 0,   0), (1, 0,   0)],
                 gamma=0.99,
                 epsilon=EPSILON_START,
                 epsilon_min=EPSILON_MIN,
                 epsilon_decay=EPSILON_DECAY,
                 learning_rate=0.0001
                 ):
        self.action_space   = action_space
        self.D              = deque(maxlen=REPLAY_BUFFER_MAX_SIZE)
        self.gamma          = gamma
        self.epsilon        = epsilon
        self.epsilon_min    = epsilon_min
        self.epsilon_decay  = epsilon_decay
        self.learning_rate  = learning_rate
        self.model          = self.build_model()
        self.target_model   = tf.keras.models.clone_model(self.model)
        self.train_step     = 0

    def build_model(self):
        model = Sequential()
        model.add(tf.keras.Input(shape=(96, 96, 1)))
        model.add(Conv2D(6, (7, 7), strides=3, activation='relu'))
        model.add(MaxPooling2D((2, 2)))
        model.add(Conv2D(12, (4, 4), activation='relu'))
        model.add(MaxPooling2D((2, 2)))
        model.add(Flatten())
        model.add(Dense(216, activation='relu'))
        model.add(Dense(64, activation='relu'))
        model.add(Dense(len(self.action_space), activation=None))
        model.compile(
            loss=tf.keras.losses.Huber(),
            optimizer=Adam(learning_rate=self.learning_rate, epsilon=1e-7)
)
        return model

    def update_model(self):
        self.target_model.set_weights(self.model.get_weights())

    def store_transition(self, state, action, reward, new_state, done):
        self.D.append((state, action, reward, new_state, done))

    def choose_action(self, state, best=False, _random=False):
        state_batch = np.expand_dims(state, axis=0)
        if _random:
            idx = random.randrange(len(self.action_space))
        elif best:
            q = self.model.predict(state_batch, verbose=0)[0]
            idx = np.argmax(q)
        else:
            if np.random.rand() < self.epsilon:
                idx = random.randrange(len(self.action_space))
            else:
                q = self.model.predict(state_batch, verbose=0)[0]
                idx = np.argmax(q)
        return self.action_space[idx]


    def experience_replay(self):
        """
        Vectorized Double‑DQN update over a minibatch.
        """
        if len(self.D) < BATCH_SIZE:
            return

        # 1) Sample and unpack
        minibatch       = random.sample(self.D, BATCH_SIZE)
        states          = np.stack([t[0] for t in minibatch])
        action_idxs     = np.array([t[1] for t in minibatch])
        rewards_arr     = np.array([t[2] for t in minibatch], dtype=np.float32)
        next_states     = np.stack([t[3] for t in minibatch])
        done_masks      = np.array([t[4] for t in minibatch], dtype=np.bool_)

        # 2) Batch Q‑value predictions
        q_current     = self.model.predict(states,      verbose=0)
        q_next_online = self.model.predict(next_states, verbose=0)
        q_next_target = self.target_model.predict(next_states, verbose=0)

        # 3) Compute targets using vectorized Double‑DQN :contentReference[oaicite:2]{index=2}
        #   best_next_actions[i] = argmax_a Q_online(next_states[i], a)
        best_next_actions = np.argmax(q_next_online, axis=1)
        #   corresponding Q_target values:
        best_next_q = q_next_target[np.arange(BATCH_SIZE), best_next_actions]
        #   TD target: r + γ * Q_target * (1 - done)
        td_targets = rewards_arr + self.gamma * best_next_q * (~done_masks)

        # 4) Replace only the taken-action Q‑values in q_current
        q_current[np.arange(BATCH_SIZE), action_idxs] = td_targets

        # 5) Train on this batch using Huber loss for robustness :contentReference[oaicite:3]{index=3}
        # (Assuming your model was compiled with Huber; if not, switch loss to tf.keras.losses.Huber())
        self.model.fit(states,
                       q_current,
                       batch_size=BATCH_SIZE,
                       epochs=1,
                       verbose=0)

        # 6) Periodically sync target network
        self.train_step += 1
        if self.train_step % TARGET_UPDATE_STEPS == 0:
            self.update_model()


    def save(self, data):
        name = f'data_{TIMESTAMP}'
        if not os.path.exists(MODEL_DIR):
            os.makedirs(MODEL_DIR)
        self.target_model.save_weights(MODEL_DIR + "model.weights.h5")
        if not os.path.exists(REWARD_DIR):
            os.makedirs(REWARD_DIR)
        filepath = os.path.join(REWARD_DIR, name + ".csv")
        plotpath = os.path.join(REWARD_DIR, name + ".jpg")
        plot_agent(data, plotpath)
        np.savetxt(filepath, data, delimiter=",")

    def load(self, name):
        self.model.load_weights(name)
        self.update_model()


# Train

In [27]:
def train_agent(agent: DDQN_Agent, env: gym.make, episodes: int):
    """
    Train agent with:
      - 4-frame temporal smoothing (via average)
      - grass penalty, steering-conditioned throttle shaping & steer-jitter penalty
      - Double-DQN updates
      - Exponential ε-decay applied once per episode
    """
    episode_rewards = []

    for episode in tqdm(range(episodes), desc="Training"):
        print(f"[INFO] Starting Episode {episode}")

        # Reset env & preprocess first frame
        obs_colour, _ = env.reset()
        first_grey, can_see_road, car_on_grass = convert_greyscale(obs_colour)

        # 4-frame grey buffer
        frame_buffer = deque([first_grey] * 4, maxlen=4)

        # Steering‐history buffer (e.g. 7 frames)
        steer_buffer = deque([0] * STEER_BUFFER_LEN, maxlen=STEER_BUFFER_LEN)

        # Track last steer to penalize jitter
        last_steer = 0.0
        total_reward    = 0.0
        negative_streak = 0
        done = False

        while (not done) and (total_reward > MAX_PENALTY) and can_see_road:

            # 1) Build temporally smoothed NN input
            state_input = np.mean(np.stack(frame_buffer, axis=0), axis=0, keepdims=True)

            # 2) Agent picks an action
            raw_action = agent.choose_action(state_input[0])
            steer, gas, brake = raw_action
            action = np.array(raw_action, dtype=np.float64)

            # 3) Record steering history & penalize rapid changes
            steer_buffer.append(steer)
            steer_change = abs(steer - last_steer)
            last_steer = steer
            # small penalty on jitter (only when you rapidly switch steer)
            jitter_penalty = 0.005 * steer_change

            # 4) Roll out with frame-skip and accumulate base reward
            reward_accum = 0.0
            for _ in range(SKIP_FRAMES + 1):
                next_colour, r, term, trunc, _ = env.step(action)
                reward_accum += r
                if RENDER:
                    env.render()
                if term or trunc:
                    done = True
                    break

            # 5) Early termination on too many negatives
            negative_streak = (negative_streak + 1) if (reward_accum < 0) else 0
            if negative_streak >= CONSECUTIVE_NEG_REWARD:
                break

            # 6) Preprocess the next frame & update grey buffer
            next_grey, can_see_road, car_on_grass = convert_greyscale(next_colour)
            frame_buffer.append(next_grey)

            # 7) Grass penalty
            if car_on_grass:
                reward_accum -= 0.12

            # 8) Corner‐aware throttle shaping
            off_far = calculate_off_far(next_colour)
            turniness = sum(abs(s) for s in steer_buffer) / STEER_BUFFER_LEN

            # 9) throttle shaping
            if abs(off_far) < 0.2 and turniness < 0.2:
                # straight road ahead, reward gas
                reward_accum += 0.012 * gas
            else:
                # approaching/in turn, penalize gas
                severity = max(turniness, abs(off_far))
                reward_accum -= 0.04 * gas * severity


            # 2) Brake incentive only when a curve is coming (off_far)
            if abs(off_far) > 0.3:
                # scaled by how sharp that curve is
                reward_accum += 0.015 * brake * abs(off_far)

            # 9) Subtract the jitter penalty
            reward_accum -= jitter_penalty

            # 10) Clip total shaped reward
            reward_accum = np.clip(reward_accum, -1.0, 1.0)

            # 11) Store and learn
            prev_state = state_input[0]
            next_state = np.mean(np.stack(frame_buffer, axis=0), axis=0)
            action_idx = agent.action_space.index(tuple(raw_action))
            agent.store_transition(prev_state, action_idx, reward_accum, next_state, done)
            agent.experience_replay()

            # 12) Accumulate for logging
            total_reward += reward_accum

        # Episode done: log & checkpoint
        episode_rewards.append([total_reward, agent.epsilon])
        print(f"[INFO] Episode {episode} → Reward: {total_reward:.2f} | ε: {agent.epsilon:.4f}")

        if episode % TARGET_UPDATE_STEPS == 0:
            agent.update_model()
        if episode % SAVE_TRAINING_FREQUENCY == 0:
            agent.save(episode_rewards)

        # Exponential ε-decay
        agent.epsilon = max(agent.epsilon_min, agent.epsilon * EPSILON_DECAY)

    env.close()
    return episode_rewards


# Test

In [28]:
def test_agent(agent: DDQN_Agent, env: gym.make, model: str, testnum=10):
    """Test a pretrained model and print out run rewards and total time taken. Quit with ctrl+c."""
    # Load agent model
    agent.load(model)

    run_rewards = []
    for test in range(testnum):
        state_colour, _ = env.reset()
        state_grey, _, _ = convert_greyscale(state_colour)

        done = False
        sum_reward = 0.0
        t1 = time.time()
        while not done:
            action = agent.choose_action(state_grey, best=True)
            action = np.array(action, dtype=np.float64)

            new_state_colour, r, terminated, truncated, _ = env.step(action)
            done = terminated or truncated

            if RENDER:
                env.render()

            state_grey, _, _ = convert_greyscale(new_state_colour)
            sum_reward += r

        t1 = time.time() - t1
        run_rewards.append([sum_reward, np.nan, t1, np.nan, np.nan, np.nan])
        print(f"[INFO]: Run {test} | Run Reward: {sum_reward:.2f} | Time: {t1:.2f}s")

    rr = [i[0] for i in run_rewards]
    rt = [i[2] for i in run_rewards]

    r_max = max(rr)
    r_min = min(rr)
    r_std_dev = np.std(rr)
    r_avg = np.mean(rr)
    t_avg = np.mean(rt)

    run_rewards.append([r_avg, np.nan, t_avg, r_max, r_min, r_std_dev])
    print(
        f"[INFO]: Runs {testnum} | Avg Run Reward: {r_avg:.2f} "
        f"| Avg Time: {t_avg:.2f}s | Max: {r_max:.2f} | Min: {r_min:.2f} | Std Dev: {r_std_dev:.2f}"
    )

    # saving test results
    if not os.path.exists(f"test_{REWARD_DIR}"):
        os.makedirs(f"test_{REWARD_DIR}")
    path = f"test_{REWARD_DIR}" + PRETRAINED_PATH.split('/')[-1][:-3] + "_run_rewards.csv"
    np.savetxt(path, run_rewards, delimiter=",")

    return [r_avg, np.nan, t_avg, r_max, r_min, r_std_dev]


main.py

In [29]:
if __name__ == "__main__":


    if not TEST:
        env = gym.make('CarRacing-v3')
        # Train Agent
        agent = DDQN_Agent()
        train_agent( agent, env, episodes = EPISODES )

    else:
        # Test Agent
        env = gym.make('CarRacing-v3',render_mode='human')
        agent = DDQN_Agent()

        test_agent( agent, env, model = PRETRAINED_PATH, testnum=5 )

  saveable.load_own_variables(weights_store.get(inner_path))
  saveable.load_own_variables(weights_store.get(inner_path))


[INFO]: Run 0 | Run Reward: 914.00 | Time: 51.43s
[INFO]: Run 1 | Run Reward: 907.30 | Time: 57.39s
[INFO]: Run 2 | Run Reward: 890.57 | Time: 61.44s
[INFO]: Run 3 | Run Reward: 914.30 | Time: 52.90s
[INFO]: Run 4 | Run Reward: 886.44 | Time: 62.17s
[INFO]: Runs 5 | Avg Run Reward: 902.52 | Avg Time: 57.07s | Max: 914.30 | Min: 886.44 | Std Dev: 11.79
