Skip to content

Commit

Permalink
Download tracking refactoring
Browse files Browse the repository at this point in the history
- replace PendingDownload singleton dict with a Queue
- total memory and CPU requirements should be reduced
- get rid of somObjectsOfWhichThisRemoteNodeIsAlearedyAware. It has very
little practicle effect and only uses memory
  • Loading branch information
PeterSurda committed Mar 19, 2017
1 parent 1322388 commit 1af49a0
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 158 deletions.
10 changes: 5 additions & 5 deletions src/bitmessageqt/__init__.py
Expand Up @@ -77,7 +77,7 @@
from class_singleWorker import singleWorker
from dialogs import AddAddressDialog
from helper_generic import powQueueSize
from inventory import PendingDownload, PendingUpload, PendingUploadDeadlineException
from inventory import PendingDownloadQueue, PendingUpload, PendingUploadDeadlineException
import knownnodes
import paths
from proofofwork import getPowType
Expand Down Expand Up @@ -2751,16 +2751,16 @@ def quit(self):
elif reply == QtGui.QMessage.Cancel:
return

if PendingDownload().len() > 0:
if PendingDownloadQueue.totalSize() > 0:
reply = QtGui.QMessageBox.question(self, _translate("MainWindow", "Synchronisation pending"),
_translate("MainWindow", "Bitmessage hasn't synchronised with the network, %n object(s) to be downloaded. If you quit now, it may cause delivery delays. Wait until the synchronisation finishes?", None, QtCore.QCoreApplication.CodecForTr, PendingDownload().len()),
_translate("MainWindow", "Bitmessage hasn't synchronised with the network, %n object(s) to be downloaded. If you quit now, it may cause delivery delays. Wait until the synchronisation finishes?", None, QtCore.QCoreApplication.CodecForTr, PendingDownloadQueue.totalSize()),
QtGui.QMessageBox.Yes|QtGui.QMessageBox.No|QtGui.QMessageBox.Cancel, QtGui.QMessageBox.Cancel)
if reply == QtGui.QMessageBox.Yes:
waitForSync = True
elif reply == QtGui.QMessageBox.Cancel:
return
else:
PendingDownload().stop()
PendingDownloadQueue.stop()

