In [None]:
#importing necessary libraries
import pygame
from Core_Game_Parts import *
import tensorflow as tf
from keras.models import Sequential
from keras.layers import Dense
from collections import deque
import matplotlib.pyplot as plt
import random

import os         
os.environ["SDL_VIDEODRIVER"] = "dummy" 

In [None]:
def model():
    """Builds a simple feedforward neural network model.

    Returns:
        model: A Keras Sequential model instance.
    """
    model=Sequential(
        [Dense(32,activation='relu',input_shape=(4,)),
         Dense(16,activation='relu'),
         Dense(3,activation='linear')]
    )
    optimizer = tf.keras.optimizers.Adam(learning_rate=0.0001)
    model.compile(optimizer=optimizer, loss='mse')
    return model
ai_model=model()
ai_model.summary()
    

In [None]:
class DQNAgent:
    """
    A Deep Q-Network (DQN) agent for reinforcement learning.
    Attributes:
        model: The neural network model used for approximating Q-values.
        memory: A deque to store past experiences for experience replay.
        gamma: Discount factor for future rewards.
        epsilon: Exploration rate for the epsilon-greedy policy.
        epsilon_min: Minimum exploration rate.
        epsilon_decay: Decay rate for exploration after each training episode.
        batch_size: Size of the minibatch for training.
    Methods:
        remember: Store an experience in memory.
        choose_action: Select an action based on the current state using an epsilon-greedy policy.
        train_from_memory: Train the model using a minibatch of experiences from memory.
    """
    def __init__(self, model):
        self.model = model
        self.memory = deque(maxlen=20000) # Increased memory size for better learning
        self.target_model = tf.keras.models.clone_model(self.model) # Target model for stability
        self.update_target_model() # Initialize target model
        self.gamma = 0.95  # Discount factor: how much to value future rewards
        self.epsilon = 1.0  # Exploration rate: initial probability of taking a random action
        self.epsilon_min = 0.01 # Minimum exploration rate
        self.epsilon_decay = 0.999  # Decay rate for exploration
        self.batch_size = 128 # Increased batch size for more stable training
        self.target_update_counter = 0 # Counter to track when to update the target model

    def update_target_model(self):
        """Copies the weights from the main model to the target model."""
        self.target_model.set_weights(self.model.get_weights())
    def remember(self, state, action, reward, next_state, done):
        """Stores an experience tuple in the agent's memory."""
        self.memory.append((state, action, reward, next_state, done))

    def choose_action(self, state):
        """
        Selects an action based on the current state using an epsilon-greedy policy.
        With probability epsilon, it takes a random action (exploration).
        Otherwise, it takes the best known action (exploitation).
        """
        if np.random.rand() <= self.epsilon:
            return random.randrange(3)  # Return a random action (0, 1, 2)
        
        # Predict Q-values for the given state and choose the action with the highest Q-value
        q_values = self.model.predict(np.reshape(state, [1, 4]), verbose=0)
        return np.argmax(q_values[0])

    def train_from_memory(self):
        """Train the DQN agent using experiences from memory.

        Returns:
            float: The training loss.
        """
        if len(self.memory) < self.batch_size:
            return None # Return None if not training

        minibatch = random.sample(self.memory, self.batch_size)
        states = np.array([experience[0] for experience in minibatch])
        actions = np.array([experience[1] for experience in minibatch])
        rewards = np.array([experience[2] for experience in minibatch])
        next_states = np.array([experience[3] for experience in minibatch])
        dones = np.array([experience[4] for experience in minibatch])

        current_q_values = self.model.predict(states, verbose=0)
        next_q_values = self.target_model.predict(next_states, verbose=0)

        targets = rewards + self.gamma * np.amax(next_q_values, axis=1) * (1 - dones)
        
        for i, action in enumerate(actions):
            current_q_values[i][action] = targets[i]

        # FIX: Capture the history object to get the loss
        history = self.model.fit(states, current_q_values, epochs=1, verbose=0)
        loss = history.history['loss'][0]

        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

        return loss

