Skip to content

Commit

Permalink
Multiprocessing PoW fixes and improvements
Browse files Browse the repository at this point in the history
- the multiprocessing PoW should now work correctly
- it also should be interruptible correctly and the GUI will ask about
  it during exit
  • Loading branch information
PeterSurda committed Oct 22, 2016
1 parent 9dd09a4 commit 32f1e04
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 43 deletions.
3 changes: 1 addition & 2 deletions src/bitmessageqt/__init__.py
Expand Up @@ -2735,8 +2735,7 @@ def quit(self):
waitForSync = False

# C PoW currently doesn't support interrupting and OpenCL is untested
# Since Windows doesn't have UNIX-style signals, it probably doesn't work on Win either, so disabling there
if getPowType == "python" and ('win32' in sys.platform or 'win64' in sys.platform) and (powQueueSize() > 0 or invQueueSize() > 0):
if getPowType() == "python" and (powQueueSize() > 0 or invQueueSize() > 0):
reply = QtGui.QMessageBox.question(self, _translate("MainWindow", "Proof of work pending"),
_translate("MainWindow", "%n object(s) pending proof of work", None, QtCore.QCoreApplication.CodecForTr, powQueueSize()) + ", " +
_translate("MainWindow", "%n object(s) waiting to be distributed", None, QtCore.QCoreApplication.CodecForTr, invQueueSize()) + "\n\n" +
Expand Down
5 changes: 2 additions & 3 deletions src/helper_generic.py
Expand Up @@ -45,10 +45,9 @@ def signal_handler(signal, frame):
logger.error("Got signal %i in %s/%s", signal, current_process().name, current_thread().name)
if current_process().name == "RegExParser":
# on Windows this isn't triggered, but it's fine, it has its own process termination thing
print "RegExParser interrupted"
raise SystemExit
if current_process().name != "MainProcess":
raise StopIteration("Interrupted")
if "PoolWorker" in current_process().name:
raise SystemExit
if current_thread().name != "MainThread":
return
logger.error("Got signal %i", signal)
Expand Down
57 changes: 26 additions & 31 deletions src/proofofwork.py
Expand Up @@ -2,11 +2,11 @@
#import time
#from multiprocessing import Pool, cpu_count
import hashlib
import signal
from struct import unpack, pack
import sys
import time
from debug import logger
from shared import config, frozen, codePath, shutdown, safeConfigGetBoolean, UISignalQueue
import shared
import openclpow
import tr
import os
Expand All @@ -30,7 +30,7 @@ def _set_idle():
def _pool_worker(nonce, initialHash, target, pool_size):
_set_idle()
trialValue = float('inf')
while trialValue > target and shutdown == 0:
while trialValue > target:
nonce += pool_size
trialValue, = unpack('>Q',hashlib.sha512(hashlib.sha512(pack('>Q',nonce) + initialHash).digest()).digest()[0:8])
return [trialValue, nonce]
Expand All @@ -39,57 +39,52 @@ def _doSafePoW(target, initialHash):
logger.debug("Safe PoW start")
nonce = 0
trialValue = float('inf')
while trialValue > target and shutdown == 0:
while trialValue > target and shared.shutdown == 0:
nonce += 1
trialValue, = unpack('>Q',hashlib.sha512(hashlib.sha512(pack('>Q',nonce) + initialHash).digest()).digest()[0:8])
if shutdown != 0:
if shared.shutdown != 0:
raise StopIteration("Interrupted")
logger.debug("Safe PoW done")
return [trialValue, nonce]

def _doFastPoW(target, initialHash):
logger.debug("Fast PoW start")
import time
from multiprocessing import Pool, cpu_count
try:
pool_size = cpu_count()
except:
pool_size = 4
try:
maxCores = config.getint('bitmessagesettings', 'maxcores')
maxCores = shared.config.getint('bitmessagesettings', 'maxcores')
except:
maxCores = 99999
if pool_size > maxCores:
pool_size = maxCores

# temporarily disable handlers
#int_handler = signal.getsignal(signal.SIGINT)
#term_handler = signal.getsignal(signal.SIGTERM)
#signal.signal(signal.SIGINT, signal.SIG_IGN)
#signal.signal(signal.SIGTERM, signal.SIG_IGN)

pool = Pool(processes=pool_size)
result = []
for i in range(pool_size):
result.append(pool.apply_async(_pool_worker, args = (i, initialHash, target, pool_size)))

# re-enable handlers
#signal.signal(signal.SIGINT, int_handler)
#signal.signal(signal.SIGTERM, term_handler)
result.append(pool.apply_async(_pool_worker, args=(i, initialHash, target, pool_size)))

while True:
if shutdown >= 1:
pool.terminate()
if shared.shutdown > 0:
try:
pool.terminate()
pool.join()
except:
pass
raise StopIteration("Interrupted")
for i in range(pool_size):
if result[i].ready():
try:
result[i].successful()
except AssertionError:
pool.terminate()
pool.join()
raise StopIteration("Interrupted")
result = result[i].get()
pool.terminate()
pool.join() #Wait for the workers to exit...
pool.join()
logger.debug("Fast PoW done")
return result[0], result[1]
time.sleep(0.2)
Expand All @@ -102,7 +97,7 @@ def _doCPoW(target, initialHash):
logger.debug("C PoW start")
nonce = bmpow(out_h, out_m)
trialValue, = unpack('>Q',hashlib.sha512(hashlib.sha512(pack('>Q',nonce) + initialHash).digest()).digest()[0:8])
if shutdown != 0:
if shared.shutdown != 0:
raise StopIteration("Interrupted")
logger.debug("C PoW done")
return [trialValue, nonce]
Expand All @@ -114,11 +109,11 @@ def _doGPUPoW(target, initialHash):
#print "{} - value {} < {}".format(nonce, trialValue, target)
if trialValue > target:
deviceNames = ", ".join(gpu.name for gpu in openclpow.gpus)
UISignalQueue.put(('updateStatusBar', tr._translate("MainWindow",'Your GPU(s) did not calculate correctly, disabling OpenCL. Please report to the developers.')))
shared.UISignalQueue.put(('updateStatusBar', tr._translate("MainWindow",'Your GPU(s) did not calculate correctly, disabling OpenCL. Please report to the developers.')))
logger.error("Your GPUs (%s) did not calculate correctly, disabling OpenCL. Please report to the developers.", deviceNames)
openclpow.ctx = False
raise Exception("GPU did not calculate correctly.")
if shutdown != 0:
if shared.shutdown != 0:
raise StopIteration("Interrupted")
logger.debug("GPU PoW done")
return [trialValue, nonce]
Expand Down Expand Up @@ -149,17 +144,17 @@ def estimate(difficulty, format = False):
return ret

def getPowType():
if safeConfigGetBoolean('bitmessagesettings', 'opencl') and openclpow.has_opencl():
if shared.safeConfigGetBoolean('bitmessagesettings', 'opencl') and openclpow.has_opencl():
return "OpenCL"
if bmpow:
return "C"
return "python"

def run(target, initialHash):
if shutdown != 0:
if shared.shutdown != 0:
raise
target = int(target)
if safeConfigGetBoolean('bitmessagesettings', 'opencl') and openclpow.has_opencl():
if shared.safeConfigGetBoolean('bitmessagesettings', 'opencl') and openclpow.has_opencl():
# trialvalue1, nonce1 = _doGPUPoW(target, initialHash)
# trialvalue, nonce = _doFastPoW(target, initialHash)
# print "GPU: %s, %s" % (trialvalue1, nonce1)
Expand All @@ -178,7 +173,7 @@ def run(target, initialHash):
raise
except:
pass # fallback
if frozen == "macosx_app" or not frozen:
if shared.frozen == "macosx_app" or not shared.frozen:
# on my (Peter Surda) Windows 10, Windows Defender
# does not like this and fights with PyBitmessage
# over CPU, resulting in very slow PoW
Expand Down Expand Up @@ -207,7 +202,7 @@ def run(target, initialHash):
bitmsglib = 'bitmsghash64.dll'
try:
# MSVS
bso = ctypes.WinDLL(os.path.join(codePath(), "bitmsghash", bitmsglib))
bso = ctypes.WinDLL(os.path.join(shared.codePath(), "bitmsghash", bitmsglib))
logger.info("Loaded C PoW DLL (stdcall) %s", bitmsglib)
bmpow = bso.BitmessagePOW
bmpow.restype = ctypes.c_ulonglong
Expand All @@ -217,7 +212,7 @@ def run(target, initialHash):
logger.error("C PoW test fail.", exc_info=True)
try:
# MinGW
bso = ctypes.CDLL(os.path.join(codePath(), "bitmsghash", bitmsglib))
bso = ctypes.CDLL(os.path.join(shared.codePath(), "bitmsghash", bitmsglib))
logger.info("Loaded C PoW DLL (cdecl) %s", bitmsglib)
bmpow = bso.BitmessagePOW
bmpow.restype = ctypes.c_ulonglong
Expand All @@ -228,7 +223,7 @@ def run(target, initialHash):
bso = None
else:
try:
bso = ctypes.CDLL(os.path.join(codePath(), "bitmsghash", bitmsglib))
bso = ctypes.CDLL(os.path.join(shared.codePath(), "bitmsghash", bitmsglib))
logger.info("Loaded C PoW DLL %s", bitmsglib)
except:
bso = None
Expand Down
7 changes: 0 additions & 7 deletions src/shared.py
Expand Up @@ -17,7 +17,6 @@
import Queue
import random
from multiprocessing import active_children, Queue as mpQueue, Lock as mpLock
from signal import SIGTERM
import socket
import sys
import stat
Expand Down Expand Up @@ -512,12 +511,6 @@ def doCleanShutdown():
parserInputQueue.put(None, False)
except Queue.Full:
pass
for child in active_children():
try:
logger.info("Killing PoW child %i", child.pid)
os.kill(child.pid, SIGTERM)
except:
pass
broadcastToSendDataQueues((0, 'shutdown', 'no data'))
objectProcessorQueue.put(('checkShutdownVariable', 'no data'))
for thread in threading.enumerate():
Expand Down

0 comments on commit 32f1e04

Please sign in to comment.