In [2]:
import numpy as np
import cv2
from PIL import Image
from matplotlib import style
style.use('ggplot')
from typing import Optional

In [3]:
class Cube:
    def __init__(self,size):
        self.size = size
        self.x = np.random.randint(0, self.size)
        self.y = np.random.randint(0, self.size)

    def __str__(self):
        return f'{self.x},{self.y}'

    def __sub__(self, other):
        return (self.x - other.x, self.y - other.y)

    def __eq__(self, other):
        return self.x == other.x and self.y == other.y

    def action(self, choise):
        if choise == 0:
            self.move(x=1, y=1)
        elif choise == 1:
            self.move(x=-1, y=1)
        elif choise == 2:
            self.move(x=1, y=-1)
        elif choise == 3:
            self.move(x=-1, y=-1)
        elif choise == 4:
            self.move(x=0, y=1)
        elif choise == 5:
            self.move(x=0, y=-1)
        elif choise == 6:
            self.move(x=1, y=0)
        elif choise == 7:
            self.move(x=-1, y=0)
        elif choise == 8:
            self.move(x=0, y=0)

    def move(self, x=False, y=False):
        if not x:
            self.x += np.random.randint(-1, 2)
        else:
            self.x += x
        if not y:
            self.y += np.random.randint(-1, 2)
        else:
            self.y += y

        if self.x < 0:
            self.x = 0
        elif self.x >= self.size:
            self.x = self.size - 1
        if self.y < 0:
            self.y = 0
        elif self.y >= self.size:
            self.y = self.size - 1

In [4]:
import gymnasium as gym
from gymnasium import spaces
from stable_baselines3.common.env_checker import check_env
from typing import Any

In [5]:
class EnvCube(gym.Env):
    SIZE = 20
    OBSERVATION_SPACE_VALUES = (SIZE,SIZE,3)
    ACTION_SPACE_VALUES = 9
    RETURN_IMAGE = False

    FOOD_REWARD = 25
    ENEMY_PENALITY = -300
    MOVE_PENALITY = -1

    d = {1:(255,0,0), #blue
         2:(0,255,0), #green
         3:(0,0,255)} #red

    PLAYER_N = 1
    FOOD_N = 2
    ENEMY_N = 3

    metadata = {'render_modes':['human']}
    def __init__(self):
        super().__init__()
        self.action_space = spaces.Discrete(self.ACTION_SPACE_VALUES)
        self.observation_space = spaces.Box(low=-self.SIZE+1,high=self.SIZE-1,shape=(4,),dtype=int)

    def reset(self, *, seed: int | None = None, options: dict[str, Any] | None = None, ):
        super().reset(seed=seed)
        self.player = Cube(self.SIZE)
        self.food = Cube(self.SIZE)
        while self.food == self.player:
            self.food = Cube(self.SIZE)

        self.enemy = Cube(self.SIZE)
        while self.enemy == self.player or self.enemy == self.food:
            self.enemy = Cube(self.SIZE)

        if self.RETURN_IMAGE:
            observation = np.array(self.get_image())
        else:
            observation = (self.player - self.food) + (self.player - self.enemy)

        self.episode_step = 0
        info = {}
        return np.array(observation),info

    def step(self,action):
        self.episode_step += 1
        self.player.action(action)
        self.food.move()
        self.enemy.move()

        if self.RETURN_IMAGE:
            new_observation = np.array(self.get_image())
        else:
            new_observation = (self.player - self.food) + (self.player - self.enemy)
            new_observation = np.array(new_observation)

        if self.player == self.food:
            reward = self.FOOD_REWARD
        elif self.player == self.enemy:
            reward = self.ENEMY_PENALITY
        else:
            reward = self.MOVE_PENALITY

        terminated = False
        truncated = False
        if self.player == self.food or self.player == self.enemy or self.episode_step >= 200:
            terminated = True
        info = {}
        return new_observation,reward,terminated,truncated,info

    def render(self,mode='human'):
        img = self.get_image()
        img = img.resize((400,400))
        cv2.imshow('Predator',np.array(img))
        if self.player == self.food or self.player == self.enemy or self.episode_step >= 200:
            cv2.waitKey(1500)
        else:
            cv2.waitKey(1)

    def get_image(self):
        env = np.zeros((self.SIZE,self.SIZE,3), dtype=np.uint8)
        env[self.food.x][self.food.y] = self.d[self.FOOD_N]
        env[self.player.x][self.player.y] = self.d[self.PLAYER_N]
        env[self.enemy.x][self.enemy.y] = self.d[self.ENEMY_N]
        img = Image.fromarray(env,'RGB')
        return img



