In [1]:
#Wersja wielowątkowa

#współbieżnośc z użyciem wątkow systemowych:
#Każdy proces (PID) działa w osobnym wątku (threading.Thread) z własną skrzynką queue.Queue.
#Osobny wątek lidera wysyła heartbeat.
#Osobny wątek symuluje awarie i powroty.
#Synchronizacja współdzielonego stanu (lider, heartbeat) przez threading.Lock.

In [2]:
from __future__ import annotations
import threading
import queue
import random
import time
from typing import Dict, Tuple, Optional, List

# =====================
# Parametry domyślne
# =====================
DEFAULT_PROCESS_IDS: List[int] = [1, 2, 3, 4]
DEFAULT_STEP_SEC: float = 0.1
DEFAULT_HEARTBEAT_INTERVAL: float = 0.3
DEFAULT_HEARTBEAT_TIMEOUT: float = 1.0
DEFAULT_PROB_CRASH: float = 0.08
DEFAULT_PROB_RECOVER: float = 0.02
DEFAULT_SIM_STEPS: int = 400
DEFAULT_RANDOM_SEED: int = 42
DEFAULT_STRICT_BULLY_ON_RECOVERY: bool = True

Msg = Tuple[str, int]  # (typ, from_pid)

class DistributedSystem:
    def __init__(
        self,
        process_ids: List[int] = None,
        step_sec: float = DEFAULT_STEP_SEC,
        heartbeat_interval: float = DEFAULT_HEARTBEAT_INTERVAL,
        heartbeat_timeout: float = DEFAULT_HEARTBEAT_TIMEOUT,
        prob_crash: float = DEFAULT_PROB_CRASH,
        prob_recover: float = DEFAULT_PROB_RECOVER,
        sim_steps: int = DEFAULT_SIM_STEPS,
        strict_bully_on_recovery: bool = DEFAULT_STRICT_BULLY_ON_RECOVERY,
        seed: int = DEFAULT_RANDOM_SEED,
    ) -> None:
        self.process_ids = list(process_ids) if process_ids is not None else list(DEFAULT_PROCESS_IDS)
        self.step_sec = float(step_sec)
        self.heartbeat_interval = float(heartbeat_interval)
        self.heartbeat_timeout = float(heartbeat_timeout)
        self.prob_crash = float(prob_crash)
        self.prob_recover = float(prob_recover)
        self.sim_steps = int(sim_steps)
        self.strict_bully_on_recovery = bool(strict_bully_on_recovery)
        self.seed = int(seed)

        # Stan współdzielony
        self.mailboxes: Dict[int, "queue.Queue[Msg]"] = {pid: queue.Queue() for pid in self.process_ids}
        self.alive: Dict[int, bool] = {pid: True for pid in self.process_ids}
        self.current_leader: Optional[int] = None
        self.last_heartbeat: Optional[float] = None
        self._last_heartbeat_sent: float = -1e9

        # Synchronizacja
        self.leader_lock = threading.Lock()
        self.print_lock = threading.Lock()
        self.stop_event = threading.Event()

        # Wątki
        self._threads: List[threading.Thread] = []

        # Czas symulacji
        self._t0 = time.monotonic()
        self.time_now = 0.0  # aktualizowane w wątku awarii/napraw (zegar krokowy)

        random.seed(self.seed)

    # ===== Pomocnicze =====
    def sim_time(self) -> float:
        # Używamy zegara krokowego (time_now), który tyka co step_sec
        return round(self.time_now, 2)

    def log(self, msg: str) -> None:
        with self.print_lock:
            print(f"[{self.sim_time():5.2f}] {msg}")

    def send(self, to: int, msg: Msg) -> None:
        self.mailboxes[to].put(msg)

    def broadcast(self, msg: Msg) -> None:
        for p in self.process_ids:
            self.send(p, msg)

    # ===== Logika tyrana =====
    def check_and_maybe_start_election(self, pid: int) -> None:
        if not self.alive.get(pid, False):
            return

        with self.leader_lock:
            no_leader = (self.current_leader is None)
            hb_timed_out = (self.last_heartbeat is None) or ((self.time_now - self.last_heartbeat) > self.heartbeat_timeout)

        if no_leader or hb_timed_out:
            higher = [p for p in self.process_ids if p > pid and self.alive.get(p, False)]
            if not higher:
                with self.leader_lock:
                    self.current_leader = pid
                    self.last_heartbeat = self.time_now
                self.broadcast(("COORDINATOR", pid))
                self.log(f"Proces {pid}: ogłaszam się LIDEREM (brak silniejszych żywych)")
            else:
                for p in higher:
                    self.send(p, ("ELECTION", pid))
                self.log(f"Proces {pid}: rozpoczął ELECTION -> wysłano do {higher}")

    def handle_mailbox(self, pid: int) -> None:
        if not self.alive.get(pid, False):
            # Opróżnij ewentualne zaległości
            try:
                while True:
                    self.mailboxes[pid].get_nowait()
            except queue.Empty:
                pass
            return

        while True:
            try:
                typ, frm = self.mailboxes[pid].get_nowait()
            except queue.Empty:
                break

            if typ == "ELECTION":
                if pid > frm and self.alive.get(pid, False):
                    self.send(frm, ("OK", pid))
                    self.log(f"Proces {pid}: OTRZYMAŁ ELECTION od {frm} -> wysyła OK i sam zaczyna wybory")
                    self.check_and_maybe_start_election(pid)
                else:
                    self.log(f"Proces {pid}: OTRZYMAŁ ELECTION od {frm}, ale nie odpowiada (niższe ID lub nieżywy)")

            elif typ == "OK":
                self.log(f"Proces {pid}: OTRZYMAŁ OK od {frm} -> czeka na COORDINATOR")

            elif typ == "COORDINATOR":
                with self.leader_lock:
                    self.current_leader = frm
                    self.last_heartbeat = self.time_now
                self.log(f"Proces {pid}: OTRZYMAŁ COORDINATOR -> nowy lider = {frm}")

            elif typ == "HEARTBEAT":
                with self.leader_lock:
                    if self.current_leader == frm or self.current_leader is None:
                        self.current_leader = frm
                        self.last_heartbeat = self.time_now
                self.log(f"Proces {pid}: OTRZYMAŁ HEARTBEAT od lidera {frm}")

    # ===== Wątki =====
    def _process_thread(self, pid: int) -> None:
        # Małe rozproszenie startu
        time.sleep(self.step_sec)
        self.check_and_maybe_start_election(pid)

        while not self.stop_event.is_set():
            self.handle_mailbox(pid)
            self.check_and_maybe_start_election(pid)
            time.sleep(self.step_sec)

    def _heartbeat_thread(self) -> None:
        last_sent = -1e9
        while not self.stop_event.is_set():
            time.sleep(self.step_sec)
            with self.leader_lock:
                leader = self.current_leader
            if leader is not None and self.alive.get(leader, False):
                if (self.time_now - last_sent) >= self.heartbeat_interval:
                    self.broadcast(("HEARTBEAT", leader))
                    last_sent = self.time_now
                    with self.leader_lock:
                        self.last_heartbeat = self.time_now
                    self.log(f"Lider {leader}: wysyła HEARTBEAT")

    def _faults_thread(self) -> None:
        steps = 0
        while not self.stop_event.is_set() and steps < self.sim_steps:
            # Zegar krokowy
            self.time_now = round(steps * self.step_sec, 2)

            # Awarie
            for pid in self.process_ids:
                if self.alive.get(pid, False) and random.random() < self.prob_crash:
                    self.alive[pid] = False
                    self.log(f"!!! Proces {pid} ULEGŁ AWARII !!!")
                    with self.leader_lock:
                        if self.current_leader == pid:
                            self.log(f"!!! Lider {pid} padł — inni wykryją to po timeoutcie !!!")

            # Powroty
            for pid in self.process_ids:
                if not self.alive.get(pid, True) and random.random() < self.prob_recover:
                    self.alive[pid] = True
                    self.log(f">>> Proces {pid} odzyskał sprawność")
                    if self.strict_bully_on_recovery:
                        # Klasyczny bully: po powrocie proces inicjuje wybory
                        self.check_and_maybe_start_election(pid)

            steps += 1
            time.sleep(self.step_sec)

        # Koniec
        self.stop_event.set()

    # ===== API uruchamiania =====
    def start(self) -> None:
        # Wątki procesów
        for pid in self.process_ids:
            t = threading.Thread(target=self._process_thread, args=(pid,), name=f"proc-{pid}", daemon=True)
            t.start()
            self._threads.append(t)
        # Heartbeat
        t = threading.Thread(target=self._heartbeat_thread, name="heartbeat", daemon=True)
        t.start()
        self._threads.append(t)
        # Awarie & powroty (z zegarem)
        t = threading.Thread(target=self._faults_thread, name="faults", daemon=True)
        t.start()
        self._threads.append(t)

    def wait(self) -> None:
        # Czekamy, aż fault-thread zakończy (ustawi stop_event)
        # Następnie dołączamy do wszystkich wątków
        while not self.stop_event.is_set():
            time.sleep(0.05)
        for t in list(self._threads):
            t.join(timeout=1.0)

    def stop(self) -> None:
        self.stop_event.set()
        self.wait()

