In [1]:
import tensorflow as tf

from tensorflow.keras import datasets, layers, models
import matplotlib.pyplot as plt
import numpy as np
import glob, re, sys, random, time
from collections import deque

np.set_printoptions(threshold=sys.maxsize)

In [2]:
NUM_ACTIONS = 5

In [3]:
from tensorflow.keras.optimizers import Adam, RMSprop

class DQN:
    def __init__(self):
        self.memory  = deque(maxlen=50000)
        
        self.gamma = 0.999
        self.epsilon = 1
        self.epsilon_min = 0.1
        self.epsilon_decay = 0.999999
        self.learning_rate = 0.005

        self.model        = self.create_model()
        self.target_model = self.create_model()

    def create_model(self):
        model = models.Sequential()
        model.add(layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 2)))
        model.add(layers.MaxPooling2D((2, 2)))
        model.add(layers.Conv2D(64, (3, 3), activation='relu'))
        model.add(layers.MaxPooling2D((2, 2)))
        model.add(layers.Conv2D(64, (3, 3), activation='relu'))
        model.add(layers.Dropout(0.4))
        model.add(layers.Flatten())
        model.add(layers.Dense(64, activation='relu'))
        model.add(layers.Dense(NUM_ACTIONS))
        model.compile(loss="mean_squared_error", 
            optimizer=Adam(lr=self.learning_rate))
        return model

    def act(self, state):
        self.epsilon *= self.epsilon_decay
        self.epsilon = max(self.epsilon_min, self.epsilon)
        if np.random.random() < self.epsilon:
            return np.random.randint(0, NUM_ACTIONS)
        return np.argmax(self.model.predict(state)[0])

    def remember(self, state, action, reward, new_state, done):
        self.memory.append([state, action, reward, new_state, done])

    def replay(self):
        batch_size = 32
        if len(self.memory) < batch_size: 
            return
        
        samples = np.array(random.sample(self.memory, batch_size))
        
        state = np.stack(samples[:,0])
        action = samples[:,1].astype(int)
        reward = samples[:,2]
        new_state = np.stack(samples[:,3])
        target = self.target_model.predict(state)
        terminal = np.array(samples[:,4])
        Q_future = np.max(self.target_model.predict(new_state), axis=1)

        terminal_indexes = np.nonzero(terminal)
        result = reward + Q_future * self.gamma
        result[terminal_indexes] = reward[terminal_indexes]

        target[np.arange(len(target)), action] = result
        self.model.fit(state, target, epochs=1, verbose=False)


    def target_train(self):
        weights = self.model.get_weights()
        target_weights = self.target_model.get_weights()
        for i in range(len(target_weights)):
            target_weights[i] = weights[i]
        self.target_model.set_weights(target_weights)

    def save_model(self, fn):
        self.model.save(fn)

    def load_model(self, fn):
        self.model.load_weights(fn)
        self.target_model.load_weights(fn)

In [4]:
NUM_CLASSES = 8

def create_critic_model():
    model = models.Sequential()
    model.add(layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1)))
    model.add(layers.MaxPooling2D((2, 2)))
    model.add(layers.Conv2D(64, (3, 3), activation='relu'))
    model.add(layers.MaxPooling2D((2, 2)))
    model.add(layers.Conv2D(64, (3, 3), activation='relu'))
    model.add(layers.Dropout(0.4))
    model.add(layers.Flatten())
    model.add(layers.Dense(64, activation='relu'))
    model.add(layers.Dense(NUM_CLASSES, activation="softmax"))
    model.compile(optimizer='adam',
                  loss="categorical_crossentropy",
                  metrics=['accuracy'])
    return model

critic = create_critic_model()
critic.load_weights("models/thres-classifier.h5")

In [None]:
def save_canvas(canvas, trial):
    plt.imshow(canvas, cmap="gray")
    plt.savefig(f"output/out-thres-2-{trial}.png")
    plt.clf()

trials  = 2000
trial_len = 800

dqn_agent = DQN()
dqn_agent.load_model("models/bootstrap-thres.h5")
steps = []
for trial in range(trials):
    canvas = np.zeros((28, 28))
    pos = np.random.randint(0,28,size=(2))
    brush_map = np.zeros((28, 28))
    brush_map[pos[0], pos[1]] = 1
    
    reward_sum = 0
    reward_num = 0
    
    for step in range(trial_len):
        old_canvas = canvas.copy()
        old_map = brush_map.copy()
        old_state = np.dstack([old_canvas, old_map])
        
        action = dqn_agent.act(old_state.reshape(1,28,28,2))
        
        if action == 0:
            if pos[1] < 27:
                pos = pos + [0, 1]
        elif action == 1:
            if pos[1] > 0:
                pos = pos - [0, 1]
        elif action == 2:
            if pos[0] < 27:
                pos = pos + [1, 0]
        elif action == 3:
            if pos[0] > 0:
                pos = pos - [1, 0]
        elif action == 4:
            canvas[pos[0], pos[1]] = 1
            
        new_map = np.zeros((28,28))
        new_map[pos[0], pos[1]] = 1
        new_state = np.dstack([canvas, new_map])
        
        probs = critic.predict(canvas.reshape(1,28,28,1))[0]
        
        reward = probs[6]/(np.sum(probs)+0.0000000001)
        
        # border penalty if the painter goes outside the canvas
        reward -= 100*(np.sum(canvas[0])+np.sum(canvas[27])+np.sum(canvas[:,0])+np.sum(canvas[:,27]))
        
        reward_sum += reward
        reward_num += 1
        
        dqn_agent.remember(old_state, action, reward, new_state, False)
        dqn_agent.replay()
        
        if step % 20 == 0:
            dqn_agent.target_train()

        brush_map = new_map
    
    print("completed trial", trial, "avg_r", reward_sum/reward_num, "epsilon", dqn_agent.epsilon)
    
    save_canvas(canvas, trial)
    
    if trial % 50 == 0:
        print("saving model")
        dqn_agent.save_model(f"models/painter-thres-2-{trial}.h5")