In [6]:
env = EnvCube()


In [7]:
env.observation_space.sample()

array([-14,  11,   8,  -5])

In [8]:
check_env(env)

In [9]:
from stable_baselines3 import DQN, A2C
from stable_baselines3.common.evaluation import evaluate_policy
import torch as th

In [10]:
model = A2C('MlpPolicy',env,verbose=1,learning_rate=5e-4,
            policy_kwargs = dict(activation_fn=th.nn.ReLU,
                     net_arch=dict(pi=[32, 32], vf=[32, 32]))
            )

Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


In [11]:
model.policy

ActorCriticPolicy(
  (features_extractor): FlattenExtractor(
    (flatten): Flatten(start_dim=1, end_dim=-1)
  )
  (pi_features_extractor): FlattenExtractor(
    (flatten): Flatten(start_dim=1, end_dim=-1)
  )
  (vf_features_extractor): FlattenExtractor(
    (flatten): Flatten(start_dim=1, end_dim=-1)
  )
  (mlp_extractor): MlpExtractor(
    (policy_net): Sequential(
      (0): Linear(in_features=4, out_features=32, bias=True)
      (1): ReLU()
      (2): Linear(in_features=32, out_features=32, bias=True)
      (3): ReLU()
    )
    (value_net): Sequential(
      (0): Linear(in_features=4, out_features=32, bias=True)
      (1): ReLU()
      (2): Linear(in_features=32, out_features=32, bias=True)
      (3): ReLU()
    )
  )
  (action_net): Linear(in_features=32, out_features=9, bias=True)
  (value_net): Linear(in_features=32, out_features=1, bias=True)
)

In [37]:
model.learn(total_timesteps=int(500000),tb_log_name='S20_A2C_Net32_50W')
model.save('S20_A2C_Net32_50W')
del model

------------------------------------
| rollout/              |          |
|    ep_len_mean        | 144      |
|    ep_rew_mean        | -135     |
| time/                 |          |
|    fps                | 316      |
|    iterations         | 100      |
|    time_elapsed       | 1        |
|    total_timesteps    | 500      |
| train/                |          |
|    entropy_loss       | -2.11    |
|    explained_variance | -0.633   |
|    learning_rate      | 0.0005   |
|    n_updates          | 99       |
|    policy_loss        | -4.12    |
|    value_loss         | 5.27     |
------------------------------------
------------------------------------
| rollout/              |          |
|    ep_len_mean        | 146      |
|    ep_rew_mean        | -187     |
| time/                 |          |
|    fps                | 347      |
|    iterations         | 200      |
|    time_elapsed       | 2        |
|    total_timesteps    | 1000     |
| train/                |          |
|

In [12]:
model = A2C.load('S20_A2C_Net32_50W',env=env)

Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


In [13]:
eposides = 100
for ep in range(eposides):
    obs,info = env.reset()
    done = False
    rewards = 0
    while not done:
        # action = env.action_space.sample()
        action,_states = model.predict(obs,deterministic=True)
        obs,reward,done,truncated,info = env.step(action)
        env.render()
        rewards += reward
    print(rewards)

-308
0
-6
10
20
-17
18
24
21
13
10
18
-3
-1
6
3
2
19
18
12
10
-301
17
17
17
11
6
8
17
16
13
11
-3
11
17
22
-301
5
17
-305
1
-1
10
10
14
0
14
15
21
5
12
7
6
9
15
14
17
-5
17
13
9
6
-4
18
22
16
13
4
6
12
5
21
18
8
-6
16
12
9
4
15
10
16
18
11
17
6
13
21
21
19
18
12
10
13
25
6
4
10
11
16
