<a href="https://colab.research.google.com/github/enakai00/colab_rlbook/blob/master/Chapter05/03_Walk_Game_Training.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**[WGT-01]**

Specify the TensorFlow version.

In [1]:
%tensorflow_version 2.x

TensorFlow 2.x selected.


**[WGT-02]**

Import modules.

In [0]:
import numpy as np
import copy, random, time
from tensorflow.keras import layers, models
from IPython.display import clear_output

**[WGT-03]**

Define a function to get the field data.

In [0]:
def get_field():
  field_img = '''
##############
#            #
#            #
#            #
#            #
#            #
#            #
#            #
#            #
#            #
#            #
#            #
#            #
##############
'''
  field = []
  for line in field_img.split('\n'):
    if line == '':
      continue 
    field.append(list(line))

  return field

**[WGT-04]**

Define the Environ class.

In [0]:
class Environ:
  def __init__(self):
    self.action_map = [(0, 1), (1, 0), (0, -1), (-1, 0)]
    self.restart()

  def restart(self):
    self.field = get_field()
    for _ in range(10):
      y = np.random.randint(1, 13)
      x = np.random.randint(1, 13)
      self.field[y][x] = 'x'

  def move(self, s, a):
    x, y = s
    dx, dy = self.action_map[a]
    self.field[y][x] = '+'
    x += dx
    y += dy
    s_new = (x, y)
    if self.field[y][x] != ' ':
      return 0, s_new, True   # Reward, Next state, Game over
    return 1, s_new, False    # Reward, Next state, Game over

  def get_state(self, s):
    x, y = s
    walls = [[0.0 if c == ' ' else 1.0 for c in line] for line in self.field]
    walker = np.zeros((14, 14))
    walker[y][x] = 1.0
    state = np.zeros((14, 14, 2))
    state[:, :, 0] = walls
    state[:, :, 1] = walker
    return state.tolist()

**[WGT-05]**

Define the QValue class.

In [0]:
class QValue:
  def __init__(self):
    self.model = self.build_model()

  def build_model(self):
    cnn_input = layers.Input(shape=(14, 14, 2))
    cnn = layers.Conv2D(8, (5, 5), padding='same', use_bias=True,
                        activation='relu')(cnn_input)
    cnn_flatten = layers.Flatten()(cnn)

    action_input = layers.Input(shape=(4,))

    combined = layers.concatenate([cnn_flatten, action_input])
    hidden1 = layers.Dense(2048, activation='relu')(combined)
    hidden2 = layers.Dense(1024, activation='relu')(hidden1)
    q_value = layers.Dense(1)(hidden2)

    model = models.Model(inputs=[cnn_input, action_input], outputs=q_value)
    model.compile(loss='mse')
    return model

  def get_action(self, state):
    states = []
    actions = []
    for a in range(4):
      states.append(np.array(state))
      action_onehot = np.zeros(4)
      action_onehot[a] = 1
      actions.append(action_onehot)
  
    q_values = self.model.predict([states, actions])
    optimal_action = np.argmax(q_values)
    return optimal_action, q_values[optimal_action][0]

**[WGT-06]**

Define a function to get a single episode.

In [0]:
def get_episode(environ, q_value, epsilon):
  episode = []
  trace = []
  environ.restart()
  s = (np.random.randint(1, 13), np.random.randint(1, 13))

  while True:
    trace.append(s)
    state = environ.get_state(s)
    if np.random.random() < epsilon:
      a = np.random.randint(4)
    else:
      a, _ = q_value.get_action(state)

    r, s_new, game_over = environ.move(s, a)
    if game_over:
      state_new = None
    else:
      state_new = environ.get_state(s_new)
    episode.append((state, a, r, state_new))

    if game_over:
      break
    s = s_new

  return episode, trace

**[WGT-07]**

Define a function to show a sample episode.

In [0]:
  def show_sample(environ, q_value):
    _, trace = get_episode(environ, q_value, epsilon=0)
    display = copy.deepcopy(environ.field)
    display = [[' ' if c == '+' else c for c in line] for line in display]
    for s in trace:
      x, y = s
      display[y][x] = '*'
      time.sleep(0.5)
      clear_output(wait=True)
      for line in display:
        print(''.join(line))
      display[y][x] = '+'

    print('Length: {}'.format(len(trace)))

**[WGT-08]**

Define a function to train the model.

In [0]:
def train(environ, q_value, num):
  experience = []
  for c in range(num):
    print()
    print('Iteration {}'.format(c+1))
    print('Collecting data', end='')
    for n in range(50):
      print('.', end='')
      if n % 10 == 0:
        epsilon = 0
      else:
        epsilon = 0.2
      episode, _ = get_episode(environ, q_value, epsilon)
      experience += episode
    if len(experience) > 10000:
      experience = experience[-10000:]

    if len(experience) < 1000:
      continue

    print()
    print('Training the model...')
    examples = experience[-200:] + random.sample(experience[:-200], 400)
    np.random.shuffle(examples)
    states, actions, labels = [], [], []
    for state, a, r, state_new in examples:
      states.append(np.array(state))
      action_onehot = np.zeros(len(environ.action_map))
      action_onehot[a] = 1
      actions.append(action_onehot)
      if not state_new:   # Terminal state
        q_new = 0
      else:
        _, q_new = q_value.get_action(state_new)
      labels.append(np.array(r + q_new))
    q_value.model.fit([np.array(states), np.array(actions)], np.array(labels),
                      batch_size=50, epochs=100, verbose=0)
    show_sample(environ, q_value)

**[WGT-09]**

Create as Environ instance and a QValue instance.

In [9]:
environ = Environ()
q_value = QValue()
q_value.model.summary()

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 14, 14, 2)]  0                                            
__________________________________________________________________________________________________
conv2d (Conv2D)                 (None, 14, 14, 8)    408         input_1[0][0]                    
__________________________________________________________________________________________________
flatten (Flatten)               (None, 1568)         0           conv2d[0][0]                     
__________________________________________________________________________________________________
input_2 (InputLayer)            [(None, 4)]          0                                            
______________________________________________________________________________________________

**[WGT-10]**

Train the model.

In [10]:
train(environ, q_value, num=50)

##############
#          ++#
#    x ++ +++#
#   x  ++ +++#
#   +++++ +++#
#++++   +++ +#
#+          +#
#++++*      +#
#    x      +#
#x         x+#
#    x   x  +#
#     x      #
#            #
##############
Length: 39


**[WGT-11]**

Show a sample episode using the trained model.

In [12]:
show_sample(environ, q_value)

##############
#            #
#        x   #
#x       x   #
#       x x x#
#       *++ x#
#         +++#
#   ++    +++#
#   ++x   +  #
#   +++++++  #
#            #
#         x  #
#         x  #
##############
Length: 21


**[WGT-12]**

Mount the Google drive on the runtime environment.

In [17]:
from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


**[WGT-13]**

Save the trained model.

In [16]:
q_value.model.save('/content/gdrive/My Drive/walk_game_model.hd5', save_format='h5')
!ls -l '/content/gdrive/My Drive/walk_game_model.hd5'

-rw------- 1 root root 42608640 Mar  5 10:53 '/content/gdrive/My Drive/walk_game_model.hd5'
