# Experiment Notebook
Notebook to inspect the results of a single experiment

In [None]:
import pandas as pd
import graphviz

import lib.experiment as experiment
import lib.metrics as metrics

In [None]:
exp = experiment.load_experiment_results("experiments/experiment_3.json")
exp.experiment

In [None]:
from collections import defaultdict
from typing import Dict, Set


def compute_topics_set(exp: experiment.PubSubExperimentResults) -> Set[str]:
    return set(
        [
            m.topic
            for node in exp.results
            for m in node.metrics
            if isinstance(m, metrics.PubSubSubscription)
        ]
    )


def compute_subscribes_list(
    exp: experiment.PubSubExperimentResults,
) -> list[metrics.PubSubSubscription]:
    """
    Compute a list of all subscription messages accross all nodes
    """
    return [
        m
        for node in exp.results
        for m in node.metrics
        if isinstance(m, metrics.PubSubSubscription)
    ]


def compute_unsubscribes_list(
    exp: experiment.PubSubExperimentResults,
) -> list[metrics.PubSubUnsubscription]:
    """
    Compute a list of all unsubscription messages accross all nodes
    """
    return [
        m
        for node in exp.results
        for m in node.metrics
        if isinstance(m, metrics.PubSubUnsubscription)
    ]


def compute_subscribes_list_per_node(
    exp: experiment.PubSubExperimentResults,
) -> list[tuple[str, metrics.PubSubSubscription]]:
    """
    Compute a list of tuples containing the node identifier and the subscription
    """
    return [
        (node.id, m)
        for node in exp.results
        for m in node.metrics
        if isinstance(m, metrics.PubSubSubscription)
    ]


def compute_subscribes_list_for_topic(
    exp: experiment.PubSubExperimentResults, topic: str
) -> list[tuple[str, metrics.PubSubSubscription]]:
    """
    Compute a list of tuples containing the node identifier and the subscription for a given topic
    """
    return [
        (n, s) for n, s in compute_subscribes_list_per_node(exp) if s.topic == topic
    ]


def compute_unsubscriptions_list_per_node(
    exp: experiment.PubSubExperimentResults,
) -> list[tuple[str, metrics.PubSubUnsubscription]]:
    """
    Compute a list of tuples containing the node identifier and the unsubscription
    """
    return [
        (node.id, m)
        for node in exp.results
        for m in node.metrics
        if isinstance(m, metrics.PubSubUnsubscription)
    ]


def compute_unsubscriptions_list_for_topic(
    exp: experiment.PubSubExperimentResults, topic: str
) -> list[tuple[str, metrics.PubSubUnsubscription]]:
    """
    Compute a list of tuples containing the node identifier and the unsubscription for a given topic
    """
    return [
        (n, s)
        for n, s in compute_unsubscriptions_list_per_node(exp)
        if s.topic == topic
    ]


def compute_send_list(
    exp: experiment.PubSubExperimentResults,
) -> list[metrics.PubSubMessageSent]:
    """
    Compute a list of all sent messages accross all nodes
    """
    return [
        m
        for node in exp.results
        for m in node.metrics
        if isinstance(m, metrics.PubSubMessageSent)
    ]


def compute_send_list_for_message(
    exp: experiment.PubSubExperimentResults, message: str
) -> list[metrics.PubSubMessageSent]:
    """
    Compute a list of all sent messages for a given message
    """
    return [m for m in compute_send_list(exp) if m.message_id == message]


def compute_receive_list(
    exp: experiment.PubSubExperimentResults,
) -> list[metrics.PubSubMessageReceived]:
    """
    Compute a list of all received messages accross all nodes
    """
    return [
        m
        for node in exp.results
        for m in node.metrics
        if isinstance(m, metrics.PubSubMessageReceived)
    ]


def compute_receive_list_for_message(
    exp: experiment.PubSubExperimentResults,
    message: str,
) -> list[metrics.PubSubMessageReceived]:
    """
    Compute a list of all received messages for a given sent message
    """
    return [m for m in compute_receive_list(exp) if m.message_id == message]


def compute_receive_list_for_message_per_node(
    exp: experiment.PubSubExperimentResults,
    message: str,
) -> list[tuple[str, metrics.PubSubMessageReceived]]:
    """
    Compute a list of tuples containing the node identifier and the received message for a given sent message
    """
    return [
        (node.id, m)
        for node in exp.results
        for m in node.metrics
        if isinstance(m, metrics.PubSubMessageReceived) and m.message_id == message
    ]


