In [1]:
from ipynb.fs.full.sim import Simulator
from ipynb.fs.full.traffic import Frame
from ipynb.fs.full.logger import Logger
from abc import ABC, abstractmethod
from collections import deque
import random

In [2]:
class IngressPort:
    def __init__(self, port_id: str, switch):
        self.port_id = port_id
        self.switch = switch

    def receive(self, frame: Frame):
        self.switch.logger.log(self.switch.sim.time, self.switch.name, "RECEIVE", frame, self.port_id)
        self.switch.process(frame, self)

class EgressPort:
    def __init__(self, port_id: str, switch, link):
        self.port_id = port_id
        self.switch = switch
        self.link = link

    def send(self, frame: Frame):
        self.switch.logger.log(self.switch.sim.time, self.switch.name, "FORWARD", frame, self.port_id)
        return self.link.transmit(frame, self.switch)

class Switch(ABC):
    def __init__(self, name: str, simulator: Simulator, logger: Logger):
        self.name = name
        self.sim = simulator
        self.logger = logger

        self.ingress_ports = {}
        self.egress_ports = {}

    def add_ingress_port(self, port_id: str):
        port = IngressPort(port_id, self)
        self.ingress_ports[port_id] = port
        return port

    def add_egress_port(self, port_id: str, link):
        port = EgressPort(port_id, self, link)
        self.egress_ports[port_id] = port
        return port

    @abstractmethod
    def process(self, frame: Frame, ingress_port: IngressPort):
        pass

In [3]:
class Link:
    def __init__(self, simulator: Simulator, bitrate_bps: float, transmission_delay_enabled: bool = True, propagation_delay: float = 0.0):
        self.sim = simulator
        self.bitrate = bitrate_bps
        self.propagation_delay = propagation_delay
        self.transmission_delay_enabled = transmission_delay_enabled # optional true

        self.endpoints = {}          # switch -> ingress_port
        self.busy_until = 0.0        # absolute simulation time

    def connect(self, switch: Switch, ingress_port: IngressPort):
        # wathc out for more than 1 connection
        self.endpoints[switch] = ingress_port

    def transmit(self, frame: Frame, from_switch: Switch):
        now = self.sim.time

        if(self.transmission_delay_enabled):
            tx_time = (frame.size * 8) / self.bitrate
        else:
            tx_time = 0.0
        
        start_time = max(now, self.busy_until)
        finish_time = start_time + tx_time

        self.busy_until = finish_time

        self.sim.schedule(finish_time + self.propagation_delay - now, self._deliver, frame, from_switch)
        return finish_time

    def _deliver(self, frame: Frame, from_switch: Switch):
        for sw, ingress in self.endpoints.items():
            if sw is not from_switch:
                ingress.receive(frame)

In [4]:
class BasicSwitch(Switch):
    def __init__(self, name, simulator: Simulator, logger: Logger, routing_table: dict):
        super().__init__(name, simulator, logger)
        self.routing_table = routing_table          # stream_id → list[egress_port_id]
        self.queues = {}                            # egress_port_id → deque[Frame]
        self.sending = {}                           # egress_port_id → bool

    def add_egress_port(self, port_id: str, link):
        port = super().add_egress_port(port_id, link)
        self.queues[port_id] = deque()
        self.sending[port_id] = False
        return port

    def process(self, frame: Frame, ingress_port: IngressPort):
        out_ports = self.routing_table.get(frame.stream_id)

        if not out_ports:
            self.logger.log(self.sim.time, self.name, "DROP", frame, None)
            return

        # multicast / replication happens here
        for port_id in out_ports:
            if port_id not in self.egress_ports:
                self.logger.log(self.sim.time, self.name, "DROP", frame, port_id)
                continue

            self.queues[port_id].append(frame)

            if not self.sending[port_id]:
                self._send_next(port_id)

    def _send_next(self, port_id: str):
        queue = self.queues[port_id]

        if not queue:
            self.sending[port_id] = False
            return

        self.sending[port_id] = True
        frame = queue.popleft()

        port = self.egress_ports[port_id]
        finish_time = port.send(frame)

        self.sim.schedule(finish_time - self.sim.time, self._on_tx_done, port_id)

    def _on_tx_done(self, port_id: str):
        self.sending[port_id] = False
        self._send_next(port_id)

In [5]:
class SinkSwitch(Switch):
    def process(self, frame: Frame, ingress_port: IngressPort):
        self.logger.log(self.sim.time, self.name, "SINK", frame, ingress_port.port_id)

