In [None]:
import operator
import re

from collections import defaultdict
from datetime import datetime, timedelta
from functools import cached_property, reduce
from itertools import combinations
from pathlib import Path

import attr
import networkx as nx
import numpy as np
import pandas as pd
import statsmodels.api as sm

from matplotlib import pyplot as plt
plt.rcParams["figure.figsize"] = (15,7)

# DB is up to 9/4 21:15

In [None]:
def date_parse(epoch_str, shift=6):
    e = epoch_str
    return datetime.fromtimestamp(float('.'.join((e[:-shift], e[-shift:]))))

def recursive_file_iter(path: Path):
    for f in path.iterdir():
        if f.is_dir():
            yield from recursive_file_iter(f)
        else:
            yield f

def fetch_self_id(log_path: Path):
    p2p_line_rgx = re.compile(r'\bStarted P2P networking\b')
    p2p_node_rgx = re.compile(r'\bself=(.*)\b')

    with log_path.open() as f:
        while l:=f.readline():
            if p2p_line_rgx.search(l):
                return p2p_node_rgx.search(l).group(1)

@attr.s(frozen=True)
class Peer:
    n_id = attr.ib()
    ip   = attr.ib(cmp=False)
    port = attr.ib(cmp=False)
    
    peer_rgx = re.compile(r'enode://([\da-fA-F]+)@([\d.]+):(\d+).*')
    
    @classmethod
    def from_enode_str(cls, enode_str):
        if m := cls.peer_rgx.search(enode_str):
            return cls(*m.groups())
        raise ValueError(enode_str)


@attr.s(order=False)
class DynamicRange:
    start = attr.ib(default=None)
    end   = attr.ib(default=None)
    
    def __attrs_post_init__(self):
        if self.end is None:
            self.end = self.start
        if self.start is None:
            self.start = self.end
        if self and self.start > self.end:
            self.start = self.end = None
    
    def __bool__(self):
        assert (self.start is None) == (self.end is None)
        return self.start is not None

    def __iadd__(self, dt):
        if not self:
            self.start = self.end = dt
            return self
        
        if dt < self.start:
            self.start = dt
        if dt > self.end:
            self.end = dt

        return self
    
    def __and__(self, other):
        start = max(self.start, other.start)
        end   = min(self.end  , other.end  )
        
        return DynamicRange(start, end)
    
    def __contains__(self, item):
        return bool(self) and self.start <= item < self.end
    
    def __str__(self):
        return f'[{self.start}, {self.end}]'


def generate_peer_range(peer_file, date_range):
    header = '='
    end_of_batch = 'undefined'

    peers = defaultdict(DynamicRange)

    lines = iter(peer_file.read_text().splitlines())
    while line := next(lines, None):
        assert line == header, line
        time = datetime.fromtimestamp(int(next(lines)) / 10**3)

        while (peer := next(lines)) != end_of_batch:
            if time not in date_range:
                continue
            if peer.startswith('removed peer'):
                next(lines)
                continue
            peers[Peer.from_enode_str(peer)] += time
    
    return peers

def fetch_events(path: Path):
    entry_rgx = re.compile('tx_timestamp:(\d*),hash:(0x[\da-fA-F]*),')
    
    if not path.exists():
        return ()

    return (
        (h, datetime.fromtimestamp(int(ts)/10**6))
        for h,ts in (entry_rgx.search(r).group(2,1)
             for r in path.read_text().splitlines()
                 if r.startswith('status:addTXs-tx_pool.go')
        )
    )

def keep_tx_between(tx_time_map, date_range):
    return {
        h: t for h,t in tx_time_map.items() if t in date_range
    }

@attr.s(frozen=True)
class Client:
    name     : str  = attr.ib()
    directory: Path = attr.ib()

    def _size(self):
        return sum((file.stat().st_size for file in recursive_file_iter(self.directory)))

    def size(self):
        tries = 10
        for _ in range(tries):
            try:
                return self._size()
            except FileNotFoundError:
                continue
        return None

    @property
    def logs_path(self) -> Path:
        return self.directory / 'logs'

    @property
    def hist_path(self) -> Path:
        return self.directory / 'mempool_history'

    @property
    def events_path(self) -> Path:
        return self.directory / 'mempool_history.events.csv'

    @property
    def peer_path(self) -> Path:
        return self.directory / 'current_peers'

    @cached_property
    def enode(self):
        return Peer.from_enode_str(fetch_self_id(self.logs_path))

    def peers_at(self, date_range: DynamicRange):
        return generate_peer_range(self.peer_path, date_range)

    def txs(self):
        df = pd.read_csv(
            self.events_path,
            parse_dates=['status_timestamp'],
            date_parser=date_parse,
            index_col='status_timestamp'
        )
        return df[df['status'] == 'addTXs-tx_pool.go'].copy()
    
    def txs_at(self, date_range:DynamicRange):
        return self.txs().sort_index().truncate(
            before=date_range.start,
            after =date_range.end,
        )

    def txs_at__direct(self, date_range:DynamicRange):
        return keep_tx_between(generate_entry_map(self.hist_path), date_range)

    def __str__(self):
        return self.name


