Skip to content

Commit

Permalink
Asyncore updates
Browse files Browse the repository at this point in the history
- clean object tracking dictionaries in the cleaner thread
- clean up close / handle_close
- add locking to tracking dictionaries
  • Loading branch information
PeterSurda committed Jun 2, 2017
1 parent 4c17a18 commit d75d920
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 25 deletions.
4 changes: 4 additions & 0 deletions src/class_singleCleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ def run(self):
if thread.isAlive() and hasattr(thread, 'downloadQueue'):
thread.downloadQueue.clear()

# inv/object tracking
for connection in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values():
connection.clean()

# TODO: cleanup pending upload / download

if state.shutdown == 0:
Expand Down
2 changes: 1 addition & 1 deletion src/network/advanceddispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def handle_connect(self):
def state_close(self):
pass

def close(self):
def handle_close(self):
self.read_buf = b""
self.write_buf = b""
self.state = "close"
Expand Down
8 changes: 4 additions & 4 deletions src/network/bmproto.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def state_bm_header(self):
self.bm_proto_reset()
self.set_state("bm_header", length=1, expectBytes=protocol.Header.size)
logger.debug("Bad magic")
self.close()
self.handle_close("Bad magic")
return False
if self.payloadLength > BMProto.maxMessageSize:
self.invalid = True
Expand Down Expand Up @@ -127,7 +127,7 @@ def state_bm_command(self):
else:
#print "Skipping command %s due to invalid data" % (self.command)
logger.debug("Closing due to invalid command %s", self.command)
self.close()
self.handle_close("Invalid command %s" % (self.command))
return False
if retval:
self.set_state("bm_header", length=self.payloadLength, expectBytes=protocol.Header.size)
Expand Down Expand Up @@ -445,12 +445,12 @@ def assembleAddr(peerList):
payload += struct.pack('>H', peer.port) # remote port
return protocol.CreatePacket('addr', payload)

def close(self, reason=None):
def handle_close(self, reason=None):
self.set_state("close")
if reason is None:
#logger.debug("%s:%i: closing, %s", self.destination.host, self.destination.port, ''.join(traceback.format_stack()))
logger.debug("%s:%i: closing", self.destination.host, self.destination.port)
#traceback.print_stack()
else:
logger.debug("%s:%i: closing, %s", self.destination.host, self.destination.port, reason)
AdvancedDispatcher.close(self)
AdvancedDispatcher.handle_close(self)
16 changes: 9 additions & 7 deletions src/network/connectionpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ def __init__(self):
self.streams = []
self.lastSpawned = 0
self.spawnWait = 0.3

self.bootstrapped = False

def handleReceivedObject(self, streamNumber, hashid, connection = None):
Expand All @@ -41,12 +40,15 @@ def handleReceivedObject(self, streamNumber, hashid, connection = None):
if not i.fullyEstablished:
continue
try:
del i.objectsNewToMe[hashid]
with i.objectsNewToMeLock:
del i.objectsNewToMe[hashid]
except KeyError:
i.objectsNewToThem[hashid] = True
with i.objectsNewToThemLock:
i.objectsNewToThem[hashid] = True
if i == connection:
try:
del i.objectsNewToThem[hashid]
with i.objectsNewToThemLock:
del i.objectsNewToThem[hashid]
except KeyError:
pass

Expand Down Expand Up @@ -171,11 +173,11 @@ def loop(self):
logger.info('Starting UDP socket(s).')
if len(self.listeningSockets) > 0 and not acceptConnections:
for i in self.listeningSockets:
i.close()
i.handle_close()
logger.info('Stopped listening for incoming connections.')
if len(self.udpSockets) > 0 and not acceptConnections:
for i in self.udpSockets:
i.close()
i.handle_close()
logger.info('Stopped udp sockets.')

