In [1]:
import gymnasium as gym
import numpy as np
from gymnasium import spaces
from stable_baselines3.common.env_checker import check_env
from stable_baselines3 import A2C
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor
from stable_baselines3.common.callbacks import EvalCallback
import torch as th 
import torch.nn as nn

import numpy as np
import cv2
from PIL import Image
import time
import pickle
import matplotlib.pyplot as plt
from matplotlib import style
style.use('ggplot')

import os
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"

In [2]:
class Cube:
    def __init__(self, size):
        self.size = size
        self.x = np.random.randint(0, self.size)
        self.y = np.random.randint(0, self.size)

    def __str__(self):
        return f'{self.x},{self.y}'

    def __sub__(self, other):
        return (self.x-other.x, self.y-other.y)

    def __eq__(self, other):
        return self.x == other.x and self.y == other.y

    def action(self, choise):
        if choise == 0:
            self.move(x=1, y=1)
        elif choise == 1:
            self.move(x=-1, y=1)
        elif choise == 2:
            self.move(x=1, y=-1)
        elif choise == 3:
            self.move(x=-1, y=-1)
        elif choise == 4:
            self.move(x=0, y=1)
        elif choise == 5:
            self.move(x=0, y=-1)
        elif choise == 6:
            self.move(x=1, y=0)
        elif choise == 7:
            self.move(x=-1, y=0)
        elif choise == 8:
            self.move(x=-0, y=0)

    def move(self, x=False, y=False):
        if not x:
            self.x += np.random.randint(-1, 2)
        else:
            self.x += x
        if not y:
            self.y += np.random.randint(-1, 2)
        else:
            self.y += y

        if self.x < 0:
            self.x = 0
        elif self.x >= self.size:
            self.x = self.size-1

        if self.y < 0:
            self.y = 0
        elif self.y >= self.size:
            self.y = self.size-1

In [3]:
class envCube(gym.Env):
    SIZE = 10
    OBSERVATION_SPACE_VALUES = (SIZE, SIZE, 3)
    ACTION_SPACE_VALUES = 9
    RETURN_IMAGE = True

    FOOD_REWARD = 25
    ENEMY_PENALITY = -300
    MOVE_PENALITY = -1

    d = {1: (255, 0, 0), 2: (0, 255, 0), 3: (0, 0, 255)}

    PLAYER_N = 1
    FOOD_N = 2
    ENEMY_N = 3
    N_CHANNELS = 3

    metadata = {"render_modes": ["human"], "render_fps": 30}

    def __init__(self):
        super(envCube, self).__init__()
        # Define action and observation space
        # They must be gym.spaces objects
        # Example when using discrete actions:
        self.action_space = spaces.Discrete(self.ACTION_SPACE_VALUES)
        # Example for using image as input (channel-first; channel-last also works):
        # RGB is a three channel
        self.observation_space = spaces.Box(
            low=0,
            high=255,
            shape=(self.N_CHANNELS, self.SIZE, self.SIZE),
            dtype=np.uint8,
        )

    def reset(self, seed=None):
        self.player = Cube(self.SIZE)
        self.food = Cube(self.SIZE)
        while self.food == self.player:
            self.food = Cube(self.SIZE)

        self.enemy = Cube(self.SIZE)
        while self.enemy == self.player or self.enemy == self.food:
            self.enemy = Cube(self.SIZE)

        if self.RETURN_IMAGE:
            observation = self.get_image()
            observation = np.moveaxis(observation, -1, 0)
        else:
            observation = (self.player - self.food) + (self.player - self.enemy)
            observation = np.array(observation)
        self.episode_step = 0
        info = {}
        return observation, info

    def step(self, action):
        self.episode_step += 1
        self.player.action(action)
        self.food.move()
        self.enemy.move()

        if self.RETURN_IMAGE:
            new_observation = self.get_image()
            new_observation = np.moveaxis(new_observation, -1, 0)
        else:
            new_observation = (self.player - self.food) + (self.player - self.enemy)
            new_observation = np.array(new_observation)
        if self.player == self.food:
            reward = self.FOOD_REWARD
        elif self.player == self.enemy:
            reward = self.ENEMY_PENALITY
        else:
            reward = self.MOVE_PENALITY

        terminated = False
        if (
            self.player == self.food
            or self.player == self.enemy
            or self.episode_step >= 200
        ):
            terminated = True

        info = {}

        truncated = False
        return new_observation, reward, terminated, truncated, info

    def render(self):
        img = self.get_image()
        img = Image.fromarray(img, "RGB")
        img = img.resize((400, 400))
        cv2.imshow("Predatoc", np.array(img))
        if (
            self.player == self.food
            or self.player == self.enemy
            or self.episode_step >= 200
        ):
            cv2.waitKey(1500)
        else:
            cv2.waitKey(50)

    def get_image(self):
        img = np.zeros((self.SIZE, self.SIZE, 3), dtype=np.uint8)
        img[self.food.x][self.food.y] = self.d[self.FOOD_N]
        img[self.player.x][self.player.y] = self.d[self.PLAYER_N]
        img[self.enemy.x][self.enemy.y] = self.d[self.ENEMY_N]
        return img