if shared.statusIconColor == 'red':
reply = QtGui.QMessageBox.question(self, _translate("MainWindow", "Not connected"),
Expand Down Expand Up @@ -2788,7 +2788,7 @@ def quit(self):
if waitForSync:
self.statusBar().showMessage(_translate(
"MainWindow", "Waiting for finishing synchronisation..."))
while PendingDownload().len() > 0:
while PendingDownloadQueue.totalSize() > 0:
time.sleep(0.5)
QtCore.QCoreApplication.processEvents(QtCore.QEventLoop.AllEvents, 1000)

Expand Down
5 changes: 3 additions & 2 deletions src/bitmessageqt/networkstatus.py
@@ -1,8 +1,9 @@
from PyQt4 import QtCore, QtGui
import time
import shared

from tr import _translate
from inventory import Inventory, PendingDownload, PendingUpload
from inventory import Inventory, PendingDownloadQueue, PendingUpload
import l10n
from retranslateui import RetranslateMixin
from uisignaler import UISignaler
Expand Down Expand Up @@ -45,7 +46,7 @@ def formatByteRate(self, num):
return "%4.0f kB" % num

def updateNumberOfObjectsToBeSynced(self):
self.labelSyncStatus.setText(_translate("networkstatus", "Object(s) to be synced: %n", None, QtCore.QCoreApplication.CodecForTr, PendingDownload().len() + PendingUpload().len()))
self.labelSyncStatus.setText(_translate("networkstatus", "Object(s) to be synced: %n", None, QtCore.QCoreApplication.CodecForTr, PendingDownloadQueue.totalSize() + PendingUpload().len()))

def updateNumberOfMessagesProcessed(self):
self.updateNumberOfObjectsToBeSynced()
Expand Down
5 changes: 1 addition & 4 deletions src/class_outgoingSynSender.py
Expand Up @@ -185,12 +185,10 @@ def run(self):
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
return
someObjectsOfWhichThisRemoteNodeIsAlreadyAware = {} # This is not necessairly a complete list; we clear it from time to time to save memory.
sendDataThreadQueue = Queue.Queue() # Used to submit information to the send data thread for this connection.

sd = sendDataThread(sendDataThreadQueue)
sd.setup(self.sock, peer.host, peer.port, self.streamNumber,
someObjectsOfWhichThisRemoteNodeIsAlreadyAware)
sd.setup(self.sock, peer.host, peer.port, self.streamNumber)
sd.start()

rd = receiveDataThread()
Expand All @@ -199,7 +197,6 @@ def run(self):
peer.host,
peer.port,
self.streamNumber,
someObjectsOfWhichThisRemoteNodeIsAlreadyAware,
self.selfInitiatedConnections,
sendDataThreadQueue,
sd.objectHashHolderInstance)
Expand Down
26 changes: 17 additions & 9 deletions src/class_receiveDataThread.py
Expand Up @@ -32,7 +32,7 @@
from debug import logger
import paths
import protocol
from inventory import Inventory, PendingDownload, PendingUpload
from inventory import Inventory, PendingDownloadQueue, PendingUpload
import queues
import state
import throttle
Expand All @@ -56,7 +56,6 @@ def setup(
HOST,
port,
streamNumber,
someObjectsOfWhichThisRemoteNodeIsAlreadyAware,
selfInitiatedConnections,
sendDataThreadQueue,
objectHashHolderInstance):
Expand All @@ -79,8 +78,8 @@ def setup(
self.initiatedConnection = True
for stream in self.streamNumber:
self.selfInitiatedConnections[stream][self] = 0
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware = someObjectsOfWhichThisRemoteNodeIsAlreadyAware
self.objectHashHolderInstance = objectHashHolderInstance
self.downloadQueue = PendingDownloadQueue()
self.startTime = time.time()

def run(self):
Expand Down Expand Up @@ -147,7 +146,6 @@ def run(self):
except Exception as err:
logger.error('Could not delete ' + str(self.hostIdent) + ' from shared.connectedHostsList.' + str(err))

PendingDownload().threadEnd()
queues.UISignalQueue.put(('updateNetworkStatusTab', 'no data'))
self.checkTimeOffsetNotification()
logger.debug('receiveDataThread ending. ID ' + str(id(self)) + '. The size of the shared.connectedHostsList is now ' + str(len(shared.connectedHostsList)))
Expand Down Expand Up @@ -240,10 +238,20 @@ def processData(self):
self.data = self.data[payloadLength + protocol.Header.size:] # take this message out and then process the next message

if self.data == '': # if there are no more messages
toRequest = []
try:
self.sendgetdata(PendingDownload().pull(100))
except Queue.Full:
for i in range(self.downloadQueue.pendingSize, 100):
while True:
hashId = self.downloadQueue.get(False)
if not hashId in Inventory():
toRequest.append(hashId)
break
# don't track download for duplicates
self.downloadQueue.task_done()
except Queue.Empty:
pass
if len(toRequest) > 0:
self.sendgetdata(toRequest)
self.processData()

def sendpong(self, payload):
Expand Down Expand Up @@ -407,7 +415,7 @@ def sendBigInv(self):
bigInvList = {}
for stream in self.streamNumber:
for hash in Inventory().unexpired_hashes_by_stream(stream):
if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware and not self.objectHashHolderInstance.hasHash(hash):
if not self.objectHashHolderInstance.hasHash(hash):
bigInvList[hash] = 0
numberOfObjectsInInvMessage = 0
payload = ''
Expand Down Expand Up @@ -476,6 +484,7 @@ def recerror(self, data):
def recobject(self, data):
self.messageProcessingStartTime = time.time()
lengthOfTimeWeShouldUseToProcessThisMessage = shared.checkAndShareObjectWithPeers(data)
self.downloadQueue.task_done()

"""
Sleeping will help guarantee that we can process messages faster than a
Expand Down Expand Up @@ -509,8 +518,7 @@ def recinv(self, data):
objectsNewToMe -= Inventory().hashes_by_stream(stream)
logger.info('inv message lists %s objects. Of those %s are new to me. It took %s seconds to figure that out.', numberOfItemsInInv, len(objectsNewToMe), time.time()-startTime)
for item in objectsNewToMe:
PendingDownload().add(item)
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware[item] = 0 # helps us keep from sending inv messages to peers that already know about the objects listed therein
self.downloadQueue.put(item)

# Send a getdata message to our peer to request the object with the given
# hash
Expand Down
9 changes: 3 additions & 6 deletions src/class_sendDataThread.py
Expand Up @@ -39,8 +39,8 @@ def setup(
sock,
HOST,
PORT,
streamNumber,
someObjectsOfWhichThisRemoteNodeIsAlreadyAware):
streamNumber
):
self.sock = sock
self.peer = state.Peer(HOST, PORT)
self.name = "sendData-" + self.peer.host.replace(":", ".") # log parser field separator
Expand All @@ -52,7 +52,6 @@ def setup(
1 # This must be set using setRemoteProtocolVersion command which is sent through the self.sendDataThreadQueue queue.
self.lastTimeISentData = int(
time.time()) # If this value increases beyond five minutes ago, we'll send a pong message to keep the connection alive.
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware = someObjectsOfWhichThisRemoteNodeIsAlreadyAware
if streamNumber == -1: # This was an incoming connection.
self.initiatedConnection = False
else:
Expand Down Expand Up @@ -165,8 +164,7 @@ def run(self):
if self.connectionIsOrWasFullyEstablished: # only send inv messages if we have send and heard a verack from the remote node
payload = ''
for hash in data:
if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware:
payload += hash
payload += hash
if payload != '':
payload = encodeVarint(len(payload)/32) + payload
packet = protocol.CreatePacket('inv', payload)
Expand All @@ -176,7 +174,6 @@ def run(self):
logger.error('sendinv: self.sock.sendall failed')
break
elif command == 'pong':
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware.clear() # To save memory, let us clear this data structure from time to time. As its function is to help us keep from sending inv messages to peers which sent us the same inv message mere seconds earlier, it will be fine to clear this data structure from time to time.
if self.lastTimeISentData < (int(time.time()) - 298):
# Send out a pong message to keep the connection alive.
logger.debug('Sending pong to ' + str(self.peer) + ' to keep connection alive.')
Expand Down
5 changes: 2 additions & 3 deletions src/class_singleListener.py
Expand Up @@ -146,19 +146,18 @@ def run(self):
else:
break

someObjectsOfWhichThisRemoteNodeIsAlreadyAware = {} # This is not necessairly a complete list; we clear it from time to time to save memory.
sendDataThreadQueue = Queue.Queue() # Used to submit information to the send data thread for this connection.
socketObject.settimeout(20)

sd = sendDataThread(sendDataThreadQueue)
sd.setup(
socketObject, HOST, PORT, -1, someObjectsOfWhichThisRemoteNodeIsAlreadyAware)
socketObject, HOST, PORT, -1)
sd.start()

rd = receiveDataThread()
rd.daemon = True # close the main program even if there are threads left
rd.setup(
socketObject, HOST, PORT, -1, someObjectsOfWhichThisRemoteNodeIsAlreadyAware, self.selfInitiatedConnections, sendDataThreadQueue, sd.objectHashHolderInstance)
socketObject, HOST, PORT, -1, self.selfInitiatedConnections, sendDataThreadQueue, sd.objectHashHolderInstance)
rd.start()

logger.info('connected to ' + HOST + ' during INCOMING request.')
Expand Down

0 comments on commit 1af49a0

Please sign in to comment.