# while len(asyncore.socket_map) > 0 and state.shutdown == 0:
Expand All @@ -194,7 +196,7 @@ def loop(self):
if i.fullyEstablished:
i.writeQueue.put(protocol.CreatePacket('ping'))
else:
i.close("Timeout (%is)" % (time.time() - i.lastTx))
i.handle_close("Timeout (%is)" % (time.time() - i.lastTx))
for i in self.inboundConnections.values() + self.outboundConnections.values() + self.listeningSockets.values() + self.udpSockets.values():
if not (i.accepting or i.connecting or i.connected):
reaper.append(i)
Expand Down
25 changes: 18 additions & 7 deletions src/network/objectracker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from Queue import Queue
import time
from threading import RLock

from inventory import Inventory
from network.downloadqueue import DownloadQueue
Expand Down Expand Up @@ -29,11 +30,14 @@ class ObjectTracker(object):

def __init__(self):
self.objectsNewToMe = {}
self.objectsNewToMeLock = RLock()
self.objectsNewToThem = {}
self.objectsNewToThemLock = RLock()
self.downloadPending = 0
self.downloadQueue = Queue()
self.initInvBloom()
self.initAddrBloom()
self.lastCleaned = time.time()

def initInvBloom(self):
if haveBloom:
Expand All @@ -48,15 +52,20 @@ def initAddrBloom(self):
error_rate=ObjectTracker.invErrorRate)

def clean(self):
if self.lastcleaned < time.time() - BMQueues.invCleanPeriod:
if self.lastCleaned < time.time() - ObjectTracker.invCleanPeriod:
if haveBloom:
if PendingDownloadQueue().size() == 0:
self.initInvBloom()
self.initAddrBloom()
else:
# release memory
self.objectsNewToMe = self.objectsNewToMe.copy()
self.objectsNewToThem = self.objectsNewToThem.copy()
else:
# release memory
with self.objectsNewToMeLock:
tmp = self.objectsNewToMe.copy()
self.objectsNewToMe = tmp
with self.objectsNewToThemLock:
tmp = self.objectsNewToThem.copy()
self.objectsNewToThem = tmp
self.lastCleaned = time.time()

def hasObj(self, hashid):
if haveBloom:
Expand All @@ -69,11 +78,13 @@ def handleReceivedInventory(self, hashId):
self.invBloom.add(hashId)
elif hashId in Inventory():
try:
del self.objectsNewToThem[hashId]
with self.objectsNewToThemLock:
del self.objectsNewToThem[hashId]
except KeyError:
pass
else:
self.objectsNewToMe[hashId] = True
with self.objectsNewToMeLock:
self.objectsNewToMe[hashId] = True
# self.DownloadQueue.put(hashId)

def hasAddr(self, addr):
Expand Down
10 changes: 4 additions & 6 deletions src/network/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,10 @@ def sendBigInv(self):

def handle_connect_event(self):
try:
asyncore.dispatcher.handle_connect_event(self)
AdvancedDispatcher.handle_connect_event(self)
except socket.error as e:
if e.errno in asyncore._DISCONNECTED:
self.close("Connection failed")
logger.debug("%s:%i: Connection failed: %s" % (self.destination.host, self.destination.port, str(e)))
return
self.writeQueue.put(protocol.assembleVersionMessage(self.destination.host, self.destination.port, network.connectionpool.BMConnectionPool().streams, False))
#print "%s:%i: Sending version" % (self.destination.host, self.destination.port)
Expand All @@ -162,15 +162,13 @@ def handle_read(self):
try:
TLSDispatcher.handle_read(self)
except socket.error as e:
#print "%s:%i: socket error: %s" % (self.destination.host, self.destination.port, str(e))
self.close()
logger.debug("%s:%i: Handle read fail: %s" % (self.destination.host, self.destination.port, str(e)))

def handle_write(self):
try:
TLSDispatcher.handle_write(self)
except socket.error as e:
#print "%s:%i: socket error: %s" % (self.destination.host, self.destination.port, str(e))
self.close()
logger.debug("%s:%i: Handle write fail: %s" % (self.destination.host, self.destination.port, str(e)))


class Socks5BMConnection(Socks5Connection, TCPConnection):
Expand Down

0 comments on commit d75d920

Please sign in to comment.