In [7]:
import numpy as np
import gymnasium as gym
from collections import defaultdict
import matplotlib.pyplot as plt
from IPython.display import clear_output
import cv2
import random
import time
import math
import pickle

In [1]:
def get_speed_wheel(image):
    '''
    extract the speed and the wheel from the image, the speed is represented by the number of white pixels in the speedometer and their intensity
    the wheel is represented by the number of green pixels
    both extracted values are normalized:
    speed: (0, 1, 2, ..., 20)
    wheel: (-10, -8, -7, -5, -4, -2, -1, 0, 1, 2, 4, 5, 7, 8, 10)
    '''
    speed_img = image[90:94,12:14]
    w1 = np.all(speed_img > [240, 240, 240], axis=-1)
    w2 = np.all(speed_img > [210, 210, 210], axis=-1)
    w3 = np.all(speed_img > [180, 180, 180], axis=-1)
    w4 = np.all(speed_img > [150, 150, 150], axis=-1)
    w5 = np.all(speed_img > [120, 120, 120], axis=-1)
    w6 = np.all(speed_img > [90, 90, 90], axis=-1)
    w7 = np.all(speed_img > [60, 60, 60], axis=-1)
    w8 = np.all(speed_img > [30, 30, 30], axis=-1)
    speed1 = np.sum(w1)
    speed2 = np.sum(w2)
    speed3 = np.sum(w3)
    speed4 = np.sum(w4)
    speed5 = np.sum(w5)
    speed6 = np.sum(w6)
    speed7 = np.sum(w7)
    speed8 = np.sum(w8)
    speed = math.ceil((speed1 + speed2 + speed3 + speed4 + speed5 + speed6 + speed7 + speed8) / 2)
    if speed > 20:
        speed = 20 # (0, 1, 2, ..., 20)

    wheel_image = image[86:92,36:60]
    R, G, B = wheel_image[:, :, 0], wheel_image[:, :, 1], wheel_image[:, :, 2]
    green_mask = (G > 250) & (R == 0) & (B == 0)
    left_green = green_mask[:,:12]
    right_green = green_mask[:,12:]
    left_count = np.sum(left_green)
    right_count = np.sum(right_green)
    if left_count > 0 and right_count > 0:
        print('ERR')
    green_pixels = max(left_count, right_count)
    wheel = math.ceil(green_pixels / 4) # (-10, -8, -7, -5, -4, -2, -1, 0, 1, 2, 4, 5, 7, 8, 10)
    if right_count > 0:
        wheel *= -1

    return speed, wheel

def highlight_track(image):
    '''
    This function removes red and green pixels from the image, removing curbs and grass and highlighting the track
    '''
    r, g, b = cv2.split(image)
    mask_white_only = (r > 130) & (g > 130) & (b > 130)
    mask_white = np.zeros_like(mask_white_only, dtype=bool)
    mask_white[mask_white_only] = True

    mask_diff_only = (np.abs(r - g) > 10) | (np.abs(r - b) > 10) | (np.abs(g - b) > 10)
    mask_diff = np.zeros_like(mask_diff_only, dtype=bool)
    mask_diff[mask_diff_only] = True
    mask = mask_diff | mask_white

    image[mask > 0] = [255, 255, 255]
    image[mask == 0] = [0, 0, 0]
    return image


def delete_nears(edge_indices):
    '''
    This function deletes the near indices in the edge_indices list:
    it is useful to avoid multiple detections of the same edge
    (the nears pixels of an edge are detected needs to be counted as one edge)
    '''
    pred = -100
    ret = []
    for i in edge_indices:
        if pred + 1 < i:
            ret.append(i)
        pred = i
    return ret

def get_car_position(black_track, edges):
    '''
    this function returns the position of the car respect to the track:
    -1 if the car is on the left, 1 if the car is on the right, 0 if the car is at the center, -2 if the car is out of the track
    black_track: the image of the track colored in black and everything else in white
    edges: the edges of the track detected by the Canny algorithm
    '''
    height, width, _ = black_track.shape # 34,52

    if not all(black_track[height - 1, int(width / 2)] == 0): # the car is out of the track
        if all(black_track[height - 1, int(width / 2) + 3] == 0): # the car is near enough to the left border
            return -1
        elif all(black_track[height - 1, int(width / 2) - 3] == 0): # the car is near enough to the right border
            return 1
        else:
            return -2 # the car is out of the track

    edges_at_bottom_left = delete_nears(np.where(edges[height - 1, :int(width / 2)] > 0)[0])
    edges_at_bottom_right = delete_nears(np.where(edges[height - 1, int(width / 2):] > 0)[0])
    left_border = edges_at_bottom_left[-1] if len(edges_at_bottom_left) > 0 else 0
    right_border = edges_at_bottom_right[0] + int(width / 2) if len(edges_at_bottom_right) > 0 else width

    if left_border + 3 > width / 2:
        return -1 # the car is on the left respect to the track
    elif right_border - 3 < width / 2:
        return 1 # the car is on the right respect to the track
    else:
        return 0 # the car is inside the track

