In [None]:

from CybORG import CybORG
from CybORG.Agents import *
from CybORG.Agents.Wrappers import *

import inspect
import random
from pprint import pprint
from gym import Env, spaces
from gym.spaces import Discrete, Tuple
import numpy as np

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributions import Categorical

class PPOSubAgent(nn.Module):
    def __init__(self, input_dim, action_dim):
        super(PPOSubAgent, self).__init__()
        # Shared feature extractor
        self.actor = nn.Sequential(
            nn.Linear(input_dim, 64),
            nn.ReLU(),
            nn.Linear(64, action_dim))
            
        self.critic = nn.Sequential(
            nn.Linear(input_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 1))
        
    def forward(self, x):
        return self.actor(x), self.critic(x)
    
    def save_model(self, path):
        torch.save(self.state_dict(), path)
    
    def load_model(self, path):
        self.load_state_dict(torch.load(path))

In [None]:
class Controller(nn.Module):
    def __init__(self, input_dim, num_types=2, hidden_dim=64, seq_len=10):
        super().__init__()
        self.seq_len = seq_len
        
        # LSTM to model temporal dependencies
        self.lstm = nn.LSTM(
            input_size=input_dim, 
            hidden_size=hidden_dim, 
            batch_first=True
        )
        
        # Classifier for belief distribution
        self.classifier = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, num_types),
            nn.Softmax(dim=-1)
        )
    
    def forward(self, obs_history):
        # obs_history shape: (batch_size, seq_len, input_dim)
        lstm_out, (h_n, c_n) = self.lstm(obs_history)
        last_hidden = lstm_out[:, -1, :]  # Take final timestep output
        belief = self.classifier(last_hidden)  # (batch_size, num_types)
        return belief
    
    def save_model(self, path):
        torch.save(self.state_dict(), path)
    
    def load_model(self, path):
        self.load_state_dict(torch.load(path))

In [None]:
def train_subagent(attacker_type={'Red':B_lineAgent}, num_episodes=100):
    # Initialize CybORG environment with specified attacker
    path = str(inspect.getfile(CybORG)) 
    path = path[:-10] + '/Shared/Scenarios/Scenario2.yaml'

    cyborg = CybORG(path,'sim',agents=attacker_type)
    env = ChallengeWrapper(env=cyborg, agent_name='Blue')
    
    agent = PPOSubAgent(input_dim=env.observation_space.shape[0],
                       action_dim=env.get_action_space('Blue'))
    
    optimizer = optim.Adam(agent.parameters(), lr=3e-4)
    
    for episode in range(num_episodes):
        print (episode)
        state = env.reset()
        done = False
        
        while not done:
            
            logits, value = agent(torch.FloatTensor(state))
            dist = Categorical(logits=logits)
            action = dist.sample()
                        
            next_state, reward, done, info = env.step(action.item())
            
            
            # Calculate losses (simplified PPO)
            _, next_value = agent(torch.FloatTensor(next_state))
            advantage = reward + 0.99 * next_value * (1 - done) - value
            
            # Update policy
            loss = -dist.log_prob(action) * advantage.detach()
            loss += 0.5 * (value - reward) ** 2  # Value loss
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            state = next_state
    
    # Save trained model
    agent.save_model(f'ppo_{attacker_type.get('Red').__name__}.pth')
    return agent

In [None]:
def train_metacontroller(steps= 46,seq_len=10, num_episodes=10):
    path = str(inspect.getfile(CybORG)) 
    path = path[:-10] + '/Shared/Scenarios/Scenario2.yaml'

    env = CybORG(path,'sim',agents={'Blue':BlueMonitorAgent})
    blue_env= FixedFlatWrapper(env=env)
   
    
    meta_controller = Controller(
        input_dim=len(blue_env.get_observation('Blue')), 
        num_types=2
    )
    optimizer = torch.optim.Adam(meta_controller.parameters(), lr=1e-3)
    criterion = nn.CrossEntropyLoss()
    
    # Buffer to store observation sequences
    obs_buffer = []
    
    for episode in range(num_episodes):
        print( 'episode  '+str(episode))
        # Set attacker type for the entire episode
        attacker_type = random.getrandbits(1)
        print("attacker for this episode is "+str(attacker_type))
        agent = B_lineAgent() if attacker_type == 0 else RedMeanderAgent()
               
        results = env.reset('Red')
        red_obs = results.observation
        red_action_space = results.action_space
        blue_obs = blue_env.get_observation('Blue')
        done = False
        obs_buffer.clear()
        
        # while not done:
        for step in range(steps):
            # Append current observation to buffer (limit to seq_len)
            obs_buffer.append(blue_obs)
            if len(obs_buffer) > seq_len:
                obs_buffer.pop(0)
            
            # Pad sequence if shorter than seq_len
            if len(obs_buffer) < seq_len:
                padded_seq = np.zeros((seq_len, len(blue_env.get_observation('Blue'))))
                padded_seq[-len(obs_buffer):] = obs_buffer
            else:
                padded_seq = np.array(obs_buffer)
            
            # Convert to tensor
            obs_seq = torch.FloatTensor(padded_seq).unsqueeze(0)  # (1, seq_len, input_dim)
            
            # Predict belief
            belief = meta_controller(obs_seq)
            predicted_label = torch.argmax(belief).item()
            print("belief and predicted label")
            print (belief)
            print(predicted_label)

            # Environment step
            action = agent.get_action(red_obs, red_action_space)
            next_state = env.step(action=action, agent='Red')
            done = next_state.done
            red_obs = next_state.observation
            
            blue_obs = blue_env.get_observation('Blue')
            # Update model after each timestep
            loss = criterion(belief, torch.tensor([attacker_type]))
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
    
    meta_controller.save_model('controller.pth')