In [3]:
def run_demo(
    sim_steps: int = DEFAULT_SIM_STEPS,
    *,
    process_ids: List[int] = None,
    step_sec: float = DEFAULT_STEP_SEC,
    heartbeat_interval: float = DEFAULT_HEARTBEAT_INTERVAL,
    heartbeat_timeout: float = DEFAULT_HEARTBEAT_TIMEOUT,
    crash: float = DEFAULT_PROB_CRASH,
    recover: float = DEFAULT_PROB_RECOVER,
    strict_bully_on_recovery: bool = DEFAULT_STRICT_BULLY_ON_RECOVERY,
    seed: int = DEFAULT_RANDOM_SEED,
) -> None:
    sys = DistributedSystem(
        process_ids=process_ids,
        step_sec=step_sec,
        heartbeat_interval=heartbeat_interval,
        heartbeat_timeout=heartbeat_timeout,
        prob_crash=crash,
        prob_recover=recover,
        sim_steps=sim_steps,
        strict_bully_on_recovery=strict_bully_on_recovery,
        seed=seed,
    )
    sys.start()
    sys.wait()

def start_nonblocking(
    sim_steps: int = DEFAULT_SIM_STEPS,
    *,
    process_ids: List[int] = None,
    step_sec: float = DEFAULT_STEP_SEC,
    heartbeat_interval: float = DEFAULT_HEARTBEAT_INTERVAL,
    heartbeat_timeout: float = DEFAULT_HEARTBEAT_TIMEOUT,
    crash: float = DEFAULT_PROB_CRASH,
    recover: float = DEFAULT_PROB_RECOVER,
    strict_bully_on_recovery: bool = DEFAULT_STRICT_BULLY_ON_RECOVERY,
    seed: int = DEFAULT_RANDOM_SEED,
) -> DistributedSystem:
    """Startuje system i *nie blokuje* wątku wywołującego. Zwraca instancję do ręcznego `stop()`."""
    sys = DistributedSystem(
        process_ids=process_ids,
        step_sec=step_sec,
        heartbeat_interval=heartbeat_interval,
        heartbeat_timeout=heartbeat_timeout,
        prob_crash=crash,
        prob_recover=recover,
        sim_steps=sim_steps,
        strict_bully_on_recovery=strict_bully_on_recovery,
        seed=seed,
    )
    sys.start()
    return sys

