# Optimistic Provide - Trace Peer State

In [62]:
import sqlalchemy as sa
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

sns.set_theme()

plt.rcParams['figure.figsize'] = [12, 7]

In [63]:
conn = sa.create_engine("postgresql://optprov:password@localhost:5432/optprov")

In [64]:
def cdf(series: pd.Series) -> pd.DataFrame:
    """ calculates the cumulative distribution function of the given series"""
    return pd.DataFrame.from_dict({
        series.name: np.append(series.sort_values(), series.max()),
        "cdf": np.linspace(0, 1, len(series) + 1)
    })

In [65]:
query = """
SELECT p.id, p.provider_id FROM provides p WHERE p.id = 16;
"""
db_provide = pd.read_sql_query(query, con=conn)
db_provide

Unnamed: 0,id,provider_id
0,16,1


In [66]:
from typing import Dict, Set, List
from enum import Enum


class PeerState(Enum):
    PEER_HEARD = 1
    PEER_WAITING = 2
    PEER_QUERIED = 3
    PEER_UNREACHABLE = 4

    def __str__(self):
        if self == PeerState.PEER_HEARD:
            return "PEER_HEARD"
        elif self == PeerState.PEER_WAITING:
            return "PEER_WAITING"
        elif self == PeerState.PEER_QUERIED:
            return "PEER_QUERIED"
        elif self == PeerState.PEER_UNREACHABLE:
            return "PEER_UNREACHABLE"
        else:
            assert False



class QueryPeerState:
    def __init__(self, id: int, distance: int, state: PeerState, referrer: int):
        self.id = id
        self.state = state
        self.distance = distance
        self.referrer = referrer

    def __str__(self):
        return f"{self.id} in state {self.state}"

class QueryPeerset:
    def __init__(self):
        self.states: Dict[int: QueryPeerState] = {}

    def try_add(self, p: int, distance: int, referrer: int):
        if p in self.states:
            return
        self.states[p] = QueryPeerState(p, distance, PeerState.PEER_HEARD, referrer)

    def set_state(self, p: int, state: PeerState):
        self.states[p].state = state

    def get_state(self, p: int) -> PeerState:
        return self.states[p].state

    def get_closest_in_states(self, states: Set[PeerState]) -> List[QueryPeerState]:
        results: List[QueryPeerState] = []
        for key in self.states:
            if self.states[key].state in states:
                results.append(self.states[key])
        return list(sorted(results, key=lambda state: state.distance))


In [67]:
query = f"""
SELECT *, encode(ps.distance, 'hex') hex_distance FROM peer_states ps
    INNER JOIN provides_x_peer_states pxps on ps.id = pxps.peer_state_id
WHERE pxps.provide_id = {int(db_provide["id"])} AND ps.referrer_id = {int(db_provide["provider_id"])}
"""
db_seed_peers = pd.read_sql_query(query, con=conn)
db_seed_peers

Unnamed: 0,id,query_id,peer_id,referrer_id,state,distance,provide_id,peer_state_id,hex_distance
0,3106,c86a7dd3-27ea-432a-b5be-a3d6f42e1d34,2525,1,QUERIED,"[b'\x08', b'\x87', b'\xce', b'\r', b'\xe6', b'...",16,3106,0887ce0de66009ab6c305f7f6cc1f4adda0032fc4ff0ef...
1,3107,c86a7dd3-27ea-432a-b5be-a3d6f42e1d34,2528,1,QUERIED,"[b'\x08', b'\x8f', b'\x19', b'\xb5', b'+', b'$...",16,3107,088f19b52b249029ca2bc45c380f121a1ce0a054539aba...
2,3108,c86a7dd3-27ea-432a-b5be-a3d6f42e1d34,1634,1,QUERIED,"[b'\x08', b'\x94', b'\xf0', b'\x1c', b'\x06', ...",16,3108,0894f01c063ee49f35e35214c98e29be5675686eefb08c...
3,3109,c86a7dd3-27ea-432a-b5be-a3d6f42e1d34,2527,1,QUERIED,"[b'\t', b' ', b'=', b'\xbb', b'\xc3', b'\xc8',...",16,3109,09203dbbc3c807ddca7ba8dd3b7bf815221901f122e16d...
4,3110,c86a7dd3-27ea-432a-b5be-a3d6f42e1d34,2526,1,WAITING,"[b'\t', b'$', b'\xba', b'\xc5', b'\x81', b'P',...",16,3110,0924bac58150ef74b0dd43624c3164f8cd7aaccda131a7...
5,3111,c86a7dd3-27ea-432a-b5be-a3d6f42e1d34,2529,1,QUERIED,"[b'\t', b':', b'\xde', b'n', b'&', b'5', b'\x9...",16,3111,093ade6e26359e71636c62f4b2f56af6651c93b040ac1c...
6,3112,c86a7dd3-27ea-432a-b5be-a3d6f42e1d34,1529,1,WAITING,"[b'\t', b'A', b'\xfe', b'\xbd', b'\x8f', b'\x9...",16,3112,0941febd8f91975bc2909df3cc1ecdac14cc19188fb53d...
7,3113,c86a7dd3-27ea-432a-b5be-a3d6f42e1d34,2520,1,QUERIED,"[b'\t', b'\x9c', b'#', b'\x1e', b'\xd4', b'q',...",16,3113,099c231ed471bafcb82a2036c533a6c8a68e1167b27ffc...
8,3114,c86a7dd3-27ea-432a-b5be-a3d6f42e1d34,533,1,QUERIED,"[b'\t', b'\xc0', b'\x02', b""'"", b'3', b'\x8f',...",16,3114,09c00227338f7735479f0a6074386db155b3c7e2910193...
9,3115,c86a7dd3-27ea-432a-b5be-a3d6f42e1d34,2514,1,WAITING,"[b'\t', b'\xc4', b'\x10', b'*', b'\x13', b'\x1...",16,3115,09c4102a131b58f66618b3132998e9e079c89374433bcd...