train_metacontroller()    
    

In [None]:

path = str(inspect.getfile(CybORG)) 
path = path[:-10] + '/Shared/Scenarios/Scenario2.yaml'
cyborg = CybORG(path,'sim',agents={'Red':B_lineAgent})
env = ChallengeWrapper(env=cyborg,agent_name='Blue')
print(env.action_space)
print(env.get_action_space('Blue'))

print(env.observation_space.shape[0])
state = env.reset()
print(state)
# blue_env = FixedFlatWrapper(env=cyborg)
# agent = B_lineAgent()

# results = cyborg.reset()
# obs = results.observation
# print("red observation ")
# print(obs)
# action_space = results.action_space
# print(action_space)
# red_obs= cyborg.get_observation('Red')
# red_action_space = cyborg.get_action_space('Red')
# print(red_obs)
# print(red_action_space)


# # blue_results = cyborg.reset('Blue')
# print("blue observation")


# # print(blue_results.action_space)
# # blue_obs = cyborg.get_observation('Blue')
# # blue_action_space = cyborg.get_action_space('Blue')
# # print(blue_obs)
# # print(blue_action_space)
# action = agent.get_action(obs, action_space)
# print(action)
# results = cyborg.step(action=action, agent='Red')
# print(results.observation)
# print (results.reward)
# print(results.done)
# print(results.info)
# print(env.get_last_action('Red'))
# print(env.get_observation('Blue'))
# action = agent.get_action(results.observation, action_space)
# print(action)
# results = cyborg.step(action=action, agent='Red')
# # print(results.observation)
# # print (results.reward)
# # print(results.done)
# # print(results.info)
# print(env.get_last_action('Red'))
# print(env.get_observation('Blue'))


SubAgents training code 

In [None]:
from subAgent import SubAgent
from RLModels import *
import os
import inspect
from CybORG import CybORG
from CybORG.Agents import B_lineAgent, RedMeanderAgent
from CybORG.Agents.Wrappers import ChallengeWrapper

agent_checkpoint_folder = "Bline"
ckpt_folder = os.path.join(os.getcwd(), "Models", agent_checkpoint_folder)

if not os.path.exists(ckpt_folder):
        os.makedirs(ckpt_folder)

path = str(inspect.getfile(CybORG)) 
path = path[:-10] + '/Shared/Scenarios/Scenario2.yaml'

cyborg = CybORG(path,'sim',agents={'Red': B_lineAgent})
env = ChallengeWrapper(env=cyborg, agent_name='Blue')

agent = SubAgent(env,target_attacker_type=0,k_epochs=5,restore=True)

agent.train(max_episodes=100000,max_timesteps=100,update_timestep=20000, ckpt_folder=ckpt_folder,print_interval=100,save_interval=10000)

Episode 100 	 Avg reward: -3
Episode 200 	 Avg reward: -3
Episode 300 	 Avg reward: -4
Episode 400 	 Avg reward: -4
Episode 500 	 Avg reward: -5
Episode 600 	 Avg reward: -4
Episode 700 	 Avg reward: -4
Episode 800 	 Avg reward: -4
Episode 900 	 Avg reward: -3
Checkpoint saved
Episode 1000 	 Avg reward: -3
Episode 1100 	 Avg reward: -4
Episode 1200 	 Avg reward: -4
Episode 1300 	 Avg reward: -3
Episode 1400 	 Avg reward: -4
Episode 1500 	 Avg reward: -3
Episode 1600 	 Avg reward: -3
Episode 1700 	 Avg reward: -4
Episode 1800 	 Avg reward: -4
Episode 1900 	 Avg reward: -3
Checkpoint saved
Episode 2000 	 Avg reward: -3
Episode 2100 	 Avg reward: -3
Episode 2200 	 Avg reward: -3
Episode 2300 	 Avg reward: -3
Episode 2400 	 Avg reward: -3
Episode 2500 	 Avg reward: -3
Episode 2600 	 Avg reward: -3
Episode 2700 	 Avg reward: -3
Episode 2800 	 Avg reward: -3
Episode 2900 	 Avg reward: -2
Checkpoint saved
Episode 3000 	 Avg reward: -3
Episode 3100 	 Avg reward: -3
Episode 3200 	 Avg reward: -

