<a href="https://colab.research.google.com/github/alitonia/Vision_check/blob/master/Classify.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Welcome to the AI STEM class


---


In this tutorial, we will build a intelligent agent from scratch to play CartPole game using Q-learning and Deep Neural Network
-



# Setup environment


---


Run the code below to automatical set up environment for the tutorial

In [0]:
#remove " > /dev/null 2>&1" to see what is going on under the hood
!pip install gym pyvirtualdisplay > /dev/null 2>&1
!apt-get install -y xvfb python-opengl ffmpeg > /dev/null 2>&1
!apt-get update > /dev/null 2>&1
!apt-get install cmake > /dev/null 2>&1
!pip install --upgrade setuptools 2>&1
!pip install ez_setup > /dev/null 2>&1
!pip install gym[atari] > /dev/null 2>&1

Requirement already up-to-date: setuptools in /usr/local/lib/python3.6/dist-packages (42.0.2)


Import some packages

In [0]:
import gym
from gym import logger as gymlogger
from gym.wrappers import Monitor
gymlogger.set_level(40) #error only
import numpy as np
import random
import matplotlib
import matplotlib.pyplot as plt
%matplotlib inline
import math
import glob
import io
import base64
from IPython.display import HTML
import pdb
import math
from collections import deque

from IPython import display as ipythondisplay

# Prepare a displaying window to visualize game
Import a display package and configure a window for displaying CartPole game


In [0]:
from pyvirtualdisplay import Display
display = Display(visible=0, size=(1400, 900))
display.start()


xdpyinfo was not found, X start can not be checked! Please install xdpyinfo!


<Display cmd_param=['Xvfb', '-br', '-nolisten', 'tcp', '-screen', '0', '1400x900x24', ':1001'] cmd=['Xvfb', '-br', '-nolisten', 'tcp', '-screen', '0', '1400x900x24', ':1001'] oserror=None return_code=None stdout="None" stderr="None" timeout_happened=False>

Two functions:


1.   show_video: to create a HTML video frame to display a game that is saved by OpenAI-gym
2.   wrap_env: to wrap an existed environment in order to save an episode game.