def find_edges_at_y(black_track, edges, y, position):
    '''
    this function returns the edges of the track at a certain y coordinate
    black_track: the image of the track colored in black and everything else in white
    edges: the edges of the track detected by the Canny algorithm
    y: the y coordinate where to find the edges
    position: the position of the car respect to the track
    '''
    height, width = edges.shape # 34,52

    up_indices = delete_nears(np.where(edges[0, :] > 0)[0])
    left_indices = delete_nears(np.where(edges[:, 0] > 0)[0])
    right_indices = delete_nears(np.where(edges[:, width - 1] > 0)[0])

    borders_at_y = delete_nears(np.where(edges[y, :] > 0)[0])

    # we know that the car is inside the track
    if len(borders_at_y) > 2:
        # take the nearest borders on the right and the nearest on the left
        borders_at_y_left = [elem for elem in borders_at_y if elem < (width / 2)]
        borders_at_y_right = [elem for elem in borders_at_y if elem > (width / 2)]
        return borders_at_y_left[-1], borders_at_y_right[0] # take the 2 nearest to the center

    elif len(borders_at_y) < 2:
        if len(right_indices) > 0 and len(left_indices) > 0: #one or more borders enters right and one or more borders enters left
            if right_indices[0] > left_indices[0]: # the track goes from right-bottom to left-top
                return -2, -2 # turn to left
            else: # the track goes from left-bottom to right-top
                return -1, -1 # turn to right
        elif len(right_indices) > 0: #one or more borders enters right
            return -1, -1 # turn to right
        elif len(left_indices) > 0: #one or more borders enters left
            return -2, -2 # turn to left
        '''
        if at a certain y there are less than 2 borders, and none of them are on the right or left, then there must be 3 borders entering down and 1 entering up
        or 1 border entering down and 1 entering up
        or 2 borders entering down and 0 entering up
        '''
        if len(up_indices) == 1:
            up_index = up_indices[0]

            if all(black_track[0, up_index - 3] == 0): # the track is on the left respect to the border
                return -2, -2 # turn to left
            else: # the track is on the right respect to the border
                return -1, -1 # turn to right
        else:
            return None, None # going straight at the end of a curve (near to out of the track)
    else:
        return borders_at_y[0], borders_at_y[1]


def discretize_state(state):
    '''
    this function discretize the state in order to use it as a key in the Q table
    '''
    zoom_state = state[20:60, 22:74]
    h_t = highlight_track(zoom_state)
    edges = cv2.Canny(h_t, threshold1=80, threshold2=120)

    position = get_car_position(h_t, edges)
    if position == -2: # the car is out of the track
        return None

    speed, wheel = get_speed_wheel(state)

    points_ahead = [39, 14]
    angles = []
    for y in points_ahead:
        y_a = y - 14
        l1, r1 = find_edges_at_y(h_t, edges, y_a, position)
        l2, r2 = find_edges_at_y(h_t, edges, y, position)

        if None in (l1, r1, l2, r2):
            return None
        if -1 in (l2, r2):
            angles.append(-0.9)
            continue
        elif -1 in (l1, r1):
            angles.append(-0.8)
            continue
        elif -2 in (l2, r2):
            angles.append(0.9)
            continue
        elif -2 in (l1, r1):
            angles.append(0.8)
            continue

        angle_left = np.arctan2((l2 - l1), (y - y_a))
        angle_right = np.arctan2((r2 - r1), (y - y_a))
        track_direction = (angle_left + angle_right) / 2.0
        a = round(track_direction, 2)
        while(a * 100 % 5 != 0):
            a = round(a + 0.01, 2)
        a = float(a)
        angles.append(a)

    angles.append(speed)
    angles.append(wheel)
    angles.append(position)
    return tuple(angles)#, edges# add to the angles info the speed info and wheel info


def EpsGreedyPolicy(Q, env, state, epsilon):
    if np.random.rand() < epsilon:
        return env.action_space.sample()  # random action
    else:
        return np.argmax(Q[state])  # choose action with higher Q

def RandomPolicy(env):
    return env.action_space.sample()  # random action

def GreedyPolicy(Q, state, env):
    if state not in Q:
        return env.action_space.sample()  # random action
    return np.argmax(Q[state])  # choose action with higher Q

def update_q_table(Q, state, action, reward, next_state, alpha, gamma):
    best_next_action = np.argmax(Q[next_state])
    td_target = reward + gamma * Q[next_state][best_next_action]
    td_delta = td_target - Q[state][action]
    Q[state][action] += alpha * td_delta

