In [1]:
import numpy as np
import cv2
from PIL import Image
import pickle
import matplotlib.pyplot as plt
from matplotlib import style

from stable_baselines3.common.env_checker import check_env
import gym
from gym import spaces
from stable_baselines3 import DQN, A2C
from stable_baselines3.common.evaluation import evaluate_policy

style.use("ggplot")

In [2]:
class Cube:
    def __init__(self, size) -> None:
        self.size = size
        self.x = np.random.randint(0, self.size)
        self.y = np.random.randint(0, self.size)

    def __str__(self) -> str:
        return f"{self.x}, {self.y}"

    def __sub__(self, __o: object):
        return (self.x - __o.x, self.y - __o.y)

    def __eq__(self, __o: object) -> bool:
        return self.x == __o.x and self.y == __o.y

    def action(self, choice):
        if choice == 0:
            self.move(x=1, y=1)
        elif choice == 1:
            self.move(x=-1, y=1)
        elif choice == 2:
            self.move(x=1, y=-1)
        elif choice == 3:
            self.move(x=-1, y=-1)
        elif choice == 4:
            self.move(x=0, y=1)
        elif choice == 5:
            self.move(x=0, y=-1)
        elif choice == 6:
            self.move(x=1, y=0)
        elif choice == 7:
            self.move(x=-1, y=0)
        elif choice == 8:
            self.move(x=0, y=0)

    def move(self, x=False, y=False):
        if not x:
            self.x += np.random.randint(-1, 2)
        else:
            self.x += x
        if not y:
            self.y += np.random.randint(-1, 2)
        else:
            self.y += y

        if self.x < 0:
            self.x = 0
        elif self.x >= self.size:
            self.x = self.size - 1
        if self.y < 0:
            self.y = 0
        elif self.y >= self.size:
            self.y = self.size - 1

In [3]:
class envCube(gym.Env):
    # SIZE = 10
    SIZE = 20
    OBSERVATION_SPACE_VALUES = (SIZE, SIZE, 3)
    ACTION_SPACE_VALUES = 9
    RETURN_IMAGE = False

    FOOD_REWARD = 25
    ENEMY_PENALITY = -300
    MOVE_PENALITY = -1

    d = {
        1: (255, 0, 0),  # blue
        2: (0, 255, 0),  # green
        3: (0, 0, 255),  # red
    }

    PLAYER_N = 1
    FOOD_N = 2
    ENEMY_N = 3

    # metadata = {"render.modes": ["human"]}

    def __init__(self):
        super(envCube, self).__init__()
        self.action_space = spaces.Discrete(self.ACTION_SPACE_VALUES)
        # self.observation_space = spaces.Box(low=0, high=255, shape=(N_CHANNELS, HEIGHT, WIDTH), dtype=np.uint8)
        self.observation_space = spaces.Box(low=-self.SIZE + 1, high=self.SIZE - 1, shape=(4,), dtype=int)

    def reset(self):
        self.player = Cube(self.SIZE)
        self.food = Cube(self.SIZE)
        while self.food == self.player:
            self.food = Cube(self.SIZE)

        self.enemy = Cube(self.SIZE)
        while self.enemy == self.player or self.enemy == self.food:
            self.enemy = Cube(self.SIZE)

        if self.RETURN_IMAGE:
            observation = np.array(self.get_image())
        else:
            observation = (self.player - self.food) + (self.player - self.enemy)

        self.episode_step = 0

        return np.array(observation)

    def step(self, action):
        self.episode_step += 1
        self.player.action(action)
        self.food.move()
        self.enemy.move()

        if self.RETURN_IMAGE:
            new_observation = np.array(self.get_image())
        else:
            new_observation = (self.player - self.food) + (self.player - self.enemy)

        if self.player == self.food:
            reward = self.FOOD_REWARD
        elif self.player == self.enemy:
            reward = self.ENEMY_PENALITY
        else:
            reward = self.MOVE_PENALITY

        done = False
        if self.player == self.food or self.player == self.enemy or self.episode_step >= 200:
            done = True
        info = {}
        return np.array(new_observation), reward, done, info

    def render(self, mode):
        img = self.get_image()
        img = img.resize((400, 400))
        cv2.imshow("Predator", np.array(img))
        if self.player == self.food or self.player == self.enemy or self.episode_step >= 200:
            cv2.waitKey(1500)
        else:
            cv2.waitKey(300)

    def get_image(self):
        env = np.zeros((self.SIZE, self.SIZE, 3), dtype=np.uint8)
        env[self.food.x][self.food.y] = self.d[self.FOOD_N]
        env[self.player.x][self.player.y] = self.d[self.PLAYER_N]
        env[self.enemy.x][self.enemy.y] = self.d[self.ENEMY_N]
        img = Image.fromarray(env, "RGB")
        return img