def compute_subscription_counts(
    exp: experiment.PubSubExperimentResults,
) -> pd.DataFrame:
    """
    Create a DataFrame containing the number of subscriptions per topic.
    The index of the DataFrame is the timestamp.
    The columns of the DataFrame are the topics.
    This could be used to get the number of subscriptions per topic at a given time.
    """
    topics = compute_topics_set(exp)
    s = {}
    for topic in topics:
        subscriptions = 0
        timestamps = []
        subcounts = []
        node_state = {}
        subscribes = compute_subscribes_list_for_topic(exp, topic)
        unsubscribes = compute_unsubscriptions_list_for_topic(exp, topic)
        merged = sorted(subscribes + unsubscribes, key=lambda m: m[1].timestamp)
        for node, m in merged:
            if node not in node_state:
                node_state[node] = False
            if isinstance(m, metrics.PubSubSubscription):
                if not node_state[node]:
                    subscriptions += 1
                node_state[node] = True
            elif isinstance(m, metrics.PubSubUnsubscription):
                if node_state[node]:
                    subscriptions -= 1
                node_state[node] = False
            if len(timestamps) > 0 and timestamps[-1] == m.timestamp:
                subcounts[-1] = subscriptions
            else:
                timestamps.append(m.timestamp)
                subcounts.append(subscriptions)
        series = pd.Series(subcounts, index=timestamps)
        s[topic] = series
    return pd.DataFrame(s).fillna(method="ffill").fillna(value=0)


def compute_topic_subscribers(
    exp: experiment.PubSubExperimentResults, topic: str
) -> Set[str]:
    """
    Compute the set of nodes that are subscribed to a given topic.
    This will run over the subscriptions/unsubscriptions of a particular node so it gives the state after the last subscription/unsubscription.
    """
    subscribes = compute_subscribes_list_for_topic(exp, topic)
    unsubscribes = compute_unsubscriptions_list_for_topic(exp, topic)
    merged = sorted(subscribes + unsubscribes, key=lambda m: m[1].timestamp)
    subscribers = set()
    for node, m in merged:
        if isinstance(m, metrics.PubSubSubscription):
            subscribers.add(node)
        elif isinstance(m, metrics.PubSubUnsubscription):
            subscribers.remove(node)
    return subscribers


def compute_message_reliability(
    exp: experiment.PubSubExperimentResults,
) -> pd.DataFrame:
    def internal_compute_sends(
        e: experiment.PubSubExperimentResults,
    ) -> list[tuple[int, str, str]]:
        """
        Compute a list of tuples containing the timestamp, message identifier and topic.
        """
        return [
            (m.timestamp, m.message_id, m.topic)
            for node in e.results
            for m in node.metrics
            if isinstance(m, metrics.PubSubMessageSent)
        ]

    def internal_compute_receives(
        e: experiment.PubSubExperimentResults,
    ) -> list[tuple[str, metrics.PubSubMessageReceived]]:
        """
        Compute a list of tuples containing the node identifier and the receive.
        This only returns messages that were delivered.
        """
        return [
            (node.id, m)
            for node in e.results
            for m in node.metrics
            if isinstance(m, metrics.PubSubMessageReceived) and m.delivered
        ]

    def internal_compute_receives_for_send(
        r: list[tuple[str, metrics.PubSubMessageReceived]],
    ) -> Dict[str, int]:
        """
        Compute a dictionary containing the list of received messages for each message identifier
        """
        d = defaultdict(lambda: set())
        for nid, m in r:
            d[m.message_id].add(nid)
        return {k: len(v) for k, v in d.items()}

    """
    Create a DataFrame containing the reliability of messages at a given time..
    """
    topics = compute_topics_set(exp)
    subscribers = {topic: compute_topic_subscribers(exp, topic) for topic in topics}
    sends = internal_compute_sends(exp)
    receives = internal_compute_receives_for_send(internal_compute_receives(exp))

    timestamps = []
    reliability = []
    for timestamp, message_id, topic in sends:
        expected_receives = len(subscribers.get(topic, []))
        if expected_receives == 0:
            continue
        actual_receives = receives.get(message_id, 0)
        timestamps.append(timestamp)
        reliability.append(actual_receives / expected_receives)

    return pd.DataFrame(
        reliability, index=timestamps, columns=["reliability"]
    ).sort_index()