In [102]:
query = f"""
WITH cte AS (
    SELECT -1 find_nodes_rpc_id, d.remote_id, min(d.started_at) "timestamp", 'dial_start' status, null error
    FROM dials d
             INNER JOIN provides_x_dials pxd on d.id = pxd.dial_id
    WHERE pxd.provide_id = {db_provide["id"][0]}
    GROUP BY d.remote_id
    UNION
    SELECT -1 find_nodes_rpc_id, d.remote_id,
           max(d.ended_at) "timestamp",
           'dial_end'      status,
           CASE
               WHEN (array_length(array_remove(array_agg(d.error), null), 1) = array_length(array_agg(d.error), 1)) THEN
                   'error'
               ELSE
                   null
               END         error
    FROM dials d
             INNER JOIN provides_x_dials pxd on d.id = pxd.dial_id
    WHERE pxd.provide_id = {db_provide["id"][0]}
    GROUP BY d.remote_id
    UNION
    SELECT fnrpcs.id, fnrpcs.remote_id, fnrpcs.started_at "timestamp", 'start' status, fnrpcs.error
    FROM find_nodes_rpcs fnrpcs
             INNER JOIN provides_x_find_nodes_rpcs pxfnr on fnrpcs.id = pxfnr.find_nodes_rpc_id
    WHERE pxfnr.provide_id = {db_provide["id"][0]}
      AND query_id != '00000000-0000-0000-0000-000000000000'
    UNION
    SELECT fnrpcs.id, fnrpcs.remote_id, fnrpcs.ended_at "timestamp", 'end' status, fnrpcs.error
    FROM find_nodes_rpcs fnrpcs
             INNER JOIN provides_x_find_nodes_rpcs pxfnr on fnrpcs.id = pxfnr.find_nodes_rpc_id
    WHERE pxfnr.provide_id = {db_provide["id"][0]}
      AND query_id != '00000000-0000-0000-0000-000000000000'
)
SELECT cte.find_nodes_rpc_id, cte.remote_id, cte.status, cte.error, cte.timestamp
FROM cte
WHERE cte.timestamp < (
    SELECT max(fnrpcs.started_at) FROM find_nodes_rpcs fnrpcs
        INNER JOIN provides_x_find_nodes_rpcs pxfnr on fnrpcs.id = pxfnr.find_nodes_rpc_id
    WHERE pxfnr.provide_id = {db_provide["id"][0]} AND query_id != '00000000-0000-0000-0000-000000000000'
)
ORDER BY timestamp
"""
db_find_nodes = pd.read_sql_query(query, con=conn)
db_find_nodes

