In [1]:
import math
import random
from collections import defaultdict, deque
import statistics
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from typing import List, Dict, Tuple, Set

In [2]:
def gini_coefficient(values: List[float]) -> float:
    if not values:
        return 0.0
    sorted_vals = sorted(values)
    n = len(values)
    cumvals = np.cumsum(sorted_vals)
    if cumvals[-1] == 0:
        return 0.0
    gini = (n + 1 - 2 * np.sum(cumvals) / cumvals[-1]) / n
    return gini

In [3]:
def erdos_renyi_graph(n: int, p: float, rng: random.Random) -> Dict[int, Set[int]]:
    adj = {i: set() for i in range(n)}
    for i in range(n):
        for j in range(i+1, n):
            if rng.random() < p:
                adj[i].add(j)
                adj[j].add(i)
    return adj

In [4]:
def sample_without(items: List[int], k: int, rng: random.Random) -> List[int]:
    if k >= len(items):
        return list(items)
    return rng.sample(items, k)

In [5]:
def load_per_event(n: int, msg_size_bytes: int, alpha: float = 1.5) -> Dict[str, Dict[str, float]]:
    mqtt_messages = 1 + max(n-1, 0)
    mqtt_bytes = mqtt_messages * msg_size_bytes
    coap_messages = 2
    coap_bytes = coap_messages * msg_size_bytes
    ln_n = max(math.log(max(n, 2)), 1.0)
    redundancy = alpha * ln_n
    gossip_messages = redundancy * max(n-1, 0)
    gossip_bytes = gossip_messages * msg_size_bytes
    return {
        "MQTT": {"messages": mqtt_messages, "bytes": mqtt_bytes, "redundancy": 1.0},
        "CoAP": {"messages": coap_messages, "bytes": coap_bytes, "redundancy": 1.0},
        "Gossip": {"messages": gossip_messages, "bytes": gossip_bytes, "redundancy": redundancy},
    }

In [10]:
def simulate_gossip(adj: Dict[int, Set[int]], alive: Set[int], seed_sources: Set[int], fanout: int, rounds: int, rng: random.Random):
    informed_round = {i: math.inf for i in adj.keys()}
    for s in seed_sources:
        if s in alive:
            informed_round[s] = 0
    coverage_history = [len([1 for k,v in informed_round.items() if v < math.inf])]
    last_r = 0
    for r in range(1, rounds+1):
        new_informed = set()
        for u in list(informed_round.keys()):
            if informed_round[u] < math.inf:
                neighbors = [v for v in adj[u] if v in alive]
                if not neighbors:
                    continue
                choices = sample_without(neighbors, fanout, rng)
                for v in choices:
                    if informed_round[v] == math.inf:
                        informed_round[v] = r
                        new_informed.add(v)
        coverage_history.append(len([1 for k,v in informed_round.items() if v < math.inf]))
        if not new_informed:
            last_r = r
            break
    if last_r == 0:
        last_r = rounds
    return informed_round, last_r, coverage_history

In [7]:
def simulate_mqtt_coverage(n: int, alive: Set[int], broker_alive: bool, link_up_prob_to_broker: float, publishers: Set[int], rng: random.Random) -> int:
    if not broker_alive:
        return len([p for p in publishers if p in alive])
    coverage = 0
    for node in range(n):
        if node in alive:
            if rng.random() < link_up_prob_to_broker:
                coverage += 1
    return coverage

