In [None]:
import os
import torch as T
import torch.cuda
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

# Инициализация нейронной сети

In [None]:
class DeepQNetwork(nn.Module):
    def __init__(self, lr, n_actions, name, input_dims, chkpt_dir):
        super(DeepQNetwork, self).__init__()
        self.checkpoint_dir = chkpt_dir
        self.checkpoint_file = os.path.join(self.checkpoint_dir, name)

        
        self.fc1 = nn.Linear(input_dims[0], 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, n_actions)

        self.optimizer = optim.RMSprop(self.parameters(), lr=lr)

        self.loss = nn.MSELoss()
        self.device = T.device('cuda:0' if T.cuda.is_available() else 'cpu')
        self.to(self.device)

    
    def forward(self, state):
        flat1 = F.relu(self.fc1(state))
        flat2 = F.relu(self.fc2(flat1))
        actions = self.fc3(flat2)
        return actions

    def save_checkpoint(self):
        print('... saving checkpoint ...')
        T.save(self.state_dict(), self.checkpoint_file)

    def load_checkpoint(self):
        print('... loading checkpoint ...')
        self.load_state_dict(T.load(self.checkpoint_file))

# Буфер воспроизведения

In [None]:
import numpy as np

class ReplayBuffer(object):
    def __init__(self, max_size, input_shape, n_actions):
        self.mem_size = max_size
        self.mem_cntr = 0
        self.state_memory = np.zeros((self.mem_size, *input_shape),
                                     dtype=np.float32)
        self.new_state_memory = np.zeros((self.mem_size, *input_shape),
                                         dtype=np.float32)

        self.action_memory = np.zeros(self.mem_size, dtype=np.int64)
        self.reward_memory = np.zeros(self.mem_size, dtype=np.float32)
        self.terminal_memory = np.zeros(self.mem_size, dtype=bool)

    def store_transition(self, state, action, reward, state_, done):
        index = self.mem_cntr % self.mem_size
        self.state_memory[index] = state
        self.new_state_memory[index] = state_
        self.action_memory[index] = action
        self.reward_memory[index] = reward
        self.terminal_memory[index] = done
        self.mem_cntr += 1

    def sample_buffer(self, batch_size):
        max_mem = min(self.mem_cntr, self.mem_size)
        batch = np.random.choice(max_mem, batch_size, replace=False)

        states = self.state_memory[batch]
        actions = self.action_memory[batch]
        rewards = self.reward_memory[batch]
        states_ = self.new_state_memory[batch]
        terminal = self.terminal_memory[batch]

        return states, actions, rewards, states_, terminal

# DDQN агент

In [None]:
from CybORG.Agents.SimpleAgents.BaseAgent import BaseAgent