: 

SubAgent Evaluation code

In [1]:
from subAgent import SubAgent
from RLModels import *
import os
import inspect
import time
from statistics import mean, stdev
from CybORG import CybORG
from CybORG.Agents import B_lineAgent, RedMeanderAgent
from CybORG.Agents.Wrappers import ChallengeWrapper

MAX_EPS = 100
agent_name = 'Blue'

agent_checkpoint_folder = "Bline"
ckpt_folder = os.path.join(os.getcwd(), "Models", agent_checkpoint_folder)

if not os.path.exists(ckpt_folder):
        os.makedirs(ckpt_folder)

path = str(inspect.getfile(CybORG)) 
path = path[:-10] + '/Shared/Scenarios/Scenario2.yaml'

cyborg = CybORG(path,'sim',agents={'Red': B_lineAgent})
env = ChallengeWrapper(env=cyborg, agent_name='Blue')

agent = SubAgent(env,target_attacker_type=0,k_epochs=5,restore=True)
print(f'Using agent {agent.__class__.__name__}, if this is incorrect please update the code to load in your agent')

file_name = str(inspect.getfile(CybORG))[:-10] + '/Evaluation/' + time.strftime("%Y%m%d_%H%M%S") + f'_{agent.__class__.__name__}.txt'
print(f'Saving evaluation results to {file_name}')
# with open(file_name, 'a+') as data:
#         data.write(f'CybORG v{cyborg_version}, {scenario}, Commit Hash: {commit_hash}\n')
#         data.write(f'author: {name}, team: {team}, technique: {name_of_agent}\n')
#         data.write(f"wrappers: {wrap_line}\n")

for num_steps in [30, 50, 100]:
        # for red_agent in [B_lineAgent, RedMeanderAgent, SleepAgent]:
            red_agent = B_lineAgent
            # cyborg = CybORG(path, 'sim', agents={'Red': red_agent})
            wrapped_cyborg = env

            observation = wrapped_cyborg.reset()
            # observation = cyborg.reset().observation

            action_space = wrapped_cyborg.get_action_space(agent_name)
            # action_space = cyborg.get_action_space(agent_name)
            total_reward = []
            actions = []
            for i in range(MAX_EPS):
                r = []
                a = []
                # cyborg.env.env.tracker.render()
                for j in range(num_steps):
                    action = agent.get_action(observation, action_space)
                    observation, rew, done, info = wrapped_cyborg.step(action)
                    # result = cyborg.step(agent_name, action)
                    r.append(rew)
                    # r.append(result.reward)
                    a.append((str(cyborg.get_last_action('Blue')), str(cyborg.get_last_action('Red'))))
                agent.end_episode()
                total_reward.append(sum(r))
                actions.append(a)
                # observation = cyborg.reset().observation
                observation = wrapped_cyborg.reset()
            print(f'Average reward for red agent {red_agent.__name__} and steps {num_steps} is: {mean(total_reward)} with a standard deviation of {stdev(total_reward)}')
            with open(file_name, 'a+') as data:
                data.write(f'steps: {num_steps}, adversary: {red_agent.__name__}, mean: {mean(total_reward)}, standard deviation {stdev(total_reward)}\n')
                for act, sum_rew in zip(actions, total_reward):
                    data.write(f'actions: {act}, total reward: {sum_rew}\n')


RuntimeError: Error(s) in loading state_dict for ActorCritic:
	size mismatch for actor.0.weight: copying a param with shape torch.Size([64, 62]) from checkpoint, the shape in current model is torch.Size([64, 52]).
	size mismatch for actor.4.weight: copying a param with shape torch.Size([36, 64]) from checkpoint, the shape in current model is torch.Size([145, 64]).
	size mismatch for actor.4.bias: copying a param with shape torch.Size([36]) from checkpoint, the shape in current model is torch.Size([145]).
	size mismatch for critic.0.weight: copying a param with shape torch.Size([64, 62]) from checkpoint, the shape in current model is torch.Size([64, 52]).