In [8]:
import random
import numpy as np
import cv2
import matplotlib.pyplot as plt

from collections import deque

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from CarRacing_V2 import CarRacing_V2

In [9]:
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

In [10]:
def image_processing(image, height=83, width=83):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    image = image.astype(float)
    image /= 255.0
    image = image[:height, :width]
    return image

# env = CarRacing_V2()
# init_state = env.reset()
# env.close()

# image_processing(init_state, 83, 83)

In [11]:
import math

def dim_encoder(Hin, Win, padding, dilation, kernel_size, stride):
    Hout = (Hin + 2 * padding[0] - dilation[0] * (kernel_size[0] - 1) - 1) / stride[0] + 1
    Wout = (Win + 2 * padding[1] - dilation[1] * (kernel_size[1] - 1) - 1) / stride[1] + 1
    return math.floor(Hout), math.floor(Wout)

Hin, Win = 83, 83
Hout, Wout = dim_encoder(Hin, Win, padding=(1, 1), dilation=(1, 1), kernel_size=(3, 3), stride=(2, 2))
Hout, Wout = math.floor(Hout/2), math.floor(Wout/2)
Hout, Wout = dim_encoder(Hout, Wout, (0, 0), (1, 1), (3, 3), (1, 1))
Hout, Wout = math.floor(Hout/2), math.floor(Wout/2)

print(Hout, Wout)

9 9


In [12]:
class DQN(nn.Module):
    def __init__(self):
        super(DQN, self).__init__()
        
        self.conv_layers = nn.Sequential(
            nn.Conv2d(1, 64, 3, stride=2, padding=1),
            nn.ReLU(True),
            nn.MaxPool2d(kernel_size=2),
            nn.Conv2d(64, 128, 4),
            nn.ReLU(True),
            nn.MaxPool2d(kernel_size=2)
        )
        
        self.flatten = nn.Flatten(start_dim=1)
        
        self.fully_connected_layers = nn.Sequential(
            nn.Linear(9 * 9 * 128, 128),
            nn.ReLU(True),
            nn.Linear(128, 4)
        )
        
    def forward(self, X):
        X = self.conv_layers(X)
        X = self.flatten(X)
        X = self.fully_connected_layers(X)
        
        return X
    
    
class DQNAgent:
    def __init__(self, lr=0.001, weight_decay=1e-5, epochs=1, batch_size=128, memory_size=5000, epsilon=0.98, gamma=0.95):        
        self.model = DQN().to(device)
        self.target = DQN().to(device)
        self.optim_model = optim.Adam(params=self.model.parameters(), lr=lr, weight_decay=weight_decay)
        self.optim_target = optim.Adam(params=self.target.parameters(), lr=lr, weight_decay=weight_decay)
        self.loss = nn.MSELoss()
        self.epochs = epochs
        self.batch_size = batch_size
        
        self.action_space = [
            [0, 0, 0],
            [0, 1, 0],
            [-1, 0, 0],
            [1, 0, 0]
        ]
        self.memory = deque(maxlen=memory_size)
        self.gamma = gamma
        self.espilon = epsilon
        
    def play_action(self, state):
        if np.random.rand() < self.espilon:
            state = np.expand_dims(state, axis=0)
            state = torch.from_numpy(state.astype('float32')).unsqueeze(dim=0)
            state = state.to(device)
            actions = self.model(state).cpu().detach().numpy()
            action = np.argmax(actions)
        else:
            action = np.random.randint(0, len(self.action_space))

        return self.action_space[action]
    
    def fit(self, X, y):
        X = torch.from_numpy(X.astype('float32')).to(device)
        y = torch.from_numpy(y.astype('float32'))
        
        for _ in range(self.epochs):
            y_pred = self.model(X).cpu()
            loss = self.loss(y_pred, y)
            self.optim_model.zero_grad()
            loss.backward()
            self.optim_model.step()
            
    def replay_memory(self):
        minibatch = random.sample(self.memory, self.batch_size)
        train_state = []
        train_target = []
        
        for state, action, reward, next_state, terminate in minibatch:
            state = np.expand_dims(state, axis=0)
            state_ten = torch.from_numpy(state.astype('float32')).unsqueeze(dim=0).to(device)
            target = self.model(state_ten).cpu().detach().numpy()[0]
            if terminate:
                target[action] = reward
            else:
                next_state = np.expand_dims(next_state, axis=0)
                next_state_ten = torch.from_numpy(next_state.astype('float32')).unsqueeze(dim=0).to(device)
                t = self.target(next_state_ten).cpu().detach().numpy()[0]
                target[action] = reward + self.gamma * np.amax(t)
            train_state.append(state)
            train_target.append(target)
        self.fit(np.array(train_state), np.array(train_target))
            
    def memorize(self, state, action, reward, next_state, terminate):
        self.memory.append((state, action, reward, next_state, terminate))
        
    def update_target(self):
        self.target.load_state_dict(self.model.state_dict())
            


In [13]:
import gym
from gym.envs.box2d.car_racing import CarRacing
import logging
import traceback

EPISODES = 500
BATCH_SIZE = 128
UPDATE_TARGET_FREQUENCY = 5

env = gym.make("CarRacing-v0")
car = DQNAgent()
reward_history = []

try:
    for eps in range(1, EPISODES+1):        
        
        init_state = env.reset()
        init_state = image_processing(init_state)
        
        total_reward = 0
        terminate = False
        
        current_state = init_state
        
        while True:
            env.render()
            

            action = car.play_action(current_state)
            
            reward = 0
            for _ in range(2+1):
                next_state, r, terminate, info = env.step(action)
                reward += r
                if terminate:
                    break
            
            if action[1] == 1 and action[2] == 0:
                reward *= 1.5
            
            total_reward += reward
            
            next_state = image_processing(next_state)
            # print(next_state.shape)
            car.memorize(current_state, action, reward, next_state, terminate)
            
            if terminate or total_reward < 0:
                print(f"Episode {eps}/{EPISODES}, total reward: {total_reward}")
                break
            
            if len(car.memory) > BATCH_SIZE:
                car.replay_memory()
                
            current_state = next_state
                
        if eps % UPDATE_TARGET_FREQUENCY == 0:
            car.update_target()
            
        reward_history.append(total_reward)
            
except Exception as e:
    logging.error(traceback.format_exc())
        
env.close()

Track generation: 1099..1405 -> 306-tiles track
retry to generate track (normal if there are not manyinstances of this message)
Track generation: 1086..1368 -> 282-tiles track
Episode 1/1000, total reward: -0.08256227758006252
Track generation: 1021..1289 -> 268-tiles track
Episode 2/1000, total reward: -0.00936329588014062
Track generation: 1172..1469 -> 297-tiles track
Episode 3/1000, total reward: -0.44864864864862697
Track generation: 1062..1331 -> 269-tiles track
Episode 4/1000, total reward: -0.24328358208951806
Track generation: 1192..1495 -> 303-tiles track
Episode 5/1000, total reward: 67.68543046357726
Track generation: 1102..1379 -> 277-tiles track
Episode 6/1000, total reward: 27.59637681159282
Track generation: 1005..1262 -> 257-tiles track
retry to generate track (normal if there are not manyinstances of this message)
Track generation: 1059..1323 -> 264-tiles track
Episode 7/1000, total reward: 78.56064638783207
Track generation: 1151..1443 -> 292-tiles track
Episode 8/10

KeyboardInterrupt: 

In [14]:
env.close()