In [7]:
import numpy as np
import tensorflow as tf
from keras import layers, Model
import gymnasium as gym
import random

env=gym.make("MountainCar-v0")

class DQN(Model):

    def __init__(self, indim, outdim):
        super(DQN, self).__init__()
        self.d1=layers.Dense(32, activation='relu', input_dim=indim)
        self.d2=layers.Dense(16, activation='relu')
        self.d3=layers.Dense(16, activation='relu')
        self.d4=layers.Dense(outdim, activation='linear')
        self.optimizer=tf.optimizers.Adam(0.001)

    def call(self, x):
        x=self.d1(x)
        x=self.d2(x)
        x=self.d3(x)
        return self.d4(x)
    
class Agent():

    def __init__(self):
        self.input_size=2
        self.action_size=3
        self.eps=1.0
        self.eps_decay=0.98
        self.min_eps=0.1
        self.batch_size=64
        self.discount=0.99
        self.learning_rate=0.001
        self.memory=[]
        self.model=DQN(self.input_size, self.action_size)

    def update_eps(self):
        self.eps=max(self.eps*self.eps_decay, self.min_eps)

    def memory_update(self, n_state, action, reward, done, state):
        self.memory.append((n_state, action, reward, done, state))

    def get_act(self, state):
        if np.random.rand() < self.eps:
            return np.random.randint(0,3)
        else:
            return np.argmax(self.model.call(np.array([state])))
        
    def update_model(self):

        if len(self.memory)<1000:
            return

        if len(self.memory)>20000:
            del self.memory[0]

        if len(self.memory)>self.batch_size:

            mini_batch=random.sample(self.memory, self.batch_size)

            n_states=np.array([x[0] for x in mini_batch])
            actions=np.array([x[1] for x in mini_batch])
            rewards=np.array([x[2] for x in mini_batch])
            dones=np.array([x[3] for x in mini_batch])
            states=np.array([x[4] for x in mini_batch])

            q_val=self.model.call(states).numpy()
            n_q_val=self.model.call(n_states).numpy()

            targets=q_val.copy()
            targets[np.arange(len(rewards)), actions]=rewards+self.discount*np.max(n_q_val, axis=1)*(1-dones)

            with tf.GradientTape() as tape:
                q_val=self.model.call(states)
                loss=tf.keras.losses.mse(targets, q_val)

            gradients=tape.gradient(loss, self.model.trainable_variables)
            optimizer=tf.keras.optimizers.Adam(learning_rate=self.learning_rate)
            optimizer.apply_gradients(zip(gradients, self.model.trainable_variables))

model=DQN(2,3)
agent=Agent()
epi=500
suc=[]

for i in range(epi):

    state, _ = env.reset()
    agent.update_eps()
    max_pos=state[0]
    step=0
    done=False

    while not done and step<1000:

        action=agent.get_act(state)

        n_state, reward, done, trunc, _ = env.step(action)

        if n_state[0]>max_pos:
            max_pos=n_state[0]

        if done:
            reward=100
            suc.append(step)
        if action==2 and n_state[0]>state[0]:
            reward=3
        elif action==0 and n_state[0]<state[0]:
            reward=3

        agent.memory_update(n_state, action, reward, done, state)
        agent.update_model()

        state=n_state
        step+=1

    print(i+1, step, max_pos)

    if len(suc)>5 and sum(suc[-5:])/5<=200:
        agent.model.save_weights("./save_model/model", save_format="tf")
        break

env.close()


