In [10]:
import numpy as np
import cv2
import gymnasium as gym
from collections import deque, defaultdict
import matplotlib.pyplot as plt
import pygame
ACTIONS = [
    [-1.0, 0.0, 0.0],  # hard left (no gas)
    [ 0.0, 0.0, 0.0],  # no action
    [ 1.0, 0.0, 0.0],  # hard right (no gas)
    [-1.0, 1.0, 0.0],  # hard left + gas
    [ 0.0, 1.0, 0.0],  # straight + gas
    [ 1.0, 1.0, 0.0],  # hard right + gas
]

class CarRacingQL:
    def __init__(self, episodes=1000, state_bins=None, epsilon=0.1, alpha=0.1, gamma=0.9, render=True):
        self.env = gym.make('CarRacing-v3', continuous=False, render_mode='rgb_array')
        self.actions = [0, 1, 2,  3, 4]  # Discrete actions
        self.epsilon = epsilon
        self.alpha = alpha
        self.gamma = gamma
        self.episodes = episodes
        self.render = render

        self.state_bins = state_bins or [10] * 8  # Default to 8 features
        self.Q_table = defaultdict(lambda: np.zeros(len(self.actions)))

        if self.render:
            pygame.init()
            self.screen = pygame.display.set_mode((640, 480))

    def auto_canny(self, image, sigma=0.33):
        v = np.median(image)
        lower = int(max(0, (1.0 - sigma) * v))
        upper = int(min(255, (1.0 + sigma) * v))
        return cv2.Canny(image, lower, upper)

    def estimate_lane_edges(self, norm):
        vertical_profile = np.mean(norm, axis=0)
        threshold = 0.3
        indices = np.where(vertical_profile > threshold)[0]
        if len(indices) > 0:
            return indices[0] / len(vertical_profile), indices[-1] / len(vertical_profile)
        return 0.0, 1.0

    def extract_features(self, obs, obs_history, speed_history):
        car = self.env.unwrapped.car
        vel_vec = car.hull.linearVelocity
        speed = np.linalg.norm([vel_vec.x, vel_vec.y])
        wheel_steering = car.wheels[0].steer / 0.6

        speed_history.append(speed)
        acceleration = (speed_history[-1] - speed_history[-2]) * 50.0 / 10.0 if len(speed_history) > 1 else 0.0
        acceleration = np.clip(acceleration, -1.0, 1.0)

        gray = cv2.cvtColor(obs, cv2.COLOR_RGB2GRAY)
        resized = cv2.resize(gray, (64, 64))
        edges = self.auto_canny(resized)

        norm = resized / 255.0
        half = norm.shape[1] // 2
        left_sum, right_sum = np.sum(norm[:, :half]), np.sum(norm[:, half:])
        center_offset = (right_sum - left_sum) / (left_sum + right_sum + 1e-5)

        obs_history.append(norm)
        curvature = np.mean([
            np.abs(obs_history[i] - obs_history[i - 1]).mean()
            for i in range(1, len(obs_history))
        ]) if len(obs_history) == obs_history.maxlen else 0.0
        curvature = np.clip(curvature * 5.0, 0.0, 1.0)

        lines = cv2.HoughLinesP(edges, 1, np.pi/180, 30, minLineLength=20, maxLineGap=10)
        angles = [np.arctan2(y2 - y1, x2 - x1) for line in lines for x1, y1, x2, y2 in [line[0]]] if lines is not None else []
        avg_lane_angle = np.mean(angles) if angles else 0.0

        sky_color = np.array([120, 174, 255]) / 255.0
        top_pixels = obs[:20, :, :] / 255.0
        off_track = 0 if np.mean(np.linalg.norm(top_pixels - sky_color, axis=2) < 0.1) > 0.3 else 1

        left_edge, right_edge = self.estimate_lane_edges(norm)

        return [
            center_offset,
            curvature,
            avg_lane_angle / np.pi,
            min(speed / 100.0, 1.0),
            acceleration,
            float(off_track),
            left_edge,
            right_edge,
            wheel_steering,
        ]

    # Discretize the state space based on the number of bins for each feature
    # Each feature is discretized into a number of bins defined in self.state_bins
    def discretize_state(self, features):
        return tuple(np.digitize(f, np.linspace(-1, 1, b)) - 1 for f, b in zip(features, self.state_bins))

    def epsilon_greedy(self, state):
        return np.random.choice(self.actions) if np.random.rand() < self.epsilon else np.argmax(self.Q_table[state])

    def update_q_value(self, state, action, reward, next_state, done):
        max_future_q = np.max(self.Q_table[next_state])
        current_q = self.Q_table[state][action]
        target = reward if done else reward + self.gamma * max_future_q
        self.Q_table[state][action] += self.alpha * (target - current_q)

    def render_frame(self, obs):
        cropped = obs[:int(obs.shape[0] * 0.88), :, :]
        surf = pygame.surfarray.make_surface(cropped.swapaxes(0, 1))
        self.screen.blit(pygame.transform.scale(surf, (640, 400)), (0, 0))

        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                raise KeyboardInterrupt
            elif event.type == pygame.KEYDOWN:
                if event.key == pygame.K_q:
                    raise KeyboardInterrupt

        pygame.display.flip()
        pygame.event.pump()


    def run_episode(self):
        obs, _ = self.env.reset()
        obs = obs[:int(obs.shape[0] * 0.88), :, :]
        obs_history = deque(maxlen=5)
        speed_history = deque(maxlen=5)

        features = self.extract_features(obs, obs_history, speed_history)
        state = self.discretize_state(features)

        done, total_reward = False, 0
    
        def skip_frames(env, obs, skip=50):
            for _ in range(skip):
                obs, _, terminated, truncated, _ = env.step(0)  # Use a valid action (e.g., no action)
                if terminated or truncated:
                    return obs, terminated or truncated
            return obs, False
        
        obs, done = skip_frames(self.env, obs, skip=50)

        try:
            while not done:
                action = self.epsilon_greedy(state)
                next_obs, reward, done, _, _ = self.env.step(action)

                if self.render:
                    self.render_frame(next_obs)
                cropped_obs = next_obs[:int(next_obs.shape[0] * 0.88), :, :]
                next_features = self.extract_features(cropped_obs, obs_history, speed_history)
                next_state = self.discretize_state(next_features)

                self.update_q_value(state, action, reward, next_state, done)
                state = next_state
                total_reward += reward
        except KeyboardInterrupt:
            print("\n\nTraining interrupted by user.")
        finally:
            self.close()

        return total_reward

    def train(self):
        rewards = []
        for episode in range(1, self.episodes + 1):
            reward = self.run_episode()
            print(f"Episode {episode}/{self.episodes}, Reward: {reward}")
            rewards.append(reward)
            print(f"Episode {episode}/{self.episodes}, Reward: {reward}")
            
        return rewards

    def close(self):
        if self.render:
            pygame.quit()
        self.env.close()


if __name__ == "__main__":
    agent = CarRacingQL(episodes=100, render=True)
    rewards = agent.train()

    plt.plot(rewards)
    plt.title("Rewards Over Episodes")
    plt.xlabel("Episode")
    plt.ylabel("Total Reward")
    plt.show()

    agent.close()




Training interrupted by user.
Episode 1/100, Reward: -248.2838709677308
Episode 1/100, Reward: -248.2838709677308


error: display Surface quit