Skip to content

Commit

Permalink
Merge pull request #2211 from lfdversluis/tunnel-community-improvements
Browse files Browse the repository at this point in the history
Tunnel community improvements
  • Loading branch information
whirm committed May 31, 2016
2 parents 386123e + eafedc9 commit bc89b12
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 33 deletions.
2 changes: 1 addition & 1 deletion Tribler/Core/APIImplementation/LaunchManyCore.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def load_communities():
from Tribler.community.tunnel.hidden_community import HiddenTunnelCommunity
# The multichain community MUST be auto_loaded before the tunnel community,
# because it must be unloaded after the tunnel, so that the tunnel closures can be signed
tunnel_settings = TunnelSettings(self.session.get_install_dir(), tribler_session=self.session)
tunnel_settings = TunnelSettings(tribler_session=self.session)
tunnel_kwargs = {'tribler_session': self.session, 'settings': tunnel_settings}
self.tunnel_community = self.dispersy.define_auto_load(
HiddenTunnelCommunity, dispersy_member, load=True, kargs=tunnel_kwargs)[0]
Expand Down
36 changes: 36 additions & 0 deletions Tribler/Test/Community/Tunnel/test_tunnelcommunity.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import time

from Tribler.Core.Utilities.twisted_thread import deferred
from Tribler.Test.Community.Tunnel.test_tunnel_base import AbstractTestTunnelCommunity
from Tribler.community.tunnel.routing import Circuit, RelayRoute
from Tribler.community.tunnel.tunnel_community import TunnelExitSocket, CircuitRequestCache, PingRequestCache
Expand Down Expand Up @@ -84,3 +86,37 @@ def test_check_destroy(self):
self.tunnel_community.relay_from_to[42] = RelayRoute(42, sock_addr)
for i in self.tunnel_community.check_destroy([msg1]):
self.assertIsInstance(i, type(msg1))

@deferred(timeout=5)
def test_send_to_destination_ip(self):
"""
This test checks if the ip address can be resolved when a destination object
is set and is_valid_address returns true.
Will result in an exception as self.transport is not initialized.
Which is catched by the try except in on_ip_address.
"""
circuit_id = 1337
sock_addr = "127.0.0.1"
exit_tunnel = TunnelExitSocket(circuit_id, self.tunnel_community, sock_addr, False)
self.tunnel_community.exit_sockets[circuit_id] = exit_tunnel
exit_tunnel.ips[("127.0.0.1", 8080)] = -1
data = "ffffffff".decode("HEX") + "1" * 25
exit_tunnel.sendto(data, ("127.0.0.1", 8080))
return exit_tunnel.close()


@deferred(timeout=5)
def test_send_to_ip_deferred(self):
"""
This test checks if the ip address can be resolved when a destination object
is set and is_valid_address returns false.
Which will be cancelled by the close() function being called immediately.
"""
circuit_id = 1337
sock_addr = "127.0.0.1"
exit_tunnel = TunnelExitSocket(circuit_id, self.tunnel_community, sock_addr, False)
self.tunnel_community.exit_sockets[circuit_id] = exit_tunnel
exit_tunnel.ips[("localhost", -1)] = -1
data = "ffffffff".decode("HEX") + "1" * 25
exit_tunnel.sendto(data, ("localhost", -1))
return exit_tunnel.close()
77 changes: 45 additions & 32 deletions Tribler/community/tunnel/tunnel_community.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
# Written by Egbert Bouman

import random
import socket
import time
from collections import defaultdict
from cryptography.exceptions import InvalidTag
from twisted.internet.error import MessageLengthError

from twisted.internet.defer import maybeDeferred, succeed

from twisted.internet import reactor
from twisted.internet.protocol import DatagramProtocol
from twisted.internet.task import LoopingCall

