Skip to content

Commit

Permalink
Asyncore update
Browse files Browse the repository at this point in the history
- duplicate checking implemented
- connection pool vs. socket closing cleanup
  • Loading branch information
PeterSurda committed May 30, 2017
1 parent abaa2c7 commit fa9811f
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 27 deletions.
5 changes: 5 additions & 0 deletions src/network/bmobject.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from addresses import calculateInventoryHash
from debug import logger
from inventory import Inventory
import protocol
import state

Expand Down Expand Up @@ -64,6 +65,10 @@ def checkStream(self):
logger.debug('The streamNumber %s isn\'t one we are interested in.' % self.streamNumber)
raise BMObjectUnwantedStreamError()

def checkAlreadyHave(self):
if self.inventoryHash in Inventory():
raise BMObjectAlreadyHaveError()

def checkMessage(self):
return

Expand Down
40 changes: 18 additions & 22 deletions src/network/bmproto.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,27 +280,24 @@ def bm_command_object(self):
self.object.checkProofOfWorkSufficient()
self.object.checkEOLSanity()
self.object.checkStream()

try:
if self.object.objectType == protocol.OBJECT_GETPUBKEY:
self.object.checkGetpubkey()
elif self.object.objectType == protocol.OBJECT_PUBKEY:
self.object.checkPubkey(self.payload[self.payloadOffset:self.payloadOffset+32])
elif self.object.objectType == protocol.OBJECT_MSG:
self.object.checkMessage()
elif self.object.objectType == protocol.OBJECT_BROADCAST:
self.object.checkBroadcast(self.payload[self.payloadOffset:self.payloadOffset+32])
# other objects don't require other types of tests
except BMObjectAlreadyHaveError:
pass
else:
Inventory()[self.object.inventoryHash] = (
self.object.objectType, self.object.streamNumber, self.payload[objectOffset:], self.object.expiresTime, self.object.tag)
objectProcessorQueue.put((self.object.objectType,self.object.data))
#DownloadQueue().task_done(self.object.inventoryHash)
invQueue.put((self.object.streamNumber, self.object.inventoryHash, self))
#ObjUploadQueue().put(UploadElem(self.object.streamNumber, self.object.inventoryHash))
#broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash))
self.object.checkAlreadyHave()

if self.object.objectType == protocol.OBJECT_GETPUBKEY:
self.object.checkGetpubkey()
elif self.object.objectType == protocol.OBJECT_PUBKEY:
self.object.checkPubkey(self.payload[self.payloadOffset:self.payloadOffset+32])
elif self.object.objectType == protocol.OBJECT_MSG:
self.object.checkMessage()
elif self.object.objectType == protocol.OBJECT_BROADCAST:
self.object.checkBroadcast(self.payload[self.payloadOffset:self.payloadOffset+32])
# other objects don't require other types of tests
Inventory()[self.object.inventoryHash] = (
self.object.objectType, self.object.streamNumber, self.payload[objectOffset:], self.object.expiresTime, self.object.tag)
objectProcessorQueue.put((self.object.objectType,self.object.data))
#DownloadQueue().task_done(self.object.inventoryHash)
invQueue.put((self.object.streamNumber, self.object.inventoryHash, self))
#ObjUploadQueue().put(UploadElem(self.object.streamNumber, self.object.inventoryHash))
#broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash))
return True

def _decode_addr(self):
Expand Down Expand Up @@ -456,5 +453,4 @@ def close(self, reason=None):
#traceback.print_stack()
else:
logger.debug("%s:%i: closing, %s", self.destination.host, self.destination.port, reason)
network.connectionpool.BMConnectionPool().removeConnection(self)
AdvancedDispatcher.close(self)
17 changes: 12 additions & 5 deletions src/network/connectionpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ def addConnection(self, connection):

def removeConnection(self, connection):
if isinstance(connection, network.udp.UDPSocket):
return
del self.udpSockets[connection.destination.host]
if isinstance(connection, network.tcp.TCPServer):
del self.listeningSockets[state.Peer(connection.destination.host, connection.destination.port)]
elif connection.isOutbound:
try:
del self.outboundConnections[connection.destination]
Expand Down Expand Up @@ -99,9 +101,10 @@ def startListening(self):
def startUDPSocket(self, bind=None):
if bind is None:
host = self.getListeningIP()
self.udpSockets[host] = network.udp.UDPSocket(host=host)
udpSocket = network.udp.UDPSocket(host=host)
else:
self.udpSockets[bind] = network.udp.UDPSocket(host=bind)
udpSocket = network.udp.UDPSocket(host=bind)
self.udpSockets[udpSocket.destination.host] = udpSocket

def loop(self):
# defaults to empty loop if outbound connections are maxed
Expand Down Expand Up @@ -164,12 +167,10 @@ def loop(self):
if len(self.listeningSockets) > 0 and not acceptConnections:
for i in self.listeningSockets:
i.close()
self.listeningSockets = {}
logger.info('Stopped listening for incoming connections.')
if len(self.udpSockets) > 0 and not acceptConnections:
for i in self.udpSockets:
i.close()
self.udpSockets = {}
logger.info('Stopped udp sockets.')

# while len(asyncore.socket_map) > 0 and state.shutdown == 0:
Expand All @@ -179,6 +180,7 @@ def loop(self):
loopTime = 1.0
asyncore.loop(timeout=loopTime, count=10)

reaper = []
for i in self.inboundConnections.values() + self.outboundConnections.values():
minTx = time.time() - 20
if i.fullyEstablished:
Expand All @@ -188,3 +190,8 @@ def loop(self):
i.writeQueue.put(protocol.CreatePacket('ping'))
else:
i.close("Timeout (%is)" % (time.time() - i.lastTx))
for i in self.inboundConnections.values() + self.outboundConnections.values() + self.listeningSockets.values() + self.udpSockets.values():
if not (i.accepting or i.connecting or i.connected):
reaper.append(i)
for i in reaper:
self.removeConnection(i)
1 change: 1 addition & 0 deletions src/network/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ def __init__(self, host='127.0.0.1', port=8444):
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind((host, port))
self.destination = state.Peer(host, port)
self.listen(5)

def handle_accept(self):
Expand Down

0 comments on commit fa9811f

Please sign in to comment.