In [92]:
# Installazione libreria gym-idsgame
!SKLEARN_ALLOW_DEPRECATED_SKLEARN_PACKAGE_INSTALL=True pip install gym-idsgame



# Algoritmo SARSA

In [3]:
import numpy as np
from gym_idsgame.agents.bot_agents.random_attack_bot_agent import RandomAttackBotAgent
from gym_idsgame.envs import IdsGameRandomAttackV21Env, IdsGameMaximalAttackV21Env
from gym_idsgame.envs.dao.game_config import GameConfig
from gym_idsgame.envs.dao.idsgame_config import IdsGameConfig

# Configura il gioco e l'ambiente per scenario Random Attack (noi controlliamo il difensore)
# Nota: In IdsGameRandomAttackV21Env l'attaccante è parte dell'ambiente; noi forniamo eventualmente un bot
# per l'attaccante, ma la nostra politica da apprendere è quella del difensore.

# Creazione dell'ambiente con attaccante casuale
base_game_config = GameConfig()
attacker_agent = RandomAttackBotAgent(game_config=base_game_config, env=None)
idsgame_config = IdsGameConfig(game_config=base_game_config, attacker_agent=attacker_agent)
env = IdsGameRandomAttackV21Env(idsgame_config=idsgame_config)

# Iperparametri SARSA
alpha = 0.5      # tasso di apprendimento
gamma = 0.95     # fattore di sconto
epsilon = 0.1    # esplorazione
epsilon_min = 0.01
epsilon_decay = 0.9995
num_episodes = 2000  # per esecuzioni locali rapide; aumentare in Colab (es. 50k)

# Osservazioni e spazi di azione (difensore)
# L'osservazione è un array numpy; useremo una tabella Q basata su hashing dello stato per semplicità.
num_actions = env.defender_action_space.n
Q = {}  # dizionario: chiave=(state_tuple), valore=np.array di dimensione num_actions

def state_key(obs: np.ndarray):
    # Converti l'osservazione in una tupla hashable a bassa collisione
    return tuple(np.asarray(obs, dtype=np.int16).ravel().tolist())


def get_Q_row(skey):
    if skey not in Q:
        Q[skey] = np.zeros(num_actions, dtype=np.float32)
    return Q[skey]


def choose_action_from_Q(skey, eps):
    if np.random.rand() < eps:
        return np.random.randint(0, num_actions)
    qrow = get_Q_row(skey)
    return int(np.argmax(qrow))


# Ciclo di addestramento SARSA (on-policy)
print("Avvio dell'addestramento con SARSA (difensore vs random attacker)...")
for episode in range(num_episodes):
    obs, reward, done, info = env.reset(), (0, 0), False, {}
    skey = state_key(obs)
    action = choose_action_from_Q(skey, epsilon)

    episode_return_d = 0.0
    steps = 0

    while not done:
        # In DefenderEnv, step richiede una coppia (attacco, difesa); l'attacco è generato dall'ambiente,
        # quindi si passa -1 per l'attacco e l'azione di difesa come intero.
        next_obs, r, done, _ = env.step((-1, action))
        # r è una tupla (reward_attacker, reward_defender)
        r_d = r[1]
        episode_return_d += r_d

        next_skey = state_key(next_obs)
        next_action = choose_action_from_Q(next_skey, epsilon)

        # Aggiornamento SARSA: Q(s,a) <- Q(s,a) + alpha * [r + gamma*Q(s',a') - Q(s,a)]
        qsa = get_Q_row(skey)
        qsa_next = get_Q_row(next_skey)
        td_target = r_d + (0 if done else gamma * qsa_next[next_action])
        td_error = td_target - qsa[action]
        qsa[action] += alpha * td_error

        skey = next_skey
        action = next_action
        steps += 1

    # Decadimento epsilon
    if epsilon > epsilon_min:
        epsilon = max(epsilon_min, epsilon * epsilon_decay)

    if (episode + 1) % 100 == 0:
        print(f"Episodio {episode+1}: ritorno difensore = {episode_return_d:.1f}, epsilon = {epsilon:.3f}")