Unnamed: 0,find_nodes_rpc_id,remote_id,status,error,timestamp
0,-1,2526,dial_start,,2022-03-31 09:58:16.931285+00:00
1,-1,2529,dial_start,,2022-03-31 09:58:16.931453+00:00
2,884,2525,start,,2022-03-31 09:58:16.931658+00:00
3,881,2528,start,,2022-03-31 09:58:16.931748+00:00
4,-1,1634,dial_start,,2022-03-31 09:58:16.932024+00:00
5,-1,2520,dial_start,,2022-03-31 09:58:16.932093+00:00
6,-1,2527,dial_start,,2022-03-31 09:58:16.932278+00:00
7,-1,533,dial_start,,2022-03-31 09:58:16.932431+00:00
8,881,2528,end,,2022-03-31 09:58:16.943073+00:00
9,-1,2939,dial_start,,2022-03-31 09:58:16.943555+00:00


In [103]:
qp = QueryPeerset()
for idx, row in db_seed_peers.iterrows():
    qp.try_add(row["peer_id"], int(row["hex_distance"], base=16), db_provide["provider_id"][0])

for idx, row in db_find_nodes.iterrows():
    if row["status"] == "dial_start":
        if qp.get_state(row["remote_id"]) == PeerState.PEER_HEARD:
            qp.set_state(row["remote_id"], PeerState.PEER_WAITING)
    elif row["status"] == "start":
        qp.set_state(row["remote_id"], PeerState.PEER_WAITING)
    elif row["status"] == "end":
        query = f"""
        SELECT cp.peer_id, encode(ps.distance, 'hex') hex_distance FROM closer_peers cp
            LEFT JOIN peer_states ps on ps.peer_id = cp.peer_id AND ps.query_id = 'c86a7dd3-27ea-432a-b5be-a3d6f42e1d34'
        WHERE find_node_rpc_id = {int(row["find_nodes_rpc_id"])}
        """
        db_closer_peers = pd.read_sql_query(query, con=conn)
        for iidx, irow in db_closer_peers.iterrows():
            assert irow["hex_distance"] is not None

            if irow["peer_id"] == db_provide["provider_id"][0]:
                continue
            qp.try_add(irow["peer_id"], int(irow["hex_distance"], base=16), row["remote_id"])

        if row["error"] is None:
            assert qp.get_state(row["remote_id"]) == PeerState.PEER_WAITING
            qp.set_state(row["remote_id"], PeerState.PEER_QUERIED)
        elif row["error"] != "context canceled":
            assert qp.get_state(row["remote_id"]) == PeerState.PEER_WAITING
            qp.set_state(row["remote_id"], PeerState.PEER_UNREACHABLE)


In [104]:
list(map(lambda x: str(x), qp.get_closest_in_states([PeerState.PEER_WAITING])[:20]))

['1572 in state PEER_WAITING',
 '2940 in state PEER_WAITING',
 '2932 in state PEER_WAITING',
 '2941 in state PEER_WAITING',
 '2936 in state PEER_WAITING',
 '2935 in state PEER_WAITING',
 '2942 in state PEER_WAITING',
 '2526 in state PEER_WAITING']

In [14]:
qp.get_closest_n_in_states(20, [PeerState.PEER_QUERIED])[:20]

['12D3KooWKwFPVTUrCNn9JmruQxbwaNGiT44fsh8LgWGupFS2cpCo',
 '12D3KooWCMKuGj6BpV5dotnu3xhQQ4ozPhDkrMPptTGS5q6sugRA',
 '12D3KooWGPLvP7zZGkoT73swXyHTMyGEQvXsgN3VPY3wWWgmyuUu',
 '12D3KooWEoNAsunHeJYKdvL3mK56yhBb4SSte1x4cCE8keXdJxDd',
 '12D3KooWHVPYRwDF1rqiSpoQNorodMqYEa7aZmx1xw7G82dCoCac',
 'QmQEmFgTASSpQ8c2JqCLBvjiWxxzPPdUh2tHHFzeZWtHs9',
 '12D3KooWCeNdy8pe4oX7nmoK9dVNAAexdsUsMShD1N534snd83Ba',
 '12D3KooWHj8Q7UoGSk86o8aiZaiiahxfrSdWmvqdxX4A6joUs5SP',
 '12D3KooWRgqEhuvQK75TfYr1VYXMVwMfNyk3KyHLyTd9Nhs47yun',
 '12D3KooWKNafq5iD9cBwzTDuZNWnhQvwABLSkvMNijM7kUKweQp2',
 '12D3KooWSc3MDPZB4SqXQasoFYoWHuyPyqD5HnCvTx6ye1A4M132',
 '12D3KooWKQqpoVp22dEzs26pwrjAQWWLj6s9Rg3KB8Y4GmGHp9kf',
 '12D3KooWCD9o8Khz6CyM8jckfaYqskBMZvgbeQ2mm8eyKSjDdb98']