In [None]:
def model_game_step(action, car, track_image, current_checkpoint):
    """
    Simulates a game step for the car in the racing environment.

    Args:
        action (int): Action to be taken by the car (0: left, 1: straight, 2: right, 3: brake).
        car (Car): The car object representing the player's car.
        track_image (Surface): The image of the track.
        current_checkpoint (int): The index of the current checkpoint.

    Returns:
        tuple: A tuple containing the new state, done flag, reward, and current checkpoint.
    """
    done = False
    
    # --- NEW MULTI-COMPONENT REWARD SYSTEM ---

    # 1. Base reward: A small penalty for each step taken. 
    # This encourages the agent to finish the lap faster.
    reward = -0.5
    
    if car.speed < 0.3:
        reward -= 2  # Extra penalty for being too slow

    # 2. Reward for Speed: Encourage the car to move forward, not stand still.
    # The reward is proportional to its speed.
    reward += car.speed * 0.5

    # 3. Reward for Progress: This is the most important part.
    # We get the sensor readings (the state) and reward the agent
    # for having a clear path ahead. The middle sensor (state[1]) looks forward.
    current_state, _ = ray_casting(car, track_image)
    # The farther the wall, the higher the reward.
    reward += current_state[1] * 0.01
    
    is_turning = action == 0 or action == 1
    is_braking = action == 2
    if is_turning and car.speed > (MAX_SPEED * 0.5): # Encourage braking only at high speeds.
        if is_braking:
            reward += 1.0 # Reward for braking in a turn.

    # 4. Penalty for Sharp Turns: Discourage frantic wiggling.
    # Encourage smoother driving by penalizing turning actions slightly.
    if action == 0: # Actions for left turns
        reward -= 0.2

    # --- CAR PHYSICS (No changes here) ---
    car.speed += ACCELERATION
    if car.speed > 0:
        speed_factor = car.speed / MAX_SPEED
        dynamic_turn_angle = MAX_TURN_ANGLE - (speed_factor) * (MAX_TURN_ANGLE - MIN_TURN_ANGLE)
        if action == 0:  # Left
            car.angle += dynamic_turn_angle
        elif action == 1:  # Right
            car.angle -= dynamic_turn_angle
    
    if action == 2: # Brake
        car.speed -= BRAKE_FORCE
    
    car.speed -= FRICTION
    car.speed = max(0, min(car.speed, MAX_SPEED))
    car.move()
    
    # --- GOAL-BASED REWARDS (Checkpoints and Crashing) ---
    checkpoint_rects = [pygame.Rect(x, y, w, h) for x, y, w, h, a in checkpoint_data]
    
    # 5. Large reward for hitting a checkpoint.
    if current_checkpoint < len(checkpoint_rects):
        if car.rect.colliderect(checkpoint_rects[current_checkpoint]):
            current_checkpoint += 1
            reward += 200 # Large positive reward
            print(f"Checkpoint {current_checkpoint} reached!")

    # 6. Very large reward for finishing the lap.
    if current_checkpoint == len(checkpoint_rects) and car.rect.colliderect(finish_line_rect):
        reward += 1000
        current_checkpoint = 0
        print("Lap finished!")

    # 7. Large penalty for crashing.
    try:
        pixel_color = track_image.get_at((int(car.x), int(car.y)))[:3]
        if pixel_color == DRAW_COLOR:
            done = True
    except IndexError:
        done = True
        
    if done:
        reward = -100 # Keep a significant penalty for crashing, but not as extreme as -100

    new_state, _ = ray_casting(car, track_image)
    return new_state, done, reward, current_checkpoint