In [6]:
class ImpairedBasicSwitch(BasicSwitch):
    def __init__(self, name: str, simulator: Simulator, logger: Logger, routing_table: dict, impair_every_n: int | None = None, impair_mode: str | None = None, impair_delay: float = 0.0):
        super().__init__(name, simulator, logger, routing_table)

        self.impair_every_n = impair_every_n
        self.impair_mode = impair_mode
        self.impair_delay = impair_delay
        self._rx_counter = 0

        if impair_every_n is not None:
            if impair_every_n <= 0:
                raise ValueError("impair_every_n must be > 0")
            if impair_mode not in ("drop", "delay"):
                raise ValueError("impair_mode must be 'drop' or 'delay'")
            if impair_mode == "delay" and impair_delay <= 0:
                raise ValueError("impair_delay must be > 0 for delay mode")

    def process(self, frame: Frame, ingress_port: IngressPort):
        self._rx_counter += 1

        if self.impair_every_n is not None and self._rx_counter % self.impair_every_n == 0:
            if self.impair_mode == "drop":
                self.logger.log(self.sim.time, self.name, "IMPAIR_DROP", frame, ingress_port.port_id)
                return

            if self.impair_mode == "delay":
                self.logger.log(self.sim.time, self.name, "IMPAIR_DELAY", frame, ingress_port.port_id)
                self.sim.schedule(self.impair_delay, self.process, frame, ingress_port)
                return

        out_ports = self.routing_table.get(frame.stream_id)

        if not out_ports:
            self.logger.log(self.sim.time, self.name, "DROP", frame, None)
            return

        for port_id in out_ports:
            if port_id not in self.egress_ports:
                self.logger.log(self.sim.time, self.name, "DROP", frame, port_id)
                continue

            self.queues[port_id].append(frame)

            if not self.sending[port_id]:
                self._send_next(port_id)

In [7]:
class TrafficSource:
    def __init__(self, name: str, simulator: Simulator, logger: Logger, start_time: float = 0.0):
        self.name = name
        self.sim = simulator
        self.logger = logger
        self.start_time = start_time
        self.frame_counter = 0
        self.egress_port = None

    def add_egress_port(self, port_id: str, link):
        port = EgressPort(port_id, self, link)
        self.egress_port = port
    
    def start_fixed_interval(self, stream_id: str, interval: float, size: int, count: int | None = None):
        """
        size: int size of the frame in Bytes
        count: int number of frames to generate 
        interval: float time in seconds interval between frames
        """
        def send():
            self.frame_counter += 1
            frame = Frame(frame_id=self.frame_counter, stream_id=stream_id, size=size, creation_time=self.sim.time)

            self.egress_port.send(frame)

            if count is None or self.frame_counter < count:
                self.sim.schedule(interval, send)

        self.sim.schedule(self.start_time, send)

    def start_random_interval(self, stream_id: str, min_interval: float, max_interval: float, size: int, count: int | None = None):
        """
        size: int size of the frame in Bytes
        count: int number of frames to generate
        min_interval: float time in seconds min interval between frames
        max_interval: float time in seconds max interval between frames
        """
        def send():
            self.frame_counter += 1
            frame = Frame(frame_id=self.frame_counter, stream_id=stream_id, size=size, creation_time=self.sim.time)

            self.egress_port.send(frame)

            if count is None or self.frame_counter < count:
                next_interval = random.uniform(min_interval, max_interval)
                self.sim.schedule(next_interval, send)

        self.sim.schedule(self.start_time, send)

    def start_periodic_loop(self, stream_id: str, frames_per_period: int, period: float, interval: float, size: int, count: int | None = None, offset: float = 0.0):
        """
        frames_per_period: number of frames in one period
        period: total time for the full set of frames to be sent once
        interval: time between frames inside the period
        size: bytes per frame
        count: total number of periods to send (None = infinite)
        offset: time offset from beginning of period to send first frame
        """
        if frames_per_period <= 0:
            raise ValueError("frames_per_period must be > 0")
        if period <= 0:
            raise ValueError("period must be > 0")
        if interval <= 0:
            raise ValueError("interval must be > 0")
        if interval * (frames_per_period - 1) >= period:
            raise ValueError("frames_per_period * interval must fit within the period")
        if size <= 0:
            raise ValueError("frame size must be > 0")
        if offset < 0 or offset >= period:
            raise ValueError("offset must be in range [0, period)")

        period_counter = 0 

        def send_period(start_time: float):
            nonlocal period_counter

            # Schedule frames within this period
            for i in range(frames_per_period):
                frame_id = i + 1  # looped frame ID: 1..frames_per_period
                send_time = start_time + offset + i * interval

                def send_frame(frame_id=frame_id):
                    frame = Frame(frame_id=frame_id, stream_id=stream_id, size=size, creation_time=self.sim.time)
                    self.egress_port.send(frame)

                self.sim.schedule(send_time - self.sim.time, send_frame)

            # Schedule next period
            period_counter += 1
            if count is None or period_counter < count:
                next_period_start = start_time + period
                self.sim.schedule(next_period_start - self.sim.time, send_period, next_period_start)

        # Start the first period
        self.sim.schedule(self.start_time, send_period, self.start_time)