from Tribler import dispersy
from Tribler.Core.Utilities.encoding import decode, encode
from Tribler.community.bartercast4.statistics import BartercastStatisticTypes, _barter_statistics
from Tribler.community.tunnel import (CIRCUIT_ID_PORT, CIRCUIT_STATE_EXTENDING, CIRCUIT_STATE_READY, CIRCUIT_TYPE_DATA,
Expand All @@ -34,6 +35,7 @@
from Tribler.dispersy.message import DropMessage, Message
from Tribler.dispersy.requestcache import NumberCache, RandomNumberCache
from Tribler.dispersy.resolution import PublicResolution
from Tribler.dispersy.taskmanager import TaskManager
from Tribler.dispersy.util import call_on_reactor_thread
import logging

Expand Down Expand Up @@ -109,10 +111,11 @@ def on_timeout(self):
pass


class TunnelExitSocket(DatagramProtocol):
class TunnelExitSocket(DatagramProtocol, TaskManager):

def __init__(self, circuit_id, community, sock_addr, mid=None):
self.tunnel_logger = logging.getLogger('TunnelLogger')
super(TunnelExitSocket, self).__init__()

self.port = None
self.sock_addr = sock_addr
Expand All @@ -134,26 +137,23 @@ def enabled(self):
def sendto(self, data, destination):
if self.check_num_packets(destination, False):
if TunnelConversion.is_allowed(data):
if dispersy.util.is_valid_address(destination):
ip_address = destination[0]
else:
def on_error(failure):
self.tunnel_logger.error("Can't resolve ip address for hostname %s. Failure: %s",
destination[0], failure)

def on_ip_address(ip_address):
self.tunnel_logger.debug("Resolved hostname %s to ip_address %s", destination[0], ip_address)
try:
ip_address = socket.gethostbyname(destination[0])
self.tunnel_logger.info("Resolved ip address %s for hostname %s",
ip_address,
destination[0])
except:
self.tunnel_logger.error("Can't resolve ip address for hostname %s", destination[0])

try:
self.transport.write(data, (ip_address, destination[1]))
except Exception, e:
self.tunnel_logger.error("Failed to write data to transport: %s. Destination: %s",
e[1],
repr(destination))
raise

self.community.increase_bytes_sent(self, len(data))
self.transport.write(data, (ip_address, destination[1]))
self.community.increase_bytes_sent(self, len(data))
except (AttributeError, MessageLengthError) as exception:
self.tunnel_logger.error(
"Failed to write data to transport: %s. Destination: %r error was: %r",
exception, destination, exception)

resolve_ip_address_deferred = reactor.resolve(destination[0])
resolve_ip_address_deferred.addCallbacks(on_ip_address, on_error)
self.register_task("resolving_%r" % destination[0], resolve_ip_address_deferred)
else:
self.tunnel_logger.error("dropping forbidden packets from exit socket with circuit_id %d",
self.circuit_id)
Expand All @@ -172,10 +172,19 @@ def tunnel_data(self, source, data):
self.community.send_data([Candidate(self.sock_addr, False)], self.circuit_id, ('0.0.0.0', 0), source, data)

def close(self):
"""
Closes the UDP socket if enabled and cancels all pending deferreds.
:return: A deferred that fires once the UDP socket has closed.
"""
self.cancel_all_pending_tasks()

done_closing_deferred = succeed(None)
if self.enabled:
self.port.stopListening()
done_closing_deferred = maybeDeferred(self.port.stopListening)
self.port = None

return done_closing_deferred

def check_num_packets(self, ip, incoming):
if self.ips[ip] < 0:
return True
Expand All @@ -197,7 +206,7 @@ def check_num_packets(self, ip, incoming):

class TunnelSettings(object):

def __init__(self, install_dir=None, tribler_session=None):
def __init__(self, tribler_session=None):
self.tunnel_logger = logging.getLogger('TunnelLogger')

self.crypto = TunnelCrypto()
Expand Down Expand Up @@ -531,7 +540,7 @@ def create_circuit(self, goal_hops, ctype=CIRCUIT_TYPE_DATA, callback=None, requ
circuit.unverified_hop.address = first_hop.sock_addr
circuit.unverified_hop.dh_secret, circuit.unverified_hop.dh_first_part = self.crypto.generate_diffie_secret()

self.tunnel_logger.error("creating circuit %d of %d hops. First hop: %s:%d", circuit_id, circuit.goal_hops,
self.tunnel_logger.info("creating circuit %d of %d hops. First hop: %s:%d", circuit_id, circuit.goal_hops,
first_hop.sock_addr[0], first_hop.sock_addr[1])

self.circuits[circuit_id] = circuit
Expand Down Expand Up @@ -650,10 +659,14 @@ def remove_exit_socket(self, circuit_id, additional_info='', destroy=False):
self.tunnel_logger.warning("MULTICHAIN: Tunnel candidate not found")
if exit_socket.enabled:
self.tunnel_logger.info("Removing exit socket %d %s", circuit_id, additional_info)
exit_socket.close()
# Remove old session key
if circuit_id in self.relay_session_keys:
del self.relay_session_keys[circuit_id]

def on_exit_socket_closed(_):
# Remove old session key
if circuit_id in self.relay_session_keys:
del self.relay_session_keys[circuit_id]

exit_socket.close().addCallback(on_exit_socket_closed)

else:
self.tunnel_logger.error("could not remove exit socket %d %s", circuit_id, additional_info)

Expand Down Expand Up @@ -1212,13 +1225,13 @@ def on_stats_request(self, messages):
self.global_time,), payload=(request.payload.identifier, stats))
self.send_packet([request.candidate], u"stats-response", response.packet)
else:
self.tunnel_logger.error("Got stats request from unknown crawler %s", request.candidate.sock_addr)
self.tunnel_logger.warning("Got stats request from unknown crawler %s", request.candidate.sock_addr)

def on_stats_response(self, messages):
for message in messages:
request = self.request_cache.get(u"stats", message.payload.identifier)
if not request:
self.tunnel_logger.error("Got unexpected stats response from %s", message.candidate.sock_addr)
self.tunnel_logger.warning("Got unexpected stats response from %s", message.candidate.sock_addr)
continue

request.handler(message.candidate, message.payload.stats)
Expand All @@ -1245,7 +1258,7 @@ def exit_data(self, circuit_id, sock_addr, destination, data):
try:
self.exit_sockets[circuit_id].sendto(data, destination)
except:
self.tunnel_logger.error("Dropping data packets while EXITing")
self.tunnel_logger.warning("Dropping data packets while EXITing")
else:
self.tunnel_logger.error("Dropping data packets with unknown circuit_id")

Expand Down

0 comments on commit bc89b12

Please sign in to comment.