In [9]:
def failure_resilience_experiment(n: int = 200, edge_prob: float = 0.05, fanout: int = 3, alpha: float = 1.5, failure_rates: List[float] = None, link_fail_rate: float = 0.1, trials: int = 20, rng_seed: int = 42):
    if failure_rates is None:
        failure_rates = [0.0, 0.1, 0.2, 0.3, 0.5]
    rng = random.Random(rng_seed)
    results = []
    for p_fail in failure_rates:
        gossip_cov = []
        gossip_rounds = []
        mqtt_cov = []
        for t in range(trials):
            adj = erdos_renyi_graph(n, edge_prob, rng)
            alive = {i for i in range(n) if rng.random() >= p_fail}
            publisher = rng.randrange(n)
            publishers = {publisher}
            rounds = max(1, int(round(alpha * math.log(max(n, 2)))))
            informed_round, last_r, cov_hist = simulate_gossip(adj=adj, alive=alive, seed_sources=publishers, fanout=fanout, rounds=rounds, rng=rng)
            cov = len([1 for v in informed_round.values() if v < math.inf])
            gossip_cov.append(cov / max(1, len(alive)))
            gossip_rounds.append(last_r)
            broker_alive = (rng.random() >= p_fail)
            link_up_prob = 1.0 - link_fail_rate
            cov_mqtt = simulate_mqtt_coverage(n=n, alive=alive, broker_alive=broker_alive, link_up_prob_to_broker=link_up_prob, publishers=publishers, rng=rng)
            mqtt_cov.append(cov_mqtt / max(1, len(alive)))
        results.append({
            "failure_rate": p_fail,
            "gossip_coverage_mean": float(np.mean(gossip_cov)),
            "gossip_coverage_p10": float(np.percentile(gossip_cov, 10)),
            "gossip_coverage_p90": float(np.percentile(gossip_cov, 90)),
            "gossip_rounds_median": float(np.median(gossip_rounds)),
            "mqtt_coverage_mean": float(np.mean(mqtt_cov)),
            "mqtt_coverage_p10": float(np.percentile(mqtt_cov, 10)),
            "mqtt_coverage_p90": float(np.percentile(mqtt_cov, 90)),
        })
    df = pd.DataFrame(results)
    return df

In [11]:
def partition_merge_experiment(n: int = 200, edge_prob: float = 0.04, fanout: int = 3, alpha: float = 1.5, partition_rounds: int = 5, post_merge_rounds: int = 10, rng_seed: int = 123) -> pd.DataFrame:
    rng = random.Random(rng_seed)
    nA = n // 2
    nB = n - nA
    nodesA = list(range(nA))
    nodesB = list(range(nA, nA + nB))
    adj = {i: set() for i in range(n)}
    adjA = erdos_renyi_graph(nA, edge_prob, rng)
    adjB = erdos_renyi_graph(nB, edge_prob, rng)
    for u in adjA:
        for v in adjA[u]:
            adj[u].add(v)
            adj[v].add(u)
    for u in adjB:
        gu = u + nA
        for v in adjB[u]:
            gv = v + nA
            adj[gu].add(gv)
            adj[gv].add(gu)
    alive = set(range(n))
    fanout_val = fanout
    rounds_total = partition_rounds + post_merge_rounds
    knows_A = {i: False for i in range(n)}
    knows_B = {i: False for i in range(n)}
    seedA = rng.choice(nodesA)
    seedB = rng.choice(nodesB)
    knows_A[seedA] = True
    knows_B[seedB] = True
    hist = []
    def round_spread(knowledge: Dict[int, bool], local_adj: Dict[int, Set[int]]):
        newly = set()
        for u in range(n):
            if knowledge[u] and u in alive:
                neigh = [v for v in local_adj[u] if v in alive]
                choices = sample_without(neigh, fanout_val, rng)
                for v in choices:
                    if not knowledge[v]:
                        knowledge[v] = True
                        newly.add(v)
        return newly
    for r in range(1, partition_rounds + 1):
        round_spread(knows_A, adj)
        round_spread(knows_B, adj)
        covA = sum(1 for k in knows_A.values() if k)
        covB = sum(1 for k in knows_B.values() if k)
        covBoth = sum(1 for i in range(n) if knows_A[i] and knows_B[i])
        hist.append({"round": r, "phase": "partition", "cov_A": covA, "cov_B": covB, "cov_both": covBoth})
    for i in range(nA):
        for j in range(nB):
            if rng.random() < (edge_prob / 5):
                gi = i
                gj = j + nA
                adj[gi].add(gj)
                adj[gj].add(gi)
    for r in range(partition_rounds + 1, rounds_total + 1):
        round_spread(knows_A, adj)
        round_spread(knows_B, adj)
        covA = sum(1 for k in knows_A.values() if k)
        covB = sum(1 for k in knows_B.values() if k)
        covBoth = sum(1 for i in range(n) if knows_A[i] and knows_B[i])
        hist.append({"round": r, "phase": "post-merge", "cov_A": covA, "cov_B": covB, "cov_both": covBoth})
    df_hist = pd.DataFrame(hist)
    return df_hist

