In [8]:
from collections import deque
import random

import tensorflow as tf
import gym
import numpy as np
from tensorflow import keras
from tensorflow.keras import layers

In [9]:
class TokyoDrifter(keras.Model):
    def __init__(self, **kwargs):
        super(TokyoDrifter, self).__init__(**kwargs)
        self.conv1 = layers.Conv2D(256, (4, 4), activation='relu', input_shape=(96,96,3))
        self.pool1 = layers.MaxPool2D(2,2)
        self.conv2 = layers.Conv2D(256, (4, 4), activation='relu')
        self.pool2 = layers.MaxPool2D(2,2)
        self.conv3 = layers.Conv2D(256, (4, 4), activation='relu')
        self.pool3 = layers.MaxPool2D(2,2)
        self.flatten = layers.Flatten()
        self.dense1 = layers.Dense(288, activation='relu')
        self.dense2 = layers.Dense(8, activation='linear')
    
    def call(self, x):
        x = self.conv1(x)
        x = self.pool1(x)
        x = self.conv2(x)
        x = self.pool2(x)
        x = self.conv3(x)
        x = self.pool3(x)
        x = self.flatten(x)
        x = self.dense1(x)
        return self.dense2(x)

In [13]:
class DQN:
    def __init__(self, model, env, gamma=0.85, mem_size=1000000, batch_size=64, expl_max=1.0, expl_min=0.01, expl_decay=0.995):
        self.model = model
        self.env = env
        self.memory = deque(maxlen=mem_size)
        self.gamma = gamma
        self.batch_size = batch_size
        self.expl_max = expl_max
        self.expl_min = expl_min
        self.expl_decay = expl_decay
        self.expl_rate = expl_max

    def train(self, epochs):
        for e in range(epochs):
            s = self.env.reset()
            s = np.array(s[None, :], dtype=np.float16)
            d = False
            while not d:
                env.render()
                a = self.action(s)
                drive = self.drive(a)
                sn, r, d, _ = env.step(drive)
                sn = np.array(sn[None, :], dtype=np.float16)
                self.remember(s, a, r, sn, d)
                s = sn
                self.experience_replay()

    
    def experience_replay(self):
        if len(self.memory) < self.batch_size:
            return
        batch = random.sample(self.memory, self.batch_size)
        for s, a, r, sn, d in batch:
            q_update = r
            if not d:
                q_update = (r + self.gamma * np.amax(self.model.predict(sn)[0]))
            q_values = self.model.predict(s)
            q_values[0][a] = q_update
            self.model.fit(s, q_values, verbose=0)
        self.exploration_rate *= self.expl_decay
        self.exploration_rate = max(self.expl_min, self.exploration_rate)

    def remember(self, s, a, r, sn, d):
        self.memory.append((s, a, r, sn, d))

    def action(self, state):
        if np.random.rand() < self.expl_rate:
            return random.randrange(1,4) #Tilfeldig innebærer alltid å kjøre slik at den lærer fort at den må bruke gassen for å komme fremover
        return np.argmax(self.model(state))
        
    def drive(self, a):
        if a == 1: #Full gass
            return [0.0,1.0,0.0]
        elif a == 2: #Full gass og sving høyre
            return [1.0,1.0,0.0]
        elif a == 3: #Full gass og sving venstre
            return [-1.0,1.0,0.0]
        elif a == 4: #Sving høyre
            return [1.0,0.0,0.0]
        elif a == 5: #Sving venstre
            return [-1.0,0.0,0.0]
        elif a == 6: #Full brems
            return [0.0,0.0,1.0]
        elif a == 7: #Full brems og sving høyre
            return [1.0,0.0,1.0]
        elif a == 8: #Full brems og sving venstre
            return [-1.0,0.0,1.0]
        else: #a==0 Gjør ingenting
            return [0.0,0.0,0.0]


In [14]:
model = TokyoDrifter()
model.compile(loss="mse", optimizer=keras.optimizers.Adam(lr=0.001))
env = gym.make("CarRacing-v0")

dqn = DQN(model, env)

dqn.train(10)

Track generation: 1273..1595 -> 322-tiles track


KeyboardInterrupt: 