In [1]:
# =====================================================================================
# REQUIREMENTS
# -------------------------------------------------------------------------------------
# [REQ-1] Implementing Protocol Selective Repeat
#   • SR window + modulus seq space: __init__ (self.N, self.W, assert W<=N/2)
#   • Sender sliding window send: tick() “push new DATA while window has space”
#   • Per-sequence timers + retransmit: tick() timeout block
#   • Receiver out-of-order buffer + in-order delivery: _handle_at_receiver() (recv_buffer + while recv_expected ...)
#   • Out-of-window handling + dup-ACK only for left-side dup: _handle_at_receiver() (delivered_set, NACK base)
#   • NACK fast retransmit override: _handle_at_sender() (NACK path)
#
# [REQ-2] Input as frames (at least 7)
#   • Assert in __init__: assert len(frames) >= 7
#   • Example below provides 8 frames in __main__
#
# [REQ-3] Outputs the status (data/ack/nack — sent/received/lost) for each frame until complete
#   • “sent”: Channel.transmit() logs “… sent -> arrives …”
#   • “lost”: Channel.transmit() logs “… **lost** on channel”
#   • “received”: _handle_at_receiver() / _handle_at_sender() logs “… received ok”
#   • “corrupted”: both handlers log “… **corrupted** -> drop” after CRC checks
#   • Control (ACK/NACK) status logged same way; counts summarized in _render_summary()
#   • Simulation runs to completion and prints final status in run()
#
# [REQ-4] Includes error detection/correction algorithm as part of error control
#   • Error detection: CRC-16/CCITT-FALSE (crc16_ccitt), computed in build_pdu() over header+payload,
#     validated in both handlers before processing
#   • Correction: ARQ via timeouts (tick()) + NACK-triggered fast retransmit (_handle_at_sender()).
#     Safe duplicate-ACKs only for already-delivered seqs prevent false positive ACKing (deadlock fix).
#
# [REQ-5] Runs/logs “until the transmission is complete”
#   • done() requires: all data delivered, no unacked, no inflight, all frames issued
#   • run() loops until done and prints “Simulation finished=YES …”
# =====================================================================================


In [2]:
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Tuple
from collections import defaultdict
import random, itertools

In [3]:
# ----------------------------
# PDUs and CRC-16/CCITT-FALSE
# ----------------------------
@dataclass
class PDU:
    kind: str                 # 'DATA', 'ACK', 'NACK'
    seq: Optional[int]
    payload: bytes = b''
    crc: int = 0
    sent_at: Optional[int] = None
    id: int = field(default_factory=itertools.count().__next__)

def crc16_ccitt(data: bytes, poly=0x1021, init=0xFFFF) -> int:
    """[REQ-4: detection] CRC-16/CCITT-FALSE for error detection."""
    crc = init
    for b in data:
        crc ^= b << 8
        for _ in range(8):
            if crc & 0x8000:
                crc = ((crc << 1) & 0xFFFF) ^ poly
            else:
                crc = (crc << 1) & 0xFFFF
    return crc

def build_pdu(kind: str, seq: Optional[int], payload: bytes=b'') -> PDU:
    """
    [REQ-4: detection] Compute CRC over header+payload.
    Add 1-byte payload to control PDUs so channel bit flips can corrupt ACK/NACK too.
    """
    if kind in ('ACK', 'NACK') and payload == b'':
        payload = b'\x00'
    header = f"{kind}:{seq if seq is not None else -1}:".encode()
    return PDU(kind=kind, seq=seq, payload=payload, crc=crc16_ccitt(header + payload))

# ----------------------------
# Simple stats recorder
# ----------------------------
def _default_data_stats():
    return {
        'sent': 0, 'retransmits': 0, 'lost': 0, 'corrupted': 0,
        'acked': 0, 'delivered': 0, 'timeouts': 0
    }

def _default_ctrl_stats():
    return {
        'sent': 0, 'lost': 0, 'corrupted': 0,
        'rx_ok_sender': 0, 'rx_ok_receiver': 0
    }