def compute_latest_network_metrics_per_peer(
    exp: experiment.PubSubExperimentResults,
) -> Dict[str, metrics.Network]:
    """
    Compute the latest network metrics per peer
    """
    d = {}
    for node in exp.results:
        for m in node.metrics:
            if isinstance(m, metrics.Network):
                d[node.id] = m
    return d


def compute_node_first_timestamps(
    exp: experiment.PubSubExperimentResults,
) -> Dict[str, int]:
    """
    Compute the first timestamp for each node
    """
    d = {}
    for node in exp.results:
        d[node.id] = min([m.timestamp for m in node.metrics])
    return d


def compute_node_last_timestamps(
    exp: experiment.PubSubExperimentResults,
) -> Dict[str, int]:
    """
    Compute the last timestamp for each node
    """
    d = {}
    for node in exp.results:
        d[node.id] = max([m.timestamp for m in node.metrics])
    return d


In [None]:
# Sanity checks on the data


def sanity_first_message_after_last_sub():
    slist = sorted(compute_subscribes_list(exp), key=lambda m: m.timestamp)
    ulist = sorted(compute_unsubscribes_list(exp), key=lambda m: m.timestamp)
    sndlist = sorted(compute_send_list(exp), key=lambda m: m.timestamp)

    last_sub = slist[-1].timestamp
    first_send = sndlist[0].timestamp
    print(f"Last subscription at {last_sub} and first send at {first_send}")
    assert last_sub < first_send


def sanity_last_message_before_earliest_node_shutdown():
    last_send = sorted(compute_send_list(exp), key=lambda m: m.timestamp)[-1].timestamp
    node_timestamps = {}
    for node in exp.results:
        node_timestamps[node.id] = max([m.timestamp for m in node.metrics])
    earliest_shutdown = min(node_timestamps.values())
    earliest_node = [k for k, v in node_timestamps.items() if v == earliest_shutdown][0]
    print(
        f"Earliest shutdown at {earliest_shutdown}(from {earliest_node}) and last send at {last_send}"
    )
    print(f"Difference: {(earliest_shutdown - last_send)/1000} seconds")
    assert last_send < earliest_shutdown


def sanity_checks():
    sanity_first_message_after_last_sub()
    #sanity_last_message_before_earliest_node_shutdown()


sanity_checks()


In [None]:
hop_counts = [
    m.hop_count
    for node in exp.results
    for m in node.metrics
    if isinstance(m, metrics.PubSubMessageReceived)
]
df = pd.DataFrame(hop_counts, columns=["hop_count"])
df.plot(kind="hist", bins=range(0, 10), logy=True)


In [None]:
for msg in compute_send_list(exp):
    received = compute_receive_list_for_message(exp, msg.message_id)
    print(f"Message {msg.message_id} was received by {len(received)} nodes")


In [None]:
compute_message_reliability(exp).plot(kind="hist")

In [None]:
network_metrics = compute_latest_network_metrics_per_peer(exp)
inbound = pd.Series([m.inbound for m in network_metrics.values()]) / (1024 * 1024)
outbound = pd.Series([m.outbound for m in network_metrics.values()]) / (1024 * 1024)
pd.DataFrame({"inbound": inbound, "outbound": outbound}).plot(
    kind="hist", cumulative=True, density=1, bins=1000
)


In [None]:
routing_tables = [
    m
    for node in exp.results
    for m in node.metrics
    if isinstance(m, metrics.RoutingTable) and m.topic != ""
]

series = pd.Series(
    [sum([len(b) for b in m.buckets]) for m in routing_tables],
    index=[m.timestamp for m in routing_tables],
)
pd.DataFrame(series).plot(style=".")


In [None]:
def ip_to_id(ip: str) -> str:
    return ip.split(":")[1]


msgid = "89047581-7253-49d3-827c-539191f8e7a5"
send_list = sorted(compute_send_list_for_message(exp, msgid), key=lambda m: m.timestamp)
recv_list = sorted(
    compute_receive_list_for_message_per_node(exp, msgid), key=lambda m: m[1].timestamp
)
topic = send_list[0].topic
subscribers = compute_topic_subscribers(exp, topic)

print(f"Message {msgid} should be received by {len(subscribers)} nodes")


dot = graphviz.Digraph(name="Message delivery graph")
for sub in subscribers:
    dot.node(sub)
for nid, rcv in recv_list:
    dot.edge(ip_to_id(rcv.source), nid)

dot
