In [1]:
import gym
import numpy as np
import matplotlib.pyplot as plt
import keras

%matplotlib inline

Using TensorFlow backend.
  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])
  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])


In [2]:
env = gym.make('BreakoutDeterministic-v4')

env.reset()
is_done = False

# Downscaling the game

def to_grayscale(img):
    return np.mean(img, axis=2).astype(np.uint8)

def downsample(img):
    return img[::2, ::2]

def preprocess(img):
    return to_grayscale(downsample(img))

# Playing a trial game

while not is_done:
    action = env.action_space.sample()
    frame, reward, is_done, info = env.step(action)
    img = preprocess(frame)
    env.render()
    
env.close()

# As of now, useless (we can define our loss function later)

def transform_reward(reward):
    return np.sign()

In [3]:
ATARI_SHAPE = (105, 80, 4)
n_actions = 4

# Defining our deep CNN model

# With the functional API we need to define the inputs.
frames_input = keras.layers.Input(ATARI_SHAPE, name='frames')
actions_input = keras.layers.Input((n_actions,), name='mask')

# Assuming that the input frames are still encoded from 0 to 255. Transforming to [0, 1].
normalized = keras.layers.Lambda(lambda x: x / 255.0)(frames_input)

# "The first hidden layer convolves 16 8×8 filters with stride 4 with the input image and applies a rectifier nonlinearity."
conv_1 = keras.layers.convolutional.Convolution2D(
    16, 8, 8, subsample=(4, 4), activation='relu'
)(normalized)
# "The second hidden layer convolves 32 4×4 filters with stride 2, again followed by a rectifier nonlinearity."
conv_2 = keras.layers.convolutional.Convolution2D(
    32, 4, 4, subsample=(2, 2), activation='relu'
)(conv_1)
# Flattening the second convolutional layer.
conv_flattened = keras.layers.core.Flatten()(conv_2)
# "The final hidden layer is fully-connected and consists of 256 rectifier units."
hidden = keras.layers.Dense(256, activation='relu')(conv_flattened)
# "The output layer is a fully-connected linear layer with a single output for each valid action."
output = keras.layers.Dense(n_actions)(hidden)
# Finally, we multiply the output by the mask!
filtered_output = keras.layers.Multiply()([output, actions_input])

model = keras.models.Model(input=[frames_input, actions_input], output=filtered_output)
optimizer=keras.optimizers.RMSprop(lr=0.00025, rho=0.95, epsilon=0.01)
model.compile(optimizer, loss='mse')

model.summary()

Model: "model_1"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
frames (InputLayer)             (None, 105, 80, 4)   0                                            
__________________________________________________________________________________________________
lambda_1 (Lambda)               (None, 105, 80, 4)   0           frames[0][0]                     
__________________________________________________________________________________________________
conv2d_1 (Conv2D)               (None, 25, 19, 16)   4112        lambda_1[0][0]                   
__________________________________________________________________________________________________
conv2d_2 (Conv2D)               (None, 11, 8, 32)    8224        conv2d_1[0][0]                   
____________________________________________________________________________________________

  del sys.path[0]


In [4]:
# Training the model

import random

episode = 100

for i in range(episode):
    print(i)
    state = env.reset()
    is_done = False
    while not is_done:
        action = 0
        state = preprocess(state)
        state = np.resize(state, (1, 105, 80, 4))
        if random.random() < 0.5:
            action = env.action_space.sample()
        else:
            Q_values = model.predict([state, np.ones(4).reshape(1, 4)])
            action = np.argmax(Q_values[0])
        new_state, reward, is_done, info = env.step(action)
        Q_values = model.predict([state, np.ones(4).reshape(1, 4)])
        if is_done:
            Q_values[0][action] = -1
        Q_values[0] = reward + 0.99 * np.max(Q_values[0])
        target = np.ones(4).reshape(4) * Q_values[:]
        model.fit(
            [state, np.ones(4).reshape(1, 4)], target,
            nb_epoch = 1, batch_size = 1, verbose = 0
        )
#         env.render()
        
    env.close()
    
# Testing the model
    
state = env.reset()
is_done = False
while not is_done:
    action = 0
    state = preprocess(state)
    state = np.resize(state, (1, 105, 80, 4))
    if random.random() < 0.5:
        action = env.action_space.sample()
    else:
        Q_values = model.predict([state, np.ones(4).reshape(1, 4)])
        action = np.argmax(Q_values[0])
    new_state, reward, is_done, info = env.step(action)
    Q_values = model.predict([state, np.ones(4).reshape(1, 4)])
    if is_done:
        Q_values[0][action] = -1
    Q_values[0] = reward + 0.99 * np.max(Q_values[0])
    target = np.ones(4).reshape(4) * Q_values[:]
    model.fit(
        [state, np.ones(4).reshape(1, 4)], target,
        nb_epoch = 1, batch_size = 1, verbose = 0
    )
    env.render()

env.close()

0





1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99