class DQNAgent(BaseAgent):
    def __init__(self, gamma=0.9, epsilon=0, lr=0.1, n_actions=41, input_dims=(52,),
                 mem_size=1000, batch_size=32, eps_min=0.01, eps_dec=5e-7,
                 replace=1000, algo='DDQN', env_name='Scenario1b', chkpt_dir='chkpt', load=False):
        self.gamma = gamma
        self.epsilon = epsilon
        self.lr = lr
        self.n_actions = n_actions
        self.input_dims = input_dims
        self.batch_size = batch_size
        self.eps_min = eps_min
        self.eps_dec = eps_dec
        self.replace_target_cnt = replace
        self.algo = algo
        self.env_name = env_name
        self.chkpt_dir = chkpt_dir
        self.action_space = [i for i in range(n_actions)]
        self.learn_step_counter = 0

        self.memory = ReplayBuffer(mem_size, input_dims, n_actions)

        self.q_eval = DeepQNetwork(self.lr, self.n_actions,
                                        input_dims=self.input_dims,
                                        name=self.env_name+'_'+self.algo+'_q_eval',
                                        chkpt_dir=self.chkpt_dir)
        self.q_next = DeepQNetwork(self.lr, self.n_actions,
                                        input_dims=self.input_dims,
                                        name=self.env_name+'_'+self.algo+'_q_next',
                                        chkpt_dir=self.chkpt_dir)

    def get_action(self, observation, action_space=None):
        if np.random.random() > self.epsilon:
            state = T.tensor([observation], dtype=T.float).to(self.q_eval.device)
            actions = self.q_eval.forward(state)
            action = T.argmax(actions).item()
        else:
            action = np.random.choice(self.action_space)

        return action

    def store_transition(self, state, action, reward, state_, done):
        self.memory.store_transition(state, action, reward, state_, done)

    def sample_memory(self):
        state, action, reward, new_state, done = \
                                self.memory.sample_buffer(self.batch_size)

        states = T.tensor(state).to(self.q_eval.device)
        rewards = T.tensor(reward).to(self.q_eval.device)
        dones = T.tensor(done).to(self.q_eval.device)
        actions = T.tensor(action).to(self.q_eval.device)
        states_ = T.tensor(new_state).to(self.q_eval.device)

        return states, actions, rewards, states_, dones

    def replace_target_network(self):
        if self.replace_target_cnt is not None and \
           self.learn_step_counter % self.replace_target_cnt == 0:
            self.q_next.load_state_dict(self.q_eval.state_dict())

    def decrement_epsilon(self):
        self.epsilon = self.epsilon - self.eps_dec \
                           if self.epsilon > self.eps_min else self.eps_min

    def train(self):
        if self.memory.mem_cntr < self.batch_size:
            return
        self.q_eval.optimizer.zero_grad()
        self.replace_target_network()
        states, actions, rewards, states_, dones = self.sample_memory()
        indices = np.arange(self.batch_size)
        q_pred = self.q_eval.forward(states)[indices, actions]
        q_next = self.q_next.forward(states_)
        q_eval = self.q_eval.forward(states_)
        max_actions = T.argmax(q_eval, dim=1)
        q_next[dones] = 0.0
        q_target = rewards + self.gamma*q_next[indices, max_actions]
        loss = self.q_eval.loss(q_target, q_pred).to(self.q_eval.device)
        loss.backward()
        self.q_eval.optimizer.step()
        self.learn_step_counter += 1
        self.decrement_epsilon()

    def end_episode(self):
        pass

    def set_initial_values(self, action_space, observation):
        pass

    def save_models(self):
        self.q_eval.save_checkpoint()
        self.q_next.save_checkpoint()

    def load_models(self):
        self.q_eval.load_checkpoint()
        self.q_next.load_checkpoint()

In [None]:
from CybORG import CybORG
from CybORG.Agents import RedMeanderAgent, B_lineAgent
from CybORG.Agents.Wrappers import *
import inspect
import os

In [None]:
PATH = str(inspect.getfile(CybORG))
PATH = PATH[:-10] + '/Shared/Scenarios/Scenario1b.yaml'

# Функция обучения агента DDQN

In [None]:
def train_DDQN(red_agent=RedMeanderAgent, num_eps=2200, len_eps=100, replace=1000, mem_size=5000,
lr=0.0001, eps_dec=0.000005, eps_min=0.05, gamma=0.99, batch_size=32, epsilon=1, chkpt_dir="model_meander"):

    CYBORG = CybORG(PATH, 'sim', agents={
        'Red': red_agent
    })
    reward_history = []
    best_score_history = []
    env = ChallengeWrapper(env=CYBORG, agent_name="Blue")
    model_dir = os.path.join(os.getcwd(), "Models", chkpt_dir)
    if not os.path.exists(model_dir):
        os.makedirs(model_dir)
    agent = DQNAgent(gamma=gamma, epsilon=epsilon, lr=lr,
                     input_dims=(env.observation_space.shape),
                     n_actions=env.action_space.n, mem_size=mem_size, eps_min=eps_min,
                     batch_size=batch_size, replace=replace, eps_dec=eps_dec,
                     chkpt_dir=model_dir, algo='DDQNAgent',
                     env_name='Scenario1b')
    best_score = -np.inf
    load_checkpoint = False
    if load_checkpoint:
        agent.load_models()

    n_steps = 0
    scores, eps_history, steps_array = [], [], []

    for i in range(num_eps):
        score = 0

        observation = env.reset()
        for j in range(len_eps):
            action = agent.get_action(observation)
            observation_, reward, done, info = env.step(action=action)
            score += reward
            if not load_checkpoint:
                agent.store_transition(observation, action,
                                     reward, observation_, int(done))
                agent.train()
            observation = observation_
            n_steps += 1
        scores.append(score)
        steps_array.append(n_steps)

        avg_score = np.mean(scores[-100:])
        print('episode: ', i,'score: ', score,
             ' average score %.1f' % avg_score, 'best score %.2f' % best_score,
            'epsilon %.2f' % agent.epsilon, 'steps', n_steps)

        
        if avg_score > best_score:
            best_score = avg_score
        reward_history.append(avg_score)
        best_score_history.append(best_score)
        eps_history.append(agent.epsilon)

   
    agent.save_models()
    return reward_history, best_score_history

