In [None]:
import nbimporter
nbimporter.options['only_defs'] = False
from pacman_game import Action, initialize_gamestate_from_file, get_next_game_state_from_action, ActionEvent
from collections import deque
import numpy as np
from copy import deepcopy
import tensorflow as tf
config = tf.compat.v1.ConfigProto()
# config.gpu_options.allow_growth = True
config.gpu_options.per_process_gpu_memory_fraction = 0.02
from tensorflow.compat.v1.keras.backend import set_session
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import SGD

set_session(tf.compat.v1.Session(config=config))

In [None]:
def calculate_reward_for_move(action_event):
    if action_event == ActionEvent.DOT:
        return 1
    elif action_event == ActionEvent.CAPTURED_BY_GHOST:
        return -5
    elif action_event == ActionEvent.NONE:
        return -0.1
    elif action_event == ActionEvent.WALL:
        return -0.1
    elif action_event == ActionEvent.WON:
        return 10
    elif action_event == ActionEvent.LOST:
        return -10
    return 0

In [None]:
def convert_state_to_input(state):
    string_rep = state.__str__()
    r = np.array([])

    for char in string_rep:
        if char == 'o':
            r = np.concatenate([r, [0, 0, 0, 0, 1]])
        if char == ' ':
            r = np.concatenate([r, [0, 0, 0, 1, 0]])
        if char == 'P':
            r = np.concatenate([r, [0, 0, 1, 0, 0]])
        if char == 'G':
            r = np.concatenate([r, [0, 1, 0, 0, 0]])
        if char == '.':
            r = np.concatenate([r, [1, 0, 0, 0, 0]])

    return r.reshape(1, r.size)

In [None]:
## DEFINE MODEL ##
level = 'level-2'
initial_game_state = initialize_gamestate_from_file(level)

input_size = convert_state_to_input(initial_game_state).size
num_actions = len(Action.get_all_actions())

model = Sequential()
model.add(Dense(256, input_shape=(input_size,), activation='relu'))
model.add(Dense(128, activation='relu'))
model.add(Dense(num_actions))
model.compile(SGD(lr=.01), "mse")


In [None]:
def pick_optimal_action(state):
    q = model.predict(convert_state_to_input(state))
    return Action.get_all_actions()[np.argmax(q[0])]

In [None]:
def pick_action(game_state):
    exploration_prob = 0.20
    if exploration_prob > np.random.rand():
        # Explore
        return np.random.choice(Action.get_all_actions())
    else:
        # Exploit
        return pick_optimal_action(game_state)

In [None]:
class Memory:
    def __init__(self, max_size):
        self.memory = deque(maxlen=max_size)

    def add(self, experience):
        self.memory.append(experience)

    def get(self, index):
        return self.memory[index]

    def get_mini_batch(self, batch_size):
        memory_size = self.get_size()
        indices = np.random.choice(np.arange(memory_size), min(batch_size, memory_size), replace=False)
        return [self.memory[i] for i in indices]

    def get_size(self):
        return len(self.memory)


In [None]:

class Experience:

    def __init__(self, current_state, action, reward, next_state, done: bool):
        """
        Args:
            done (bool):
            current_state (GameState):
            action (Action):
            reward (int):
            next_state (GameState):
        """
        self.current_state = current_state
        self.action = action
        self.reward = reward
        self.next_state = next_state
        self.done = done


In [None]:
def train(level, num_training_episodes, batch_size, gamma=0.9):
    print("Start training")
    
    initial_game_state = initialize_gamestate_from_file(level)
    tot_loss = {}
    memory = Memory(max_size=5000)

    for i in range(1, num_training_episodes):
        print("\nEpisode number", i)
        
        loss = 0.
        num_episode_steps = 0

        done = False
        current_game_state = deepcopy(initial_game_state)

        while not done:
            if num_episode_steps > 500:
                break

            action = pick_action(current_game_state)
            next_game_state, action_event = get_next_game_state_from_action(current_game_state, action.name)

            if action_event == ActionEvent.WON or action_event == ActionEvent.LOST:
                done = True
                if action_event == ActionEvent.WON:
                    print("Won!!")
                else:
                    print("Lost")

            reward = calculate_reward_for_move(action_event)

            experience = Experience(
                current_state=convert_state_to_input(current_game_state),
                action=action,
                reward=reward,
                next_state=convert_state_to_input(next_game_state),
                done=done
            )
            memory.add(experience)

            batch = memory.get_mini_batch(batch_size=batch_size)

            # Dimensions of our observed states, ie, the input to our model.
            input_dim = batch[0].current_state.shape[1]
            x_train = np.zeros((min(memory.get_size(), batch_size), input_dim))
            y_train = np.zeros((x_train.shape[0], len(Action.get_all_actions())))  # Target Q-value

            sample: Experience
            for j, sample in enumerate(batch):
                y_target = model.predict(sample.current_state)[0]

                x_train[j:j + 1] = sample.current_state
                if sample.done:
                    y_target[sample.action.value] = sample.reward
                else:
                    y_target[sample.action.value] = sample.reward + gamma * np.max(model.predict(sample.next_state))
                y_train[j] = y_target

            batch_loss = model.train_on_batch(x_train, np.asarray(y_train))

            loss += batch_loss

            num_episode_steps += 1

            current_game_state = deepcopy(next_game_state)

        print("Number of moves:", num_episode_steps)
        print("Loss:", loss)
        print("Loss per step/move:", loss / num_episode_steps)

        tot_loss[i] = (loss / num_episode_steps)

    print("\nFinished training")
    print("\nTotal loss in each episode\n", tot_loss)

    # plot_training_history(tot_loss)

    print("Saving model...")
    
    model_path = "nn_model.h5"
    model.save('./' + model_path)
    
    print("Model saved to " + model_path)

In [None]:
train(level=level, num_training_episodes=3, batch_size=100)