In [1]:
class ZombieEnvironment:
    ACTION_MAPPINGS = {
        0: "movebiteUp",
        1: "movebiteDown",
        2: "movebiteLeft",
        3: "movebiteRight",
    }
    ACTION_SPACE = tuple(range(len(ACTION_MAPPINGS)))
    SIZE = (6, 6)

    def __init__(
        self,
        max_timesteps: int = 300,
        have_enemy_player: bool = True,
        logdir: str = "",
        run_name="",
    ) -> None:
        self.max_timesteps = max_timesteps
        self.reset()
        self.total_timesteps = 0
        self.total_invalid_moves = 0
        self.writer = None
        if logdir != "" and run_name != "":
            self.writer = tf.summary.create_file_writer(f"{logdir}/{run_name}")
        self.have_enemy_player = have_enemy_player

    def reset(self):
        self.board = Board(ZombieEnvironment.SIZE, "Zombie")
        self.board.populate(num_zombies=1)
        self.enemyPlayer = GovernmentPlayer()
        self.done = False

        # coordinates of the first zombie
        self.agentPosition = self.board.indexOf(True)

        # useful for metrics
        self.max_number_of_zombies = 1
        self.episode_invalid_actions = 0
        self.episode_reward = 0
        self.episode_timesteps = 0

        return self._get_obs()

    def step(self, action: int):
        action_name = ZombieEnvironment.ACTION_MAPPINGS[action]

        # first, try to move
        valid, new_pos = self.board.actionToFunction['move' + action_name[8:]](
            self.board.toCoord(self.agentPosition)
        )
        if valid:
            self.agentPosition = new_pos
            action_name = "move"
        else:  # bite variation
            dest_coord = list(self.board.toCoord(self.agentPosition))
            if "Up" in action_name:
                dest_coord[1] -= 1
            elif "Down" in action_name:
                dest_coord[1] += 1
            elif "Right" in action_name:
                dest_coord[0] += 1
            else:
                dest_coord[0] -= 1
            valid, _ = self.board.actionToFunction["bite"](dest_coord)
            if valid:
                action_name = "bite"

        won = None
        # do the opposing player's action if the action was valid.
        if valid:
            _action, coord = self.enemyPlayer.get_move(self.board)
            if not _action:
                self.done = True
                won = True
            else:
                if self.have_enemy_player:
                    self.board.actionToFunction[_action](coord)
            self.board.update()

        # see if the game is over
        if not self.board.States[
            self.agentPosition
        ].person.isZombie:  # zombie was cured
            self.done = True
            won = False
        if not self.board.is_move_possible_at(self.agentPosition):  # no move possible
            self.done = True
        if self.episode_timesteps > self.max_timesteps:
            self.done = True
        if not valid:
            self.done = True

        # get obs, reward, done, info
        obs, reward, done, info = (
            self._get_obs(),
            self._get_reward(action_name, valid, won),
            self._get_done(),
            self._get_info(),
        )

        # update the metrics
        self.episode_reward += reward
        if not valid:
            self.episode_invalid_actions += 1
            self.total_invalid_moves += 1
        self.episode_timesteps += 1
        self.max_number_of_zombies = max(
            self.board.num_zombies(), self.max_number_of_zombies
        )
        self.total_timesteps += 1

        # write the metrics
        if self.writer is not None:
            with self.writer.as_default():
                tf.summary.scalar(
                    "train/invalid_action_rate",
                    self.total_invalid_moves / self.total_timesteps,
                    step=self.total_timesteps,
                )
                tf.summary.scalar("train/cur_reward", reward, step=self.total_timesteps)

        # return the obs, reward, done, info
        return obs, reward, done, info

    def _get_info(self):
        return {}

    def _get_done(self):
        return self.done

    def _get_reward(self, action_name: str, was_valid: bool, won: bool):
        """
        Gonna try to return reward between [-1, 1]
        """
        if not was_valid:
            return -1
        if won is True:
            return 1
        if won is False:
            return -0.5
        if "bite" in action_name:
            return 0.9
        return -0.01  # this is the case where it was move

    def _get_obs(self):
        """
        Is based off the assumption that 5 is not in the returned board.
        Uses 5 as the key for current position.
        """
        AGENT_POSITION_CONSTANT = 5
        ret = self.board.get_board()
        ret[self.agentPosition] = AGENT_POSITION_CONSTANT

        # normalize observation to be be centered at 0
        ret = np.array(ret, dtype=np.float32)
        ret /= np.float32(AGENT_POSITION_CONSTANT)
        ret -= np.float32(0.5)
        return ret

    def render(self):
        import PygameFunctions as PF
        import pygame

        PF.run(self.board)
        pygame.display.update()

    def init_render(self):
        import PygameFunctions as PF
        import pygame

        PF.initScreen(self.board)
        pygame.display.update()

    def close(self):
        import pygame

        pygame.quit()

    def write_run_metrics(self):
        if self.writer is not None:
            with self.writer.as_default():
                tf.summary.scalar(
                    "episode/num_invalid_actions_per_ep",
                    self.episode_invalid_actions,
                    step=self.total_timesteps,
                )
                tf.summary.scalar(
                    "episode/episode_length",
                    self.episode_timesteps,
                    step=self.total_timesteps,
                )
                tf.summary.scalar(
                    "episode/episode_total_reward",
                    self.episode_reward,
                    step=self.total_timesteps,
                )
                tf.summary.scalar(
                    "episode/mean_reward",
                    self.episode_reward / self.episode_timesteps,
                    step=self.total_timesteps,
                )
                tf.summary.scalar(
                    "episode/percent_invalid_per_ep",
                    self.episode_invalid_actions / self.episode_timesteps,
                    step=self.total_timesteps,
                )


In [None]:
ZOMBIE_OUTPUT_SIZE = len(ZombieEnvironment.ACTION_SPACE)
INPUT_SHAPE = (ROWS * COLUMNS,)

In [None]:
def make_zombie_model():
    """
    makes the model that will be used for zombies
    The output of the model will be the predicted q value
    for being in a certain state.
    """
    model = models.Sequential()
    model.add(layers.InputLayer(INPUT_SHAPE))
    model.add(layers.Flatten())
    model.add(layers.Dense(36 * 2))
    model.add(layers.LeakyReLU())
    model.add(layers.Dense(36 * 8))
    model.add(layers.LeakyReLU())
    model.add(layers.Dense(36 * 64))
    model.add(layers.SyncBatchNormalization())
    model.add(layers.LeakyReLU())
    model.add(layers.Dense(8 * 36))
    model.add(layers.LeakyReLU())
    model.add(layers.Dense(ZOMBIE_OUTPUT_SIZE))  # the q values for each action
    model.add(layers.LeakyReLU())
    return model


In [None]:
with tf.device(DEVICE):
    optimizer = keras.optimizers.Adam(0.002)
    loss = keras.losses.Huber()