From 40d307549f17fd92576e1482d6b7a02c0377b8ea Mon Sep 17 00:00:00 2001 From: qstokkink Date: Tue, 17 Sep 2019 14:25:55 +0200 Subject: [PATCH] Added latency community --- ipv8/peerdiscovery/latency/__init__.py | 0 ipv8/peerdiscovery/latency/cache.py | 60 +++ ipv8/peerdiscovery/latency/community.py | 321 +++++++++++++++ ipv8/peerdiscovery/latency/discovery.py | 214 ++++++++++ ipv8/peerdiscovery/latency/payload.py | 57 +++ ipv8/peerdiscovery/latency/peer_selection.py | 179 +++++++++ ipv8/test/peerdiscovery/latency/__init__.py | 0 .../peerdiscovery/latency/test_community.py | 86 ++++ .../peerdiscovery/latency/test_discovery.py | 367 ++++++++++++++++++ .../latency/test_peer_selection.py | 49 +++ test_classes_list.txt | 4 + 11 files changed, 1337 insertions(+) create mode 100644 ipv8/peerdiscovery/latency/__init__.py create mode 100644 ipv8/peerdiscovery/latency/cache.py create mode 100644 ipv8/peerdiscovery/latency/community.py create mode 100644 ipv8/peerdiscovery/latency/discovery.py create mode 100644 ipv8/peerdiscovery/latency/payload.py create mode 100644 ipv8/peerdiscovery/latency/peer_selection.py create mode 100644 ipv8/test/peerdiscovery/latency/__init__.py create mode 100644 ipv8/test/peerdiscovery/latency/test_community.py create mode 100644 ipv8/test/peerdiscovery/latency/test_discovery.py create mode 100644 ipv8/test/peerdiscovery/latency/test_peer_selection.py diff --git a/ipv8/peerdiscovery/latency/__init__.py b/ipv8/peerdiscovery/latency/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/ipv8/peerdiscovery/latency/cache.py b/ipv8/peerdiscovery/latency/cache.py new file mode 100644 index 000000000..ab7fd5610 --- /dev/null +++ b/ipv8/peerdiscovery/latency/cache.py @@ -0,0 +1,60 @@ +from __future__ import absolute_import + +from twisted.internet.defer import Deferred + +from ...requestcache import NumberCache + + +class ProposalCache(NumberCache): + """ + Cache for keeping track of a made proposal. + This cache concludes upon proposal (1) accept, (2) reject or (3) timeout. + """ + + def __init__(self, overlay, peer, nonce): + super(ProposalCache, self).__init__(overlay.request_cache, u"proposal-cache", + self.number_from_pk_nonce(peer.mid, nonce)) + self.overlay = overlay + self.peer = peer + + @classmethod + def number_from_pk_nonce(cls, public_key, nonce): + """ + Create an identifier from a public key and a nonce. + + :param public_key: the counterparty public key + :type public_key: str or bytes + :param nonce: the nonce for this proposal + :type nonce: str or bytes + :return: the identifier for the given parameters + :rtype: int + """ + number = nonce + for c in public_key: + number <<= 8 + number += c if isinstance(c, int) else ord(c) + return number + + def on_timeout(self): + """ + When timing out, we remove this proposal from the open proposals. + + :returns: None + """ + try: + self.overlay.open_proposals.remove(self.peer) + except KeyError: + self.overlay.logger.debug("Proposal timed out, but peer already removed.") + + +class StatsRequestCache(NumberCache): + """ + Cache for waiting for a stats response. + """ + + def __init__(self, overlay): + super(StatsRequestCache, self).__init__(overlay.request_cache, u"stats-request", overlay.claim_global_time()) + self.deferred = Deferred() + + def on_timeout(self): + self.deferred.errback(None) diff --git a/ipv8/peerdiscovery/latency/community.py b/ipv8/peerdiscovery/latency/community.py new file mode 100644 index 000000000..9518e3e70 --- /dev/null +++ b/ipv8/peerdiscovery/latency/community.py @@ -0,0 +1,321 @@ +from __future__ import absolute_import, division + +import os +import struct +import time +from binascii import unhexlify +from collections import namedtuple +from math import floor + +from twisted.internet.defer import fail +from twisted.internet.task import LoopingCall + +from .cache import ProposalCache, StatsRequestCache +from .discovery import LatencyEdgeWalk +from .payload import (BreakMatchPayload, ProposalAcceptPayload, ProposalPayload, ProposalRejectPayload, + StatsRequestPayload, StatsResponsePayload) +from .peer_selection import Option, PeerSelector, generate_reference +from ...community import DEFAULT_MAX_PEERS +from ...lazy_community import lazy_wrapper +from ...peer import Peer +from ...peerdiscovery.community import DiscoveryCommunity + + +Stats = namedtuple('Stats', ["total", "possible", "matched"]) + + +def generate_nonce(): + """ + Create a 2 byte securely random nonce. + + :return: the 2-byte securely random integer + :rtype: int + """ + return struct.unpack(">H", os.urandom(2))[0] + + +def get_current_time(): + """ + Get the current time in 10s of seconds. + + :return: the time in 10s since the UNIX epoch + :rtype: int + """ + return int(time.time() / 10) + + +DEFAULT_PING_BINS = [x * 0.05 + 0.001 for x in range(0, 40, 1)] + + +class LatencyCommunity(DiscoveryCommunity): + + master_peer = Peer(unhexlify("4c69624e61434c504b3aaf489217d2a689086b9103bd7a7a249021f387e1af10c06a0dc82ea0c65786" + "041b682e0db8fce6b4c3db0d4e47e4afbeed2e633752b949820dad16af1962d7fa")) + + def __init__(self, my_peer, endpoint, network, max_peers=DEFAULT_MAX_PEERS, anonymize=False, preferred_count=60, + k_window=30, ping_time_bins=DEFAULT_PING_BINS): + """ + :param preferred_count: the maximum amount of partners, you will probably get between this and half of this + :type preferred_count: int + :param k_window: the amount of proposals to consider at the same time + :type k_window: int + :param ping_time_bins: the list of function evaluation points + :type ping_time_bins: [float] + """ + super(LatencyCommunity, self).__init__(my_peer, endpoint, network, max_peers=max_peers, anonymize=anonymize) + + self.ping_reference_bins = generate_reference(lambda x: preferred_count / x, ping_time_bins, preferred_count) + self.preferred_count = preferred_count + self.k_window = k_window + + self.possible_peers = [] # List of peers in the discovery + self.acceptable_peers = set() # Peers we want included in our next round + + self.open_proposals = set() + self.accepted_proposals = set() + + self.add_message_handler(ProposalPayload.msg_id, self.on_proposal) + self.add_message_handler(ProposalAcceptPayload.msg_id, self.on_accept_proposal) + self.add_message_handler(ProposalRejectPayload.msg_id, self.on_reject_proposal) + self.add_message_handler(BreakMatchPayload.msg_id, self.on_break_match) + self.add_message_handler(StatsRequestPayload.msg_id, self.on_stats_request) + self.add_message_handler(StatsResponsePayload.msg_id, self.on_stats_response) + + self.request_cache.register_task("update_acceptable_peers", + LoopingCall(self.update_acceptable_peers)).start(5.0, False) + + def get_available_strategies(self): + out = super(LatencyCommunity, self).get_available_strategies() + out['LatencyEdgeWalk'] = LatencyEdgeWalk + return out + + def check_payload(self, payload): + """ + Check if a given payload (with `peerid` field) is targeted to us. + + :param payload: the payload to check for + :type payload: Payload + :except: RuntimeError + :returns: None + """ + if payload.peerid != self.my_peer.mid: + raise RuntimeError("Someone is replay attacking us!") + + def update_acceptable_peers(self): + """ + Propose to a fresh set of peers or swap out suboptimal peers. + + :returns: None + """ + # Clean up mappings + peer_set = self.get_peers() + self.open_proposals = set(p for p in self.open_proposals if p in peer_set) + self.accepted_proposals = set(p for p in self.accepted_proposals if p in peer_set) + # If necessary, send out new proposals + open_for_proposal_count = self.preferred_count - len(self.accepted_proposals) - len(self.open_proposals) + if open_for_proposal_count > 0: + peer_selector = PeerSelector(self.ping_reference_bins, + included=[Option(peer.get_median_ping(), peer) + for peer in self.accepted_proposals + if peer.get_median_ping() is not None]) + options = [] + # Only consider peers that are not already accepted or proposed to + for peer in self.possible_peers: + if (peer not in self.accepted_proposals and peer not in self.open_proposals + and peer.get_median_ping() is not None): + options.append(Option(peer.get_median_ping(), peer)) + # Maximally send out K_WINDOW proposals at the same time + choices = [] + for _ in range(self.k_window): + choice = peer_selector.decide(options) + if choice is not None: + options.remove(choice) + choices.append(choice) + # If the K_WINDOW goes over the PREFERRED_COUNT, stop + if len(peer_selector.included) == (self.preferred_count - len(self.accepted_proposals) + - len(self.open_proposals)): + break + new_options = [tup.obj for tup in choices] + self.acceptable_peers = new_options + list(self.open_proposals) + list(self.accepted_proposals) + for peer in new_options: + self.send_proposal(peer) + elif self.preferred_count == len(self.accepted_proposals): + # Remove the current worst peer, if there is one + peer_selector = PeerSelector(self.ping_reference_bins, + included=[Option(peer.get_median_ping(), peer) + for peer in self.accepted_proposals + if peer.get_median_ping() is not None]) + worst = peer_selector.current_worst() + if worst: + peer = worst.obj + self.accepted_proposals.remove(peer) + packet = self.ezr_pack(BreakMatchPayload.msg_id, BreakMatchPayload(get_current_time(), peer.mid)) + self.endpoint.send(peer.address, packet) + + def send_proposal(self, peer): + """ + Send a proposal to a given peer. + + :param peer: the peer to send the proposal to + :type peer: Peer + :returns: None + """ + nonce = generate_nonce() + self.open_proposals.add(peer) + self.request_cache.add(ProposalCache(self, peer, nonce)) + packet = self.ezr_pack(ProposalPayload.msg_id, ProposalPayload(nonce, peer.mid)) + self.endpoint.send(peer.address, packet) + + @lazy_wrapper(ProposalPayload) + def on_proposal(self, peer, payload): + """ + Upon receiving a proposal, respond with an acceptation or rejection. + + :param peer: the peer we have received a proposal from + :type peer: Peer + :param payload: the proposal payload + :type payload: ProposalPayload + :returns: None + """ + self.check_payload(payload) + accept = False + if peer in self.acceptable_peers or peer in self.open_proposals or peer in self.accepted_proposals: + accept = True + elif len(self.open_proposals) + len(self.accepted_proposals) < self.preferred_count: + if len(self.open_proposals) + len(self.accepted_proposals) < floor(self.preferred_count * 0.75): + accept = True + if not peer.get_median_ping(): + self.send_ping(peer) + elif peer.get_median_ping(): + peer_selector = PeerSelector(self.ping_reference_bins, + included=[Option(p.get_median_ping(), p) + for p in self.accepted_proposals + if p.get_median_ping() is not None]) + if peer_selector.decide([Option(peer.get_median_ping(), peer)]): + accept = True + if accept: + packet = self.ezr_pack(ProposalAcceptPayload.msg_id, ProposalAcceptPayload(payload.nonce, peer.mid)) + self.accepted_proposals.add(peer) + else: + packet = self.ezr_pack(ProposalRejectPayload.msg_id, ProposalRejectPayload(payload.nonce, peer.mid)) + self.endpoint.send(peer.address, packet) + + @lazy_wrapper(ProposalAcceptPayload) + def on_accept_proposal(self, peer, payload): + """ + If someone accepted our proposal update our mappings. + + :param peer: the peer that sent us the accept + :type peer: Peer + :param payload: the acceptation payload + :type payload: ProposalAcceptPayload + :returns: None + """ + self.check_payload(payload) + try: + request_cache = self.request_cache.pop(u"proposal-cache", + ProposalCache.number_from_pk_nonce(peer.mid, payload.nonce)) + if request_cache: + if len(self.accepted_proposals) < self.preferred_count or peer in self.accepted_proposals: + self.accepted_proposals.add(peer) + else: + self.logger.debug("%s accepted our proposal, but we don't want it anymore!", str(peer)) + packet = self.ezr_pack(BreakMatchPayload.msg_id, BreakMatchPayload(get_current_time(), peer.mid)) + self.endpoint.send(peer.address, packet) + self.open_proposals.remove(peer) + else: + self.logger.debug("Got timed out or unwanted proposal response.") + except KeyError: + self.logger.debug("Got timed out or unwanted proposal response.") + + @lazy_wrapper(ProposalRejectPayload) + def on_reject_proposal(self, peer, payload): + """ + If someone rejected our proposal update our mappings. + + :param peer: the peer that sent us the reject + :type peer: Peer + :param payload: the rejection payload + :type payload: ProposalRejectPayload + :returns: None + """ + self.check_payload(payload) + try: + request_cache = self.request_cache.pop(u"proposal-cache", + ProposalCache.number_from_pk_nonce(peer.mid, payload.nonce)) + if request_cache: + self.open_proposals.remove(peer) + else: + self.logger.debug("Got timed out or unwanted proposal response.") + except KeyError: + self.logger.debug("Got timed out or unwanted proposal response.") + + @lazy_wrapper(BreakMatchPayload) + def on_break_match(self, peer, payload): + """ + If someone broke a match with us. + + :param peer: the peer that sent us the break + :type peer: Peer + :param payload: the break payload + :type payload: BreakMatchPayload + :returns: None + """ + self.check_payload(payload) # Peer id is correct + current_time = get_current_time() + if not current_time - 1 <= payload.time <= current_time: + self.logger.debug("Got timed out match break.") + return + try: + self.accepted_proposals.remove(peer) + except KeyError: + self.logger.debug("Tried to match break a non-accepted peer.") + + def send_stats_request(self, peer): + """ + Request the stats of a particular peer. + + :param peer: the peer to request from + :return: deferred object to wait for + :rtype: Deferred + """ + cache = self.request_cache.add(StatsRequestCache(self)) + if cache: + packet = self.ezr_pack(StatsRequestPayload.msg_id, StatsRequestPayload(cache.number)) + self.endpoint.send(peer.address, packet) + return cache.deferred + return fail(None) + + @lazy_wrapper(StatsRequestPayload) + def on_stats_request(self, peer, payload): + """ + If someone requests our stats. + + :param peer: the peer that sent us the request + :type peer: Peer + :param payload: the stats request payload + :type payload: StatsRequestPayload + :returns: None + """ + response = StatsResponsePayload(payload.identifier, + len(self.get_peers()), len(self.possible_peers), len(self.accepted_proposals)) + packet = self.ezr_pack(StatsResponsePayload.msg_id, response) + self.endpoint.send(peer.address, packet) + + @lazy_wrapper(StatsResponsePayload) + def on_stats_response(self, peer, payload): + """ + If someone responds with their stats. + + :param peer: the peer that sent us the request + :type peer: Peer + :param payload: the stats response payload + :type payload: StatsResponsePayload + :returns: None + """ + try: + cache = self.request_cache.pop(u"stats-request", payload.identifier) + if cache: + cache.deferred.callback(Stats(payload.total, payload.possible, payload.matched)) + except KeyError: + self.logger.debug("Got a timed-out or unwanted StatsResponsePayload.") diff --git a/ipv8/peerdiscovery/latency/discovery.py b/ipv8/peerdiscovery/latency/discovery.py new file mode 100644 index 000000000..f2927eb91 --- /dev/null +++ b/ipv8/peerdiscovery/latency/discovery.py @@ -0,0 +1,214 @@ +from __future__ import absolute_import, division + +import time + +from ..discovery import DiscoveryStrategy + + +class LatencyEdgeWalk(DiscoveryStrategy): + + def __init__(self, overlay, max_roots=30, max_edge_length=6, max_similarity=0.05, gc_delay=30.0): + """ + Create a new LatencyEdgeWalk strategy. + + :param overlay: the overlay to apply this strategy to + :param max_roots: the node count from the bootstrap server + :param max_edge_length: the maximum edge length + :param max_similarity: the maximum similarity between ping times, in seconds + :param gc_delay: the interval for garbage collection of loose connections + """ + super(LatencyEdgeWalk, self).__init__(overlay) + + # Variables + self.max_roots = max_roots + self.max_edge_length = max_edge_length + self.max_similarity = max_similarity + self.gc_delay = gc_delay + self.last_gc = 0.0 # UNIX timestamp 0 + + # Data structures + self.roots = [] + self.ancestry = {} # Peer introduced by Peer (or None) + self.leaves = [] # Current edges' HEAD Peer objects + + # If the overlay's max_peers does not match our settings, this algorithm will fail spectacularly. + # Therefore we override any settings from the overlay with our own automatically. + overlay.max_peers = max_roots * max_edge_length + + def get_root_address(self): + """ + Bootstrap into a suffient set of root nodes. + + :returns: None + """ + self.overlay.bootstrap() + existing_mids = [p.mid for p in self.overlay.network.verified_peers] + for peer in self.overlay.get_peers(): + introducer, service = self.overlay.network._all_addresses[peer.address] + if introducer not in existing_mids and peer not in self.roots: + # Bootstrapped peer, not in use + self.roots.append(peer) + self.leaves.append(peer) + + def get_granular_ping(self, peer): + """ + Get the ping for a peer, measure if necessary. + + :param peer: the peer to get the ping time for + :type peer: Peer + :return: the median ping for this peer + :rtype: float or None + """ + if not peer or not peer.pings or len(peer.pings) < peer.pings.maxlen: + self.overlay.send_ping(peer) + return None + return peer.get_median_ping() + + def check_extend_edge(self, leaf, leaf_pings, removed_leafs): + """ + Check if we can extend the edge a certain leaf resides on, do so if possible. + + :param leaf: the leaf to check for extension + :type leaf: Peer + :param leaf_pings: the list of pings in the edge of this leaf + :type leaf_pings: [float] + :param removed_leafs: reference the the leaves to be removed after this iteration (we can add to this) + :type removed_leafs: [Peer] + :returns: None + """ + introductions = self.overlay.network.get_introductions_from(leaf) + for introduction in introductions: + ipeer = self.overlay.network.get_verified_by_address(introduction) + if ipeer and ipeer not in self.ancestry: + ipingtime = self.get_granular_ping(ipeer) + if ipingtime is None: + continue + unique = True + for ptime in leaf_pings: + if ptime - self.max_similarity <= ipingtime <= ptime + self.max_similarity: + unique = False + break + if unique: + removed_leafs.append(leaf) + self.leaves.append(ipeer) + self.ancestry[ipeer] = leaf + for other_intro in introductions: + if other_intro != introduction: + self.overlay.network.remove_by_address(other_intro) + else: + if ipeer: + self.get_granular_ping(ipeer) + self.overlay.walk_to(introduction) + + def ensure_leaf_pings(self, leaf): + """ + Get all of the pings in the ancestry of a leaf (measure if necessary) and make sure the leaf introduces us + to other peers. + + :param leaf: the leaf of a unique ping edge + :type leaf: Peer + :return: the list of ping times on this leaf's edge + :rtype: [float] + """ + depth = 0 + previous = leaf + leaf_pings = [] + # 1. Measure pings + while previous: + depth += 1 + pingtime = self.get_granular_ping(previous) + previous = self.ancestry.get(previous, None) + if pingtime is None: + continue + leaf_pings.append(pingtime) + # 2. Make sure we get introductions from the current leaf + if depth < self.max_edge_length: + self.overlay.send_introduction_request(leaf) + return leaf_pings + + def garbage_collect(self): + """ + Clean up connections we no longer use: + + - Peers which have not made it into any edge + - Peers in an edge which have gone offline + + :returns: None + """ + all_peers = self.overlay.get_peers() + if len(all_peers) > self.overlay.max_peers / 2 and time.time() - self.last_gc >= self.gc_delay: + self.last_gc = time.time() + + # 1. Remove peers which have not made it into any edge + my_peers = set(self.leaves) | set(self.ancestry.values()) + + to_remove = [peer for peer in all_peers if peer not in my_peers] + for peer in to_remove: + self.overlay.network.remove_peer(peer) + + # 2. Remove peers which have gone offline + remove_set = set(p for p in my_peers if p not in all_peers) + for leaf in self.leaves[:]: + is_leaf = True + re_leaf = False + next_node = None + current = leaf + while True: + previous = self.ancestry.get(current, None) + + if current in remove_set: + if is_leaf: + re_leaf = True # The next online node in the edge should become the new leaf + self.leaves.remove(current) + self.ancestry.pop(current, None) + if current in self.roots: + self.roots.remove(current) + if next_node and previous: + # We are in between two nodes, relink them + self.ancestry[next_node] = previous + elif next_node: + # We only have a next node, which means the current is the root and we should re-root + self.roots.append(next_node) + self.ancestry.pop(next_node, None) + else: + # We should not be removed, but we might have been appointed as a new leaf + if re_leaf: + self.leaves.append(current) + re_leaf = False + next_node = current + + current = previous + is_leaf = False + if not previous: + break + + def take_step(self): + """ + Perform an iteration of peer discovery: + + 1. Ensure we have enough edges + 2. Grow the edges if needed + 3. Garbage collect unused peers + + :returns: None + """ + with self.walk_lock: + # 1. Pick peer introduced by bootstrap + if len(self.roots) < self.max_roots: + self.get_root_address() + + # 2. For each edge < MAX_EDGE_LENGTH: grow edge based on last peer on edge + removed_leafs = [] + for leaf in self.leaves: + # 2.b. Ensure each edge has ping times and candidates to grow + leaf_pings = self.ensure_leaf_pings(leaf) + + # 3. On response: if MYKA allowed (<> MAX_SIMILARITY) add to edge + self.check_extend_edge(leaf, leaf_pings, removed_leafs) + + # If we updated our leaf list, remove old leaves (which are now part of the ancestry tree) + self.leaves = [leaf for leaf in self.leaves if leaf not in removed_leafs] + + # Update the overlay with the agreeable peers and garbage collect loose connections + self.overlay.possible_peers = [p for p in list(self.ancestry.values()) + self.leaves[:]] + self.garbage_collect() diff --git a/ipv8/peerdiscovery/latency/payload.py b/ipv8/peerdiscovery/latency/payload.py new file mode 100644 index 000000000..b3b6d4821 --- /dev/null +++ b/ipv8/peerdiscovery/latency/payload.py @@ -0,0 +1,57 @@ +from __future__ import absolute_import + +from ...messaging.lazy_payload import VariablePayload + + +class ProposalPayload(VariablePayload): + """ + Packet for proposing to another peer. + """ + msg_id = 5 + format_list = ['H', 'varlenH'] + names = ["nonce", 'peerid'] + + +class ProposalAcceptPayload(VariablePayload): + """ + Packet for accepting a proposal from another peer. + """ + msg_id = 6 + format_list = ['H', 'varlenH'] + names = ["nonce", 'peerid'] + + +class ProposalRejectPayload(VariablePayload): + """ + Packet for rejecting a proposal from another peer. + """ + msg_id = 7 + format_list = ['H', 'varlenH'] + names = ["nonce", 'peerid'] + + +class BreakMatchPayload(VariablePayload): + """ + Break a previously accepted proposal. + """ + msg_id = 8 + format_list = ['I', 'varlenH'] + names = ["time", 'peerid'] + + +class StatsRequestPayload(VariablePayload): + """ + Request for peer statistics. + """ + msg_id = 9 + format_list = ['Q'] + names = ["identifier"] + + +class StatsResponsePayload(VariablePayload): + """ + Response with peer statistics. + """ + msg_id = 10 + format_list = ['Q', 'I', 'I', 'I'] + names = ["identifier", "total", "possible", "matched"] diff --git a/ipv8/peerdiscovery/latency/peer_selection.py b/ipv8/peerdiscovery/latency/peer_selection.py new file mode 100644 index 000000000..8aff769c8 --- /dev/null +++ b/ipv8/peerdiscovery/latency/peer_selection.py @@ -0,0 +1,179 @@ +from __future__ import absolute_import, division + +import math +from collections import namedtuple +from random import shuffle + + +Option = namedtuple('Option', ['value', 'obj']) +ReferenceFuncPoint = namedtuple('ReferenceFuncPoint', ['x', 'y']) + + +def unweigthed_pdf(x, X, bandwidth): + """ + Given a 1D point x, a set of 1D bin center points X and the kernel bandwith, calculate the sum of the contributions + of x for each of the points in X. + + :param x: the point to check the contribution for + :type x: float + :param X: the measuring points of the contribution + :type X: [float] + :param bandwidth: the kernel bandwith for the kernel estimate + :type bandwidth: float + :return: the contribution of x to X + :rtype: float + """ + return sum((math.sqrt(2 * math.pi * bandwidth ** 2) ** -1) + * math.exp(-((x - x_i) ** 2) / (2 * bandwidth ** 2)) + for x_i in X) + + +def weighted_pdf(x, X, falloff): + """ + Given a 1D point x, a set of 1D bin center points X and the kernel bandwith, calculate the sum of the contributions + of x for each of the points in X. Normalize the result to 1.0. + + :param x: the point to check the contribution for + :type x: float + :param X: the measuring points of the contribution + :type X: [float] + :param falloff: the kernel bandwith for the kernel estimate + :type falloff: float + :return: the contribution of x to X + :rtype: float + """ + return unweigthed_pdf(x, X, falloff) / unweigthed_pdf(x, [x], falloff) + + +def get_error(references, included, option, falloff=0.025): + """ + Get the error of including an option next to the already included options, given some reference points. + We punish values over the reference point twice as much as those under the reference point. + + :param references: the requested value per measurement point + :type references: [ReferenceFuncPoint] + :param included: the already included options + :type included: [float] + :param option: the option to evaluate + :type option: Option + :param falloff: the kernel bandwith for the kernel estimate + :type falloff: float + :return: the error for the given option + :rtype: float + """ + errors = [] + for i in range(len(references)): + x, y = references[i] + d = weighted_pdf(x, included + ([option.value] if option is not None else []), falloff) + e = y - d + if d > y: + e *= -2 + errors.append(e) + return sum(errors) + + +def optimal_choice(references, included, options, falloff=0.025): + """ + Given reference values and already included options, select the best fit from the given options. + + :param references: the requested value per measurement point + :type references: [ReferenceFuncPoint] + :param included: the already included options + :type included: [float] + :param options: the options to evaluate + :type options: [Option] + :param falloff: the kernel bandwith for the kernel estimate + :type falloff: float + :return: the best option to include, if it exists + :rtype: Option or None + """ + best_option = None + best_mse = None + for option in [None] + options: + mse = get_error(references, included, option, falloff) + if best_mse is None or mse < best_mse or (best_option is None and mse == best_mse): + best_option = option + best_mse = mse + return best_option + + +def generate_reference(func, x_coords, peer_count): + """ + Given a function and the points on which to evaluate, generate reference points. + Normalize the function to fit a certain target count, such that the sum of all bins equals the requested peer count. + + :param func: the function to seed the bins with + :type func: function + :param x_coords: the x-coordinates to evaluate the given function + :type x_coords: [float] + :param peer_count: the total amount of requested peers + :type peer_count: int + :return: the reference points to use for the kernel density estimation + :rtype: [ReferenceFuncPoint] + """ + modifier = peer_count / sum(func(x) for x in x_coords) # Make sure ceil doesn't clip + distribution = [sum(weighted_pdf(x, x_coords, 0.025) for _ in range(int(math.ceil(modifier * func(x))))) + for x in x_coords] + modifier = sum(distribution) / len(x_coords) + return [ReferenceFuncPoint(x, modifier * func(x)) for x in x_coords] + + +class PeerSelector(object): + """ + Class to aid with selecting weighted peers to fit a weighted distribution and to remove peers to fit a given + weighted distribution. + """ + + def __init__(self, reference_points, included=None): + """ + Create new PeerSelector from a set of reference points. + For removal, give a set of already included peers. + + :param reference_points: the reference points which this selector is based on + :type reference_points: [ReferenceFuncPoint] + :param included: the optionally already included options + :type included: [Option] + """ + if not included: + self.included = [] + self._included_values = [] + else: + self.included = included + self._included_values = [option.value for option in included] + self.reference = reference_points + + def decide(self, options, falloff=0.025): + """ + Return the optimal option from the given options to include, if it exists. + + :param options: the available options + :type options: [Option] + :return: the optimal choice + :rtype: Option or None + """ + shuffle(options) + choice = optimal_choice(self.reference, self._included_values, options, falloff) + if choice is not None: + self.included.append(choice) + self._included_values.append(choice.value) + return choice + + def current_worst(self): + """ + Get the current worst included option. + + :return: the current worst included options + :rtype: Option + """ + current_worst = None + current_worst_mse = None + for option in self.included: + if option is None: + continue + values = [self.included[i].value for i in range(len(self.included)) + if i != self.included.index(option) and option is not None] + mse = get_error(self.reference, values, option) + if current_worst is None or mse > current_worst_mse: + current_worst = option + current_worst_mse = mse + return current_worst diff --git a/ipv8/test/peerdiscovery/latency/__init__.py b/ipv8/test/peerdiscovery/latency/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/ipv8/test/peerdiscovery/latency/test_community.py b/ipv8/test/peerdiscovery/latency/test_community.py new file mode 100644 index 000000000..8d2605bd7 --- /dev/null +++ b/ipv8/test/peerdiscovery/latency/test_community.py @@ -0,0 +1,86 @@ +from __future__ import absolute_import + +import collections + +from twisted.internet.defer import inlineCallbacks + +from ...base import TestBase +from ....peer import Peer +from ....peerdiscovery.latency.community import LatencyCommunity + + +class TestLatencyCommunity(TestBase): + + @inlineCallbacks + def setUp(self): + super(TestLatencyCommunity, self).setUp() + self.initialize(LatencyCommunity, 2, preferred_count=1) + self.peer0 = Peer(self.nodes[0].my_peer.key.pub(), self.nodes[0].my_peer.address) + self.peer0.pings = collections.deque([0.1, 0.11, 0.09, 0.1, 0.1], maxlen=5) + self.peer1 = Peer(self.nodes[1].my_peer.key.pub(), self.nodes[1].my_peer.address) + self.peer1.pings = collections.deque([0.1, 0.11, 0.09, 0.1, 0.1], maxlen=5) + yield self.introduce_nodes() + + @inlineCallbacks + def test_match(self): + """ + If two peers match, they should end up in each others matches. + """ + self.nodes[0].overlay.possible_peers = [self.peer1] + self.nodes[1].overlay.possible_peers = [self.peer0] + + self.nodes[0].overlay.update_acceptable_peers() + self.nodes[1].overlay.update_acceptable_peers() + + yield self.deliver_messages() + + self.assertIn(self.peer1, self.nodes[0].overlay.accepted_proposals) + self.assertIn(self.peer0, self.nodes[1].overlay.accepted_proposals) + + @inlineCallbacks + def test_no_match(self): + """ + If two peers don't match, they shouldn't end up in each others matches. + """ + self.nodes[0].overlay.possible_peers = [] + self.nodes[1].overlay.possible_peers = [self.peer0] + + self.nodes[0].overlay.update_acceptable_peers() + self.nodes[1].overlay.update_acceptable_peers() + + yield self.deliver_messages() + + self.assertListEqual([], list(self.nodes[0].overlay.accepted_proposals)) + self.assertListEqual([], list(self.nodes[1].overlay.accepted_proposals)) + + @inlineCallbacks + def test_unmatch(self): + """ + If a peer breaks a match, both peers should remove their matching. + """ + self.nodes[0].overlay.possible_peers = [self.peer1] + self.nodes[0].overlay.accepted_proposals = {self.peer1} + self.nodes[1].overlay.possible_peers = [self.peer0] + self.nodes[1].overlay.accepted_proposals = {self.peer0} + + self.nodes[0].overlay.preferred_count = 1 + self.nodes[0].overlay.update_acceptable_peers() + + yield self.deliver_messages() + + self.assertListEqual([], list(self.nodes[0].overlay.accepted_proposals)) + self.assertListEqual([], list(self.nodes[1].overlay.accepted_proposals)) + + @inlineCallbacks + def test_stats(self): + """ + Check if we can collect a peer's stats + """ + self.nodes[0].overlay.possible_peers = [self.peer0, self.peer1, self.peer0, self.peer1] + self.nodes[0].overlay.accepted_proposals = {self.peer1} + + stats = yield self.nodes[1].overlay.send_stats_request(self.peer0) + + self.assertEqual(1, stats.total) + self.assertEqual(4, stats.possible) + self.assertEqual(1, stats.matched) diff --git a/ipv8/test/peerdiscovery/latency/test_discovery.py b/ipv8/test/peerdiscovery/latency/test_discovery.py new file mode 100644 index 000000000..0a13c73ed --- /dev/null +++ b/ipv8/test/peerdiscovery/latency/test_discovery.py @@ -0,0 +1,367 @@ +from __future__ import absolute_import + +import collections + +from twisted.trial import unittest + +from ....keyvault.crypto import default_eccrypto +from ....peer import Peer +from ....peerdiscovery.latency.discovery import LatencyEdgeWalk +from ....peerdiscovery.network import Network + + +class MockOverlay(object): + + def __init__(self): + # Mocking + self.max_peers = 0 + self.network = Network() + + # Walker output + self.possible_peers = [] + + # Call inspection + self.sent_introduction_requests = [] + self.sent_walk_to = [] + self.sent_pings = [] + self.has_bootstrapped = False + + def send_introduction_request(self, peer): + self.sent_introduction_requests.append(peer) + + def send_ping(self, peer): + self.sent_pings.append(peer) + + def walk_to(self, address): + self.sent_walk_to.append(address) + + def get_peers(self): + return self.network.verified_peers + + def get_walkable_addresses(self): + return self.network.get_walkable_addresses() + + def bootstrap(self): + self.has_bootstrapped = True + + +class TestLatencyEdgeWalk(unittest.TestCase): + tracker_peer = None + root_peer = None + root_peer_pinged = None + + def setUp(self): + super(TestLatencyEdgeWalk, self).setUp() + self.overlay = MockOverlay() + + self.overlay.network.blacklist.append(self.tracker_peer.address) + self.overlay.network.blacklist_mids.append(self.tracker_peer.mid) + + self.walker = LatencyEdgeWalk(self.overlay) + + @classmethod + def setUpClass(cls): + cls.tracker_peer = Peer(default_eccrypto.generate_key(u"low").pub()) + cls.root_peer = Peer(default_eccrypto.generate_key(u"low").pub(), ("1.1.1.1", 1)) + cls.root_peer_pinged = Peer(default_eccrypto.generate_key(u"low").pub(), ("1.1.1.2", 1)) + cls.root_peer_pinged.pings = collections.deque([0.1, 0.11, 0.09, 0.1, 0.1], maxlen=5) + + def test_get_root_address_none(self): + """ + Check whether we try to bootstrap when requesting a root node. + """ + self.walker.get_root_address() + + self.assertTrue(self.overlay.has_bootstrapped) + self.assertListEqual([], self.walker.roots) + + def test_get_root_address_one(self): + """ + Check whether we add a root node when requesting a root node. + """ + self.overlay.network.add_verified_peer(self.root_peer) + self.walker.get_root_address() + + self.assertTrue(self.overlay.has_bootstrapped) + self.assertListEqual([self.root_peer], self.walker.roots) + + def test_get_granular_ping_no_data(self): + """ + Check if get_granular_ping returns None without enough pings. + """ + ping = self.walker.get_granular_ping(self.root_peer) + + self.assertIsNone(ping) + self.assertListEqual([self.root_peer], self.overlay.sent_pings) + + def test_get_granular_ping_data(self): + """ + Check if get_granular_ping returns the median ping with enough pings. + """ + ping = self.walker.get_granular_ping(self.root_peer_pinged) + + self.assertEqual(0.1, ping) + self.assertListEqual([], self.overlay.sent_pings) + + def test_check_extend_edge_unknown(self): + """ + Check if we walk to a possible edge extension. + """ + introduced = ('1.2.3.4', 5) + self.overlay.network.discover_address(self.root_peer_pinged, introduced) + + removed = [] + self.walker.check_extend_edge(self.root_peer_pinged, [self.root_peer_pinged.get_median_ping()], removed) + + self.assertListEqual([introduced], self.overlay.sent_walk_to) + self.assertListEqual([], removed) + + def test_check_extend_edge_ping_known(self): + """ + Check if we explore the current leaf's subgraph. + """ + verified_peer = Peer(default_eccrypto.generate_key(u"low").pub(), ('1.2.3.4', 5)) + self.overlay.network.discover_address(self.root_peer_pinged, verified_peer.address) + self.overlay.network.add_verified_peer(verified_peer) + self.walker.ancestry[verified_peer] = None + + removed = [] + self.walker.check_extend_edge(self.root_peer_pinged, [self.root_peer_pinged.get_median_ping()], removed) + + self.assertListEqual([verified_peer.address], self.overlay.sent_walk_to) + self.assertListEqual([], removed) + self.assertListEqual([verified_peer], self.overlay.sent_pings) + + def test_check_extend_edge_ensure_ping(self): + """ + Don't add verified peers without sufficient ping information. + """ + verified_peer = Peer(default_eccrypto.generate_key(u"low").pub(), ('1.2.3.4', 5)) + self.overlay.network.discover_address(self.root_peer_pinged, verified_peer.address) + self.overlay.network.add_verified_peer(verified_peer) + + removed = [] + self.walker.check_extend_edge(self.root_peer_pinged, [self.root_peer_pinged.get_median_ping()], removed) + + self.assertListEqual([], self.overlay.sent_walk_to) + self.assertListEqual([], removed) + self.assertListEqual([verified_peer], self.overlay.sent_pings) + + def test_check_extend_edge_ensure_unique(self): + """ + Don't add verified peers without sufficiently unique ping. + """ + verified_peer = Peer(default_eccrypto.generate_key(u"low").pub(), ('1.2.3.4', 5)) + verified_peer.pings = self.root_peer_pinged.pings + self.overlay.network.discover_address(self.root_peer_pinged, verified_peer.address) + self.overlay.network.add_verified_peer(verified_peer) + + removed = [] + self.walker.check_extend_edge(self.root_peer_pinged, [self.root_peer_pinged.get_median_ping()], removed) + + self.assertListEqual([], self.overlay.sent_walk_to) + self.assertListEqual([], removed) + self.assertListEqual([], self.overlay.sent_pings) + self.assertNotIn(verified_peer, self.walker.ancestry) + + def test_check_extend_edge_add_unique(self): + """ + Add verified peers with sufficiently unique ping. + """ + verified_peer = Peer(default_eccrypto.generate_key(u"low").pub(), ('1.2.3.4', 5)) + verified_peer.pings = collections.deque([1.0, 1.1, 0.9, 1.0, 1.0], maxlen=5) + self.overlay.network.discover_address(self.root_peer_pinged, verified_peer.address) + self.overlay.network.add_verified_peer(verified_peer) + + removed = [] + self.walker.check_extend_edge(self.root_peer_pinged, [self.root_peer_pinged.get_median_ping()], removed) + + self.assertListEqual([], self.overlay.sent_walk_to) + self.assertListEqual([self.root_peer_pinged], removed) + self.assertListEqual([], self.overlay.sent_pings) + self.assertIn(verified_peer, self.walker.ancestry) + self.assertEqual(self.root_peer_pinged, self.walker.ancestry[verified_peer]) + self.assertListEqual([verified_peer], self.walker.leaves) + + def test_ensure_leaf_pings_none(self): + """ + If we have room to grow, ping the current leaf and request an introduction. + """ + ping_times = self.walker.ensure_leaf_pings(self.root_peer) + + self.assertListEqual([self.root_peer], self.overlay.sent_pings) + self.assertListEqual([self.root_peer], self.overlay.sent_introduction_requests) + self.assertListEqual([], ping_times) + + def test_ensure_leaf_pings_complete(self): + """ + If we do not have room to grow and a pinged leaf, don't ping the current leaf and request no introduction. + """ + self.walker.max_edge_length = 1 + + ping_times = self.walker.ensure_leaf_pings(self.root_peer_pinged) + + self.assertListEqual([], self.overlay.sent_pings) + self.assertListEqual([], self.overlay.sent_introduction_requests) + self.assertListEqual([0.1], ping_times) + + def test_ensure_leaf_pings_child_pinged(self): + """ + Check if all ping times are measured along an edge, with an unpinged parent. + """ + self.walker.ancestry = {self.root_peer: self.root_peer_pinged} + + ping_times = self.walker.ensure_leaf_pings(self.root_peer) + + self.assertListEqual([self.root_peer], self.overlay.sent_pings) + self.assertListEqual([self.root_peer], self.overlay.sent_introduction_requests) + self.assertListEqual([0.1], ping_times) + + def test_ensure_leaf_pings_parent_pinged(self): + """ + Check if all ping times are measured along an edge, with an unpinged child. + """ + self.walker.ancestry = {self.root_peer_pinged: self.root_peer} + + ping_times = self.walker.ensure_leaf_pings(self.root_peer_pinged) + + self.assertListEqual([self.root_peer], self.overlay.sent_pings) + self.assertListEqual([self.root_peer_pinged], self.overlay.sent_introduction_requests) + self.assertListEqual([0.1], ping_times) + + def test_ensure_leaf_pings_both_pinged(self): + """ + Check if all ping times are reported along an edge, with an all members pinged. + """ + pinged_peer = Peer(default_eccrypto.generate_key(u"low").pub(), ('1.2.3.4', 5)) + pinged_peer.pings = collections.deque([1.0, 1.1, 0.9, 1.0, 1.0], maxlen=5) + self.walker.ancestry = {self.root_peer_pinged: pinged_peer} + + ping_times = self.walker.ensure_leaf_pings(self.root_peer_pinged) + + self.assertListEqual([], self.overlay.sent_pings) + self.assertListEqual([self.root_peer_pinged], self.overlay.sent_introduction_requests) + self.assertListEqual([0.1, 1.0], ping_times) + + def test_garbage_collect(self): + """ + Check if garbage peers are removed from the network. + """ + self.overlay.network.add_verified_peer(self.root_peer) + + self.overlay.max_peers = 0 + self.walker.max_edge_length = 0 + self.walker.garbage_collect() + + self.assertListEqual([], self.overlay.get_peers()) + + def test_garbage_collect_no_leaves(self): + """ + Check if leaves are not garbage collected. + """ + self.overlay.network.add_verified_peer(self.root_peer) + self.walker.leaves.append(self.root_peer) + + self.overlay.max_peers = 0 + self.walker.max_edge_length = 0 + self.walker.garbage_collect() + + self.assertListEqual([self.root_peer], self.overlay.get_peers()) + + def test_garbage_collect_no_ancestry(self): + """ + Check if nodes in edges are not garbage collected. + """ + self.overlay.network.add_verified_peer(self.root_peer) + self.walker.ancestry[self.root_peer] = self.root_peer + + self.overlay.max_peers = 0 + self.walker.max_edge_length = 0 + self.walker.garbage_collect() + + self.assertListEqual([self.root_peer], self.overlay.get_peers()) + + def test_garbage_collect_reroute_middle(self): + """ + Check if a middle node in an edge becomes unresponsive, the edge reroutes itself. + """ + other_peer = Peer(default_eccrypto.generate_key(u"low").pub(), ('1.2.3.4', 5)) + self.overlay.network.add_verified_peer(self.root_peer) + self.overlay.network.add_verified_peer(self.root_peer_pinged) + self.walker.ancestry[self.root_peer] = other_peer + self.walker.ancestry[other_peer] = self.root_peer_pinged + self.walker.roots = [self.root_peer_pinged] + self.walker.leaves = [self.root_peer] + + # Edge: (root) self.root_peer_pinged <- other_peer <- self.root_peer (leaf) + # other_peer is unresponsive, reroute + self.overlay.max_peers = 0 + self.walker.max_edge_length = 0 + self.walker.garbage_collect() + + # New edge: (root) self.root_peer_pinged <- self.root_peer (leaf) + self.assertIn(self.root_peer, self.overlay.get_peers()) + self.assertIn(self.root_peer_pinged, self.overlay.get_peers()) + self.assertNotIn(other_peer, self.overlay.get_peers()) + self.assertIn(self.root_peer, self.walker.ancestry) + self.assertIn(self.root_peer, self.walker.leaves) + self.assertIn(self.root_peer_pinged, self.walker.roots) + self.assertNotIn(other_peer, self.walker.ancestry) + self.assertEqual(self.root_peer_pinged, self.walker.ancestry[self.root_peer]) + + def test_garbage_collect_reroute_leaf(self): + """ + Check if a leaf node in an edge becomes unresponsive, the edge reroutes itself. + """ + other_peer = Peer(default_eccrypto.generate_key(u"low").pub(), ('1.2.3.4', 5)) + self.overlay.network.add_verified_peer(other_peer) + self.overlay.network.add_verified_peer(self.root_peer_pinged) + self.walker.ancestry[self.root_peer] = other_peer + self.walker.ancestry[other_peer] = self.root_peer_pinged + self.walker.roots = [self.root_peer_pinged] + self.walker.leaves = [self.root_peer] + + # Edge: (root) self.root_peer_pinged <- other_peer <- self.root_peer (leaf) + # self.root_peer is unresponsive, reroute + self.overlay.max_peers = 0 + self.walker.max_edge_length = 0 + self.walker.garbage_collect() + + # New edge: (root) self.root_peer_pinged <- other_peer (leaf) + self.assertNotIn(self.root_peer, self.overlay.get_peers()) + self.assertIn(self.root_peer_pinged, self.overlay.get_peers()) + self.assertIn(other_peer, self.overlay.get_peers()) + self.assertIn(other_peer, self.walker.ancestry) + self.assertIn(other_peer, self.walker.leaves) + self.assertIn(self.root_peer_pinged, self.walker.roots) + self.assertNotIn(self.root_peer, self.walker.ancestry) + self.assertEqual(self.root_peer_pinged, self.walker.ancestry[other_peer]) + + def test_garbage_collect_reroute_root(self): + """ + Check if a root node in an edge becomes unresponsive, the edge reroutes itself. + """ + other_peer = Peer(default_eccrypto.generate_key(u"low").pub(), ('1.2.3.4', 5)) + self.overlay.network.add_verified_peer(self.root_peer) + self.overlay.network.add_verified_peer(other_peer) + self.walker.ancestry[self.root_peer] = other_peer + self.walker.ancestry[other_peer] = self.root_peer_pinged + self.walker.roots = [self.root_peer_pinged] + self.walker.leaves = [self.root_peer] + + # Edge: (root) self.root_peer_pinged <- other_peer <- self.root_peer (leaf) + # self.root_peer_pinged is unresponsive, reroute + self.overlay.max_peers = 0 + self.walker.max_edge_length = 0 + self.walker.garbage_collect() + + # New edge: (root) self.root_peer_pinged <- self.root_peer (leaf) + self.assertIn(self.root_peer, self.overlay.get_peers()) + self.assertNotIn(self.root_peer_pinged, self.overlay.get_peers()) + self.assertIn(other_peer, self.overlay.get_peers()) + self.assertIn(self.root_peer, self.walker.ancestry) + self.assertIn(self.root_peer, self.walker.leaves) + self.assertNotIn(self.root_peer_pinged, self.walker.roots) + self.assertIn(other_peer, self.walker.roots) + self.assertNotIn(self.root_peer_pinged, self.walker.ancestry.values()) + self.assertEqual(other_peer, self.walker.ancestry[self.root_peer]) diff --git a/ipv8/test/peerdiscovery/latency/test_peer_selection.py b/ipv8/test/peerdiscovery/latency/test_peer_selection.py new file mode 100644 index 000000000..ac938f0a2 --- /dev/null +++ b/ipv8/test/peerdiscovery/latency/test_peer_selection.py @@ -0,0 +1,49 @@ +from __future__ import absolute_import + +from twisted.trial import unittest + +from ....peerdiscovery.latency.peer_selection import Option, PeerSelector, ReferenceFuncPoint + + +class TestPeerSelector(unittest.TestCase): + + def test_optimal_single_choice(self): + """ + Given a single point of reference and one option closer to it than the other, pick the closer one. + """ + selector = PeerSelector([ReferenceFuncPoint(1.0, 1.0)]) + options = [Option(0.0, 'A'), Option(1.5, 'B')] + + self.assertEqual(Option(1.5, 'B'), selector.decide(options, falloff=0.2)) + + def test_optimal_single_double_bin(self): + """ + Given a single point of reference with a weight of 2.0, allocate two points. + """ + selector = PeerSelector([ReferenceFuncPoint(1.0, 2.0)]) + options = [Option(1.0, 'A'), Option(1.0, 'B')] + + selected = {selector.decide(options, falloff=0.2)} + selected |= {selector.decide(list(option for option in options if option not in selected), falloff=0.2)} + + self.assertSetEqual(set(options), selected) + + def test_optimal_single_none(self): + """ + Given a single point of reference and a two filling options, fill with any option and then don't add more. + """ + selector = PeerSelector([ReferenceFuncPoint(1.0, 1.0)]) + options = [Option(1.0, 'A'), Option(1.0, 'B')] + + self.assertIn(selector.decide(options, falloff=0.2), options) + self.assertIsNone(selector.decide(options, falloff=0.2)) + + def test_optimal_double_under(self): + """ + Errors should be weighted to prefer options under the reference function, instead of over. + """ + selector = PeerSelector([ReferenceFuncPoint(0.0, 1.0), ReferenceFuncPoint(1.0, 1.0)], + included=[Option(0.0, 'A')]) + options = [Option(0.5, 'B'), Option(1.5, 'C')] + + self.assertEqual(Option(1.5, 'C'), selector.decide(options, falloff=0.2)) diff --git a/test_classes_list.txt b/test_classes_list.txt index f6461b472..51558100a 100644 --- a/test_classes_list.txt +++ b/test_classes_list.txt @@ -12,6 +12,10 @@ ipv8/test/peerdiscovery/test_random_discovery.py:TestRandomWalk ipv8/test/peerdiscovery/test_churn.py:TestChurn ipv8/test/peerdiscovery/test_churn.py:TestPingChurn +ipv8/test/peerdiscovery/latency/test_peer_selection.py:TestPeerSelector +ipv8/test/peerdiscovery/latency/test_discovery.py:TestLatencyEdgeWalk +ipv8/test/peerdiscovery/latency/test_community.py:TestLatencyCommunity + ipv8/test/keyvault/test_crypto.py:TestECCrypto ipv8/test/keyvault/test_serialization.py:TestSerialization ipv8/test/keyvault/test_signature.py:TestSignatures