In [12]:
def load_balance_experiment(n: int = 200, edge_prob: float = 0.05, fanout: int = 3, alpha: float = 1.5, publishers: int = 20, rng_seed: int = 7) -> pd.DataFrame:
    rng = random.Random(rng_seed)
    adj = erdos_renyi_graph(n, edge_prob, rng)
    pubs = set(rng.sample(range(n), publishers))
    rounds = max(1, int(round(alpha * math.log(max(n, 2)))))
    send_counts_gossip = [0] * n
    informed = {p: set([p]) for p in pubs}
    for r in range(1, rounds + 1):
        for p in pubs:
            new_informed = set()
            for u in list(informed[p]):
                neigh = list(adj[u])
                choices = sample_without(neigh, fanout, rng)
                for v in choices:
                    send_counts_gossip[u] += 1
                    if v not in informed[p]:
                        new_informed.add(v)
            informed[p].update(new_informed)
    send_counts_mqtt = [0] * n
    broker = rng.randrange(n)
    for p in pubs:
        send_counts_mqtt[p] += 1
    send_counts_mqtt[broker] += (n - 1) * len(pubs)
    df = pd.DataFrame({"node": list(range(n)), "send_gossip": send_counts_gossip, "send_mqtt": send_counts_mqtt})
    df["gini_gossip"] = gini_coefficient(df["send_gossip"].tolist())
    df["gini_mqtt"] = gini_coefficient(df["send_mqtt"].tolist())
    return df

In [None]:


# Parameters
N = 200
MSG_SIZE = 50
ALPHA = 1.5
FANOUT = 3
EDGE_PROB = 0.05

# 1) Load per event for a range of N
ns = [10, 25, 50, 100, 200, 400]
records = []
for n in ns:
    rec = load_per_event(n, MSG_SIZE, alpha=ALPHA)
    for proto, vals in rec.items():
        records.append({
            "N": n,
            "protocol": proto,
            "messages_per_event": vals["messages"],
            "bytes_per_event": vals["bytes"],
            "redundancy_factor": vals["redundancy"],
        })
df_load = pd.DataFrame(records)

# 2) Failure resilience
df_fail = failure_resilience_experiment(n=N, edge_prob=EDGE_PROB, fanout=FANOUT, alpha=ALPHA, failure_rates=[0.0, 0.1, 0.2, 0.3, 0.5], link_fail_rate=0.1, trials=25, rng_seed=1234)

# 3) Partition & merge
df_pm = partition_merge_experiment(n=N, edge_prob=EDGE_PROB, fanout=FANOUT, alpha=ALPHA, partition_rounds=6, post_merge_rounds=12, rng_seed=99)

# 4) Load balance
df_lb = load_balance_experiment(n=N, edge_prob=EDGE_PROB, fanout=FANOUT, alpha=ALPHA, publishers=25, rng_seed=321)

from caas_jupyter_tools import display_dataframe_to_user
display_dataframe_to_user("Load per event (MQTT vs CoAP vs Gossip)", df_load)
display_dataframe_to_user("Failure resilience results (coverage stats)", df_fail)
display_dataframe_to_user("Partition & merge convergence history", df_pm)
display_dataframe_to_user("Load balance per-node send counts", df_lb)

# Plots

# Plot 1: Bytes per event vs N
plt.figure()
for proto in ["MQTT", "CoAP", "Gossip"]:
    x = np.asarray([r["N"] for r in records if r["protocol"] == proto], dtype=float)
    y = np.asarray([r["bytes_per_event"] for r in records if r["protocol"] == proto], dtype=float)
    plt.plot(x, y, marker="o", label=proto)
plt.xlabel("Number of nodes (N)")
plt.ylabel("Bytes per event")
plt.title("Bytes per event vs N")
plt.legend()
plt.tight_layout()
plt.savefig("/mnt/data/plot_bytes_per_event.png", dpi=160)
plt.show()

