Skip to content

Commit

Permalink
Added latency community
Browse files Browse the repository at this point in the history
  • Loading branch information
qstokkink committed Sep 12, 2019
1 parent 8f81fab commit c4c62d5
Show file tree
Hide file tree
Showing 9 changed files with 1,269 additions and 0 deletions.
Empty file.
315 changes: 315 additions & 0 deletions ipv8/peerdiscovery/latency/community.py
@@ -0,0 +1,315 @@
from __future__ import absolute_import, division

import os
import struct
import time

from twisted.internet.task import LoopingCall

from .discovery import LatencyEdgeWalk
from .peer_selection import Option, PeerSelector, generate_reference
from ...community import DEFAULT_MAX_PEERS
from ...lazy_community import lazy_wrapper
from ...messaging.lazy_payload import VariablePayload
from ...messaging.payload_headers import BinMemberAuthenticationPayload
from ...peerdiscovery.community import DiscoveryCommunity
from ...requestcache import NumberCache


class ProposalPayload(VariablePayload):
"""
Packet for proposing to another peer.
"""
format_list = ['H', 'varlenH']
names = ["nonce", 'peerid']


class ProposalAcceptPayload(VariablePayload):
"""
Packet for accepting a proposal from another peer.
"""
format_list = ['H', 'varlenH']
names = ["nonce", 'peerid']


class ProposalRejectPayload(VariablePayload):
"""
Packet for rejecting a proposal from another peer.
"""
format_list = ['H', 'varlenH']
names = ["nonce", 'peerid']


class BreakMatchPayload(VariablePayload):
"""
Break a previously accepted proposal.
"""
format_list = ['I', 'varlenH']
names = ["time", 'peerid']


class ProposalCache(NumberCache):
"""
Cache for keeping track of a made proposal.
This cache concludes upon proposal (1) accept, (2) reject or (3) timeout.
"""

def __init__(self, overlay, peer, nonce):
super(ProposalCache, self).__init__(overlay.request_cache, u"proposal-cache",
self.number_from_pk_nonce(peer.mid, nonce))
self.overlay = overlay
self.peer = peer

@classmethod
def number_from_pk_nonce(cls, public_key, nonce):
"""
Create an identifier from a public key and a nonce.
:param public_key: the counterparty public key
:type public_key: str or bytes
:param nonce: the nonce for this proposal
:type nonce: str or bytes
:return: the identifier for the given parameters
:rtype: int
"""
number = nonce
for c in public_key:
number <<= 8
number += c if isinstance(c, int) else ord(c)
return number

def on_timeout(self):
"""
When timing out, we remove this proposal from the open proposals.
:returns: None
"""
try:
self.overlay.open_proposals.remove(self.peer)
except KeyError:
self.overlay.logger.debug("Proposal timed out, but peer already removed.")


def generate_nonce():
"""
Create a 2 byte securely random nonce.
:return: the 2-byte securely random integer
:rtype: int
"""
return struct.unpack(">H", os.urandom(2))[0]


DEFAULT_PING_BINS = [x / 100.0 for x in range(10, 200, 10)]


class LatencyCommunity(DiscoveryCommunity):

def __init__(self, my_peer, endpoint, network, max_peers=DEFAULT_MAX_PEERS, anonymize=False, preferred_count=60,
k_window=5, ping_time_bins=DEFAULT_PING_BINS):
"""
:param preferred_count: the maximum amount of partners, you will probably get between this and half of this
:type preferred_count: int
:param k_window: the amount of proposals to consider at the same time
:type k_window: int
:param ping_time_bins: the list of function evaluation points
:type ping_time_bins: [float]
"""
super(LatencyCommunity, self).__init__(my_peer, endpoint, network, max_peers=max_peers, anonymize=anonymize)

self.ping_reference_bins = generate_reference(lambda x: 1 / x, ping_time_bins, preferred_count)
self.preferred_count = preferred_count
self.k_window = k_window

self.possible_peers = [] # List of peers in the discovery
self.acceptable_peers = set() # Peers we want included in our next round

self.open_proposals = set()
self.accepted_proposals = set()

self.decode_map.update({
chr(5): self.on_proposal,
chr(6): self.on_accept_proposal,
chr(7): self.on_reject_proposal,
chr(8): self.on_break_match
})

self.request_cache.register_task("update_acceptable_peers",
LoopingCall(self.update_acceptable_peers)).start(10.0, False)

def get_available_strategies(self):
out = super(LatencyCommunity, self).get_available_strategies()
out['LatencyEdgeWalk'] = LatencyEdgeWalk
return out

def check_payload(self, payload):
"""
Check if a given payload (with `peerid` field) is targeted to us.
:param payload: the payload to check for
:type payload: Payload
:except: RuntimeError
:returns: None
"""
if payload.peerid != self.my_peer.mid:
raise RuntimeError("Someone is replay attacking us!")