In [5]:
run_demo(
        sim_steps=DEFAULT_SIM_STEPS,
        crash=DEFAULT_PROB_CRASH,
        recover=DEFAULT_PROB_RECOVER,
        heartbeat_interval=DEFAULT_HEARTBEAT_INTERVAL,
        heartbeat_timeout=DEFAULT_HEARTBEAT_TIMEOUT,
        strict_bully_on_recovery=DEFAULT_STRICT_BULLY_ON_RECOVERY,
        seed=DEFAULT_RANDOM_SEED,
    )
print("[INFO] Done.")

[ 0.00] !!! Proces 2 ULEGŁ AWARII !!!
[ 0.00] Proces 4: ogłaszam się LIDEREM (brak silniejszych żywych)
[ 0.00] Proces 4: OTRZYMAŁ COORDINATOR -> nowy lider = 4
[ 0.00] Lider 4: wysyła HEARTBEAT
[ 0.00] Proces 3: OTRZYMAŁ COORDINATOR -> nowy lider = 4
[ 0.00] Proces 3: OTRZYMAŁ HEARTBEAT od lidera 4
[ 0.00] Proces 1: OTRZYMAŁ COORDINATOR -> nowy lider = 4
[ 0.00] Proces 1: OTRZYMAŁ HEARTBEAT od lidera 4
[ 0.10] Proces 4: OTRZYMAŁ HEARTBEAT od lidera 4
[ 0.20] !!! Proces 1 ULEGŁ AWARII !!!
[ 0.30] Lider 4: wysyła HEARTBEAT
[ 0.30] Proces 3: OTRZYMAŁ HEARTBEAT od lidera 4
[ 0.30] Proces 4: OTRZYMAŁ HEARTBEAT od lidera 4
[ 0.40] !!! Proces 4 ULEGŁ AWARII !!!
[ 0.40] !!! Lider 4 padł — inni wykryją to po timeoutcie !!!
[ 1.40] Proces 3: ogłaszam się LIDEREM (brak silniejszych żywych)
[ 1.40] Lider 3: wysyła HEARTBEAT
[ 1.50] Proces 3: OTRZYMAŁ COORDINATOR -> nowy lider = 3
[ 1.50] Proces 3: OTRZYMAŁ HEARTBEAT od lidera 3
[ 1.60] !!! Proces 3 ULEGŁ AWARII !!!
[ 1.60] !!! Lider 3 padł — inni