In [0]:
def show_video():
  mp4list = glob.glob('/content/video/*.mp4')
  if len(mp4list) > 0:
    mp4 = mp4list[0]
    video = io.open(mp4, 'r+b').read()
    encoded = base64.b64encode(video)
    ipythondisplay.display(HTML(data='''<video alt="test" autoplay 
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
  else: 
    print("Could not find video")
    

def wrap_env(env):
  env = Monitor(env, '/content/video', force=True)
  return env

# Let's play CartPole game using random action

---
Create an OpenAI-gym environment for agent to interact. 


In [0]:
env = wrap_env(gym.make('CartPole-v0'))
test = env.reset()
test

array([-0.02549702, -0.04831545,  0.00149151, -0.01454881])

In [0]:
### addition function ####
def step(action, reset=False):
  if reset:
    obs = env.reset()
  else:
    obs, reward, done, _ = env.step(action)
  image = env.render('rgb_array')
  return obs, reward, done, image

def get_new_state(state, predicting_angle):
  state[2] = predicting_angle
  return state

# get_state(test)

In [0]:
get_new_state(test, 10)

array([-0.02642203,  0.02803793, 10.        ,  0.01621101])

Playing a CartPole game with arbitrary action

In [0]:
state = env.reset()
total_reward = 0
while True:
  #randomly sample an action from action space.
  action = env.action_space.sample()
  state, reward, done, _ = env.step(action)
  total_reward+=reward
  if done:
    break
env.close()
print("total_reward: ",total_reward)
show_video()

total_reward:  28.0


# Lets play CartPole using Q-learning

Initialize some hyper-parameters to implement Q-learning algorithm 

In [0]:
buckets = (1, 1, 6, 12,) # down-scaling feature space to discrete range
n_episodes = 1000 # training episodes 
n_win_ticks = 195 # average ticks over 100 episodes required for win
min_alpha = 0.1 # learning rate
min_epsilon = 0.1 # exploration rate
gamma = 1.0 # discount factor
ada_divisor = 25 # only for development purposes
quiet = False
env = wrap_env(gym.make('CartPole-v0'))

Q = np.zeros(buckets + (env.action_space.n,))

Discretize continous observation into discrete state

In [0]:
def discretize(obs):
        upper_bounds = [env.observation_space.high[0], 0.5, env.observation_space.high[2], math.radians(50)]
        lower_bounds = [env.observation_space.low[0], -0.5, env.observation_space.low[2], -math.radians(50)]
        ratios = [(obs[i] + abs(lower_bounds[i])) / (upper_bounds[i] - lower_bounds[i]) for i in range(len(obs))]
        new_obs = [int(round((buckets[i] - 1) * ratios[i])) for i in range(len(obs))]
        new_obs = [min(buckets[i] - 1, max(0, new_obs[i])) for i in range(len(obs))]
        return tuple(new_obs)


Some fundamental function to train an agent by utilizing Q-learning algorithm

In [0]:
def choose_action(state, epsilon):
    return env.action_space.sample() if (np.random.random() <= epsilon) else np.argmax(Q[state])

def update_q(state_old, action, reward, state_new, alpha):
    Q[state_old][action] += alpha * (reward + gamma * np.max(Q[state_new]) - Q[state_old][action])

def get_epsilon(t):
    return max(min_epsilon, min(1, 1.0 - math.log10((t + 1) / ada_divisor)))

def get_alpha(t):
    return max(min_alpha, min(1.0, 1.0 - math.log10((t + 1) / ada_divisor)))

In [0]:
def training():
        scores = deque(maxlen=100)

        for e in range(n_episodes):
            current_state = discretize(env.reset())
            alpha = get_alpha(e)
            epsilon = get_epsilon(e)
            done = False
            i = 0

            while not done:
                action = choose_action(current_state, epsilon)
                obs, reward, done, _ = env.step(action)
                new_state = discretize(obs)
                update_q(current_state, action, reward, new_state, alpha)
                current_state = new_state
                i += 1

            scores.append(i)
            mean_score = np.mean(scores)
            if mean_score >= n_win_ticks and e >= 100:
                if not quiet: print('Ran {} episodes. Solved after {} trials ✔'.format(e, e - 100))
                return e - 100
            if e % 100 == 0 and not quiet:
                print('[Episode {}] - Mean survival time over last 100 episodes was {} ticks.'.format(e, mean_score))
                print(scores)

        if not self.quiet: print('Did not solve after {} episodes 😞'.format(e))
        return e

In [0]:
training()

[Episode 0] - Mean survival time over last 100 episodes was 22.0 ticks.
deque([22], maxlen=100)
[Episode 100] - Mean survival time over last 100 episodes was 30.12 ticks.
deque([12, 16, 14, 15, 13, 28, 13, 19, 22, 19, 15, 21, 20, 36, 13, 15, 16, 19, 12, 16, 18, 13, 12, 38, 12, 12, 15, 39, 12, 11, 21, 16, 11, 38, 23, 21, 36, 20, 26, 17, 17, 21, 12, 17, 12, 9, 13, 14, 16, 10, 12, 11, 16, 33, 12, 22, 17, 14, 21, 41, 14, 17, 63, 65, 37, 12, 18, 13, 33, 38, 18, 34, 17, 25, 58, 108, 18, 37, 42, 34, 27, 47, 34, 32, 41, 43, 35, 69, 39, 200, 86, 9, 61, 85, 126, 79, 59, 65, 29, 40], maxlen=100)
[Episode 200] - Mean survival time over last 100 episodes was 118.97 ticks.
deque([14, 37, 17, 8, 37, 33, 19, 21, 15, 16, 19, 42, 43, 28, 23, 35, 25, 67, 12, 35, 183, 24, 16, 101, 27, 12, 23, 28, 180, 153, 54, 200, 133, 46, 29, 20, 18, 32, 46, 30, 27, 79, 119, 42, 132, 56, 32, 28, 32, 28, 28, 16, 61, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 2

151

## Evaluate Q-learning algorithm

---



In [0]:
obs = env.reset()
state = discretize(obs)
total_reward = 0
while True:
  action = choose_action(state, 0.1)
  obs, reward, done, _ = env.step(action)
  state = discretize(obs)
  total_reward+=reward
  if done:
    break
env.close()
show_video()
print("total_reward: {}".format(total_reward))

NameError: ignored

# Let's play CartPole game using Deep Neural Network

---
import some packages for CartPole game.

In [0]:
from collections import deque
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam

EPISODES = 100 # number of training episodes.

Using TensorFlow backend.


## Program an intelligent agent

---


Program a agent using Q-learning and deep neural network to manipulate a cart in order to maximize balance time. Some fundamental functions list as follow:


*  _init_: to initalize an agent for playing a game.
*  *_build_model*: to build a deep neural network model for approximate a Q-function
*   *remember:* to storage playing experience of agent in a replay buffer which is used later for training. 
*   *act:* to make a decision based on Q-value for controlling cart either "move_to_left" or "move_to_right". 
*   *replay*: to train a deep neural network by using experience from a replay buffer in order to predict Q-value of a pair state-action.
*   *load*: load a pre-trained model of neural network.
*   *save*: save a trained model of neural network.









In [0]:
class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=2000)
        self.gamma = 0.95    # discount rate
        self.epsilon = 1.0  # exploration rate
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.learning_rate = 0.001
        self.model = self._build_model()

    def _build_model(self):
        # Neural Net for Deep-Q learning Model
        # Homework: change the number of dense layer
        model = Sequential()
        model.add(Dense(6, input_dim=self.state_size, activation='relu'))
        model.add(Dense(6, activation='relu'))
        model.add(Dense(self.action_size, activation='linear'))
        model.compile(loss='mse',
                      optimizer=Adam(lr=self.learning_rate))
        return model

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        act_values = self.model.predict(state)
        return np.argmax(act_values[0])  

    def replay(self, batch_size):
        minibatch = random.sample(self.memory, batch_size)
        for state, action, reward, next_state, done in minibatch:
            target = reward
            if not done:
                target = (reward + self.gamma *
                          np.amax(self.model.predict(next_state)[0]))
            target_f = self.model.predict(state)
            target_f[0][action] = target
            self.model.fit(state, target_f, epochs=1, verbose=0)
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

    def load(self, name):
        self.model.load_weights(name)

    def save(self, name):
        self.model.save_weights(name)

## Train an agent using Deep Q-learning

---


Main function for tutorial.

In [0]:

if __name__ == "__main__":
    # create a wrap environment
    state_size = env.observation_space.shape[0]
    action_size = env.action_space.n
    agent = DQNAgent(state_size, action_size)
    done = False
    batch_size = 32

    for e in range(EPISODES):
        state = env.reset()
        state = np.reshape(state, [1, state_size])
        for time in range(200):
           
            action = agent.act(state)
            next_state, reward, done, _ = env.step(action)
            reward = reward if not done else -10
            next_state = np.reshape(next_state, [1, state_size])
            agent.remember(state, action, reward, next_state, done)
            state = next_state
            if done:
                print("episode: {}/{}, score: {}, e: {:.2}"
                      .format(e, EPISODES, time, agent.epsilon))
                break
            if len(agent.memory) > batch_size:
                agent.replay(batch_size)
        if e % 10 == 0:
            agent.save("/content/cartpole-dqn_episode_"+str(e)+".h5")





episode: 0/100, score: 18, e: 1.0








episode: 1/100, score: 15, e: 0.99
episode: 2/100, score: 73, e: 0.69
episode: 3/100, score: 9, e: 0.66
episode: 4/100, score: 12, e: 0.62
episode: 5/100, score: 13, e: 0.58
episode: 6/100, score: 13, e: 0.54
episode: 7/100, score: 9, e: 0.52
episode: 8/100, score: 12, e: 0.49
episode: 9/100, score: 24, e: 0.43
episode: 10/100, score: 11, e: 0.41
episode: 11/100, score: 11, e: 0.39
episode: 12/100, score: 17, e: 0.36
episode: 13/100, score: 11, e: 0.34
episode: 14/100, score: 22, e: 0.3
episode: 15/100, score: 12, e: 0.28
episode: 16/100, score: 12, e: 0.27
episode: 17/100, score: 13, e: 0.25
episode: 18/100, score: 29, e: 0.22
episode: 19/100, score: 15, e: 0.2
episode: 20/100, score: 20, e: 0.18
episode: 21/100, score: 20, e: 0.16
episode: 22/100, score: 14, e: 0.15
episode: 23/100, score: 13, e: 0.14
episode: 24/100, score: 13, e: 0.13
episode: 25/100, score: 11, e: 0.13
episode: 26/100, score: 36, e: 0.11
episode: 27/100, score: 31, e: 

## Evaluate our trained model in playing Cartpole game using Deep Neural Network
---


In [0]:
env = wrap_env(gym.make('CartPole-v0'))

state = env.reset()
state = np.reshape(state, [1, state_size])
total_reward =0
#load a trained model to decide proper action
agent.load("cartpole-dqn_episode_30.h5")
while True:
  #sample action from deep neural network
  action = agent.act(state)
  
  next_state, reward, done, _ = env.step(action)
  next_state = np.reshape(next_state, [1, state_size])
  state = next_state
  total_reward+=reward
  if done:
    break
print("total_reward: ",total_reward)
env.close()
show_video()

In [0]:
ll

total 4
drwxr-xr-x 1 root 4096 Nov 27 22:38 [0m[01;34msample_data[0m/


# Well done!
You have finished the first tutorial!

---



Please share your Colab Notebook to the email tienmanhptit1994@gmail.com to submit your solution.