print("\nAddestramento completato.")

# Valutazione della politica appresa (modalità greedy)
print("\nValutazione della politica appresa...")
num_test_episodes = 50
total_test_rewards_d = 0.0
for _ in range(num_test_episodes):
    obs, done = env.reset(), False
    while not done:
        skey = state_key(obs)
        qrow = get_Q_row(skey)
        action = int(np.argmax(qrow))
        obs, r, done, _ = env.step((-1, action))
        total_test_rewards_d += r[1]

print(f"Ricompensa media (difensore) su {num_test_episodes} episodi di test: {total_test_rewards_d / num_test_episodes:.2f}")

Avvio dell'addestramento con SARSA (difensore vs random attacker)...
Episodio 100: ritorno difensore = -100.0, epsilon = 0.095
Episodio 200: ritorno difensore = -100.0, epsilon = 0.090
Episodio 300: ritorno difensore = -100.0, epsilon = 0.086
Episodio 400: ritorno difensore = -100.0, epsilon = 0.082
Episodio 500: ritorno difensore = -100.0, epsilon = 0.078
Episodio 600: ritorno difensore = -100.0, epsilon = 0.074
Episodio 700: ritorno difensore = -100.0, epsilon = 0.070
Episodio 800: ritorno difensore = -100.0, epsilon = 0.067
Episodio 900: ritorno difensore = -100.0, epsilon = 0.064
Episodio 1000: ritorno difensore = -100.0, epsilon = 0.061
Episodio 1100: ritorno difensore = -100.0, epsilon = 0.058
Episodio 1200: ritorno difensore = -100.0, epsilon = 0.055
Episodio 1300: ritorno difensore = -100.0, epsilon = 0.052
Episodio 1400: ritorno difensore = -100.0, epsilon = 0.050
Episodio 1500: ritorno difensore = -100.0, epsilon = 0.047
Episodio 1600: ritorno difensore = -100.0, epsilon = 0.

# Algoritmo DDQN

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque, namedtuple
import random

# Funzioni di supporto per DDQN
Transition = namedtuple('Transition', ('state', 'action', 'reward', 'next_state', 'done'))

class ReplayBuffer:
    def __init__(self, capacity:int):
        self.buffer = deque(maxlen=capacity)
    def push(self, *args):
        self.buffer.append(Transition(*args))
    def sample(self, batch_size:int):
        batch = random.sample(self.buffer, batch_size)
        return Transition(*zip(*batch))
    def __len__(self):
        return len(self.buffer)

class QNetwork(nn.Module):
    def __init__(self, input_dim:int, output_dim:int):
        super().__init__()
        hidden = 256
        self.net = nn.Sequential(
            nn.Linear(input_dim, hidden), nn.ReLU(),
            nn.Linear(hidden, hidden), nn.ReLU(),
            nn.Linear(hidden, output_dim)
        )
    def forward(self, x):
        return self.net(x)

# Utility per ottenere dimensione stato
def flatten_obs(obs):
    arr = np.asarray(obs, dtype=np.float32).ravel()
    return arr

# Costruzione env per DDQN (RandomAttack e MaximalAttack)
base_game_config_ddqn = GameConfig()
attacker_bot_ddqn = RandomAttackBotAgent(game_config=base_game_config_ddqn, env=None)
idsgame_config_ddqn = IdsGameConfig(game_config=base_game_config_ddqn, attacker_agent=attacker_bot_ddqn)
ra_env = IdsGameRandomAttackV21Env(idsgame_config=idsgame_config_ddqn)
ma_env = IdsGameMaximalAttackV21Env(idsgame_config=idsgame_config_ddqn)

# Parametri DDQN
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

ddqn_gamma = 0.99
lr = 1e-3
buffer_capacity = 100_000
batch_size = 128
target_update_freq = 1000  # passi di training
start_learning_after = 1000  # passi di esperienza prima di iniziare update
train_steps = 50_000  # per Colab aumentare (es. 500k)
max_steps_per_episode = 500