def update_acceptable_peers(self):
"""
Propose to a fresh set of peers or swap out suboptimal peers.
:returns: None
"""
# Clean up mappings
peer_set = self.get_peers()
self.open_proposals = set(p for p in self.open_proposals if p in peer_set)
self.accepted_proposals = set(p for p in self.accepted_proposals if p in peer_set)
# If necessary, send out new proposals
open_for_proposal_count = self.preferred_count - len(self.accepted_proposals) - len(self.open_proposals)
if open_for_proposal_count > 0:
peer_selector = PeerSelector(self.ping_reference_bins)
options = []
# Only consider peers that are not already accepted or proposed to
for peer in self.possible_peers:
if (peer not in self.accepted_proposals
and peer not in self.open_proposals):
options.append(Option(peer.get_median_ping(), peer))
# Maximally send out K_WINDOW proposals at the same time
choices = []
for _ in range(self.k_window):
choice = peer_selector.decide(options)
if choice is not None:
options.remove(choice)
choices.append(choice)
# If the K_WINDOW goes over the PREFERRED_COUNT, stop
if len(peer_selector.included) == (self.preferred_count - len(self.accepted_proposals)
- len(self.open_proposals)):
break
new_options = [tup.obj for tup in choices]
for peer in new_options:
self.send_proposal(peer)
self.acceptable_peers = new_options + list(self.open_proposals) + list(self.accepted_proposals)
elif self.preferred_count == len(self.accepted_proposals):
# Remove the current worst peer, if there is one
peer_selector = PeerSelector(self.ping_reference_bins,
included=[Option(peer.get_median_ping(), peer)
for peer in self.accepted_proposals])
worst = peer_selector.current_worst()
if worst:
peer = worst.obj
self.accepted_proposals.remove(peer)
auth = BinMemberAuthenticationPayload(self.my_peer.public_key.key_to_bin()).to_pack_list()
plist = BreakMatchPayload(int(time.time() / 10), peer.mid).to_pack_list()
packet = self._ez_pack(self._prefix, 8, [auth, plist])
self.endpoint.send(peer.address, packet)

def send_proposal(self, peer):
"""
Send a proposal to a given peer.
:param peer: the peer to send the proposal to
:type peer: Peer
:returns: None
"""
nonce = generate_nonce()
self.open_proposals.add(peer)
self.request_cache.add(ProposalCache(self, peer, nonce))
auth = BinMemberAuthenticationPayload(self.my_peer.public_key.key_to_bin()).to_pack_list()
payload = ProposalPayload(nonce, peer.mid).to_pack_list()
self.endpoint.send(peer.address, self._ez_pack(self._prefix, 5, [auth, payload]))

@lazy_wrapper(ProposalPayload)
def on_proposal(self, peer, payload):
"""
Upon receiving a proposal, respond with an acceptation or rejection.
:param peer: the peer we have received a proposal from
:type peer: Peer
:param payload: the proposal payload
:type payload: ProposalPayload
:returns: None
"""
self.check_payload(payload)
auth = BinMemberAuthenticationPayload(self.my_peer.public_key.key_to_bin()).to_pack_list()
if ((len(self.open_proposals) + len(self.accepted_proposals) < self.preferred_count
and peer in self.acceptable_peers)
or peer in self.open_proposals or peer in self.accepted_proposals):
plist = ProposalAcceptPayload(payload.nonce, peer.mid).to_pack_list()
packet = self._ez_pack(self._prefix, 6, [auth, plist])
self.accepted_proposals.add(peer)
else:
plist = ProposalRejectPayload(payload.nonce, peer.mid).to_pack_list()
packet = self._ez_pack(self._prefix, 7, [auth, plist])
self.endpoint.send(peer.address, packet)

@lazy_wrapper(ProposalAcceptPayload)
def on_accept_proposal(self, peer, payload):
"""
If someone accepted our proposal update our mappings.
:param peer: the peer that sent us the accept
:type peer: Peer
:param payload: the acceptation payload
:type payload: ProposalAcceptPayload
:returns: None
"""
self.check_payload(payload)
try:
request_cache = self.request_cache.pop(u"proposal-cache",
ProposalCache.number_from_pk_nonce(peer.mid, payload.nonce))
if request_cache:
if len(self.open_proposals) + len(self.accepted_proposals) < self.preferred_count:
self.accepted_proposals.add(peer)
else:
auth = BinMemberAuthenticationPayload(self.my_peer.public_key.key_to_bin()).to_pack_list()
plist = BreakMatchPayload(int(time.time() / 10), peer.mid).to_pack_list()
packet = self._ez_pack(self._prefix, 8, [auth, plist])
self.endpoint.send(peer.address, packet)
self.open_proposals.remove(peer)
else:
self.logger.debug("Got timed out or unwanted proposal response.")
except KeyError:
self.logger.debug("Got timed out or unwanted proposal response.")

@lazy_wrapper(ProposalRejectPayload)
def on_reject_proposal(self, peer, payload):
"""
If someone rejected our proposal update our mappings.
:param peer: the peer that sent us the reject
:type peer: Peer
:param payload: the rejection payload
:type payload: ProposalRejectPayload
:returns: None
"""
self.check_payload(payload)
try:
request_cache = self.request_cache.pop(u"proposal-cache",
ProposalCache.number_from_pk_nonce(peer.mid, payload.nonce))
if request_cache:
self.open_proposals.remove(peer)
else:
self.logger.debug("Got timed out or unwanted proposal response.")
except KeyError:
self.logger.debug("Got timed out or unwanted proposal response.")

@lazy_wrapper(BreakMatchPayload)
def on_break_match(self, peer, payload):
"""
If someone broke a match with us.
:param peer: the peer that sent us the break
:type peer: Peer
:param payload: the break payload
:type payload: BreakMatchPayload
:returns: None
"""
self.check_payload(payload) # Peer id is correct
current_time = int(time.time() / 10)
if not current_time - 1 <= payload.time <= current_time:
self.logger.debug("Got timed out match break.")
return
try:
self.accepted_proposals.remove(peer)
except KeyError:
self.logger.debug("Tried to match break a non-accepted peer.")

0 comments on commit c4c62d5

Please sign in to comment.