In [1]:
#: imports, nothing to see here
import random
from collections import defaultdict, namedtuple, deque
from itertools import product, starmap

import matplotlib.pyplot as plt
import numpy as np
from IPython.display import Image
from matplotlib import animation
import copy

"""
from keras.models import Sequential
from keras.layers.core import Dense, Activation
from keras.layers.normalization import BatchNormalization
from keras.regularizers import l2

from keras.optimizers import RMSprop
"""


from numpy import sin, cos, pi

%matplotlib inline

random.seed(1)

In [10]:
State = namedtuple('State', ['theta', 'theta_d', 'x', 'x_d'])
random = np.random.uniform(low=-0.05, high=0.05, size=(4,))
state = State(*random)
state

State(theta=0.017310488285557382, theta_d=-0.029708126746085296, x=-0.027529462365079839, x_d=0.013533486259850888)

In [16]:
class CartPole:
    def __init__(self):
        self.grav = 9.81
        self.mass_cart = 1.0
        self.mass_pole = 0.1
        self.mass_total = self.mass_cart + self.mass_pole
        self.pole_mcenter = 0.5
        self.polemass_mom = self.mass_pole * self.pole_mcenter
        self.force_mag = 10.0
        self.delta_t = 0.02
        self.lim_theta = pi / 15
        self.start_state = self.start_state()
        self.lim_x = 2.4
        self.n_actions = 2
        self.actions = [-1, 1]

    def is_terminal(self, state):
        x, theta = state[:2]
        if (abs(state.x) >= self.lim_x) or (abs(state.theta) >= self.lim_theta):
            return True
        else:
            return False

    def reward(self, state):
        if self.is_terminal(state) is True:
            return 0
        else:
            return 1

    def new_state(self, state, action):
        move = self.actions[action]
        theta, theta_d, x, x_d = state
        F = self.force_mag * move
        costheta = cos(theta)
        sintheta = sin(theta)
        temp = (F + self.polemass_mom * theta_d * theta_d * sintheta
                ) / self.mass_total

        theta_d_d = ((self.grav * sintheta - costheta * temp) /
                         (self.pole_mcenter *
                          (4.0 / 3.0 - self.mass_pole * costheta * costheta /
                           self.mass_total)))

        x_d_d = temp - self.polemass_mom * theta_d_d * costheta / self.mass_total
        x_new = x + self.delta_t * x_dot
        x_d_new = x_d + self.delta_t * x_d_d
        theta_new = theta + self.delta_t * theta_d
        theta_d_new = theta_d + self.delta_t * theta_d_d
        newstate = State(theta_new, theta_d_new, x_new, x_d_new)
        return newstate

    def start_state(self):
        random = np.random.uniform(low=-0.05, high=0.05, size=(4, ))
        start_state = State(*random)
        return start_state

In [19]:
dim_actions = 2
dim_states = 4

def make_model():
    model = Sequential()
    rms = RMSprop()
    model.add(Dense(32, input_shape=(dim_states,), init='zero', bias=True))
    model.add(BatchNormalization())
    model.add(Activation('relu'))
    model.add(Dense(32, init='zero', bias=False))
    model.add(BatchNormalization())
    model.add(Activation('relu'))
    model.add(Dense(32, init='zero', bias=False))
    model.add(BatchNormalization())
    model.add(Activation('relu'))
    model.add(Dense(dim_actions, init='zero',bias=True))
    model.add(BatchNormalization())
    model.add(Activation('linear'))
    return model


class QAgent:
    def __init__(self,
                 env,
                 model,
                 alpha=0.001,
                 epsilon=0-99,
                 gamma=1,
                 buffer_size=300,
                 batch_size=36):
        self.epsilon = epsilon
        self.alpha = alpha
        self.gamma = gamma
        self.reward_total = 0
        self.Q = make_model()
        self.Q_target = make_model()
        self.buffer_size = buffer_size
        self.memory = deque(maxlen=1000)
        self.batch_size = batch_size
        self.actions = [-1, 1]
        self.env = env
        self.log = []
        self.log_sum = []
        self.step_count=0
        self.episode_count=0
        self.lern_start = 500
        self.target_update = 500

    def act(self, state):
        if random.random() > self.epsilon:
            return self.greedy_action(state)
        else:
            return self.random_action(state)

    def make_batch(self):
            batch = random.sample(self.memory, self.batch_size)
            X_train = []
            y_train = []
            for state, action, newstate, reward in batch:
                s = np.asarray(state).reshape(1,4)
                X_train.append(s)
                if self.step_count <= self.target_update:
                    y = self.Q_target.predict(s)[0]
                else:
                    y = self.Q_target.predict(s)[0]
                if reward == 0:
                    y[action] = reward
                    y_train.append(y)
                else:
                    ns = np.asarray(newstate).reshape(1,4)
                    if self.step_count <= self.target_update:
                        Q_sa = np.max(self.Q.predict(ns)[0])
                    else:
                        Q_sa = np.max(self.Q_target.predict(ns)[0])
                    y[action] = reward + self.gamma * Q_sa
                    y_train.append(y)
            return np.vstack(X_train), np.vstack(y_train)

    def flashback(self):
        if self.step_count >= self.lern_start:
            if len(self.memory)>= self.batch_size:
                X, y = self.make_batch()
                loss = self.Q.fit(X, y, verbose=0)
        if self.step_count%self.target_update ==0:
            self.update_target_network()
    def update_target_network(self):
        print('Update Target!')
        weights = self.Q.get_weights()
        self.Q_target.set_weights(weights)


    def random_action(self, state):
        choice = np.random.randint(0,2)
        return choice

    def greedy_action(self, state):
        s = np.asarray(state).reshape(1,4)
        Qs = self.Q.predict(s)[0]
        action = np.argmax(Qs)
        return action

    def remember(self, state, action, newstate, reward):
        self.step_count+=1
        if reward ==0:    
            self.episode_count+=1
        self.memory.append((state, action, newstate, reward))
        self.log.append((state[0], state[1]))
        if reward == 0:
            self.log_sum.append(self.log)
            self.log = []

    def save_net(self):
        self.Q.save('backup.h5')
        pass

In [20]:
def run_episode(domain, agent):
    state = domain.compute_start_state()
    step = 0
    while not domain.is_terminal(state):
        action = agent.act(state)    #: Take the current state as input and compute an action.
        newstate = domain.newstate(state, action)   #: Take the action and compute the changed state.
        reward = domain.reward(newstate)
        agent.remember(state, action, newstate, reward)#: Learn.
        agent.flashback()
        state = newstate                            #: Newstate becomes the current state for next iteration.
        step +=1
    print(step)
    pass

In [21]:
def run_experiment(domain, agent, epsilon_decay, n_episodes):
    run_random(domain, agent)
    for i in range(n_episodes):
        agent.epsilon *= epsilon_decay
        run_episode(domain, agent)
    print('Setting epsilon paramter to zero',
          'to prevent random actions and evaluate learned policy.\n')
    agent.epsilon = 0
    run_episode(domain, agent)                    
    pass

In [22]:
n_episodes = 2000
epsilon_decay = 0.995
domain = CartPole()
model = make_model()
agent = QAgent(domain, model)

NameError: name 'Sequential' is not defined