In [None]:
#success -ve
import pygame
import numpy as np
import math
import random
import tensorflow as tf
from collections import deque
import pandas as pd
import os



In [None]:
class DeepQNetwork(tf.keras.Model):
    def __init__(self, action_space):
        super(DeepQNetwork, self).__init__()
        self.dense1 = tf.keras.layers.Dense(128, activation='relu')
        self.dense2 = tf.keras.layers.Dense(128, activation='relu')
        self.output_layer = tf.keras.layers.Dense(action_space, activation='linear')

    def call(self, state):
        x = self.dense1(state)
        x = self.dense2(x)
        return self.output_layer(x)
    
    def compile_model(self):
        self.compile(optimizer='adam', loss='mse')

class ReplayBuffer:
    def __init__(self, buffer_size):
        self.buffer = deque(maxlen=buffer_size)

    def add(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return np.array(states), np.array(actions), np.array(rewards), np.array(next_states), np.array(dones)



In [None]:
class DQNAgent:
    def __init__(self, state_space, action_space, buffer_size=10000, batch_size=64, gamma=0.99, epsilon=1.0, epsilon_decay=0.995, epsilon_min=0.01):
        self.state_space = state_space
        self.action_space = action_space
        self.batch_size = batch_size
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = epsilon_min
        self.buffer = ReplayBuffer(buffer_size)
        self.model = DeepQNetwork(action_space)
        self.model.compile_model()
        self.target_model = DeepQNetwork(action_space)
        self.target_model.compile_model()
        self.update_target_model()

        # Define model saving parameters
        self.model_save_folder = "C:/AMRITA/Sem6/RL/forward_negative"
        self.checkpoint_rewards = [100]  # Initial reward for the first checkpoint set to 100
        self.next_checkpoint_reward_multiplier = 2  # Multiplier for next checkpoint reward
        self.penalty_reward = -50  # Penalty for trying to backtrack

        # Create the model saving directory if it does not exist
        if not os.path.exists(self.model_save_folder):
            os.makedirs(self.model_save_folder)

    def update_target_model(self):
        self.target_model.set_weights(self.model.get_weights())

    def choose_action(self, state):
        if np.random.rand() <= self.epsilon:
            return np.random.choice(self.action_space)
        q_values = self.model.predict(state)
        return np.argmax(q_values[0])

    def remember(self, state, action, reward, next_state, done):
        self.buffer.add(state, action, reward, next_state, done)

    def replay(self):
        if len(self.buffer.buffer) < self.batch_size:
            return
        states, actions, rewards, next_states, dones = self.buffer.sample(self.batch_size)
        target = self.model.predict(states)
        target_next = self.target_model.predict(next_states)
        for i in range(self.batch_size):
            if dones[i]:
                target[i][actions[i]] = rewards[i]
            else:
                target[i][actions[i]] = rewards[i] + self.gamma * np.amax(target_next[i])
        self.model.fit(states, target, epochs=1, verbose=0)
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

    def save_model(self, episode):
        if episode % 500 == 0:
            model_save_path = os.path.join(self.model_save_folder, f'dqn_model_episode_{episode}.h5')
            self.model.save(model_save_path)
            print(f"Model saved successfully for episode {episode} in {model_save_path}.")


In [None]:

class AutonomousCarsEnv:
    def __init__(self, agent, track_size=(800, 600), num_cars=1, num_checkpoints=10, max_steps_per_episode=1000):
        self.track_size = track_size
        self.num_cars = num_cars
        self.car_size = (40, 20)
        self.forward_speed = 8
        self.rotation_angle = 10
        self.agent = agent
    
        # Initialize Pygame
        pygame.init()
        self.screen = pygame.display.set_mode(self.track_size)
        pygame.display.set_caption("Autonomous Cars Environment")
    
        # Load car image
        self.car_image = pygame.image.load("C:\AMRITA\Sem6\RL\car.png")
        self.car_image = pygame.transform.scale(self.car_image, self.car_size)
    
        # Colors
        self.background_color = (0, 255, 0)
        self.track_color = (0, 0, 0)
        self.start_line_color = (255, 0, 0)  # Changed start line color to red
    
        # Track parameters
        self.track_width = self.car_size[0]
        self.inner_semi_major_axis = 200
        self.inner_semi_minor_axis = 100
        self.outer_semi_major_axis = self.inner_semi_major_axis + self.track_width
        self.outer_semi_minor_axis = self.inner_semi_minor_axis + self.track_width

        # Create track vertices (elliptical loop)
        self.track_inner_vertices, self.track_outer_vertices = self.create_track()  # Assign track vertices here
    
        # Start line position
        self.start_line_pos = self.generate_start_line_point()
    
        # Checkpoints
        self.num_checkpoints = num_checkpoints
        self.checkpoints, self.checkpoint_rewards = self.generate_checkpoints()  # Assign checkpoints and rewards
        self.current_checkpoint = 0
    
        # Initialize car position and angle
        self.reset()
    
        # Font for rendering text
        self.font = pygame.font.SysFont('Arial', 24)
    
        # Max steps per episode
        self.max_steps_per_episode = max_steps_per_episode
        self.current_step = 0
        self.lap_count = 0  # Added lap count

        # Create CSV file for logging
        self.log_file = 'C:/AMRITA//Sem6/RL/negative_episode_logs.csv'
        self.create_log_file()

    def create_log_file(self):
        if not os.path.isfile(self.log_file):
            df = pd.DataFrame(columns=['Episode', 'Cumulative Reward', 'Distance Traveled', 'Lap Count'])
            df.to_csv(self.log_file, index=False)

    def log_episode(self, episode, cumulative_reward, distance_traveled):
        df = pd.read_csv(self.log_file)
        new_entry = {'Episode': episode, 'Cumulative Reward': cumulative_reward, 'Distance Traveled': distance_traveled, 'Lap Count': self.lap_count}
        df = pd.concat([df, pd.DataFrame([new_entry])], ignore_index=True)
        df.to_csv(self.log_file, index=False)

    def generate_start_line_point(self):
        angle = 0
        radians = math.radians(angle)
        center = (self.track_size[0] // 2, self.track_size[1] // 2)
        start_x = center[0] + (self.inner_semi_major_axis + self.track_width / 2) * math.cos(radians)
        start_y = center[1] + (self.inner_semi_minor_axis + self.track_width / 2) * math.sin(radians)
        end_x = center[0] + (self.inner_semi_major_axis + self.track_width / 2) * math.cos(radians + math.pi)
        end_y = center[1] + (self.inner_semi_minor_axis + self.track_width / 2) * math.sin(radians + math.pi)
        return (start_x, start_y), (end_x, end_y)

    def create_track(self):
        track_inner_vertices = []
        track_outer_vertices = []
        center = (self.track_size[0] // 2, self.track_size[1] // 2)
        for angle in range(0, 360, 5):
            radians = math.radians(angle)
            inner_x = center[0] + self.inner_semi_major_axis * math.cos(radians)
            inner_y = center[1] + self.inner_semi_minor_axis * math.sin(radians)
            outer_x = center[0] + self.outer_semi_major_axis * math.cos(radians)
            outer_y = center[1] + self.outer_semi_minor_axis * math.sin(radians)
            track_inner_vertices.append((inner_x, inner_y))
            track_outer_vertices.append((outer_x, outer_y))
        return track_inner_vertices, track_outer_vertices

    def generate_checkpoints(self):
        checkpoints = []
        rewards = []
        track_length = len(self.track_outer_vertices)
        checkpoint_indices = np.linspace(0, track_length - 1, self.num_checkpoints, dtype=int)
        for idx in checkpoint_indices:
            checkpoint = self.track_outer_vertices[idx]
            direction = np.array(checkpoint) - np.array(self.track_inner_vertices[idx])
            scaled_direction = direction / np.linalg.norm(direction)
            new_checkpoint = tuple(np.array(checkpoint) - scaled_direction * 20)  # Convert to tuple
            checkpoints.append(new_checkpoint)
            # Assign reward based on checkpoint index or any other logic
            rewards.append(100 * (idx + 1))  # Example: increasing reward based on checkpoint index
        return checkpoints, rewards
    def step(self, action):
        self.current_step += 1
        old_pos = self.car_pos.copy()  # Keep track of the old position for distance calculation

        if action == 0:  # Move forward and slightly right
            self.car_pos[0] += self.forward_speed * math.cos(math.radians(self.car_angle))
            self.car_pos[1] -= self.forward_speed * math.sin(math.radians(self.car_angle))
            self.car_angle += self.rotation_angle / 2
        elif action == 1:  # Move forward
            self.car_pos[0] += self.forward_speed * math.cos(math.radians(self.car_angle))
            self.car_pos[1] -= self.forward_speed * math.sin(math.radians(self.car_angle))
        elif action == 2:  # Move forward and slightly left
            self.car_pos[0] += self.forward_speed * math.cos(math.radians(self.car_angle))
            self.car_pos[1] -= self.forward_speed * math.sin(math.radians(self.car_angle))
            self.car_angle -= self.rotation_angle / 2

        distance_traveled = np.linalg.norm(np.array(self.car_pos) - np.array(old_pos))
    
        reward = -1  # Small negative reward for each step

        if not self.is_inside_track():
            reward = -100  # Large negative reward for going off track
            self.reset()
            return reward, True, distance_traveled
    
        if self.reached_checkpoint():
            reward = self.checkpoint_rewards[self.current_checkpoint]
            self.current_checkpoint += 1
            if self.current_checkpoint == len(self.checkpoints):
                self.current_checkpoint = 0
                self.lap_count += 1  # Increment lap count
    
        if self.current_step >= self.max_steps_per_episode:
            self.reset()
            return reward, True, distance_traveled
    
        return reward, False, distance_traveled

    def is_inside_track(self):
        center = (self.track_size[0] // 2, self.track_size[1] // 2)
        car_rect = pygame.Rect(self.car_pos[0], self.car_pos[1], self.car_size[0], self.car_size[1])
    
        dx_inner = (car_rect.centerx - center[0]) ** 2 / self.inner_semi_major_axis ** 2
        dy_inner = (car_rect.centery - center[1]) ** 2 / self.inner_semi_minor_axis ** 2
        inside_inner_ellipse = dx_inner + dy_inner < 1
    
        dx_outer = (car_rect.centerx - center[0]) ** 2 / self.outer_semi_major_axis ** 2
        dy_outer = (car_rect.centery - center[1]) ** 2 / self.outer_semi_minor_axis ** 2
        inside_outer_ellipse = dx_outer + dy_outer < 1
    
        return inside_outer_ellipse and not inside_inner_ellipse

    def reached_checkpoint(self):
        checkpoint = self.checkpoints[self.current_checkpoint]
        car_center = (self.car_pos[0], self.car_pos[1])
        distance = np.linalg.norm(np.array(checkpoint) - np.array(car_center))
        return distance < 20

    def reset(self):
        start_point = np.array(self.start_line_pos[0])
        self.car_pos = start_point
        self.car_angle = self.calculate_track_tangent_angle()
        self.current_step = 0
        self.current_checkpoint = 0
        self.lap_count = 0  # Reset lap count

    def calculate_track_tangent_angle(self):
        nearest_vertex = self.find_nearest_track_vertex()
        next_vertex = self.track_outer_vertices[(self.track_outer_vertices.index(nearest_vertex) + 1) % len(self.track_outer_vertices)]
        tangent_angle = math.atan2(next_vertex[1] - nearest_vertex[1], next_vertex[0] - nearest_vertex[0]) * 180 / math.pi
        return tangent_angle

    def find_nearest_track_vertex(self):
        min_distance = float('inf')
        nearest_vertex = None
        for vertex in self.track_outer_vertices:
            distance = np.linalg.norm(np.array(vertex) - self.car_pos)
            if distance < min_distance:
                min_distance = distance
                nearest_vertex = vertex
        return nearest_vertex

    def render_text(self, text, position):
        text_surface = self.font.render(text, True, (0, 0, 0))
        self.screen.blit(text_surface, position)

    def render(self, episode, cumulative_reward, total_distance_traveled):
        self.screen.fill(self.background_color)
    
        pygame.draw.polygon(self.screen, self.track_color, self.track_outer_vertices, 0)
        pygame.draw.polygon(self.screen, self.background_color, self.track_inner_vertices, 0)
    
        pygame.draw.circle(self.screen, (255, 0, 0), (int(self.start_line_pos[0][0]), int(self.start_line_pos[0][1])), 5)
    
        for checkpoint in self.checkpoints:
            pygame.draw.circle(self.screen, (255, 255, 255), (int(checkpoint[0]), int(checkpoint[1])), 5)
    
        rotated_car = pygame.transform.rotate(self.car_image, self.car_angle)
        car_rect = rotated_car.get_rect(center=(self.car_pos[0], self.car_pos[1]))
        self.screen.blit(rotated_car, car_rect.topleft)
    
        self.render_text(f"Episode: {episode}", (10, 10))
        self.render_text(f"Cumulative Reward: {cumulative_reward}", (10, 40))
        self.render_text(f"Distance Traveled: {total_distance_traveled}", (10, 70))  # Display distance traveled
        self.render_text(f"Lap Count: {self.lap_count}", (10, 100))  # Added lap count rendering
    
        pygame.display.flip()

    def close(self):
        pygame.quit()



In [None]:
if __name__ == "__main__":
    state_space = 6  # Increased state space to include more features
    action_space = 3
    agent = DQNAgent(state_space, action_space)
    env = AutonomousCarsEnv(agent, max_steps_per_episode=1000)

    running = True
    episode = 0
    cumulative_reward = 0
    total_distance_traveled = 0  # Track total distance traveled
    episode_data = []  # To store episode metrics
    
    while running:
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                running = False
    
        car_angle_rad = math.radians(env.car_angle)
        state = np.array([env.car_pos[0], env.car_pos[1], env.car_angle, np.cos(car_angle_rad), np.sin(car_angle_rad), env.current_checkpoint])
        action = agent.choose_action(state.reshape(1, -1))
        reward, done, distance_traveled = env.step(action)
        car_angle_rad = math.radians(env.car_angle)
        next_state = np.array([env.car_pos[0], env.car_pos[1], env.car_angle, np.cos(car_angle_rad), np.sin(car_angle_rad), env.current_checkpoint])
        agent.remember(state, action, reward, next_state.reshape(1, -1), done)
        agent.replay()
    
        cumulative_reward += reward
        total_distance_traveled += distance_traveled  # Update total distance traveled
        env.render(episode, cumulative_reward, total_distance_traveled)
    
        if done:
            episode += 1
            print(f"Episode: {episode}, Distance Traveled: {total_distance_traveled}, Cumulative Reward: {cumulative_reward}, Lap Count: {env.lap_count}")
            agent.save_model(episode)  # Pass episode number to save_model
            agent.update_target_model()
            # Save model every 500 episodes or at the end of training
            if episode % 500 == 0:
                agent.save_model(episode)
            
            # Log episode data
            env.log_episode(episode, cumulative_reward, total_distance_traveled)
            cumulative_reward = 0
            total_distance_traveled = 0  # Reset total distance traveled
               
    env.close()
    
    # Display metrics in Jupyter Notebook
    episode_df = pd.read_csv('C:/AMRITA/Sem6/RL/negative_episode_logs.csv')
    print(episode_df)
    episode_df.plot(x='Episode', y=['Distance Traveled', 'Cumulative Reward', 'Lap Count'], kind='line', subplots=True)