Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 11 additions & 38 deletions ipv8/messaging/anonymization/caches.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,14 @@
from __future__ import absolute_import

import logging
import time

from twisted.internet.defer import Deferred
from twisted.python.failure import Failure

from .tunnel import CIRCUIT_STATE_CLOSING, CIRCUIT_STATE_READY, PING_INTERVAL
from .tunnel import CIRCUIT_STATE_CLOSING, CIRCUIT_STATE_READY
from ...requestcache import NumberCache, RandomNumberCache


class CircuitRequestCache(NumberCache):
"""
Used to track the total circuit creation time
"""
def __init__(self, community, circuit, timeout):
super(CircuitRequestCache, self).__init__(community.request_cache, u"circuit", circuit.circuit_id)
self.community = community
self.circuit = circuit
self.timeout = timeout

@property
def timeout_delay(self):
return float(self.timeout)

def on_timeout(self):
if self.circuit.state != CIRCUIT_STATE_READY:
reason = 'timeout on CircuitRequestCache, state = %s, candidate = %s' % (
self.circuit.state, self.circuit.peer.address)
self.community.remove_circuit(self.number, reason)


class CreateRequestCache(NumberCache):
"""
Used to track outstanding create messages
Expand Down Expand Up @@ -70,13 +48,14 @@ def on_timeout(self):

class RetryRequestCache(NumberCache):
"""
Used to retry adding additional hops to the circuit.
Used to track adding additional hops to the circuit.
"""
def __init__(self, community, circuit, candidates, retry_func, timeout):
def __init__(self, community, circuit, candidates, max_tries, retry_func, timeout):
super(RetryRequestCache, self).__init__(community.request_cache, u"retry", circuit.circuit_id)
self.community = community
self.circuit = circuit
self.candidates = candidates
self.max_tries = max_tries
self.retry_func = retry_func
self.timeout = timeout

Expand All @@ -85,11 +64,15 @@ def timeout_delay(self):
return float(self.timeout)

def on_timeout(self):
if not self.candidates or self.circuit.state == CIRCUIT_STATE_CLOSING:
if self.circuit.state == CIRCUIT_STATE_CLOSING:
return
if not self.candidates or self.max_tries < 1:
reason = 'timeout on RetryRequestCache (tries left: %d)' % self.max_tries
self.community.remove_circuit(self.circuit.circuit_id, reason)
return

def retry_later(_):
self.retry_func(self.circuit, self.candidates)
self.retry_func(self.circuit, self.candidates, self.max_tries)

later = Deferred()
self.community.request_cache.register_anonymous_task("retry-later", later, delay=0.0)
Expand All @@ -100,19 +83,9 @@ class PingRequestCache(RandomNumberCache):

def __init__(self, community, circuit):
super(PingRequestCache, self).__init__(community.request_cache, u"ping")
self.logger = logging.getLogger(__name__)
self.circuit = circuit
self.community = community

@property
def timeout_delay(self):
return PING_INTERVAL + 5

def on_timeout(self):
if self.circuit.last_activity < time.time() - self.timeout_delay:
self.logger.info("PingRequestCache: no response on ping, circuit %d timed out",
self.circuit.circuit_id)
self.community.remove_circuit(self.circuit.circuit_id, 'ping timeout')
pass


class IPRequestCache(RandomNumberCache):
Expand Down
50 changes: 25 additions & 25 deletions ipv8/messaging/anonymization/community.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,10 @@ def __init__(self):
self.max_traffic = 250 * 1024 * 1024

self.max_packets_without_reply = 50

# Maximum number of seconds circuit creation is allowed to take. Within this time period, the unverified hop
# of the circuit can still change in case it is unresponsive.
# TODO: set this to ~60s next time we change the community ID, or after enough nodes are using the newer code
self.circuit_timeout = 10

self.circuit_timeout = 60
# Maximum number of seconds that a hop allows us to change the next hop
self.unstable_timeout = 60
# Maximum number of seconds adding a single hop to a circuit is allowed to take.
Expand Down Expand Up @@ -306,7 +305,7 @@ def create_circuit(self, goal_hops, ctype=CIRCUIT_TYPE_DATA, callback=None, requ
possible_first_hops = [required_exit]
else:
self.logger.info("Look for a first hop that is not an exit node and is not used before")
first_hops = set([c.peer.address for c in self.circuits.values()])
first_hops = set([c.peer.address for c in self.circuits.values() if c.peer])
possible_first_hops = [c for c in relay_candidates if c.address not in first_hops
and c.address != required_exit.address]

Expand All @@ -317,12 +316,12 @@ def create_circuit(self, goal_hops, ctype=CIRCUIT_TYPE_DATA, callback=None, requ
# Finally, construct the Circuit object and send the CREATE message
circuit_id = self._generate_circuit_id()
self.circuits[circuit_id] = circuit = Circuit(circuit_id, goal_hops, ctype, callback, required_exit, info_hash)
self.request_cache.add(CircuitRequestCache(self, circuit, self.settings.circuit_timeout))
self.send_initial_create(circuit, possible_first_hops)
self.send_initial_create(circuit, possible_first_hops,
self.settings.circuit_timeout // self.settings.next_hop_timeout)

return circuit_id

def send_initial_create(self, circuit, candidate_list):
def send_initial_create(self, circuit, candidate_list, max_tries):
if self.request_cache.has(u"retry", circuit.circuit_id):
self.request_cache.pop(u"retry", circuit.circuit_id)

Expand All @@ -335,7 +334,7 @@ def send_initial_create(self, circuit, candidate_list):

self.logger.info("Adding first hop %s:%d to circuit %d", *(first_hop.address + (circuit.circuit_id,)))

self.request_cache.add(RetryRequestCache(self, circuit, alt_first_hops,
self.request_cache.add(RetryRequestCache(self, circuit, alt_first_hops, max_tries - 1,
self.send_initial_create, self.settings.next_hop_timeout))

self.increase_bytes_sent(circuit, self.send_cell([first_hop],
Expand Down Expand Up @@ -531,9 +530,6 @@ def relay_cell(self, cell):
self.increase_bytes_sent(next_relay, self.send_packet([next_relay.peer], packet))

def _ours_on_created_extended(self, circuit, payload):
if self.request_cache.has(u"retry", payload.circuit_id):
self.request_cache.pop(u"retry", payload.circuit_id)

hop = circuit.unverified_hop

try:
Expand All @@ -553,19 +549,21 @@ def _ours_on_created_extended(self, circuit, payload):
_, candidate_list = decode(self.crypto.decrypt_str(candidate_list_enc,
hop.session_keys[EXIT_NODE],
hop.session_keys[EXIT_NODE_SALT]))
self.send_extend(circuit, candidate_list)
cache = self.request_cache.get(u"retry", payload.circuit_id)
self.send_extend(circuit, candidate_list, cache.max_tries if cache else 1)

elif circuit.state == CIRCUIT_STATE_READY:
self.request_cache.pop(u"circuit", circuit.circuit_id)
self.request_cache.pop(u"retry", payload.circuit_id)

# Execute callback
if circuit.callback:
circuit.callback(circuit)
circuit.callback = None
else:
return

def send_extend(self, circuit, candidate_list):
def send_extend(self, circuit, candidate_list, max_tries):
if self.request_cache.has(u"retry", circuit.circuit_id):
self.request_cache.pop(u"retry", circuit.circuit_id)

ignore_candidates = [self.crypto.key_to_bin(chop.public_key) for chop in circuit.hops] + \
[self.my_peer.public_key]
if circuit.required_exit:
Expand All @@ -588,8 +586,7 @@ def send_extend(self, circuit, candidate_list):
if not self.crypto.is_key_compatible(public_key):
candidate_list.pop(i)

pub_key = next(iter(candidate_list), None)
extend_hop_public_bin = pub_key
extend_hop_public_bin = next(iter(candidate_list), None)
extend_hop_addr = None

if extend_hop_public_bin:
Expand All @@ -603,8 +600,10 @@ def send_extend(self, circuit, candidate_list):
# Only retry if we are allowed to use another node
if not become_exit or not circuit.required_exit:
alt_candidates = [c for c in candidate_list if c != extend_hop_public_bin]
self.request_cache.add(RetryRequestCache(self, circuit, alt_candidates,
self.send_extend, self.settings.next_hop_timeout))
else:
alt_candidates = []
self.request_cache.add(RetryRequestCache(self, circuit, alt_candidates, max_tries - 1,
self.send_extend, self.settings.next_hop_timeout))

self.increase_bytes_sent(circuit, self.send_cell([circuit.peer],
u"extend",
Expand Down Expand Up @@ -705,7 +704,7 @@ def join_circuit(self, create_payload, previous_node_address):
self.relay_session_keys[circuit_id] = self.crypto.generate_session_keys(shared_secret)

peers_list = [peer for peer in self.get_candidates(PEER_FLAG_RELAY)
if peer.public_key.key_to_bin() not in self.get_candidates(PEER_FLAG_EXIT_ANY)][:4]
if peer not in self.get_candidates(PEER_FLAG_EXIT_ANY)][:4]
peers_keys = {c.public_key.key_to_bin(): c for c in peers_list}

peer = Peer(create_payload.node_public_key, previous_node_address)
Expand Down Expand Up @@ -756,8 +755,7 @@ def on_created(self, source_address, payload, _):
payload.key,
payload.auth,
payload.candidate_list_enc))
elif self.request_cache.has(u"circuit", payload.circuit_id) and \
self.request_cache.has(u"retry", payload.circuit_id):
elif self.request_cache.has(u"retry", payload.circuit_id):
circuit = self.circuits[circuit_id]
self._ours_on_created_extended(circuit, payload)
else:
Expand Down Expand Up @@ -818,7 +816,7 @@ def on_extend(self, source_address, payload, _):

@tc_lazy_wrapper_unsigned(ExtendedPayload)
def on_extended(self, source_address, payload, _):
if not self.request_cache.has(u"circuit", payload.circuit_id):
if not self.request_cache.has(u"retry", payload.circuit_id):
self.logger.warning("Received unexpected extended for circuit %s", payload.circuit_id)
return

Expand Down Expand Up @@ -891,7 +889,9 @@ def do_ping(self, exclude=None):
# Ping circuits. Pings are only sent to the first hop, subsequent hops will relay the ping.
exclude = [] if exclude is None else exclude
for circuit in self.circuits.values():
if circuit.state in [CIRCUIT_STATE_READY, CIRCUIT_STATE_EXTENDING] and circuit.circuit_id not in exclude:
if circuit.state in [CIRCUIT_STATE_READY, CIRCUIT_STATE_EXTENDING] \
and circuit.circuit_id not in exclude \
and circuit.hops:
cache = self.request_cache.add(PingRequestCache(self, circuit))
self.increase_bytes_sent(circuit, self.send_cell([circuit.peer], u"ping",
PingPayload(circuit.circuit_id, cache.number)))
Expand Down
1 change: 1 addition & 0 deletions ipv8/messaging/anonymization/hidden_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import os
import random
import struct
import time

from twisted.internet.defer import DeferredList, fail, inlineCallbacks, returnValue
from twisted.internet.task import LoopingCall
Expand Down
4 changes: 2 additions & 2 deletions ipv8/test/messaging/anonymization/test_community.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ def test_reuse_partial_circuit(self):

# Retry to extend the circuit
circuit.required_exit = None
self.nodes[0].overlay.send_extend(circuit, [self.nodes[3].overlay.my_peer.public_key.key_to_bin()])
self.nodes[0].overlay.send_extend(circuit, [self.nodes[3].overlay.my_peer.public_key.key_to_bin()], 1)
yield self.deliver_messages()

# Circuit should now be 0 -> 1 -> 3
Expand Down Expand Up @@ -386,7 +386,7 @@ def test_reuse_partial_circuit_first_hop(self):
yield self.introduce_nodes()

# Retry to extend the circuit
self.nodes[0].overlay.send_initial_create(circuit, [self.nodes[3].overlay.my_peer])
self.nodes[0].overlay.send_initial_create(circuit, [self.nodes[3].overlay.my_peer], 1)
yield self.deliver_messages()

# Circuit should now be 0 -> 2 -> 3
Expand Down