In [9]:
import os
import gym
import numpy as np

from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.results_plotter import load_results, ts2xy, plot_results
from stable_baselines3.common.callbacks import BaseCallback

from Enviroment import Enviroment
from gym import spaces
import cv2
from tqdm import tqdm

from stable_baselines3.common.utils import set_random_seed
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor
import torch.nn as nn
import torch.nn.functional as F
import torch
from stable_baselines3 import PPO

## Callback class

In [10]:
class SaveOnBestTrainingRewardCallback(BaseCallback):
    """
    Callback for saving a model (the check is done every ``check_freq`` steps)
    based on the training reward (in practice, we recommend using ``EvalCallback``).

    :param check_freq: (int)
    :param log_dir: (str) Path to the folder where the model will be saved.
      It must contains the file created by the ``Monitor`` wrapper.
    :param verbose: (int)
    """
    def __init__(self, check_freq: int, log_dir: str, verbose=1):
        super(SaveOnBestTrainingRewardCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.log_dir = log_dir
        self.save_path = os.path.join(log_dir, 'best_model_PPO')
        self.best_mean_reward = -np.inf
        self.rew_len = []
        self.n_games = 0

    def _on_step(self) -> bool:
        
        if self.n_games < len(model.ep_info_buffer):
            self.n_games+=1
            self.rew_len.append(model.ep_info_buffer[-1]['r'])
            
            if self.best_mean_reward < np.mean(self.rew_len[-100:]):
                self.best_mean_reward = np.mean(self.rew_len[-100:])
                self.model.save(self.save_path)

        return True

## Neural network class

In [11]:
import torch as th
import torch.nn as nn
import torch.nn.functional as F
import torch
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor

class CustomCNN(BaseFeaturesExtractor):
    """
    :param observation_space: (gym.Space)
    :param features_dim: (int) Number of features extracted.
        This corresponds to the number of unit for the last layer.
    """

    def __init__(self, observation_space: gym.spaces.Dict, features_dim: int = 518):
        super(CustomCNN, self).__init__(observation_space, features_dim)
        
        
        extractors = {}
        
        for key, subspace in observation_space.spaces.items():
            if key == "img":
        
                n_input_channels = observation_space[key].shape[0]
            
                extractors[key] = nn.Sequential(

                nn.Conv2d(n_input_channels, 32, 2),
                nn.MaxPool2d(2, 2),
                nn.Conv2d(32, 64, 2),
                nn.MaxPool2d(2, 2),

                ResBlock(n_filters=64, kernel_size=2),
                nn.MaxPool2d(4, 4),
                ResBlock(n_filters=64, kernel_size=2),
                nn.MaxPool2d(2, 2),
                ResBlock(n_filters=64, kernel_size=2),
                nn.MaxPool2d(2, 2),
                ResBlock(n_filters=64, kernel_size=2), 
                nn.MaxPool2d(2, 2),
                
                nn.Conv2d(64, 128, 2),
                nn.Flatten())
                    
            elif key == "posRobot":
                
                n_input_channels = observation_space[key].shape[0]
                
                extractors[key] = nn.Sequential(nn.Linear(n_input_channels, 9),
                                        nn.ReLU(),
                                        nn.Linear(9, 9),
                                        nn.ReLU(),
                                        nn.Linear(9, 3))
                    
            elif key == "target":
                            
                n_input_channels = observation_space[key].shape[0]
                    
                extractors[key] = nn.Sequential(nn.Linear(n_input_channels, 9),
                                        nn.ReLU(),
                                        nn.Linear(9, 9),
                                        nn.ReLU(),
                                        nn.Linear(9, 3))
                
        self.extractors = nn.ModuleDict(extractors)

    def forward(self, observations: th.Tensor) -> th.Tensor:
        '''
        Forward propagation
        :param observations: (dict) изображение; координаты и углы ориентации агентов
        :return: features tensor
        '''
        encoded_tensor_list = []

        for key, extractor in self.extractors.items():
            encoded_tensor_list.append(extractor(observations[key]))

        return th.cat(encoded_tensor_list, dim=1)

    
class ResBlock(nn.Module):
    def __init__(self, n_filters, kernel_size):
        """
        Инициализация кастомного резнетовского блока
        :param n_filters: (int) количество фильтров сверточного слоя
        :param kernel_size: (int) размер ядра свертки
        """
        super().__init__()
        self.n_filters = n_filters
        self.kernel_size = kernel_size

        self.b1 = nn.Conv2d(self.n_filters, self.n_filters, self.kernel_size, padding='same')
    
        self.b2 = nn.BatchNorm2d(self.n_filters, eps = 0.001, momentum= 0.99)
        self.b3 = nn.Conv2d(self.n_filters, self.n_filters, self.kernel_size, padding='same')
        self.b4 = nn.BatchNorm2d(self.n_filters, eps = 0.001, momentum= 0.99)
        
    def forward(self, x):
        '''
        Forward propagation
        :param x: input
        :return: output
        '''
        residual = x
        y = F.relu(self.b1(x))
        y = self.b2(y)
        y = F.relu(self.b3(y))
        y = self.b4(y)
        y += residual
        y = F.relu(y)
        return y

## Environment gym class

In [12]:
class CustomEnv(gym.Env):
    '''
    Оборочивание класса среды в среду gym
    '''
    metadata = {'render.modes': ['human']}

    def __init__(self, obstacle_turn: bool, Total_war: bool, num_obs: int, num_enemy: int, 
                 size_obs, steps_limit, vizualaze=False, head_velocity=0.01,
                rew_col = -100,rew_win=100, rew_defeat = -100):
        '''
        Инициализация класса среды
        :param obstacle_turn: (bool) Флаг генерации препятствий
        :param vizualaze: (bool) Флаг генерации препятствий
        :param Total_war: (bool) Флаг режима игры (с противником или без)
        :param steps_limit: (int) Максимальное количество действий в среде за одну игру
        '''
        self.log_koef = 50

        self.velocity_coef = 35       #  1/2 max speed !!!
        self.ang_Norm_coef = np.pi
        self.coords_Norm_coef = 500
        self.proportional_coef = 0.01
        
        self.rew_col = rew_col
        self.rew_win = rew_win
        self.rew_defeat = rew_defeat
                
        self.enviroment = Enviroment(obstacle_turn, vizualaze, Total_war,
                                     head_velocity, num_obs, num_enemy, size_obs, steps_limit,
                                     rew_col, rew_win, rew_defeat)

        self.enviroment.reset()

        self.action_space = spaces.Box(low=np.array([-1, -1]), high=np.array([1, 1]), dtype=np.float16)
        self.observation_space = gym.spaces.Dict({
                    'img': spaces.Box(low=0, high=255, shape=(500, 500, 3), dtype=np.uint8),
                    'posRobot': spaces.Box(low=np.array([0, 0,-3.14]), high=np.array([500, 500, 3.14])),
                    'target': spaces.Box(low  = np.array([[0, 0,-3.14] for i in range(num_enemy)]).reshape(-1), 
                                         high = np.array([[500, 500, 3.14] for i in range(num_enemy)]).reshape(-1)
                                        )
                                                })

        
        self.img1 = None
        self.img2 = None
        self.img3 = None
        

    def make_layers(self):
        """
        Функция наслоения изображений трех последовательных шагов в среде
        :param img1, img2, img3: состояния среды на трех последовательных шагах
        :return: new_img: изображение, содержащее информацию о состояниях среды на трех последовательных шагах, отображенную с разной интенсивностью
        """
        new_img = cv2.addWeighted(self.img2, 0.4, self.img1, 0.2, 0)
        self.Img = cv2.addWeighted(self.img3, 0.7, new_img, 0.5, 0)
    
    
    def step(self, action):
        """
        Метод осуществления шага в среде
        :param action: (int) направление движения в среде
        :return: dict_state, reward, not done, {}: состояние, реворд, флаг терминального состояния, информация о среде
        """
        
        action[0] *= self.velocity_coef
        action[0] += self.velocity_coef 
        action[1] *= self.ang_Norm_coef
        
        state, reward, done, numstep = self.enviroment.step(action)
        
        self.img1 = self.img2
        self.img2 = self.img3
        self.img3 = state.img
        
        self.make_layers()
    
        x2 = state.posRobot[0]
        y2 = state.posRobot[1]
    
        x4 = state.target[0,0]
        y4 = state.target[0,1]
        
        f2 =  state.target[0,2]
        
        Ax4, Ay4 = -np.cos(f2), np.sin(f2)
        Bx24, By24 = x2 - x4, y2 - y4
        
        dist = - np.sqrt(np.abs((x2-x4)**2 + (y2-y4)**2))
        phy = (Ax4*Bx24 + Ay4*By24)/(np.sqrt(Ax4**2 + Ay4**2) * np.sqrt(Bx24**2 + By24**2))
        reward_l = phy*(dist+500) * self.proportional_coef * (not done) + reward

        
        dict_state = {'img':     self.Img,  
                      'posRobot':self.normPoseRobot(state.posRobot),  
                      'target':  self.normTarget(state.target).reshape(-1)}

        return dict_state, reward_l, done, {}
    
    def normTarget(self, coords):
        '''
        Метод нормализации координат
        :return: coords: нормализованные координаты
        '''
        coords=np.float32(coords)
        coords[:,2]  = coords[:,2] / self.ang_Norm_coef #угол
        coords[:,:2] = coords[:,:2] / self.coords_Norm_coef #координаты
        
        return coords

    def normPoseRobot(self, coords):
        '''
        Метод нормализации координат
        :return: coords: нормализованные координаты
        '''
        coords=np.float32(coords)
        coords[2]  = coords[2] / self.ang_Norm_coef #угол
        coords[:2] = coords[:2] / self.coords_Norm_coef #координаты
        
        return coords


    def reset(self):
        '''
        Метод обновления игры
        :return: dict_state: состояние
        '''
        
        state = self.enviroment.reset()
        
        self.img2 = state.img
        self.img3 = state.img
        
        dict_state = {'img':     state.img,  
                      'posRobot':self.normPoseRobot(state.posRobot),  
                      'target':  self.normTarget(state.target).reshape(-1)}

        return dict_state

    def render(self, model, num_gifs=1):
        '''
        Метод вывода информации об игре
        :param mode:
        :return:
        '''
        for i in range(num_gifs):
            
            images = []
            obs = self.reset()
            img = obs['img']# env.render(mode='rgb_array')
            done = False
                
            height, width, layers = img.shape
            size = (width,height)
            out = cv2.VideoWriter(f"video{i}.avi",cv2.VideoWriter_fourcc(*'DIVX'), 25, size)
            img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
            out.write(img)
            while not done:

                action, _ = model.predict(obs)
                obs, _, done ,_ = self.step(action)
                img = obs['img']
                img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
                out.write(img)
            out.release()
    
    def get_statistic(self, model, num_games):
        collision = 0
        win = 0
        destroyed = 0
        loss = 0
        
        pbar = tqdm(range(num_games))
        for i in pbar:
            obs = self.reset()
            done = False
            while not done:
                action, _ = model.predict(obs)
                obs, reward, done ,_ = self.step(action)
                
                
                
            if reward == self.rew_col:      # collision
                collision+=1
            elif reward == self.rew_win:    # win
                win +=1
            elif reward == self.rew_defeat: # loss
                destroyed +=1
            else:                           # not_achieved
                loss+=1
        
        print("Win: ",win/num_games)
        print("destroyed: ", destroyed/num_games)
        print("loss: ",loss/num_games)
        print("collision: ",collision/num_games)

In [72]:
env = CustomEnv(obstacle_turn = False,
                vizualaze     = True, 
                Total_war     = True,
                head_velocity = 0.005,
                num_obs       = 5, 
                num_enemy     = 1, 
                size_obs      = [30, 40],
                rew_col       = -100,
                rew_win       =100,
                rew_defeat    = -100,
                steps_limit   = 2000)

## Initialize the agent

In [6]:
from stable_baselines3 import PPO, A2C, TD3, DDPG, SAC

policy_kwargs = dict(
    features_extractor_class=CustomCNN,
    features_extractor_kwargs=dict(features_dim=518),
    activation_fn=torch.nn.ReLU,
    net_arch = [dict(pi=[1029, 128, 32, 8], vf=[1029, 128, 32, 8])])

model = PPO(policy          = 'MultiInputPolicy',
            env             = env,
            learning_rate   = 0.0001,
            n_steps         = 2048,
            batch_size      = 24,
            gamma           = 0.99,
            gae_lambda      = 0.95,
            tensorboard_log = "./tensorboard_logs/",
            policy_kwargs   = policy_kwargs,
            verbose         = 1,
            device          = 'cuda')

# model = A2C(policy          = 'MlpPolicy',
#             env             = env,
#             learning_rate   = 0.0001,
#             n_steps         = 24,
#             gamma           = 0.99,
#             gae_lambda      = 0.95,
#             tensorboard_log = "./tensorboard_logs/",
#             policy_kwargs   = policy_kwargs,
#             verbose         = 1,
#             device          = 'cuda')
# model = TD3(policy          = 'MlpPolicy',  # 1 neural network metod
#             env             = env,
#             learning_rate   = 0.0001,
#             buffer_size     = 100,
#             batch_size      = 2,
#             gamma           = 0.99,
#             tensorboard_log = "./tensorboard_logs_cont_mult/",
#             policy_kwargs   = policy_kwargs,
#             verbose         = 0,
#             device          = 'cuda')

# model = DDPG(policy         = 'MlpPolicy',  # 1 neural network metod
#             env             = env,
#             learning_rate   = 0.0001,
#             buffer_size     = 100,
#             batch_size      = 2,
#             gamma           = 0.99,
#             tensorboard_log = "./tensorboard_logs_cont_mult/",
#             policy_kwargs   = policy_kwargs,
#             verbose         = 0,
#             device          = 'cuda')

# model = SAC(policy          = 'MlpPolicy',  # 1 neural network metod
#             env             = env,
#             learning_rate   = 0.0001,
#             buffer_size     = 100,
#             batch_size      = 2,
#             gamma           = 0.99,
#             tensorboard_log = "./tensorboard_logs_cont_mult/",
#             policy_kwargs   = policy_kwargs,
#             verbose         = 0,
#             device          = 'cuda')

We recommend using a `batch_size` that is a factor of `n_steps * n_envs`.
Info: (n_steps=2048 and n_envs=1)


Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Wrapping the env in a VecTransposeImage.


## Make callback

In [7]:
log_dir = './saved_models/PPO'  # For A2C agent: './saved_models/A2C'
os.makedirs(log_dir, exist_ok=True)
env = Monitor(env, log_dir)
callback = SaveOnBestTrainingRewardCallback(check_freq=500, 
                                            log_dir=log_dir)

## Learn model

In [None]:
model.learn(total_timesteps=1e6,callback=callback)

Logging to ./tensorboard_logs/PPO_1


  return torch.max_pool2d(input, kernel_size, stride, padding, dilation, ceil_mode)
  return F.conv2d(input, weight, bias, self.stride,


---------------------------------
| rollout/           |          |
|    ep_len_mean     | 70       |
|    ep_rew_mean     | 2.1e+03  |
| time/              |          |
|    fps             | 30       |
|    iterations      | 1        |
|    time_elapsed    | 66       |
|    total_timesteps | 2048     |
---------------------------------
------------------------------------------
| rollout/                |              |
|    ep_len_mean          | 68.1         |
|    ep_rew_mean          | 14.3         |
| time/                   |              |
|    fps                  | 16           |
|    iterations           | 2            |
|    time_elapsed         | 255          |
|    total_timesteps      | 4096         |
| train/                  |              |
|    approx_kl            | 0.0064298823 |
|    clip_fraction        | 0.125        |
|    clip_range           | 0.2          |
|    entropy_loss         | -2.84        |
|    explained_variance   | 0.000244     |
|    learning_r

## Make environment to test trained model and get statistics

In [5]:
env_test = CustomEnv(obstacle_turn = False,
                    vizualaze     = False, 
                    Total_war     = True,
                    head_velocity = 0.005,
                    num_obs       = 5, 
                    num_enemy     = 1, 
                    size_obs = [30, 40],
                      rew_col = -100,
                      rew_win=100,
                      rew_defeat = -100,
                    steps_limit    = 2000)



## Load the best model and get statistics

In [7]:
path = './saved_models/PPO/best_model_PPO'  # For A2C agent: './saved_models/A2C/callback0/best_model_A2C/'
model = PPO.load(path, env=env_test,)  # For A2C agent: A2C.load(path, env=env_test)

Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Wrapping the env in a VecTransposeImage.


In [None]:
# env_test.get_statistic(model, 10000)

In [8]:
env_test.render(model)

  return torch.max_pool2d(input, kernel_size, stride, padding, dilation, ceil_mode)
  return F.conv2d(input, weight, bias, self.stride,


[-0.71000046 -0.61010295]
[-0.58855855 -0.16962498]
[ 0.9980455  -0.11894868]
[-0.6679856   0.25215867]
[ 0.25673664 -0.2201438 ]
[0.41752818 0.61426336]
[-1.        -0.8854179]
[-0.06292793 -0.2993555 ]
[-0.2526896   0.02959792]
[-0.12730184 -0.1832371 ]
[-1.          0.22374073]
[-1.        -0.6075144]
[ 1. -1.]
[-0.09695279 -0.06826799]
[0.9424362  0.03675167]
[-0.45674962 -0.5568431 ]
[1.         0.13033205]
[-0.08442053 -1.        ]
[-0.02818215 -0.16201453]
[-0.38634798 -1.        ]
[-0.75555   -0.7755078]
[-0.6251105  1.       ]
[1.        0.7987722]
[-0.21700399  0.13408116]
[-1.         -0.93926865]
[-0.52596045 -1.        ]
[-0.39734644 -0.75453895]
[0.37661397 0.5678267 ]
[-0.10892828  1.        ]
[1. 1.]
[-0.40665746 -0.2722182 ]
[ 0.38245186 -0.06684395]
[-0.0554547  -0.10788595]
[0.12444302 0.9387353 ]


## Check tensorboard

In [None]:
# !tensorboard --logdir ./tensorboard_logs/