# Red Inicial


## Imports


In [57]:
from collections import deque
from enum import Enum
from functools import cache
from typing import Generator, Optional
from random import random

import gymnasium as gym
import numpy as np
from time import time
from gymnasium.spaces import Box, Dict, Discrete
from stable_baselines3 import PPO
from stable_baselines3.common.env_checker import check_env

## Clases y Funciones


In [58]:
class Packet_Generator():
    def __init__(self, min_ip=0, max_ip=2000,
                 min_port=0, max_port=4000,
                 min_protocol=0, max_protocol=100,
                 min_size=0, max_size=2**16 - 1,
                 min_rate=0, max_rate=100):

        self.packet = Dict({
            "IP":  Box(low=min_ip, high=max_ip, shape=(), dtype=int),
            "PORT":  Box(low=min_port, high=max_port, shape=(), dtype=int),
            "PROTOCOL":  Box(low=min_protocol, high=max_protocol, shape=(), dtype=int),
            "SIZE":  Box(low=min_size, high=max_size, shape=(), dtype=int)
        })
        self.min_rate: int = min_rate
        self.max_rate: int = max_rate

    def generate_packet(self):
        return self.packet.sample()

    def generate_packets(self):
        num_packets: int = np.random.randint(self.min_rate, self.max_rate)
        return [self.generate_packet() for _ in range(num_packets)]


class DOS_Packet_Generator(Packet_Generator):
    def __init__(self,
                 min_ip=0, max_ip=2000,
                 min_port=0, max_port=4000,
                 min_protocol=0, max_protocol=100,
                 min_size=0, max_size=2**16 - 1,
                 min_rate=10, max_rate=100):

        ip = np.random.randint(min_ip, max_ip)
        super().__init__(ip, ip,
                         min_port, max_port,
                         min_protocol, max_protocol,
                         min_size, max_size,
                         min_rate, max_rate)


class DDOS_Packet_Generator(Packet_Generator):
    def __init__(self,
                 min_ip=0, max_ip=2000,
                 min_port=0, max_port=4000,
                 min_protocol=0, max_protocol=100,
                 min_size=2**10, max_size=2**16 - 1,
                 min_rate=100, max_rate=1000):

        super().__init__(min_ip, max_ip,
                         min_port, max_port,
                         min_protocol, max_protocol,
                         min_size, max_size,
                         min_rate, max_rate)


class PacketAttack(Enum):
    @staticmethod
    def new_set(description, weight, class_ref):
        return {
            "Description": description,
            "weight": weight,
            "class": class_ref
        }

    @staticmethod
    def not_implemented():
        raise NotImplementedError(f"Class not implemented")

    @classmethod
    @cache
    def weights(cls):
        attack_weights = []
        for attack in PacketAttack:
            attack_weights.append(attack.value["weight"])
        return np.array(attack_weights)

    # ----ENUM VALUES----
    DOS = new_set("Denial of Service", 1.0, DOS_Packet_Generator)
    DDOS = new_set("Distributed Denial of Service", 2.0, DDOS_Packet_Generator)


dos_gen = DOS_Packet_Generator()

print(dos_gen.generate_packets())

