Skip to content

Commit

Permalink
Asyncore fixes
Browse files Browse the repository at this point in the history
- better handling of WSA* checks on non-windows systems
- handle EBADF on Windows/select
- better timeouts / loop lengths in main asyncore loop and
spawning new connections
- remove InvThread prints
  • Loading branch information
PeterSurda committed May 29, 2017
1 parent 74f1a74 commit a5c1b0c
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 19 deletions.
6 changes: 0 additions & 6 deletions src/bitmessagemain.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,6 @@ def _fixSocket():
if sys.platform.startswith('linux'):
socket.SO_BINDTODEVICE = 25

if not sys.platform.startswith('win'):
errno.WSAEWOULDBLOCK = errno.EWOULDBLOCK
errno.WSAENETUNREACH = errno.ENETUNREACH
errno.WSAECONNREFUSED = errno.ECONNREFUSED
errno.WSAEHOSTUNREACH = errno.EHOSTUNREACH

if not sys.platform.startswith('win'):
return

Expand Down
30 changes: 18 additions & 12 deletions src/network/asyncore_pollchoose.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@
errorcode
try:
from errno import WSAEWOULDBLOCK
except:
pass
except (ImportError, AttributeError):
WSAEWOULDBLOCK = EWOULDBLOCK

from ssl import SSLError, SSL_ERROR_WANT_READ, SSL_ERROR_WANT_WRITE

_DISCONNECTED = frozenset((ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED, EPIPE,
Expand Down Expand Up @@ -199,6 +200,9 @@ def select_poller(timeout=0.0, map=None):
r, w, e = select.select(r, w, e, timeout)
except KeyboardInterrupt:
return
except socket.error as err:
if err.args[0] in (EBADF):
return

for fd in random.sample(r, len(r)):
obj = map.get(fd)
Expand Down Expand Up @@ -369,12 +373,18 @@ def loop(timeout=30.0, use_poll=False, map=None, count=None,
# then poll
poller(timeout, map)
else:
timeout /= count
if timeout == 0:
deadline = 0
else:
deadline = time.time() + timeout
while map and count > 0:
# fill buckets first
update_sent()
update_received()
poller(timeout, map)
subtimeout = deadline - time.time()
if subtimeout <= 0:
break
poller(subtimeout, map)
# then poll
count = count - 1

Expand Down Expand Up @@ -555,10 +565,8 @@ def send(self, data):
else:
raise
except socket.error as why:
if why.args[0] in (EAGAIN, EWOULDBLOCK) or \
(sys.platform.startswith('win') and \
err.errno == WSAEWOULDBLOCK):
return 0
if why.args[0] in (EAGAIN, EWOULDBLOCK, WSAEWOULDBLOCK):
return 0
elif why.args[0] in _DISCONNECTED:
self.handle_close()
return 0
Expand All @@ -582,10 +590,8 @@ def recv(self, buffer_size):
raise
except socket.error as why:
# winsock sometimes raises ENOTCONN
if why.args[0] in (EAGAIN, EWOULDBLOCK) or \
(sys.platform.startswith('win') and \
err.errno == WSAEWOULDBLOCK):
return b''
if why.args[0] in (EAGAIN, EWOULDBLOCK, WSAEWOULDBLOCK):
return b''
if why.args[0] in _DISCONNECTED:
self.handle_close()
return b''
Expand Down
10 changes: 9 additions & 1 deletion src/network/connectionpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

@Singleton
class BMConnectionPool(object):

def __init__(self):
asyncore.set_rates(
BMConfigParser().safeGetInt("bitmessagesettings", "maxdownloadrate") * 1024,
Expand All @@ -28,6 +29,8 @@ def __init__(self):
self.listeningSockets = {}
self.udpSockets = {}
self.streams = []
self.lastSpawned = 0
self.spawnWait = 0.3

self.bootstrapped = False

Expand Down Expand Up @@ -146,6 +149,8 @@ def loop(self):
if e.errno == errno.ENETUNREACH:
continue

self.lastSpawned = time.time()

if acceptConnections and len(self.listeningSockets) == 0:
self.startListening()
logger.info('Listening for incoming connections.')
Expand All @@ -169,7 +174,10 @@ def loop(self):

# while len(asyncore.socket_map) > 0 and state.shutdown == 0:
# print "loop, state = %s" % (proxy.state)
asyncore.loop(timeout=2.0, count=1)
loopTime = float(self.spawnWait)
if self.lastSpawned < time.time() - self.spawnWait:
loopTime = 1.0
asyncore.loop(timeout=loopTime, count=10)

for i in self.inboundConnections.values() + self.outboundConnections.values():
minTx = time.time() - 20
Expand Down
3 changes: 3 additions & 0 deletions src/network/invthread.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from binascii import hexlify
import collections
import Queue
import random
Expand Down Expand Up @@ -35,6 +36,7 @@ def run(self):
try:
(stream, hash) = invQueue.get(False)
self.holdHash (stream, hash)
#print "Holding hash %i, %s" % (stream, hexlify(hash))
except Queue.Empty:
break

Expand All @@ -50,6 +52,7 @@ def run(self):
except KeyError:
continue
if len(hashes) > 0:
#print "sending inv of %i" % (len(hashes))
connection.writeQueue.put(protocol.CreatePacket('inv', addresses.encodeVarint(len(hashes)) + b"".join(hashes)))
self.collectionOfInvs[iterator] = {}
iterator += 1
Expand Down

0 comments on commit a5c1b0c

Please sign in to comment.