In [4]:
# ----------------------------
# Unreliable Channel
# ----------------------------
class Channel:
    """
    [REQ-3] Logs per-PDU status:
      • sent (with arrival/corrupted info)
      • lost
    [REQ-4] Injects bit errors (payload flips) to exercise CRC.
    """
    def __init__(self, p_loss=0.10, p_bit_error=0.02, seed=42, rtt=4, stats=None):
        self.p_loss = p_loss
        self.p_bit_error = p_bit_error
        self.random = random.Random(seed)
        self.rtt = rtt
        self.stats = stats

    def transmit(self, pdu: PDU, now: int, log: List[str]) -> Optional[Tuple[int, PDU]]:
        # [REQ-3] LOST
        if self.random.random() < self.p_loss:
            log.append(f"[t={now}] {pdu.kind}[{pdu.seq}] **lost** on channel")
            if self.stats is not None:
                self._inc_loss(pdu)
            return None

        # [REQ-4] BIT ERRORS (possible corruption)
        corrupted = False
        raw = bytearray(pdu.payload)
        for i in range(len(raw)):
            if self.random.random() < self.p_bit_error:
                raw[i] ^= 1 << self.random.randrange(8)
                corrupted = True

        recv = PDU(kind=pdu.kind, seq=pdu.seq, payload=bytes(raw), crc=pdu.crc)
        delay = self.rtt // 2 + self.random.randint(0, 2)
        arrival = now + delay

        # [REQ-3] SENT (+ note whether arrival is corrupted/ok)
        log.append(f"[t={now}] {pdu.kind}[{pdu.seq}] sent -> arrives {'corrupted' if corrupted else 'ok'} at t={arrival}")

        if self.stats is not None and corrupted:
            self._inc_corrupt(pdu)

        return arrival, recv

    # Stats helpers (for table summaries)
    def _inc_loss(self, pdu: PDU):
        if pdu.kind == 'DATA':
            self.stats['DATA'][pdu.seq]['lost'] += 1
        else:
            self.stats[pdu.kind][pdu.seq]['lost'] += 1

    def _inc_corrupt(self, pdu: PDU):
        if pdu.kind == 'DATA':
            self.stats['DATA'][pdu.seq]['corrupted'] += 1
        else:
            self.stats[pdu.kind][pdu.seq]['corrupted'] += 1