In [4]:
env = envCube()
print(env.observation_space.shape)
check_env(env)

(3, 10, 10)




In [5]:
class CustomCNN(BaseFeaturesExtractor):
    """
    :param observation_space: (gym.Space)
    :param features_dim: (int) Number of features extracted.
        This corresponds to the number of unit for the last layer.
    """

    def __init__(self, observation_space: spaces.Box, features_dim: int = 256):
        super().__init__(observation_space, features_dim)
        # We assume CxHxW images (channels first)
        # Re-ordering will be done by pre-preprocessing or wrapper
        n_input_channels = observation_space.shape[0]
        self.cnn = nn.Sequential(
            nn.Conv2d(n_input_channels, 8, kernel_size=(3, 3), stride=1, padding=0),
            nn.ReLU(),
            nn.Conv2d(8, 8, kernel_size=(3, 3), stride=1, padding=0),
            nn.ReLU(),
            nn.Conv2d(8, 8, kernel_size=(3, 3), stride=1, padding=0),
            nn.Flatten(),
        )

        # Compute shape by doing one forward pass
        with th.no_grad():
            n_flatten = self.cnn(
                th.as_tensor(observation_space.sample()[None]).float()
            ).shape[1]

        self.linear = nn.Sequential(nn.Linear(n_flatten, features_dim), nn.ReLU())

    def forward(self, observations: th.Tensor) -> th.Tensor:
        return self.linear(self.cnn(observations))


policy_kwargs = dict(
    features_extractor_class=CustomCNN,
    features_extractor_kwargs=dict(features_dim=128),
    # net_arch=[dict(vf=[16], pi=[32])],
    activation_fn = th.nn.ReLU
)

In [6]:
# model = A2C("CnnPolicy", env, verbose=1)
model = A2C(
    "CnnPolicy",
    env,
    verbose=0,
    tensorboard_log="./tb_logs",
    learning_rate=1e-3,
    policy_kwargs=policy_kwargs,
)

In [7]:
model.policy

ActorCriticCnnPolicy(
  (features_extractor): CustomCNN(
    (cnn): Sequential(
      (0): Conv2d(3, 8, kernel_size=(3, 3), stride=(1, 1))
      (1): ReLU()
      (2): Conv2d(8, 8, kernel_size=(3, 3), stride=(1, 1))
      (3): ReLU()
      (4): Conv2d(8, 8, kernel_size=(3, 3), stride=(1, 1))
      (5): Flatten(start_dim=1, end_dim=-1)
    )
    (linear): Sequential(
      (0): Linear(in_features=128, out_features=128, bias=True)
      (1): ReLU()
    )
  )
  (pi_features_extractor): CustomCNN(
    (cnn): Sequential(
      (0): Conv2d(3, 8, kernel_size=(3, 3), stride=(1, 1))
      (1): ReLU()
      (2): Conv2d(8, 8, kernel_size=(3, 3), stride=(1, 1))
      (3): ReLU()
      (4): Conv2d(8, 8, kernel_size=(3, 3), stride=(1, 1))
      (5): Flatten(start_dim=1, end_dim=-1)
    )
    (linear): Sequential(
      (0): Linear(in_features=128, out_features=128, bias=True)
      (1): ReLU()
    )
  )
  (vf_features_extractor): CustomCNN(
    (cnn): Sequential(
      (0): Conv2d(3, 8, kernel_size=

In [8]:
# 保存最优模型
eval_callback = EvalCallback(env, best_model_save_path="./tb_logs/bestModel",
                             log_path="./tb_logs/bestModel", eval_freq=500,
                             deterministic=True, render=False)

In [14]:
# 加载日志信息
logfile = np.load('./tb_logs/bestModel/evaluations.npz')
logfile.files


['timesteps', 'results', 'ep_lengths']

In [21]:
logfile['timesteps'].shape
logfile['results'].shape
logfile['ep_lengths'].shape
logfile['results'][:,0]



array([-200.,  -51., -101., -319.,  -16., -433., -200., -304., -422.,
       -340.,  -67., -200.,  -65., -302., -200., -200., -368., -498.,
       -345.,  -87.])

In [None]:
model.learn(
    total_timesteps=int(1e4),
    progress_bar=True,
    tb_log_name="Custom_CNN_A2C_Net128x16x32_1W_call",
    callback= eval_callback
)
model.save("Custom_CNN_A2C_Net128x16x32_1W_call")

In [None]:
# model = A2C.load("Custom_CNN_A2C_Net128x16x32_2M",env=env)
# mean_reward, std_reward = evaluate_policy(
#     model, model.get_env(), deterministic=False, render=False, n_eval_episodes=10
# )
# print(mean_reward, std_reward)


In [None]:
# # env测试代码
# eposides = 10
# for ep in range(eposides):
#     # 初始化
#     obs, _ = env.reset()
#     terminated = False
#     rewards = 0
#     while not terminated:
#         # action = env.action_space.sample()
#         action, _states = model.predict(obs, deterministic=True)
#         obs, reward, terminated, info, _ = env.step(action)
#         env.render()
#         rewards += reward
#     print(terminated,rewards)