In [None]:
#!pip install gym
#!pip install --user tf-agents
#!pip install tensorflow
#!pip install ray

#!pip install stable_baselines3
#!pip install pyglet

#!pip install keras

### Import gym library

In [1]:
import gym
import random
import numpy as np
from keras.models import Sequential
from keras.layers import Dense
from tensorflow.keras.optimizers import Adam


In [2]:
ENV_NAME = 'CartPole-v0'

GAMMA = 0.95
LEARNING_RATE = 0.001

MEMORY_SIZE = 1000000
BATCH_SIZE = 20

EXPLORATION_MAX = 1.0
EXPLORATION_MIN = 0.01
EXPLORATION_DECAY = 0.995

In [3]:
env = gym.make(ENV_NAME)

In [4]:
obs = env.reset()
obs

array([-0.01117641, -0.02419793, -0.04317619,  0.03489567], dtype=float32)

In [5]:
env.observation_space

Box([-4.8000002e+00 -3.4028235e+38 -4.1887903e-01 -3.4028235e+38], [4.8000002e+00 3.4028235e+38 4.1887903e-01 3.4028235e+38], (4,), float32)

In [6]:
env.action_space

Discrete(2)

In [None]:
#env.render()

### To take an action we need step() function
    # step function returns 4 parameters
    # [[env state], reward, terminal state(bool),]

### Below 2 blocks are a simple demonstartion of pushing cart to one side

In [8]:

env = gym.make('CartPole-v0')
env.reset()
for step_index in range(10000):
    env.render()
    action = env.action_space.sample()
    observation, reward, done, info = env.step(action)
    print("Step {}:".format(step_index))
    print("action: {}".format(action))
    print("observation: {}".format(observation))
    print("reward: {}".format(reward))
    print("done: {}".format(done))
    print("info: {}".format(info), end ="\n\n")
    
    # if termination state has been reached
    if done:
        break
#env.close()

Step 0:
action: 1
observation: [-0.04646764  0.15308405  0.04635182 -0.25661477]
reward: 1.0
done: False
info: {}

Step 1:
action: 0
observation: [-0.04340596 -0.04266797  0.04121952  0.05032045]
reward: 1.0
done: False
info: {}

Step 2:
action: 0
observation: [-0.04425932 -0.238356    0.04222593  0.35571826]
reward: 1.0
done: False
info: {}

Step 3:
action: 1
observation: [-0.04902644 -0.04385905  0.0493403   0.07664364]
reward: 1.0
done: False
info: {}

Step 4:
action: 1
observation: [-0.04990362  0.15052213  0.05087317 -0.20007312]
reward: 1.0
done: False
info: {}

Step 5:
action: 1
observation: [-0.04689318  0.34488094  0.04687171 -0.4762839 ]
reward: 1.0
done: False
info: {}

Step 6:
action: 0
observation: [-0.03999555  0.14912958  0.03734603 -0.16920412]
reward: 1.0
done: False
info: {}

Step 7:
action: 0
observation: [-0.03701296 -0.04650646  0.03396194  0.13502252]
reward: 1.0
done: False
info: {}

Step 8:
action: 1
observation: [-0.03794309  0.14811298  0.0366624  -0.14675543]

### Training data

In [None]:
class DQNSolver:

    def __init__(self, observation_space, action_space):
        self.exploration_rate = EXPLORATION_MAX

        self.action_space = action_space
        self.memory = deque(maxlen=MEMORY_SIZE)

        self.model = Sequential()
        self.model.add(Dense(24, input_shape=(observation_space,), activation="relu"))
        self.model.add(Dense(24, activation="relu"))
        self.model.add(Dense(self.action_space, activation="linear"))
        self.model.compile(loss="mse", optimizer=Adam(lr=LEARNING_RATE))

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() < self.exploration_rate:
            return random.randrange(self.action_space)
        q_values = self.model.predict(state)
        return np.argmax(q_values[0])

    def experience_replay(self):
        if len(self.memory) < BATCH_SIZE:
            return
        batch = random.sample(self.memory, BATCH_SIZE)
        for state, action, reward, state_next, terminal in batch:
            q_update = reward
            if not terminal:
                q_update = (reward + GAMMA * np.amax(self.model.predict(state_next)[0]))
            q_values = self.model.predict(state)
            q_values[0][action] = q_update
            self.model.fit(state, q_values, verbose=0)
        self.exploration_rate *= EXPLORATION_DECAY
        self.exploration_rate = max(EXPLORATION_MIN, self.exploration_rate)

In [None]:
def cartpole():
    env = gym.make(ENV_NAME)
    score_logger = ScoreLogger(ENV_NAME)
    observation_space = env.observation_space.shape[0]
    action_space = env.action_space.n
    dqn_solver = DQNSolver(observation_space, action_space)
    run = 0
    while True:
        run += 1
        state = env.reset()
        state = np.reshape(state, [1, observation_space])
        step = 0
        while True:
            step += 1
            #env.render()
            action = dqn_solver.act(state)
            state_next, reward, terminal, info = env.step(action)
            reward = reward if not terminal else -reward
            state_next = np.reshape(state_next, [1, observation_space])
            dqn_solver.remember(state, action, reward, state_next, terminal)
            state = state_next
            if terminal:
                print "Run: " + str(run) + ", exploration: " + str(dqn_solver.exploration_rate) + ", score: " + str(step)
                score_logger.add_score(step, run)
                break
            dqn_solver.experience_replay()