In [5]:
# ----------------------------
# Selective Repeat ARQ
# ----------------------------
class SelectiveRepeatSimulator:
    def __init__(
        self,
        frames: List[bytes],
        k_bits=3,
        window_size=None,
        timeout=12,
        p_loss=0.10,
        p_bit_error=0.02,
        seed=7,
        rtt=4
    ):
        # [REQ-2] enforces ≥7 frames input
        assert len(frames) >= 7, "Provide at least 7 frames."

        # [REQ-1] seq number space (N) and SR window (W) with SR constraint W ≤ N/2
        self.N = 1 << k_bits
        self.W = window_size if window_size is not None else self.N // 2
        assert self.W <= self.N // 2, "Selective Repeat requires W <= N/2."

        self.frames = frames
        self.timeout = timeout

        # [REQ-3] End-of-run summaries to verify.
        self.stats = {
            'DATA': defaultdict(_default_data_stats),
            'ACK': defaultdict(_default_ctrl_stats),
            'NACK': defaultdict(_default_ctrl_stats),
        }

        self.channel = Channel(p_loss=p_loss, p_bit_error=p_bit_error, seed=seed, rtt=rtt, stats=self.stats)

        self.now = 0
        self.log: List[str] = []
        self.inflight: List[Tuple[int, PDU, bool]] = []  # (arrival_time, pdu, to_sender?)

        # [REQ-1] Sender sliding-window state and per-seq timers
        self.send_index = 0
        self.unacked: Dict[int, Tuple[PDU, int, int]] = {}  # seq -> (pdu, sent_at, frame_idx)

        # [REQ-1] Receiver buffer and delivery state
        self.recv_expected = 0
        self.recv_buffer: Dict[int, PDU] = {}
        self.delivered: List[bytes] = []
        self.delivered_set: set[int] = set()   # used for safe dup-ACKs (left-of-window only)
        self.nacked_once: set[int] = set()     # avoid NACK storms

    # [REQ-1] Inclusive circular membership for an SR window of size W starting at base
    def _in_window(self, seq: int, base: int, W: int) -> bool:
        end = (base + W - 1) % self.N
        if base <= end:
            return base <= seq <= end
        return seq >= base or seq <= end

    # ---------- main tick ----------
    def tick(self):
        # [REQ-1] Sender pushes DATA while window has space
        while self.send_index < len(self.frames) and len(self.unacked) < self.W:
            seq = self.send_index % self.N
            pdu = build_pdu('DATA', seq, self.frames[self.send_index])
            pdu.sent_at = self.now
            self.unacked[seq] = (pdu, self.now, self.send_index)
            self.send_index += 1
            self.stats['DATA'][seq]['sent'] += 1
            r = self.channel.transmit(pdu, self.now, self.log)      # [REQ-3] “sent” logged
            if r:
                self.inflight.append((r[0], r[1], False))

        # Deliver any arrivals for this tick (to receiver or back to sender)
        arrivals = [e for e in self.inflight if e[0] == self.now]
        self.inflight = [e for e in self.inflight if e[0] != self.now]
        for _, pdu, to_sender in arrivals:
            if to_sender:
                self._handle_at_sender(pdu)                          # [REQ-3] sender “received ok/corrupted”
            else:
                self._handle_at_receiver(pdu)                        # [REQ-3] receiver “received ok/corrupted”

        # [REQ-1][REQ-4] Per-sequence timeout → ARQ retransmit (correction)
        for seq, (pdu, tsent, idx) in list(self.unacked.items()):
            if self.now - tsent >= self.timeout:
                self.log.append(f"[t={self.now}] TIMEOUT data[{seq}] -> retransmit")
                self.stats['DATA'][seq]['timeouts'] += 1
                self.stats['DATA'][seq]['retransmits'] += 1
                self.unacked[seq] = (pdu, self.now, idx)
                self.stats['DATA'][seq]['sent'] += 1
                r = self.channel.transmit(pdu, self.now, self.log)  # [REQ-3] “sent” again
                if r:
                    self.inflight.append((r[0], r[1], False))

        self.now += 1

    # ---------- Receiver ----------
    def _handle_at_receiver(self, pdu: PDU):
        # [REQ-4] CRC validation (detection) for all PDUs
        header = f"{pdu.kind}:{pdu.seq if pdu.seq is not None else -1}:".encode()
        ok = (crc16_ccitt(header + pdu.payload) == pdu.crc)
        if not ok:
            # [REQ-3] “corrupted” status
            self.log.append(f"[t={self.now}] Receiver: {pdu.kind}[{pdu.seq}] **corrupted** -> drop")
            # [REQ-4] Trigger correction hint via NACK of base (once)
            if pdu.kind == 'DATA' and self.recv_expected not in self.nacked_once:
                nack = build_pdu('NACK', self.recv_expected)
                self.stats['NACK'][self.recv_expected]['sent'] += 1
                r = self.channel.transmit(nack, self.now, self.log)    # [REQ-3] NACK “sent”
                if r: self.inflight.append((r[0], r[1], True))
                self.nacked_once.add(self.recv_expected)
            return

        # [REQ-3] “received ok”
        self.log.append(f"[t={self.now}] Receiver: {pdu.kind}[{pdu.seq}] received ok")
        if pdu.kind in ('ACK', 'NACK'):
            # Receiver never consumes control PDUs in this model.
            return

        # [REQ-1] DATA handling: SR buffering and in-order delivery
        seq = pdu.seq
        if self._in_window(seq, self.recv_expected, self.W):
            if seq not in self.recv_buffer:
                self.recv_buffer[seq] = pdu

            # [REQ-3] ACK “sent” for in-window data
            ack = build_pdu('ACK', seq)
            self.stats['ACK'][seq]['sent'] += 1
            r = self.channel.transmit(ack, self.now, self.log)
            if r: self.inflight.append((r[0], r[1], True))

            # [REQ-1] Deliver in-order burst from recv_expected forward
            while self.recv_expected in self.recv_buffer:
                dpdu = self.recv_buffer.pop(self.recv_expected)
                self.delivered.append(dpdu.payload)
                self.delivered_set.add(self.recv_expected)                 # enables safe dup-ACKs
                self.stats['DATA'][self.recv_expected]['delivered'] += 1
                # [REQ-3] Delivery is logged for transparency
                self.log.append(f"[t={self.now}] Receiver: **DELIVER** data[{self.recv_expected}] to app")
                self.recv_expected = (self.recv_expected + 1) % self.N
                self.nacked_once.discard(self.recv_expected)

            # [REQ-1][REQ-4] Gap? NACK the base once (correction hint)
            if (self.recv_expected not in self.recv_buffer
                and any(s != self.recv_expected for s in self.recv_buffer)
                and self.recv_expected not in self.nacked_once):
                nack = build_pdu('NACK', self.recv_expected)
                self.stats['NACK'][self.recv_expected]['sent'] += 1
                r = self.channel.transmit(nack, self.now, self.log)      # [REQ-3] NACK “sent”
                if r: self.inflight.append((r[0], r[1], True))
                self.nacked_once.add(self.recv_expected)

        else:
            # [REQ-1] Out-of-window: drop. Only dup-ACK if it is a true left-duplicate (already delivered).
            self.log.append(f"[t={self.now}] Receiver: DATA[{seq}] **out-of-window** (base={self.recv_expected}, W={self.W}) -> drop")

            if seq in self.delivered_set:
                # [REQ-1] Safe duplicate ACK (helps if earlier ACK was lost)
                ack = build_pdu('ACK', seq)
                self.stats['ACK'][seq]['sent'] += 1
                r = self.channel.transmit(ack, self.now, self.log)        # [REQ-3] ACK “sent”
                if r: self.inflight.append((r[0], r[1], True))
            else:
                # Right-side frame (not yet delivered): do NOT ACK; at most NACK the base once
                if self.recv_expected not in self.nacked_once:
                    nack = build_pdu('NACK', self.recv_expected)
                    self.stats['NACK'][self.recv_expected]['sent'] += 1
                    r = self.channel.transmit(nack, self.now, self.log)   # [REQ-3] NACK “sent”
                    if r: self.inflight.append((r[0], r[1], True))
                    self.nacked_once.add(self.recv_expected)

    # ---------- Sender ----------
    def _handle_at_sender(self, pdu: PDU):
        # [REQ-4] CRC validation (detection) for control PDUs and any looped-back PDUs
        header = f"{pdu.kind}:{pdu.seq if pdu.seq is not None else -1}:".encode()
        ok = (crc16_ccitt(header + pdu.payload) == pdu.crc)
        if not ok:
            self.log.append(f"[t={self.now}] Sender: {pdu.kind}[{pdu.seq}] **corrupted** -> drop")  # [REQ-3]
            return

        self.log.append(f"[t={self.now}] Sender: {pdu.kind}[{pdu.seq}] received ok")                # [REQ-3]
        if pdu.kind in ('ACK', 'NACK'):
            self.stats[pdu.kind][pdu.seq]['rx_ok_sender'] += 1

        if pdu.kind == 'ACK':
            # [REQ-1] ACK clears exactly one seq in SR
            if pdu.seq in self.unacked:
                del self.unacked[pdu.seq]
                self.stats['DATA'][pdu.seq]['acked'] += 1
                self.log.append(f"[t={self.now}] Sender: ACKed data[{pdu.seq}]")                    # [REQ-3]

        elif pdu.kind == 'NACK':
            # [REQ-1][REQ-4] NACK fast retransmit; override even if we (incorrectly) think it was ACKed
            if pdu.seq in self.unacked:
                data_pdu, _, idx = self.unacked[pdu.seq]
            else:
                # rebuild from original frames if not tracked (deadlock-breaker)
                if pdu.seq is None or pdu.seq >= self.N:
                    return
                idx = None
                for i in range(len(self.frames)):
                    if i % self.N == pdu.seq:
                        idx = i
                        break
                if idx is None or idx >= len(self.frames):
                    return
                data_pdu = build_pdu('DATA', pdu.seq, self.frames[idx])

            self.log.append(f"[t={self.now}] Sender: NACK for data[{pdu.seq}] -> (re)transmit")     # [REQ-3]
            self.unacked[pdu.seq] = (data_pdu, self.now, idx)
            self.stats['DATA'][pdu.seq]['retransmits'] += 1
            self.stats['DATA'][pdu.seq]['sent'] += 1
            r = self.channel.transmit(data_pdu, self.now, self.log)                                 # [REQ-3]
            if r: self.inflight.append((r[0], r[1], False))

    # ---------- Termination ----------
    def done(self):
        """
        [REQ-5] Transmission is considered complete only when:
          • all frames delivered (len(delivered) == len(frames)),
          • no unacked remain,
          • no inflight PDUs,
          • and sender has issued all frames.
        """
        return (
            len(self.delivered) == len(self.frames)
            and not self.unacked
            and not self.inflight
            and self.send_index >= len(self.frames)
        )

    def run(self, max_ticks=1000):
        """
        [REQ-5] Drives the simulation until done(); logs a final 'finished=YES/NO'.
        [REQ-3] Produces a full status log (sent/received/lost/corrupted/timeout/deliver) for DATA/ACK/NACK.
        """
        STALL_LIMIT = 300
        stagnant = 0

        def snap():
            return (len(self.delivered), len(self.unacked), len(self.inflight), self.send_index, self.recv_expected)

        before = snap()
        while not self.done() and self.now < max_ticks:
            self.tick()
            after = snap()
            stagnant = stagnant + 1 if after == before else 0
            if stagnant >= STALL_LIMIT:
                self.log.append(f"[t={self.now}] **STALL** detected (no progress for {STALL_LIMIT} ticks) — stopping.")
                break
            before = after

        finished = "YES" if self.done() else "NO"
        self.log.append(f"[t={self.now}] Simulation finished={finished}, delivered={len(self.delivered)}/{len(self.frames)}")  # [REQ-5]

        # [REQ-3] Appends summary tables for quick auditing
        self.log.append("\n" + self._render_summary())

        return "\n".join(self.log), [p.decode(errors='ignore') for p in self.delivered], self.now

    # ---------- Summary Rendering ----------
    def _render_summary(self) -> str:
        """Readable end-of-run summaries"""
        data_seqs = sorted(self.stats['DATA'].keys(), key=lambda s: (s is None, s))
        lines = []
        lines.append("==== SUMMARY: DATA (per sequence) ====")
        lines.append(f"{'seq':>3} | {'sent':>4} {'reTX':>5} {'lost':>4} {'corr':>4} {'ACKed':>5} {'deliv':>5} {'TO':>3}")
        lines.append("-"*45)
        for s in data_seqs:
            st = self.stats['DATA'][s]
            lines.append(f"{s:>3} | {st['sent']:>4} {st['retransmits']:>5} {st['lost']:>4} {st['corrupted']:>4} {st['acked']:>5} {st['delivered']:>5} {st['timeouts']:>3}")

        def agg(kind: str):
            total = {'sent':0,'lost':0,'corrupted':0,'rx_ok_sender':0,'rx_ok_receiver':0}
            for _seq, st in self.stats[kind].items():
                for k in total:
                    total[k] += st[k]
            return total

        ack_total = agg('ACK')
        nack_total = agg('NACK')

        lines.append("\n==== SUMMARY: CONTROL (totals) ====")
        lines.append(f"{'kind':<6} | {'sent':>5} {'lost':>5} {'corr':>5} {'rx@sender':>9}")
        lines.append("-"*40)
        lines.append(f"{'ACK':<6} | {ack_total['sent']:>5} {ack_total['lost']:>5} {ack_total['corrupted']:>5} {ack_total['rx_ok_sender']:>9}")
        lines.append(f"{'NACK':<6} | {nack_total['sent']:>5} {nack_total['lost']:>5} {nack_total['corrupted']:>5} {nack_total['rx_ok_sender']:>9}")

        return "\n".join(lines)

