In [1]:
""" Author: Sonu Gupta
    Project: Training a car racing machine learning model
"""    

import random
import gymnasium as gym
import numpy as np
from collections import deque
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam
import os
import cv2

In [2]:
env = gym.make("CarRacing-v2", continuous = False)
state_size = 96 * 96 * 3
action_size = 5
batch_size = 15
n_episodes = 5
print(env.action_space)

Discrete(5)


In [3]:
class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=500)
        self.gamma = 0.95
        self.epsilon = 1.0
        self.epsilon_decay = 0.95
        self.epsilon_min = 0.05
       
        self.learning_rate = 0.001 # This should be lower (0.001?)
        self.model = self._build_model()
        
    def process_state(state):
        state = cv2.cvtColor(state, cv2.COLOR_BGR2GRAY)
        state = state.astype(float)
        state /= 255.0
        return state
    
    def _build_model(self):
        model = Sequential()
        model.add(Dense(12, activation = "relu", input_dim=self.state_size))
        model.add(Dense(12, activation = "relu"))
        model.add(Dense(self.action_size, activation = "linear"))
        model.compile(loss="mse", optimizer = Adam(learning_rate = self.learning_rate))
        return model
   
    def remember(self, s, a, r, s_prime, done):
        self.memory.append((s, a, r, s_prime, done))
       
    def train(self, batch_size):
        minibatch = random.sample(self.memory, batch_size)
        for s, a, r, s_prime, done in minibatch:
            target = r # if done is true (terminal state)
            if not done:
                target = (r + self.gamma * np.amax(self.model.predict(s_prime)[0]))
            target_f = self.model.predict(s)
            target_f[0][a] = target
            self.model.fit(s, target_f, epochs=1, verbose=False)
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
   
    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        act_values = self.model.predict(state)
        return np.argmax(act_values[0])
   
    def save(self, name):
        self.model.save_weights(name)
   
    def load(self, name):
        self.model.load_weights(name)



In [4]:
agent = DQNAgent(state_size, action_size)

done = False

In [None]:
for e in range(n_episodes):
    state, _ = env.reset()
    state = np.reshape(state, (-1, 27648))
    #agent.process_state(state)
    for time in range(200):
        #env.render()
        action = agent.act(state)
        # print(action)
        next_state, reward, done, trunc, _ = env.step(action)
        reward = reward if not done else -10
        next_state = np.reshape(next_state, (-1, 27648))
        agent.remember(state, action, reward, next_state, done)
        state = next_state
        if done or trunc:
            print(f"episode: {e}/{n_episodes}, score: {time}, e: {agent.epsilon}")
            break
        if len(agent.memory) > batch_size:
            agent.train(batch_size)
        if e % 10 == 0:
            agent.save("./model_output/CarRacing-v2")  