In [None]:
def train_dqn(episodes=500):
    """
    Main function to train the DQN agent with enhanced logging and plotting.
    """
    pygame.init()
    screen = pygame.display.set_mode((SCREEN_WIDTH, SCREEN_HEIGHT)) # Keep commented out for speed
    track_surface = pygame.image.load(TRACK_IMAGE_PATH).convert()
    
    ai_model = model()
    agent = DQNAgent(ai_model)

    # NEW: Add lists to store history for new plots
    scores_history = []
    loss_history = []
    checkpoints_history = []
    max_speed_history = []
    
    for e in range(episodes):
        car = Car(CAR_IMAGE_PATH, DEFAULT_START_X, DEFAULT_START_Y, DEFAULT_START_ANGLE, DEFAULT_START_SPEED)
        current_checkpoint = 0
        
        distances, _ = ray_casting(car, track_surface)
        speed = car.speed / MAX_SPEED 
        state = np.array(distances + [speed]) 
        state = np.reshape(state, [1, 4])
        
        total_reward = 0
        max_steps_per_episode = 2000
        max_speed_episode = 0

        for step in range(max_steps_per_episode):
            action = agent.choose_action(state)
            distances_next, done, reward, new_checkpoint = model_game_step(action, car, track_surface, current_checkpoint)
            
            max_speed_episode = max(max_speed_episode, car.speed)
            
            total_reward += reward
            speed_next = car.speed / MAX_SPEED 
            next_state = np.array(distances_next + [speed_next])
            next_state = np.reshape(next_state, [1, 4])
            current_checkpoint = new_checkpoint
            
            agent.remember(state[0], action, reward, next_state[0], done)
            state = next_state
            
            if step % 4 == 0: # Train every 8 steps for speed
                loss = agent.train_from_memory()
                if loss is not None:
                    loss_history.append(loss)
            
            if done:
                break
        
        if e % 5 == 0:
            agent.update_target_model()
            print(f"--- Target Network Updated at Episode {e+1} ---")
            
        # NEW: Append the new metrics to their history lists
        scores_history.append(total_reward)
        checkpoints_history.append(current_checkpoint)
        max_speed_history.append(max_speed_episode)
        
        print(
            f"Episode: {e+1}/{episodes}, "
            f"Score: {total_reward:.2f}, "
            f"Max Speed: {max_speed_episode:.2f}, "
            f"Checkpoints: {current_checkpoint}, "
            f"Epsilon: {agent.epsilon:.2f}"
        )

        if (e + 1) % 50 == 0:
            ai_model.save_weights(f"dqn_car_weights_episode_{e+1}.weights.h5")

    pygame.quit()

# --- MODIFIED: Enhanced plotting section with Bar Chart ---
    plt.style.use('seaborn-v0_8-darkgrid')
    
    # NEW: Create 4 subplots instead of 3, and increase the figure size
    fig, (ax1, ax2, ax3, ax4) = plt.subplots(4, 1, figsize=(12, 22)) 

    # --- Graph 1: Score ---
    ax1.set_title('Agent Score Over Time')
    ax1.set_xlabel('Episode')
    ax1.set_ylabel('Total Reward (Score)')
    ax1.plot(scores_history, label='Score per Episode', color='royalblue')
    ax1.legend()

    # --- Graph 2: Max Speed ---
    ax2.set_title('Max Speed Achieved per Episode')
    ax2.set_xlabel('Episode')
    ax2.set_ylabel('Max Speed')
    ax2.plot(max_speed_history, label='Max Speed', color='purple')
    ax2.legend()
    
    # --- Graph 3: Model Loss ---
    ax3.set_title('Model Loss Over Time')
    ax3.set_xlabel('Training Step')
    ax3.set_ylabel('MSE Loss')
    ax3.plot(loss_history, label='Training Loss', color='orangered', alpha=0.7)
    ax3.legend()

    # --- NEW: Graph 4: Checkpoints Bar Chart ---
    ax4.set_title('Checkpoints Cleared per Episode')
    ax4.set_xlabel('Episode')
    ax4.set_ylabel('Checkpoints Cleared')
    # Use ax4.bar() to create the bar chart
    episodes = range(len(checkpoints_history))
    ax4.bar(episodes, checkpoints_history, color='forestgreen', label='Checkpoints')
    # Set y-axis to be integers since you can't clear half a checkpoint
    ax4.yaxis.set_major_locator(plt.MaxNLocator(integer=True)) 
    ax4.legend()

    plt.tight_layout()
    plt.show()
    # --- Optional: Add a histogram for checkpoint distribution ---
    plt.figure(figsize=(8, 6))
    plt.title('Distribution of Checkpoints Cleared')
    plt.xlabel('Number of Checkpoints Cleared in an Episode')
    plt.ylabel('Number of Episodes')
    plt.hist(checkpoints_history, bins=range(max(checkpoints_history) + 2), align='left', rwidth=0.8)
    plt.grid(axis='y', alpha=0.75)
    plt.show()

In [None]:
if __name__=="__main__":
    train_dqn(episodes=20)