Skip to content
This repository has been archived by the owner on May 16, 2019. It is now read-only.

Commit

Permalink
Browse files Browse the repository at this point in the history
 into noupnp
  • Loading branch information
cpacia committed Jan 6, 2016
2 parents e6d0153 + e1406ce commit f5e22c4
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 19 deletions.
2 changes: 1 addition & 1 deletion api/restapi.py
Expand Up @@ -964,7 +964,7 @@ def history_fetched(ec, history):
if ec:
print ec
else:
for tx_type, txid, i, height, value in history: # pylint: disable=W0612
for tx_type, txid, i, height, value in history: # pylint: disable=W0612

tx = {
"txid": txid.encode("hex"),
Expand Down
3 changes: 1 addition & 2 deletions dht/tests/test_protocol.py
Expand Up @@ -483,13 +483,12 @@ def test_timeout(self):
def handle_response(resp, n):
self.assertFalse(resp[0])
self.assertIsNone(resp[1])
self.assertTrue(self.protocol.router.isNewNode(n))

n = Node(digest("S"), self.addr1[0], self.addr1[1])
d = defer.Deferred().addCallback(handle_response, n)
self.protocol._outstanding["msgID"] = [d, self.addr1]
self.protocol.router.addContact(n)
self.protocol.timeout(self.addr1, n)
self.protocol.timeout(self.addr1)

def test_transferKeyValues(self):
self._connecting_to_connected()
Expand Down
43 changes: 29 additions & 14 deletions net/rpcudp.py
Expand Up @@ -16,6 +16,7 @@
from protos.message import Message, Command, NOT_FOUND
from dht import node
from constants import PROTOCOL_VERSION, SEED_NODE, SEED_NODE_TESTNET
from txrudp.connection import State


class RPCProtocol:
Expand All @@ -26,7 +27,7 @@ class RPCProtocol:
"""
__metaclass__ = abc.ABCMeta

def __init__(self, sourceNode, router, waitTimeout=5):
def __init__(self, sourceNode, router, waitTimeout=2.5):
"""
Args:
proto: A protobuf `Node` object containing info about this node.
Expand Down Expand Up @@ -140,17 +141,22 @@ def _sendResponse(self, response, funcname, msgID, sender, connection):
m.arguments.append(str(arg))
connection.send_message(m.SerializeToString())

def timeout(self, address, node_to_remove):
def timeout(self, address):
"""
This timeout is called by the txrudp connection handler. We will run through the
outstanding messages and callback false on any waiting on this IP address.
"""
if node_to_remove is not None:
self.router.removeContact(node_to_remove)
for msgID, val in self._outstanding.items():
if address == val[1]:
val[0].callback((False, None))
del self._outstanding[msgID]
try:
node_to_remove = self.multiplexer[address].handler.node
if node_to_remove is not None:
self.router.removeContact(node_to_remove)
self.multiplexer[address].shutdown()
except Exception:
pass

def rpc_hole_punch(self, sender, ip, port, relay="False"):
"""
Expand All @@ -177,6 +183,24 @@ def __getattr__(self, name):

def func(address, *args):
msgID = sha1(str(random.getrandbits(255))).digest()
d = defer.Deferred()
if name != "hole_punch":
seed = SEED_NODE_TESTNET if self.multiplexer.testnet else SEED_NODE
if address in self.multiplexer and self.multiplexer[address].state == State.CONNECTED:
timeout = timeout = reactor.callLater(self._waitTimeout, self.timeout, address)
else:
timeout = reactor.callLater(self._waitTimeout, self.hole_punch, seed,
address[0], address[1], "True", msgID)
self._outstanding[msgID] = [d, address, timeout]
self.log.debug("calling remote function %s on %s (msgid %s)" % (name, address, b64encode(msgID)))
elif args[3] in self._outstanding:
prev_msgID = args[3]
args = args[:3]
deferred, addr, hp = self._outstanding[prev_msgID] # pylint: disable=W0612
timeout = reactor.callLater(3, self.timeout, addr)
self._outstanding[prev_msgID] = [deferred, addr, timeout]
self.log.debug("sending hole punch message to %s" % args[0] + ":" + str(args[1]))

m = Message()
m.messageID = msgID
m.sender.MergeFrom(self.sourceNode.getProto())
Expand All @@ -186,16 +210,7 @@ def func(address, *args):
m.arguments.append(str(arg))
m.testnet = self.multiplexer.testnet
data = m.SerializeToString()
d = defer.Deferred()
if name != "hole_punch":
seed = SEED_NODE_TESTNET if self.multiplexer.testnet else SEED_NODE
hole_punch = reactor.callLater(3, self.hole_punch, seed, address[0], address[1], "True")
if address in self.multiplexer:
hole_punch.cancel()
self._outstanding[msgID] = [d, address, hole_punch]
self.log.debug("calling remote function %s on %s (msgid %s)" % (name, address, b64encode(msgID)))
else:
self.log.debug("sending hole punch message to %s" % args[0] + ":" + str(args[1]))

self.multiplexer.send_message(data, address)
return d

Expand Down
5 changes: 3 additions & 2 deletions net/wireprotocol.py
Expand Up @@ -78,12 +78,13 @@ def receive_message(self, datagram):
return False

def handle_shutdown(self):
for processor in self.processors:
processor.timeout((self.connection.dest_addr[0], self.connection.dest_addr[1]), self.node)
self.connection.unregister()
if self.addr:
self.log.info("connection with %s terminated" % self.addr)
try:
self.ban_score.scoring_loop.stop()
except Exception:
pass
try:
self.keep_alive_loop.stop()
except Exception:
Expand Down

0 comments on commit f5e22c4

Please sign in to comment.