[{'IP': 1190, 'PORT': 3561, 'PROTOCOL': 73, 'SIZE': 62188}, {'IP': 1190, 'PORT': 1385, 'PROTOCOL': 35, 'SIZE': 5703}, {'IP': 1190, 'PORT': 1849, 'PROTOCOL': 23, 'SIZE': 57583}, {'IP': 1190, 'PORT': 3608, 'PROTOCOL': 84, 'SIZE': 5671}, {'IP': 1190, 'PORT': 3574, 'PROTOCOL': 74, 'SIZE': 2386}, {'IP': 1190, 'PORT': 1498, 'PROTOCOL': 46, 'SIZE': 32952}, {'IP': 1190, 'PORT': 2207, 'PROTOCOL': 72, 'SIZE': 56073}, {'IP': 1190, 'PORT': 3733, 'PROTOCOL': 5, 'SIZE': 41583}, {'IP': 1190, 'PORT': 3157, 'PROTOCOL': 31, 'SIZE': 17287}, {'IP': 1190, 'PORT': 2920, 'PROTOCOL': 47, 'SIZE': 15737}, {'IP': 1190, 'PORT': 2566, 'PROTOCOL': 15, 'SIZE': 47300}, {'IP': 1190, 'PORT': 3915, 'PROTOCOL': 72, 'SIZE': 47107}, {'IP': 1190, 'PORT': 2099, 'PROTOCOL': 43, 'SIZE': 63287}, {'IP': 1190, 'PORT': 1702, 'PROTOCOL': 37, 'SIZE': 59579}, {'IP': 1190, 'PORT': 2471, 'PROTOCOL': 100, 'SIZE': 58616}, {'IP': 1190, 'PORT': 2166, 'PROTOCOL': 87, 'SIZE': 13620}, {'IP': 1190, 'PORT': 386, 'PROTOCOL': 24, 'SIZE': 43094}, 

In [59]:


class Acciones(Enum):


    REENVIAR = 0

    DESCARTAR = 2

    @classmethod
    def int_to_action(cls, action: int) -> "Acciones":
        return cls._get_actions_list()[action]

    @classmethod
    @cache
    def _get_actions_list(cls):
        return list(Acciones)



class RouterEnv(gym.Env):

    def __init__(self, max_len=10, seed: Optional[int] = None):

        super(RouterEnv, self).__init__()


        self.max_len: int = max_len

        self.queue = deque(maxlen=max_len)

        self.rate = 5

        self.attack_probability = 0.1
        self.step_durations: list[float] = []

        self.observation_space = Box(low=0, high=self.max_len, shape=(
            1,), dtype=np.uint16)  # Paquetes en la cola

        self.action_space = Discrete(len(Acciones))


    def reset(self, seed: Optional[int] = None, options: Optional[dict] = None):
        self.queue.clear()
        self.step_durations.clear()

        self.rng: Generator = np.random.default_rng(seed)

        observation = self._get_obs()
        info = self._get_info()
        return observation, info

    def _get_obs(self):
        stats = self.calculate_queue_stats()
        return stats

    def _get_info(self):
        return {
            "Queue": np.array(self.queue),
            "AttackProb": self.attack_probability
        }

    def calculate_queue_stats(self):
        num_packets: int = len(self.queue)
        return np.array([num_packets], dtype=np.uint16)

    def step(self, action: int):
        start_time: float = time()
        if random() < self.attack_probability:
            paquete = Packet_Generator().generate_packet()  # Generación de paquetes normales
        else:
            # Generación de paquetes maliciosos
            paquete = DOS_Packet_Generator().generate_packet()

        self.queue.append(paquete)

        action = Acciones.int_to_action(action)
        reward: int = self.procesar_paquete(action)

        finished: bool = self._is_finished_execution()
        observation = self._get_obs()
        truncated = False
        info = self._get_info()

        self.step_durations.append(time() - start_time)

        return observation, reward, finished, truncated, info

    def procesar_paquete(self, accion: Acciones) -> int:
        reward: int = self.max_len - len(self.queue)
        if len(self.queue) == 0:
            return reward

        paquete = self.queue.popleft()

        reward -= accion.value

        return reward

    def _is_finished_execution(self) -> bool:
        # Terminar solo después de 10 pasos
        return len(self.step_durations) >= 10

In [61]:
env = RouterEnv()
check_env(env)

model = PPO("MlpPolicy", env, verbose=True)

model.learn(total_timesteps=10000)
model.save("Example")

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 11       |
|    ep_rew_mean     | 88.6     |
| time/              |          |
|    fps             | 925      |
|    iterations      | 1        |
|    time_elapsed    | 2        |
|    total_timesteps | 2048     |
---------------------------------
------------------------------------------
| rollout/                |              |
|    ep_len_mean          | 11           |
|    ep_rew_mean          | 88.3         |
| time/                   |              |
|    fps                  | 791          |
|    iterations           | 2            |
|    time_elapsed         | 5            |
|    total_timesteps      | 4096         |
| train/                  |              |
|    approx_kl            | 0.0005354822 |
|    clip_fraction        | 0            |
|    clip_range           | 0.2          |
|    en