@attr.s(frozen=True)
class ClientAt:
    client    : Client       = attr.ib()
    date_range: DynamicRange = attr.ib()

    @property
    def enode(self):
        return self.client.enode
    
    @property
    def name(self):
        return self.client.name
    
    @cached_property
    def peers(self):
        return self.client.peers_at(self.date_range)
    
    @cached_property
    def txs(self):
        return self.client.txs_at(self.date_range)

    def __str__(self):
        return f'{self.client}@{self.date_range}'
    
    def __format__(self, fmt):
        return f'{self.client}'


def generate_intersection_df(clients):
    intersection = reduce(operator.and_, (c.txs.keys() for c in clients))
    return pd.DataFrame.from_records(
        ((h, *(c.txs[h] for c in clients)) for h in intersection),
        columns=('hash', *map(format, clients)),
    )

In [None]:
date_range = DynamicRange(
    start=datetime.fromisoformat('2022-04-09T09:00:00'),
    end  =datetime.fromisoformat('2022-04-09T21:00:00'),
)
# date_range = DynamicRange(
#     start=datetime.now() - timedelta(hours=24),
#     end  =datetime.now(),
# )

In [None]:
client_ids = [
    1,
    2,
    3,
    4,
    5,
    6,
    7,
    8,
]

client_base_dir = Path('/data/avi/eth_clients')
assert client_base_dir.exists()

clients = [Client(client:=f'c{i}', client_base_dir/client) for i in client_ids]

In [None]:
# print('Current sizes:')
# total_size = 0

# for c in clients:
#     size = c.size()
#     print(f'{c}: {size / 2**30:>6.2f}GB')
#     total_size += size

# print(f'\nTOTAL: {total_size / 2**30:>6.2f}GB')

In [None]:
clients = [ClientAt(client=client, date_range=date_range) for client in clients]

In [None]:
for c in clients:
    print(f'{c}: {len(c.peers)}')

In [None]:
g = nx.Graph()

for client in clients:
    g.add_node(client.enode)
    
    for peer in client.peers:
        if peer not in g:
            g.add_node(peer)
        g.add_edge(client.enode, peer)

pos = nx.layout.spring_layout(g)

ours   = [c.enode for c in clients]
others = g.nodes - ours
node_size = 160
nx.draw_networkx_nodes (g, pos=pos, node_size=node_size, nodelist=others)
nx.draw_networkx_nodes (g, pos=pos, node_size=node_size, nodelist=ours  , node_color='orange' , label='name')
nx.draw_networkx_edges (g, pos=pos, node_size=300)
nx.draw_networkx_labels(g, pos=pos, labels={c.enode: c.name for c in clients});

In [None]:
for a, b in combinations(clients, 2):
    intersection = a.peers.keys() & b.peers.keys()
    if not intersection:
        continue

    print(f'clients: {a}&{b} intersection size: {len(intersection)}')
    for peer in intersection:
        print(a.peers[peer] & b.peers[peer])
    
    print()

In [None]:
for client in clients:
    print(f'{client} {len(client.txs):>10,}')

In [None]:
dfs = {}
diff_col = 'diff'

for a, b in combinations(clients, 2):
    print(f'clients: {a} {b}')
    df = generate_intersection_df((a, b))
    if len(df) == 0:
        print('∅')
        continue
    df[diff_col] = (df[a.name] - df[b.name]).abs()
    dfs[(a.name, b.name)] = df
    
    print(f'mean = {df[diff_col].mean()}')
    print(f'25%  = {df[diff_col].quantile(.25)}')
    print(f'50%  = {df[diff_col].quantile(.50) }')
    print(f'75%  = {df[diff_col].quantile(.75)}')
    print(f'90%  = {df[diff_col].quantile(.90)}')
    print(f'95%  = {df[diff_col].quantile(.95)}')
    print(f'99%  = {df[diff_col].quantile(.99)}')
    print(f'max  = {df[diff_col].max()}')   
    print()

In [None]:
i = 0
for (a, b), df in dfs.items():
    i += 1
    # if i%3 != 0:
        # continue

    ax = (df[diff_col]
     .apply(timedelta.total_seconds)/60
    ).rename(f'{a}-{b}').quantile(np.arange(200)/200).plot()

    ax.set_yscale('log')
    ax.set_ylabel('minutes')
    ax.set_xlabel('quantile')
    ax.legend()