# Epsilon schedule
eps_start = 1.0
eps_end = 0.05
eps_decay_steps = 30_000

def epsilon_by_step(t):
    if t >= eps_decay_steps:
        return eps_end
    frac = t / eps_decay_steps
    return eps_start + (eps_end - eps_start) * frac

# Setup rete per RandomAttack
obs_dim = flatten_obs(ra_env.reset()).shape[0]
act_dim = ra_env.defender_action_space.n
online_q = QNetwork(obs_dim, act_dim).to(device)
target_q = QNetwork(obs_dim, act_dim).to(device)
target_q.load_state_dict(online_q.state_dict())
optimizer = optim.Adam(online_q.parameters(), lr=lr)
replay = ReplayBuffer(buffer_capacity)

# Funzione per selezione azione epsilon-greedy
def select_action(net, obs, eps):
    if random.random() < eps:
        return random.randrange(act_dim)
    with torch.no_grad():
        s = torch.from_numpy(flatten_obs(obs)).unsqueeze(0).to(device)
        q = net(s)
        return int(q.argmax(dim=1).item())

# Loop di training DDQN su RandomAttack
print("\n[DDQN] Training su RandomAttack (difensore)...")
state = ra_env.reset()
train_t = 0
episode_return = 0.0
for step in range(train_steps):
    eps = epsilon_by_step(train_t)
    action = select_action(online_q, state, eps)
    next_state, r, done, _ = ra_env.step((-1, action))
    r_d = float(r[1])
    replay.push(flatten_obs(state), action, r_d, flatten_obs(next_state), float(done))
    state = next_state
    episode_return += r_d

    # update
    if len(replay) >= start_learning_after:
        batch = replay.sample(batch_size)
        s = torch.from_numpy(np.stack(batch.state)).float().to(device)
        a = torch.tensor(batch.action, dtype=torch.int64).unsqueeze(1).to(device)
        r_t = torch.tensor(batch.reward, dtype=torch.float32).unsqueeze(1).to(device)
        ns = torch.from_numpy(np.stack(batch.next_state)).float().to(device)
        d = torch.tensor(batch.done, dtype=torch.float32).unsqueeze(1).to(device)

        # Q(s,a)
        q_sa = online_q(s).gather(1, a)
        # DDQN target: a* = argmax_a Q_online(s', a), target = r + gamma*(1-d)*Q_target(s', a*)
        with torch.no_grad():
            next_actions = online_q(ns).argmax(dim=1, keepdim=True)
            q_next = target_q(ns).gather(1, next_actions)
            target = r_t + ddqn_gamma * (1.0 - d) * q_next
        loss = nn.SmoothL1Loss()(q_sa, target)
        optimizer.zero_grad()
        loss.backward()
        nn.utils.clip_grad_norm_(online_q.parameters(), 10.0)
        optimizer.step()

        # Aggiornamento periodico target
        if train_t % target_update_freq == 0:
            target_q.load_state_dict(online_q.state_dict())

    train_t += 1

    if done or (step % max_steps_per_episode == 0 and step > 0):
        # reset episodio
        print(f"Step {step}: return episodio (difensore) = {episode_return:.1f}, eps={eps:.2f}")
        state = ra_env.reset()
        episode_return = 0.0

print("[DDQN] Training RandomAttack completato.")

# Valutazione DDQN su RandomAttack
def evaluate(env_eval, net, episodes=20):
    total = 0.0
    for _ in range(episodes):
        obs, done = env_eval.reset(), False
        while not done:
            with torch.no_grad():
                s = torch.from_numpy(flatten_obs(obs)).unsqueeze(0).to(device)
                a = int(net(s).argmax(1).item())
            obs, r, done, _ = env_eval.step((-1, a))
            total += float(r[1])
    return total / episodes

ra_score = evaluate(ra_env, online_q, episodes=20)
print(f"[DDQN] RandomAttack - Ricompensa media (difensore) su 20 episodi: {ra_score:.2f}")