In [4]:
env = envCube()


# It will check your custom environment and output additional warnings if needed
check_env(env)


print(env.action_space)
print(env.action_space.sample())
print(env.observation_space)
print(env.observation_space.sample())

  from .autonotebook import tqdm as notebook_tqdm


Discrete(9)
3
Box([-19 -19 -19 -19], [19 19 19 19], (4,), int32)
[ -5 -19  17   9]


In [6]:
from stable_baselines3 import DQN, A2C
from stable_baselines3.common.evaluation import evaluate_policy
import torch
import os

os.environ["KMP_DUPLICATE_LIB_OK"] = "True"

model = A2C(
    "MlpPolicy",
    env,
    verbose=1,
    tensorboard_log="./logs",
    learning_rate=5e-4,
    # policy_kwargs={'net_arch':[256,256]},
    policy_kwargs={"net_arch": [32, dict(vf=[32], pi=[16])], "activation_fn": torch.nn.ReLU},
)

print(env.action_space)

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Discrete(9)


In [7]:
model.policy

ActorCriticPolicy(
  (features_extractor): FlattenExtractor(
    (flatten): Flatten(start_dim=1, end_dim=-1)
  )
  (mlp_extractor): MlpExtractor(
    (shared_net): Sequential(
      (0): Linear(in_features=4, out_features=32, bias=True)
      (1): ReLU()
    )
    (policy_net): Sequential(
      (0): Linear(in_features=32, out_features=16, bias=True)
      (1): ReLU()
    )
    (value_net): Sequential(
      (0): Linear(in_features=32, out_features=32, bias=True)
      (1): ReLU()
    )
  )
  (action_net): Linear(in_features=16, out_features=9, bias=True)
  (value_net): Linear(in_features=32, out_features=1, bias=True)
)

In [8]:
model.learn(total_timesteps=int(500000), tb_log_name="A2C_Net32x32x16_50W_SIZE20")

model.save("A2C_Net32x32x16_50W_SIZE20")
del model

Logging to ./logs\A2C_Net32x32x16_20W_SIZE20_2
------------------------------------
| rollout/              |          |
|    ep_len_mean        | 138      |
|    ep_rew_mean        | -130     |
| time/                 |          |
|    fps                | 894      |
|    iterations         | 100      |
|    time_elapsed       | 0        |
|    total_timesteps    | 500      |
| train/                |          |
|    entropy_loss       | -2.18    |
|    explained_variance | -4.24    |
|    learning_rate      | 0.0005   |
|    n_updates          | 99       |
|    policy_loss        | -2.66    |
|    value_loss         | 8.87     |
------------------------------------
------------------------------------
| rollout/              |          |
|    ep_len_mean        | 162      |
|    ep_rew_mean        | -208     |
| time/                 |          |
|    fps                | 913      |
|    iterations         | 200      |
|    time_elapsed       | 1        |
|    total_timesteps    | 10

In [5]:
model = A2C.load("A2C_Net32x32x16_50W_SIZE20", env=env)

model.policy

Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


ActorCriticPolicy(
  (features_extractor): FlattenExtractor(
    (flatten): Flatten(start_dim=1, end_dim=-1)
  )
  (mlp_extractor): MlpExtractor(
    (shared_net): Sequential(
      (0): Linear(in_features=4, out_features=32, bias=True)
      (1): ReLU()
    )
    (policy_net): Sequential(
      (0): Linear(in_features=32, out_features=16, bias=True)
      (1): ReLU()
    )
    (value_net): Sequential(
      (0): Linear(in_features=32, out_features=32, bias=True)
      (1): ReLU()
    )
  )
  (action_net): Linear(in_features=16, out_features=9, bias=True)
  (value_net): Linear(in_features=32, out_features=1, bias=True)
)

In [6]:
mean_reward, std_reward = evaluate_policy(
    model,
    model.get_env(),
    deterministic=False,
    render=True,
    n_eval_episodes=10,
)

print(mean_reward, std_reward)


In [None]:
n_eposide = 10

for ep in range(n_eposide):
    obs = env.reset()
    done = False
    rewards = 0
    while not done:
        # action = env.action_space.sample()
        action, _state = model.predict(obs, deterministic=True)
        obs, reward, done, info = env.step(action)
        env.render()
        rewards += reward

    print(rewards)

12
11
19
8
-308
13
24
13
22
8
