In [1]:
import numpy as np
import gym
import time

from keras.models import Sequential
from keras.layers import Dense, Activation, Flatten
from keras.optimizers import Adam

from gym import Env, spaces
from gym.utils import seeding
from rl.agents.dqn import DQNAgent
from rl.policy import BoltzmannQPolicy
from rl.memory import SequentialMemory

Using TensorFlow backend.


In [67]:
class TestEnv(Env):
    metadata = {'render.modes': ['human', 'ansi']}

    def __init__(self) -> None:
        self.__max_step = 9
        
        # 0-8分别代表此次落子位置
        self.action_space = spaces.Discrete(9)
        # 初始全零，下标0为：1、2分别代表黑白两方（1黑方先行），下标1-9为：0空、1黑方、2白方
        self.observation_space = spaces.Box(
            np.zeros(10, int), np.zeros(10, int) + 2)
        # 代表AI所属方（1或2）
        self.ai = None
        self._seed()
        self.state = None
        self.__step = None

    def _seed(self, seed=None):
        # 随机AI所属方
        self.ai = np.random.choice([1, 2])
    
    def _reset(self):
        self.__step = 0
        self.state = [self.ai]
        self.state.extend(np.zeros(9, int).tolist())
        return np.array(self.state)

    def _step(self, action):
        grid = self.state[1:]
        # 不可重复落子
        if grid[action] != 0:
            return np.array(self.state), -100, True, {}
        # 黑方先手
        grid = np.array(grid, int)
        dif = np.sum(grid == 1) - np.sum(grid == 2)
        if dif == 0:
            current = 1
        elif dif == 1:
            current = 2
        else:
            return np.array(self.state), -100, True, {}
        # 胜负
        grid[action] = current
        self.state = [self.ai]
        self.state.extend(grid.tolist())
        is_win = lambda b: b[0:3].all() or b[3:6].all() or b[6:9].all() or \
            b[0::3].all() or b[1::3].all() or b[2::3].all() or \
            b[0::4].all() or b[2:7:2].all()
        if is_win(grid == current):
            if self.ai == current:
                reward = 10
            else:
                reward = -10
            return np.array(self.state), reward, True, {}

        reward = 0
        self.__step += 1
        if self.__step < self.__max_step:
            done = False
        else:
            done = True
        return np.array(self.state), reward, done, {}

    def _render(self, mode='ansi', close=False):
        print(self.state[0], np.array(self.state[1:], int).reshape(3, 3))

    def _close(self):
        super()._close()

In [72]:
env = TestEnv()
env.reset()
nb_actions = env.action_space.n

2 [[2 2 1]
 [1 2 1]
 [0 2 1]]


In [69]:
env.state

[2, 0, 0, 0, 0, 0, 0, 0, 0, 0]

In [71]:
for i in range(10):
    action = env.action_space.sample()
    observation, reward, done, info = env.step(action)
    print(action, reward, done, info)

2 -10 True {}
3 -100 True {}
8 -100 True {}
1 -100 True {}
3 -100 True {}
3 -100 True {}
3 -100 True {}
7 10 True {}
0 -100 True {}
1 -100 True {}


In [74]:
model = Sequential()
model.add(Flatten(input_shape=(1,) + env.observation_space.shape))
model.add(Dense(16))
model.add(Activation('relu'))
model.add(Dense(16))
model.add(Activation('relu'))
model.add(Dense(nb_actions))
model.add(Activation('linear'))
print(model.summary())

_________________________________________________________________
Layer (type)                 Output Shape              Param #   
flatten_4 (Flatten)          (None, 10)                0         
_________________________________________________________________
dense_10 (Dense)             (None, 16)                176       
_________________________________________________________________
activation_10 (Activation)   (None, 16)                0         
_________________________________________________________________
dense_11 (Dense)             (None, 16)                272       
_________________________________________________________________
activation_11 (Activation)   (None, 16)                0         
_________________________________________________________________
dense_12 (Dense)             (None, 9)                 153       
_________________________________________________________________
activation_12 (Activation)   (None, 9)                 0         
Total para

In [None]:
memory = SequentialMemory(limit=500, window_length=1)
policy = BoltzmannQPolicy()
dqn = DQNAgent(model=model, nb_actions=nb_actions, memory=memory, nb_steps_warmup=10,
               target_model_update=1e-2, policy=policy)
dqn.compile(Adam(lr=1e-3), metrics=['mae'])

In [None]:
dqn.fit(env, nb_steps=500, visualize=False, verbose=2)

Training for 500 steps ...


InternalError: Blas GEMM launch failed : a.shape=(1, 10), b.shape=(10, 16), m=1, n=16, k=10
	 [[Node: dense_1/MatMul = MatMul[T=DT_FLOAT, transpose_a=false, transpose_b=false, _device="/job:localhost/replica:0/task:0/gpu:0"](flatten_1/Reshape, dense_1/kernel/read)]]