1 1000 -0.3372658
2 1000 -0.37137675
3 1000 -0.1896321
4 1000 -0.3280672
5 1000 -0.2809995
6 1000 0.283267
7 1000 -0.3078933
8 1000 0.29938993
9 1000 0.34483078
10 1000 0.322477
11 1000 0.34850666
12 1000 0.10603861
13 452 0.5024678
14 708 0.50630116
15 250 0.5004995
16 373 0.52902514
17 385 0.5169368
18 310 0.50995207
19 350 0.50906664
20 333 0.52561474
21 329 0.5203854
22 417 0.51180446
23 349 0.50001895
24 240 0.50573015
25 405 0.52636
26 307 0.5124214
27 250 0.50254935
28 289 0.5185026
29 400 0.50837106
30 300 0.5048926
31 316 0.5064211
32 172 0.52098566
33 258 0.50249016
34 409 0.5009229
35 232 0.5078359
36 327 0.53479
37 334 0.5076587
38 467 0.52379936
39 173 0.50611526
40 165 0.502376
41 180 0.5031163
42 330 0.5278593
43 294 0.50092334
44 301 0.5186913
45 261 0.5170992
46 242 0.51891655
47 202 0.51237524
48 163 0.50928134
49 148 0.5076828
50 171 0.5140109


In [2]:
import numpy as np
import tensorflow as tf
from keras import layers, Model
import gymnasium as gym
import random

env=gym.make("MountainCar-v0", render_mode='human')

class DQN(Model):

    def __init__(self, indim, outdim):
        super(DQN, self).__init__()
        self.d1=layers.Dense(32, activation='relu', input_dim=indim)
        self.d2=layers.Dense(16, activation='relu')
        self.d3=layers.Dense(16, activation='relu')
        self.d4=layers.Dense(outdim, activation='linear')
        self.optimizer=tf.optimizers.Adam(0.001)

    def call(self, x):
        x=self.d1(x)
        x=self.d2(x)
        x=self.d3(x)
        return self.d4(x)
    
class Agent():

    def __init__(self):
        self.input_size=2
        self.action_size=3
        self.eps=1.0
        self.eps_decay=0.98
        self.min_eps=0.1
        self.batch_size=64
        self.discount=0.99
        self.learning_rate=0.001
        self.memory=[]
        self.model=DQN(self.input_size, self.action_size)

    def update_eps(self):
        self.eps=max(self.eps*self.eps_decay, self.min_eps)

    def memory_update(self, n_state, action, reward, done, state):
        self.memory.append((n_state, action, reward, done, state))

    def get_act(self, state):
        if np.random.rand() < self.eps:
            return np.random.randint(0,3)
        else:
            return np.argmax(self.model.call(np.array([state])))
        
    def update_model(self):

        if len(self.memory)<1000:
            return

        if len(self.memory)>20000:
            del self.memory[0]

        if len(self.memory)>self.batch_size:

            mini_batch=random.sample(self.memory, self.batch_size)

            n_states=np.array([x[0] for x in mini_batch])
            actions=np.array([x[1] for x in mini_batch])
            rewards=np.array([x[2] for x in mini_batch])
            dones=np.array([x[3] for x in mini_batch])
            states=np.array([x[4] for x in mini_batch])

            q_val=self.model.call(states).numpy()
            n_q_val=self.model.call(n_states).numpy()

            targets=q_val.copy()
            targets[np.arange(len(rewards)), actions]=rewards+self.discount*np.max(n_q_val, axis=1)*(1-dones)

            with tf.GradientTape() as tape:
                q_val=self.model.call(states)
                loss=tf.keras.losses.mse(targets, q_val)

            gradients=tape.gradient(loss, self.model.trainable_variables)
            optimizer=tf.keras.optimizers.Adam(learning_rate=self.learning_rate)
            optimizer.apply_gradients(zip(gradients, self.model.trainable_variables))

model=DQN(2,3)
agent=Agent()
agent.eps=0.01
agent.model.load_weights('./save_model/model')

for i in range(5):

    state, _ = env.reset()
    max_pos=state[0]
    step=0
    done=False

    while not done:

        action=agent.get_act(state)
        n_state, reward, done, trunc, _ = env.step(action)

        if n_state[0]>max_pos:
            max_pos=n_state[0]
        if done:
            reward=100
        if action==2 and n_state[0]>state[0]:
            reward=3
        elif action==0 and n_state[0]<state[0]:
            reward=3

        state=n_state
        step+=1

    print(i+1, step)

env.close()


1 126
2 127
3 126
4 125
5 126