# Plot 2: Messages per event vs N (log scale on y)
plt.figure()
for proto in ["MQTT", "CoAP", "Gossip"]:
    x = np.asarray([r["N"] for r in records if r["protocol"] == proto], dtype=float)
    y = np.asarray([r["messages_per_event"] for r in records if r["protocol"] == proto], dtype=float)
    plt.plot(x, y, marker="o", label=proto)
plt.xlabel("Number of nodes (N)")
plt.ylabel("Messages per event")
plt.yscale("log")
plt.title("Messages per event vs N (log y)")
plt.legend()
plt.tight_layout()
plt.savefig("/mnt/data/plot_messages_per_event.png", dpi=160)
plt.show()

# Plot 3: Failure resilience — coverage vs failure rate
plt.figure()
x = np.asarray(df_fail["failure_rate"].tolist(), dtype=float)
g_mean = np.asarray(df_fail["gossip_coverage_mean"].tolist(), dtype=float)
m_mean = np.asarray(df_fail["mqtt_coverage_mean"].tolist(), dtype=float)
g_p10 = np.asarray(df_fail["gossip_coverage_p10"].tolist(), dtype=float)
g_p90 = np.asarray(df_fail["gossip_coverage_p90"].tolist(), dtype=float)
m_p10 = np.asarray(df_fail["mqtt_coverage_p10"].tolist(), dtype=float)
m_p90 = np.asarray(df_fail["mqtt_coverage_p90"].tolist(), dtype=float)

plt.plot(x, g_mean, marker="o", label="Gossip mean")
plt.plot(x, m_mean, marker="o", label="MQTT mean")
plt.fill_between(x, g_p10, g_p90, alpha=0.2, label="Gossip p10–p90")
plt.fill_between(x, m_p10, m_p90, alpha=0.2, label="MQTT p10–p90")
plt.xlabel("Node failure rate")
plt.ylabel("Coverage (fraction of alive nodes)")
plt.title("Failure resilience: coverage vs failures")
plt.legend()
plt.tight_layout()
plt.savefig("/mnt/data/plot_failure_resilience.png", dpi=160)
plt.show()

# Plot 4: Partition & merge — coverage over rounds
plt.figure()
x2 = np.asarray(df_pm["round"].tolist(), dtype=float)
yA = np.asarray(df_pm["cov_A"].tolist(), dtype=float)
yB = np.asarray(df_pm["cov_B"].tolist(), dtype=float)
yBoth = np.asarray(df_pm["cov_both"].tolist(), dtype=float)
plt.plot(x2, yA, label="Update A coverage")
plt.plot(x2, yB, label="Update B coverage")
plt.plot(x2, yBoth, label="Both updates known")
plt.xlabel("Round")
plt.ylabel("Nodes covered")
plt.title("Partition & merge: convergence after healing")
plt.legend()
plt.tight_layout()
plt.savefig("/mnt/data/plot_partition_merge.png", dpi=160)
plt.show()

# Plot 5: Load balance — Gini coefficients
plt.figure()
gini_gossip = gini_coefficient(df_lb["send_gossip"].tolist())
gini_mqtt = gini_coefficient(df_lb["send_mqtt"].tolist())
plt.bar(["Gossip", "MQTT"], [gini_gossip, gini_mqtt])
plt.ylabel("Gini coefficient (send load)")
plt.title("Load balance: per-node send-load inequality")
plt.tight_layout()
plt.savefig("/mnt/data/plot_load_balance_gini.png", dpi=160)
plt.show()

# Save CSV outputs for download
df_load.to_csv("/mnt/data/load_per_event.csv", index=False)
df_fail.to_csv("/mnt/data/failure_resilience.csv", index=False)
df_pm.to_csv("/mnt/data/partition_merge.csv", index=False)
df_lb.to_csv("/mnt/data/load_balance.csv", index=False)

print("Files saved:")
print("- /mnt/data/plot_bytes_per_event.png")
print("- /mnt/data/plot_messages_per_event.png")
print("- /mnt/data/plot_failure_resilience.png")
print("- /mnt/data/plot_partition_merge.png")
print("- /mnt/data/plot_load_balance_gini.png")
print("- /mnt/data/load_per_event.csv")
print("- /mnt/data/failure_resilience.csv")
print("- /mnt/data/partition_merge.csv")
print("- /mnt/data/load_balance.csv")