In [3]:
def qlearn(Q, env, alpha, gamma, num_episodes, epsilon=0.9, min_epsilon=0.15):
    for episode in range(num_episodes):
        if (episode +1) % 200 == 0:
            print(f"Current reward: {evaluate(Q, env, gamma, policy='greedy')}")
        state, _ = env.reset()
        state = (0.0, 0.0, 0, 0, 0) # initial state
        truncated = terminated = False
        for step in range(1000):
            if terminated or truncated:
                break
            if step < 45: # skip the first 45 steps to avoid the initial phase of zooming in the track
                _, _, _, _, _ = env.step(0)
                continue
            action = EpsGreedyPolicy(Q, env, state, epsilon)
            next_state, reward, terminated, truncated, _ = env.step(action)

            next_state = discretize_state(next_state)
            if next_state is None: # out of track or limit case
                next_state = ()
                print('OUT')
                reward = -100
                truncated = True

            update_q_table(Q, state, action, reward, next_state, alpha, gamma)
            state = next_state

        epsilon = epsilon * 0.9993
        epsilon = max(epsilon, min_epsilon)
        print(f"Episode {episode + 1}/{num_episodes}, epsilon: {epsilon}, steps: {step + 1}/{1000}")


In [4]:
def evaluate(Q, env, gamma, render=False, n_episodes=10, policy='random'):
    """Perform rollouts and compute the average discounted return."""
    sum_reward = 0.0
    state, _ = env.reset()
    state = (0.0, 0.0, 0, 0, 0) # initial state
    ep = 0
    truncated = terminated = False

    step = 0
    while True:
        if truncated or terminated:
            print("New episode")
            _, _ = env.reset()
            state = (0.0, 0.0, 0, 0, 0)
            ep += 1
            step = 0
            if ep >= n_episodes:
                break
        if step < 45:
            _, _, terminated, truncated, _ = env.step(0)
            step += 1
            continue

        if policy == 'greedy':
            action = GreedyPolicy(Q, state, env)
        else:
            action = RandomPolicy(env)

        state, reward, terminated, truncated, info = env.step(action)
        state = discretize_state(state)
        if state is None:
            state = ()

        step += 1
        sum_reward += reward

        if render:
          if random.random() < 0.1:
              print("state " + str(state))
              print("reward " + str(reward))
              print("truncated " + str(truncated))
              print("terminated " + str(terminated))
              print("step " + str(step))
              print("info " + str(info))
              print("action " + str(actions[action]))
              img = env.render()
              show_frame(img)
              time.sleep(0.4)
              clear_output(wait=True)
    return(sum_reward / n_episodes)

In [5]:
alpha = 0.15  # Tasso di apprendimento
gamma = 0.99  # Fattore di sconto
epsilon = 1  # Probabilità di esplorazione # adjusted durinf learning
num_episodes = 4000  # Numero di episodi di addestramento
actions = {0: 'N', 1: 'L', 2: 'R', 3: 'A', 4: 'B'}

In [8]:
env = gym.make("CarRacing-v2", domain_randomize=False, continuous=False, render_mode="rgb_array")
Q = defaultdict(lambda: np.zeros(env.action_space.n))

In [None]:
''' load a previously trained Q table '''
# Q_dict = pickle.load(open("QT/Q_table_2.pkl", "rb"))
# Q = defaultdict(lambda: np.zeros(env.action_space.n), Q_dict)
# evaluate(Q, env, gamma, render=True, n_episodes=1, policy='random')

In [None]:
avg = evaluate(Q, env, gamma, render=False, n_episodes=5, policy='random')
print(avg)

In [None]:
qlearn(Q, env, alpha, gamma, num_episodes, epsilon=epsilon)

In [None]:
avg = evaluate(Q, env, gamma, render=True, n_episodes=1, policy='greedy')
print(avg)

In [None]:
''' print the Q table '''
# print(len(Q))
# dict_speed = {}
# for i, s in enumerate(Q):
#     if i < 00:
#         continue
#     if i > 4000:
#         break
#     print(s)
#     print(s[:2])
#     if (len(s) <= 1):
#         continue
#     print("speed: " + str(s[2]))
#     print("wheel: " + str(s[3]))
#     print("pos: " + str(s[4]))
#     for j, a in enumerate(Q[s]):
#         if j == 0:
#             print("do nothing: " + str(a))
#         elif j == 1:
#             print("steer right: " + str(a))
#         elif j == 2:
#             print("steer left: " + str(a))
#         elif j == 3:
#             print("accelerate: " + str(a))
#         elif j == 4:
#             print("brake: " + str(a))

#     print('#####')

In [None]:
''' save the Q table and the hyperparameters and results '''
# Q_dict = dict(Q)
# with open('QT/Q_table_13.pkl', 'wb') as f:
#     pickle.dump(Q_dict, f)
# with open('QT/hyperparameters_13.txt', 'w') as f:
#     f.write(f"alpha: {alpha}\n")
#     f.write(f"gamma: {gamma}\n")
#     f.write(f"epsilon: {epsilon}\n")
#     f.write(f"num_episodes: {num_episodes}\n")
#     f.write(f"max_steps: {max_steps}\n")
#     f.write(f"eps_min: {0.15}\n")
#     f.write(f"eps_decay: {0.9992}\n")
#     f.write(f"actions: {actions}\n")
#     f.write(f"angles_distances: {39, 14}\n")
#     f.write(f"range: 14\n")
#     f.write(f"zoom-size: 20,26; 22,74\n")
#     f.write(f"Q_table_size: {len(Q)}\n")
#     f.write(f"avg_reward: {evaluate(Q, env, gamma, render=False, n_episodes=15, policy='greedy')}\n")