In [6]:
# ----------------------------
# Example (demonstrates [REQ-2], [REQ-3], [REQ-5])
# ----------------------------
if __name__ == "__main__":
    # [REQ-2] At least 7 frames (here 8)
    frames = [f"Frame {i}".encode() for i in range(8)]
    sim = SelectiveRepeatSimulator(
        frames=frames,
        k_bits=3,       # N=8 [REQ-1]
        window_size=4,  # W <= N/2 [REQ-1]
        timeout=12,     # per-seq timer [REQ-1][REQ-4]
        p_loss=0.12,    # channel loss [REQ-3]
        p_bit_error=0.02, # channel bit flips [REQ-4]
        seed=9,
        rtt=4
    )
    status_log, delivered_order, ticks = sim.run(max_ticks=5000)
    # [REQ-3] Full status log of DATA/ACK/NACK (sent/received/lost/corrupted)
    print("---- STATUS LOG ----")
    print(status_log)
    # Delivered order + completion confirmation [REQ-5]
    print("\n---- DELIVERY ORDER ----")
    print(delivered_order)
    print(f"\nCompleted in {ticks} ticks, delivered {len(delivered_order)} frames.")

---- STATUS LOG ----
[t=0] DATA[0] sent -> arrives corrupted at t=4
[t=0] DATA[1] sent -> arrives ok at t=2
[t=0] DATA[2] sent -> arrives ok at t=4
[t=0] DATA[3] sent -> arrives ok at t=4
[t=2] Receiver: DATA[1] received ok
[t=2] ACK[1] sent -> arrives ok at t=4
[t=2] NACK[0] sent -> arrives ok at t=4
[t=4] Receiver: DATA[0] **corrupted** -> drop
[t=4] Receiver: DATA[2] received ok
[t=4] ACK[2] **lost** on channel
[t=4] Receiver: DATA[3] received ok
[t=4] ACK[3] sent -> arrives ok at t=6
[t=4] Sender: ACK[1] received ok
[t=4] Sender: ACKed data[1]
[t=4] Sender: NACK[0] received ok
[t=4] Sender: NACK for data[0] -> (re)transmit
[t=4] DATA[0] sent -> arrives corrupted at t=8
[t=5] DATA[4] sent -> arrives ok at t=9
[t=6] Sender: ACK[3] received ok
[t=6] Sender: ACKed data[3]
[t=7] DATA[5] sent -> arrives ok at t=9
[t=8] Receiver: DATA[0] **corrupted** -> drop
[t=9] Receiver: DATA[4] received ok
[t=9] Receiver: DATA[4] **out-of-window** (base=0, W=4) -> drop
[t=9] Receiver: DATA[5] receive