In [None]:
import tensorflow as tf
import numba as nb
import numpy as np
import gymnasium as gym

In [None]:
tf.config.list_physical_devices("GPU")

In [None]:
@nb.njit(fastmath=True)
def contrast_img(img, colours):
    """
    Return the high-contrast image,
    with each pixel set as the closest high-contrast colour.
    """
    rgb_img = np.copy(img)
    for i, row in enumerate(rgb_img):
        for j, rgb_pixel in enumerate(row):
            dists = np.empty(len(colours))
            for k, colour in enumerate(colours):
                dist = 0
                for x, y in zip(rgb_pixel, colour):
                    dist += np.abs(x-y) ** 2
                dists[k] = dist ** (1/2)
            min_val = dists[0]
            min_ind = 0
            for l in range(1, len(colours)):
                if dists[l] < min_val:
                    min_val = dists[l]
                    min_ind = l
            rgb_img[i,j] = colours[min_ind]
    return rgb_img


def agent_process_img(img, crop="box", contrast=True):
    """
    Pre-process the image
    """
    if crop == "box":
        # crop unnecessary pixels
        img = img[12:-12, 12:-12]
        
    if contrast:
        # Set each pixel colour as its closest high-contrast colour
        colours = np.array([[170,0,0],[105,230,105],[0,0,0],[101,101,101],[255,255,255]])
        img = contrast_img(img, colours)
    
    return img

In [None]:
def crop(img):
    img = img[:84, 6:90] 
    return img

@nb.njit(fastmath=True)
def rgb_to_grey(img):
    """
    Convert an RGB image to greyscale using the weighted method.
    """
    num_rows, num_cols, _ = img.shape
    grey_img = np.empty((num_rows, num_cols), dtype=np.uint8)
    for i, row in enumerate(img):
        for j, rgb_pixel in enumerate(row):
            # Compute weighted sum of RGB channels
            grey_img[i, j] = 0.2989 * rgb_pixel[0] + 0.5870 * rgb_pixel[1] + 0.1140 * rgb_pixel[2]

    return grey_img

In [None]:
@nb.njit(fastmath=True)
def get_steering(processed_img, line_ind):
    """Count road pixels on the left and right of the car."""
    left_line = processed_img[line_ind:line_ind+1,:34][0]
    right_line = processed_img[line_ind:line_ind+1,38:][0]
    left_count, right_count = 0, 0
    for left_pixel, right_pixel in zip(left_line, right_line):
        if left_pixel[1] == 101:
            left_count += 1
        if right_pixel[1] == 101:
            right_count += 1
    
    return right_count / len(right_line) - left_count / len(left_line)

In [None]:
def gen_dataset(length):
    env = gym.make("CarRacing-v2", domain_randomize=False, autoreset=False)
    states = []
    actions = []
    colours = np.array([[170,0,0],[105,230,105],[0,0,0],[101,101,101],[255,255,255]])
    while len(states) < length:
        episode_states = []
        episode_actions = []
        observation, info = env.reset()
        for _ in range(50):
            action = [0.0, 0.0, 0.0]
            observation, reward, terminated, truncated, info = env.step(action)
        steps = 0
        reward_sum = 0
        reward_size = 0
        reward_exp = 0
        while True:
            state_img = np.copy(observation)
            state_img = crop(state_img)
            state_img = rgb_to_grey(state_img).reshape((84,84,1))
            episode_states.append(state_img)
            
            agent_img = np.copy(observation)
            agent_img = agent_process_img(agent_img)
            steering = get_steering(agent_img,56)
            action = [steering, 0.025+np.random.rand()*0.950, np.random.rand()*0.125]
            episode_actions.append(action)
            
            observation, reward, terminated, truncated, info = env.step(action)
            steps += 1
            if reward > 0:
                reward_size = reward
                reward_sum += reward
                reward_exp = -2.128*(reward_size**2) + 20.65*reward_size + 919.0
                
            if (round(reward_sum) == round(reward_exp)):
                states += episode_states
                actions += episode_actions
                break
            if steps >= 2000:
                break
    return np.array(states), np.array(actions)

In [None]:
states, actions = gen_dataset(10_000)
states_ds = tf.data.Dataset.from_tensor_slices(states) 
actions_ds = tf.data.Dataset.from_tensor_slices(actions) 
dataset = tf.data.Dataset.zip((states_ds, actions_ds))
dataset = dataset.batch(64)
tf.data.Dataset.save(dataset, "./dataset")

In [None]:
supervised_model = tf.keras.Sequential([
    tf.keras.layers.Conv2D(16, kernel_size=8, input_shape=(84,84,1)),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Dropout(0.2),
    tf.keras.layers.Conv2D(32, kernel_size=4),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Dropout(0.2),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(256, activation="relu"),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Dropout(0.2),
    tf.keras.layers.Dense(3, activation="linear")
])
supervised_model.compile(optimizer="adam", loss="huber")
supervised_model.summary()

In [None]:
history = supervised_model.fit(dataset, epochs=15)

In [None]:
guesses = supervised_model.predict(states)

In [None]:
for i in range(0,10000,1000):
    print(f"guess = {guesses[i]}")
    print(f"actual = {actions[i]}")
    print("")

In [None]:
supervised_model.save_weights('./checkpoints/my_checkpoint')