Caused by op 'dense_1/MatMul', defined at:
  File "D:\Anaconda3\lib\runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "D:\Anaconda3\lib\runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "D:\Anaconda3\lib\site-packages\ipykernel_launcher.py", line 16, in <module>
    app.launch_new_instance()
  File "D:\Anaconda3\lib\site-packages\traitlets\config\application.py", line 658, in launch_instance
    app.start()
  File "D:\Anaconda3\lib\site-packages\ipykernel\kernelapp.py", line 477, in start
    ioloop.IOLoop.instance().start()
  File "D:\Anaconda3\lib\site-packages\zmq\eventloop\ioloop.py", line 177, in start
    super(ZMQIOLoop, self).start()
  File "D:\Anaconda3\lib\site-packages\tornado\ioloop.py", line 888, in start
    handler_func(fd_obj, events)
  File "D:\Anaconda3\lib\site-packages\tornado\stack_context.py", line 277, in null_wrapper
    return fn(*args, **kwargs)
  File "D:\Anaconda3\lib\site-packages\zmq\eventloop\zmqstream.py", line 440, in _handle_events
    self._handle_recv()
  File "D:\Anaconda3\lib\site-packages\zmq\eventloop\zmqstream.py", line 472, in _handle_recv
    self._run_callback(callback, msg)
  File "D:\Anaconda3\lib\site-packages\zmq\eventloop\zmqstream.py", line 414, in _run_callback
    callback(*args, **kwargs)
  File "D:\Anaconda3\lib\site-packages\tornado\stack_context.py", line 277, in null_wrapper
    return fn(*args, **kwargs)
  File "D:\Anaconda3\lib\site-packages\ipykernel\kernelbase.py", line 283, in dispatcher
    return self.dispatch_shell(stream, msg)
  File "D:\Anaconda3\lib\site-packages\ipykernel\kernelbase.py", line 235, in dispatch_shell
    handler(stream, idents, msg)
  File "D:\Anaconda3\lib\site-packages\ipykernel\kernelbase.py", line 399, in execute_request
    user_expressions, allow_stdin)
  File "D:\Anaconda3\lib\site-packages\ipykernel\ipkernel.py", line 196, in do_execute
    res = shell.run_cell(code, store_history=store_history, silent=silent)
  File "D:\Anaconda3\lib\site-packages\ipykernel\zmqshell.py", line 533, in run_cell
    return super(ZMQInteractiveShell, self).run_cell(*args, **kwargs)
  File "D:\Anaconda3\lib\site-packages\IPython\core\interactiveshell.py", line 2717, in run_cell
    interactivity=interactivity, compiler=compiler, result=result)
  File "D:\Anaconda3\lib\site-packages\IPython\core\interactiveshell.py", line 2821, in run_ast_nodes
    if self.run_code(code, result):
  File "D:\Anaconda3\lib\site-packages\IPython\core\interactiveshell.py", line 2881, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-63-2f4eacf0178e>", line 3, in <module>
    model.add(Dense(16))
  File "D:\Anaconda3\lib\site-packages\keras\models.py", line 469, in add
    output_tensor = layer(self.outputs[0])
  File "D:\Anaconda3\lib\site-packages\keras\engine\topology.py", line 596, in __call__
    output = self.call(inputs, **kwargs)
  File "D:\Anaconda3\lib\site-packages\keras\layers\core.py", line 838, in call
    output = K.dot(inputs, self.kernel)
  File "D:\Anaconda3\lib\site-packages\keras\backend\tensorflow_backend.py", line 978, in dot
    out = tf.matmul(x, y)
  File "D:\Anaconda3\lib\site-packages\tensorflow\python\ops\math_ops.py", line 1816, in matmul
    a, b, transpose_a=transpose_a, transpose_b=transpose_b, name=name)
  File "D:\Anaconda3\lib\site-packages\tensorflow\python\ops\gen_math_ops.py", line 1217, in _mat_mul
    transpose_b=transpose_b, name=name)
  File "D:\Anaconda3\lib\site-packages\tensorflow\python\framework\op_def_library.py", line 767, in apply_op
    op_def=op_def)
  File "D:\Anaconda3\lib\site-packages\tensorflow\python\framework\ops.py", line 2506, in create_op
    original_op=self._default_original_op, op_def=op_def)
  File "D:\Anaconda3\lib\site-packages\tensorflow\python\framework\ops.py", line 1269, in __init__
    self._traceback = _extract_stack()

InternalError (see above for traceback): Blas GEMM launch failed : a.shape=(1, 10), b.shape=(10, 16), m=1, n=16, k=10
	 [[Node: dense_1/MatMul = MatMul[T=DT_FLOAT, transpose_a=false, transpose_b=false, _device="/job:localhost/replica:0/task:0/gpu:0"](flatten_1/Reshape, dense_1/kernel/read)]]


In [29]:
env.reset()
dqn.test(env)
print(env.state_list)

Testing for 1 episodes ...
Episode 1: reward: 10.000, steps: 10
[1, 1, 0, 1, 1, 0, 1, 1, 0, 1, 1, 0]


In [64]:
env.reset()

array([1, 1])

In [73]:
np.argmax(model.predict(env.state.reshape(1, 1, 2)))

1

In [72]:
env.step(0)

(array([1, 0]), 1, False, {})

In [53]:
model.get_layer(index=0).input

<tf.Tensor 'flatten_2_input:0' shape=(?, 1, 2) dtype=float32>

In [76]:
dqn.test(env, 2, visualize=True)

Testing for 2 episodes ...
[0, 0, 0]
[0, 0, 0, 0]
[0, 0, 0, 0, 0]
[0, 0, 0, 0, 0, 0]
[0, 0, 0, 0, 0, 0, 0]
[0, 0, 0, 0, 0, 0, 0, 0]
[0, 0, 0, 0, 0, 0, 0, 0, 0]
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
Episode 1: reward: 10.000, steps: 10
[1, 1, 0]
[1, 1, 0, 1]
[1, 1, 0, 1, 1]
[1, 1, 0, 1, 1, 0]
[1, 1, 0, 1, 1, 0, 1]
[1, 1, 0, 1, 1, 0, 1, 1]
[1, 1, 0, 1, 1, 0, 1, 1, 0]
[1, 1, 0, 1, 1, 0, 1, 1, 0, 1]
[1, 1, 0, 1, 1, 0, 1, 1, 0, 1, 1]
[1, 1, 0, 1, 1, 0, 1, 1, 0, 1, 1, 0]
Episode 2: reward: 10.000, steps: 10


<keras.callbacks.History at 0x25d88f546d8>