In [1]:
import gym
import time

env = gym.make('CartPole-v1', render_mode='human')


observation, info = env.reset(seed=42)

for i in range(100):
    observation, reward, terminated, truncated, info = env.step(env.action_space.sample())

    print(i)
    if terminated or truncated:
        time.sleep(2)
        observation, info = env.reset()
        break

env.close()

0
1
2
3
4
5
6
7
8
9
10


In [2]:
import gym
import numpy as np
import tensorflow as tf
import random
import matplotlib.pyplot as plt
from collections import namedtuple
from collections import deque

np.random.seed(1)
tf.random.set_seed(1)
Transition = namedtuple(
'Transition', ('state', 'action', 'reward',
'next_state', 'done'))

class DQNAgent:
    def __init__(
        self, env, discount_factor=0.95,
        epsilon_greedy=1.0, epsilon_min=0.01,
        epsilon_decay=0.995, learning_rate=1e-3,
        max_memory_size=2000):
        self.enf = env
        self.state_size = env.observation_space.shape[0]
        self.action_size = env.action_space.n
        self.memory = deque(maxlen=max_memory_size)
        self.gamma = discount_factor
        self.epsilon = epsilon_greedy
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay
        self.lr = learning_rate
        self._build_nn_model()
        
    def _build_nn_model(self,n_layers=3):
        self.model = tf.keras.Sequential()
        ## Hidden layers
        

        for n in range(n_layers-1):
            self.model.add(tf.keras.layers.Dense(
            units=32, activation='relu'))
        self.model.add(tf.keras.layers.Dense(
        units=32, activation='relu'))
        ## Last layer
        self.model.add(tf.keras.layers.Dense(
        units=self.action_size))
            ## Build & compile model
        self.model.build(input_shape=(None, self.state_size))
        self.model.compile(
        loss='mse',
        optimizer=tf.keras.optimizers.legacy.Adam(learning_rate=self.lr))
            
    def save_model(self, e):
        # Save the weights
        self.model.save_weights('./checkpoints3/DQN_checkpoint_'+str(e))
        
        
    def load_model(self,e):
        self.model.load_weights('./checkpoints3/DQN_checkpoint_'+str(e))
        
  
            
    def remember(self, transition):
        self.memory.append(transition)
        
    def choose_action(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        
        
        q_values = self.model.predict(state, verbose = 0)[0]
        return np.argmax(q_values) # returns action
    
    def _learn(self, batch_samples):
        batch_states, batch_targets = [], []
        for ti, transition in enumerate(batch_samples):
            s, a, r, next_s, done = transition
            if done:
                target = r
            else:
                target = (r +
                    self.gamma * np.amax(
                    self.model.predict(next_s,verbose = 0)[0]
                    )
                    )
                
            
            target_all = self.model.predict(s,verbose = 0)[0]
            
            target_all[a] = target
            batch_states.append(s.flatten())
            
#             print(s.shape,target_all.shape)
            
            batch_targets.append(target_all)
            self._adjust_epsilon()
            
            
            
        return self.model.fit(x=np.array(batch_states),
                            y=np.array(batch_targets),
                            epochs=1,
                            verbose=0)
    
    def _adjust_epsilon(self):
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
        
    def replay(self, batch_size):
        samples = random.sample(self.memory, batch_size)
        history = self._learn(samples)
        return history.history['loss'][0]

In [3]:
# Retraining
Retraining = True
# Retraining = False
if Retraining: 
    def plot_learning_history(history):
        fig = plt.figure(1, figsize=(14, 5))
        ax = fig.add_subplot(1, 1, 1)
        episodes = np.arange(len(history))+1
        plt.plot(episodes, history, lw=4,
        marker='o', markersize=10)
        ax.tick_params(axis='both', which='major', labelsize=15)
        plt.xlabel('Episodes', size=20)
        plt.ylabel('# Total Rewards', size=20)
        plt.show()

    ## General settings
    EPISODES = 50
    batch_size = 32
    init_replay_memory_size = 500


    env = gym.make('CartPole-v1')
    agent = DQNAgent(env)
    state, info = env.reset()
    state = np.reshape(state, [1, agent.state_size])
    ## Filling up the replay-memory
    for i in range(init_replay_memory_size):
        action = agent.choose_action(state)
        next_state, reward, terminated, truncated, _ = env.step(action)
        next_state = np.reshape(next_state, [1, agent.state_size])
        
        
        done = terminated or truncated
        
        agent.remember(Transition(state, action, reward,
        next_state, done))
        if terminated or truncated:
            state, info = env.reset()
            state = np.reshape(state, [1, agent.state_size])
        else:
            state = next_state


    total_rewards, losses = [], []
    for e in range(EPISODES):
        print('Episode: ', e)
        state, info = env.reset()

        state = np.reshape(state, [1, agent.state_size])
        for i in range(50):
            print('Episode_i: ', i)
            
            
            action = agent.choose_action(state)
            
            
            
            next_state, reward, terminated, truncated, _ = env.step(action)
            
            done = terminated or truncated
            next_state = np.reshape(next_state,
                            [1, agent.state_size])
            agent.remember(Transition(state, action, reward,
                            next_state, done))
            
            
            
            state = next_state

            if terminated or truncated:
                total_rewards.append(i)
                print('Episode: %d/%d, Total reward: %d'
                % (e, EPISODES, i))
                break
            loss = agent.replay(batch_size)
            
            
            
            losses.append(loss)
            
    plot_learning_history(total_rewards)

    agent.save_model()

In [4]:
import time
import gym

env = gym.make('CartPole-v1', render_mode='human')
agent = DQNAgent(env)
agent.epsilon = 0
state, info = env.reset()
state = np.reshape(state, [1, agent.state_size])

agent.load_model(39)

state, info = env.reset()



env.render()
state = np.reshape(state, [1, agent.state_size])
for i in range(500):
    print(i)
    action = agent.choose_action(state)
    next_state, reward, terminated, truncated, _ = env.step(action)
    next_state = np.reshape(next_state,
                    [1, agent.state_size])
    state = next_state

    env.render()
#     time.sleep(0.5)
    if terminated or truncated:
        time.sleep(2)
        observation, info = env.reset()
#         
        break
    
env.close()

0
1
2


2023-06-16 08:20:53.169189: W tensorflow/tsl/platform/profile_utils/cpu_utils.cc:128] Failed to get CPU frequency: 0 Hz


3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
