In [None]:
import gym
import cv2
import numpy as np
import random
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, Flatten, Dense
from collections import deque
import matplotlib.pyplot as plt
!pip install box2d box2d-py



In [None]:
# Initialize the CarRacing environment from OpenAI Gym
env = gym.make("CarRacing-v2")

# Constants for image preprocessing
IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS = 66, 200, 3
ACTION_SPACE_SIZE = 3  # Three actions: left, right, straight

# Function to preprocess each frame from the environment
def preprocess_frame(frame):
    frame = cv2.resize(frame, (IMG_WIDTH, IMG_HEIGHT))  # Resize
    frame = frame / 255.0  # Normalize to [0,1]
    return frame


In [None]:
class DQN:
    def __init__(self, state_shape, action_space_size):
        self.state_shape = state_shape
        self.action_space_size = action_space_size
        self.model = self.create_model()

    def create_model(self):
        model = Sequential([
            Conv2D(24, (5, 5), strides=(2, 2), activation="relu", input_shape=self.state_shape),
            Conv2D(36, (5, 5), strides=(2, 2), activation="relu"),
            Conv2D(48, (5, 5), strides=(2, 2), activation="relu"),
            Conv2D(64, (3, 3), activation="relu"),
            Conv2D(64, (3, 3), activation="relu"),
            Flatten(),
            Dense(100, activation="relu"),
            Dense(50, activation="relu"),
            Dense(self.action_space_size, activation="linear")  # Output Q-values for actions
        ])
        model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001), loss="mse")
        return model

In [None]:
class DQNAgent:
    def __init__(self, state_shape, action_space_size):
        self.action_space_size = action_space_size
        self.memory = deque(maxlen=2000)  # Experience replay buffer
        self.gamma = 0.99
        self.epsilon = 1.0
        self.epsilon_decay = 0.995
        self.epsilon_min = 0.1
        self.batch_size = 32
        self.dqn = DQN(state_shape, action_space_size)

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_space_size)
        q_values = self.dqn.model.predict(np.expand_dims(state, axis=0))
        return np.argmax(q_values[0])

    def replay(self):
        if len(self.memory) < self.batch_size:
            return

        minibatch = random.sample(self.memory, self.batch_size)
        for state, action, reward, next_state, done in minibatch:
            target = reward
            if not done:
                target = reward + self.gamma * np.amax(self.dqn.model.predict(np.expand_dims(next_state, axis=0))[0])
            target_q_values = self.dqn.model.predict(np.expand_dims(state, axis=0))
            target_q_values[0][action] = target
            self.dqn.model.train_on_batch(np.expand_dims(state, axis=0), target_q_values)

        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

In [None]:
state_shape = (IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS)
agent = DQNAgent(state_shape, ACTION_SPACE_SIZE)
episodes = 500
episode_rewards = []

for episode in range(episodes):
    state = preprocess_frame(env.reset())
    done = False
    total_reward = 0

    while not done:
        action = agent.act(state)

        # Convert action to continuous steering and acceleration for CarRacing
        if action == 0:  # Left
            action_array = [-1.0, 0.0, 0.0]
        elif action == 1:  # Right
            action_array = [1.0, 0.0, 0.0]
        else:  # Straight
            action_array = [0.0, 1.0, 0.0]

        next_state, reward, done, _ = env.step(action_array)
        next_state = preprocess_frame(next_state)

        agent.remember(state, action, reward, next_state, done)
        agent.replay()

        state = next_state
        total_reward += reward

        if done:
            episode_rewards.append(total_reward)
            print(f"Episode {episode+1}/{episodes}, Total Reward: {total_reward}")
            break

# Save the trained model
agent.dqn.model.save("dqn_car_racing.h5")

In [None]:
plt.plot(episode_rewards)
plt.xlabel("Episodes")
plt.ylabel("Total Reward")
plt.title("Reward Curve")
plt.show()

In [None]:
for episode in range(5):  # Run 5 test episodes
    state = preprocess_frame(env.reset())
    done = False
    total_reward = 0

    while not done:
        action = agent.act(state)
        if action == 0:
            action_array = [-1.0, 0.0, 0.0]
        elif action == 1:
            action_array = [1.0, 0.0, 0.0]
        else:
            action_array = [0.0, 1.0, 0.0]

        next_state, reward, done, _ = env.step(action_array)
        next_state = preprocess_frame(next_state)

        total_reward += reward
        state = next_state
        env.render()  # Display the environment frame

    print(f"Test Episode {episode+1}, Total Reward: {total_reward}")

env.close()

In [None]:
action_counts = {"Left": 0, "Straight": 0, "Right": 0}

for episode in range(5):
    state = preprocess_frame(env.reset())
    done = False

    while not done:
        action = agent.act(state)
        if action == 0:
            action_counts["Left"] += 1
        elif action == 1:
            action_counts["Right"] += 1
        else:
            action_counts["Straight"] += 1

        next_state, _, done, _ = env.step([0.0, 1.0, 0.0])  # Go straight in testing
        next_state = preprocess_frame(next_state)
        state = next_state

# Plot action distribution
plt.bar(action_counts.keys(), action_counts.values())
plt.xlabel("Actions")
plt.ylabel("Frequency")
plt.title("Action Distribution in Test Episodes")
plt.show()