# Функция тестирования агента DDQN

In [None]:
def test_DDQN(eps_len=100, num_eps=10, chkpt_dir="model_meander2", red_agent=RedMeanderAgent):
    lr=0.0001
    eps_dec=0.000005
    eps_min=0.05
    gamma=0.99
    batch_size=32
    epsilon=0
    mem_size=5000
    replace=1000
    CYBORG = CybORG(PATH, 'sim', agents={
        'Red': red_agent
    })
    env = ChallengeWrapper(env=CYBORG, agent_name="Blue")

    model_dir = os.path.join(os.getcwd(), "Models", chkpt_dir)
    print(model_dir)
    
    agent = DQNAgent(gamma=gamma, epsilon=0, lr=lr,
                     input_dims=(env.observation_space.shape),
                     n_actions=env.action_space.n, mem_size=mem_size, eps_min=eps_min,
                     batch_size=batch_size, replace=replace, eps_dec=eps_dec,
                     chkpt_dir=model_dir, algo='DDQNAgent',
                     env_name='Scenario1b')
    # gets the checkpoint from model_dir
    agent.load_models()

    scores = []
    mmax = -1000
    optim_actions = []
    for i in range(num_eps):
        s = []
        a = []
        observation = env.reset()
       
        for j in range(eps_len):
            action = agent.get_action(observation)
            observation, reward, done, info = env.step(action=action)
            #print(observation)
            s.append(reward)
            a.append((str(env.get_last_action('Blue')), str(env.get_last_action('Red')),str(reward)))
        total_score = np.sum(s)
        if total_score >= mmax:
            mmax = total_score
            optim_actions = a
        scores.append(total_score)
        print(i,'score: ', total_score)
        
    avg_score = np.mean(scores)
    print('average score: ', avg_score, ' +- ', np.std(scores))
    
    print('max score:', mmax)
    return optim_actions

# Обучение

In [None]:
CYBORG = CybORG(PATH, 'sim', agents={
        'Red': RedMeanderAgent
    })
env = ChallengeWrapper(env=CYBORG, agent_name="Blue")

rewards, bests =train_DDQN()

# Вывод графика обучения

In [None]:
import matplotlib.pyplot as plt

# Пример данных

# Построение графика
plt.figure(figsize=(10, 6))
plt.plot(rewards, linestyle='-', color='b')  # Используем только массив y
plt.title("График зависимости наград от сыгранных эпизодов")
plt.xlabel("Эпизоды")
plt.ylabel("Награда")
plt.yticks(range(-800, 0, 50)) 
plt.grid(True)
plt.savefig("grafik.png", dpi=300, bbox_inches='tight')
# Показать график
plt.show()

# Вывод оптимальной стратегии

In [None]:
opt = test_DDQN(chkpt_dir="model_meander", red_agent=RedMeanderAgent,num_eps=15)

for i in range(len(opt)):
    print(i+1,': ',opt[i])