# Re-setup rete per MaximalAttack (riuso pesi iniziali o trasferimento opzionale)
obs_dim_ma = flatten_obs(ma_env.reset()).shape[0]
act_dim_ma = ma_env.defender_action_space.n

online_q_ma = QNetwork(obs_dim_ma, act_dim_ma).to(device)
target_q_ma = QNetwork(obs_dim_ma, act_dim_ma).to(device)
target_q_ma.load_state_dict(online_q_ma.state_dict())
optimizer_ma = optim.Adam(online_q_ma.parameters(), lr=lr)
replay_ma = ReplayBuffer(buffer_capacity)

print("\n[DDQN] Training su MaximalAttack (difensore)...")
state = ma_env.reset()
train_t = 0
episode_return = 0.0
for step in range(train_steps):
    eps = epsilon_by_step(train_t)
    action = select_action(online_q_ma, state, eps)
    next_state, r, done, _ = ma_env.step((-1, action))
    r_d = float(r[1])
    replay_ma.push(flatten_obs(state), action, r_d, flatten_obs(next_state), float(done))
    state = next_state
    episode_return += r_d

    if len(replay_ma) >= start_learning_after:
        batch = replay_ma.sample(batch_size)
        s = torch.from_numpy(np.stack(batch.state)).float().to(device)
        a = torch.tensor(batch.action, dtype=torch.int64).unsqueeze(1).to(device)
        r_t = torch.tensor(batch.reward, dtype=torch.float32).unsqueeze(1).to(device)
        ns = torch.from_numpy(np.stack(batch.next_state)).float().to(device)
        d = torch.tensor(batch.done, dtype=torch.float32).unsqueeze(1).to(device)

        q_sa = online_q_ma(s).gather(1, a)
        with torch.no_grad():
            next_actions = online_q_ma(ns).argmax(dim=1, keepdim=True)
            q_next = target_q_ma(ns).gather(1, next_actions)
            target = r_t + ddqn_gamma * (1.0 - d) * q_next
        loss = nn.SmoothL1Loss()(q_sa, target)
        optimizer_ma.zero_grad()
        loss.backward()
        nn.utils.clip_grad_norm_(online_q_ma.parameters(), 10.0)
        optimizer_ma.step()

        if train_t % target_update_freq == 0:
            target_q_ma.load_state_dict(online_q_ma.state_dict())

    train_t += 1

    if done or (step % max_steps_per_episode == 0 and step > 0):
        print(f"Step {step}: return episodio (difensore) = {episode_return:.1f}, eps={eps:.2f}")
        state = ma_env.reset()
        episode_return = 0.0

print("[DDQN] Training MaximalAttack completato.")

ma_score = evaluate(ma_env, online_q_ma, episodes=20)
print(f"[DDQN] MaximalAttack - Ricompensa media (difensore) su 20 episodi: {ma_score:.2f}")



[DDQN] Training su RandomAttack (difensore)...
Step 101: return episodio (difensore) = -100.0, eps=1.00
Step 203: return episodio (difensore) = -100.0, eps=0.99
Step 305: return episodio (difensore) = -100.0, eps=0.99
Step 407: return episodio (difensore) = -100.0, eps=0.99
Step 500: return episodio (difensore) = 0.0, eps=0.98
Step 602: return episodio (difensore) = -100.0, eps=0.98
Step 704: return episodio (difensore) = -100.0, eps=0.98
Step 806: return episodio (difensore) = -100.0, eps=0.97
Step 908: return episodio (difensore) = -100.0, eps=0.97
Step 1000: return episodio (difensore) = 0.0, eps=0.97
Step 1102: return episodio (difensore) = -100.0, eps=0.97
Step 1204: return episodio (difensore) = -100.0, eps=0.96
Step 1306: return episodio (difensore) = -100.0, eps=0.96
Step 1408: return episodio (difensore) = -100.0, eps=0.96
Step 1500: return episodio (difensore) = 0.0, eps=0.95
Step 1602: return episodio (difensore) = -100.0, eps=0.95
Step 1704: return episodio (difensore) = -