Skip to content

Commit

Permalink
add task error handling and retry
Browse files Browse the repository at this point in the history
  • Loading branch information
cff29546 committed Jan 9, 2023
1 parent 19398b4 commit b046b23
Showing 1 changed file with 30 additions and 8 deletions.
38 changes: 30 additions & 8 deletions pzmap2dzi/mp.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
hotkey = None

class Worker(object):
def __init__(self, func, data, wid, out_q):
def __init__(self, func, data, wid, out_q, retry):
self.func = func
self.data = data
self.retry = retry
self.wid = wid
self.q = multiprocessing.Queue()
self.out_q = out_q
Expand All @@ -25,12 +26,25 @@ def start(self):
self.p = p

def _run(self):
state = None
while True:
self.out_q.put(self.wid)
self.out_q.put([self.wid, state])
job = self.q.get()
if job == 'stop':
break
self.func(self.data, job)
attempt = 0
while attempt < self.retry:
try:
self.func(self.data, job)
attempt = self.retry
state = True
except Exception as e:
attempt += 1
print('job:[{}] error, retry {}/{} Exception: {}'.format(
job, attempt, self.retry, e))
state = e
if state != True:
print('job:[{}] failed, Exception: {}'.format(job, state))

def push(self, job):
self.q.put(job)
Expand Down Expand Up @@ -69,7 +83,7 @@ def __init__(self, func, data, n):
self.n = n
self.q = multiprocessing.Queue()

def run(self, jobs, verbose=False, break_key=None):
def run(self, jobs, verbose=False, break_key=None, retry=3):
if len(jobs) == 0:
if verbose:
print('Nothing to do!')
Expand All @@ -83,7 +97,7 @@ def run(self, jobs, verbose=False, break_key=None):
return True
workers = []
for i in range(min(self.n, len(jobs))):
w = Worker(self.func, self.data, i, self.q)
w = Worker(self.func, self.data, i, self.q, retry)
w.start()
workers.append(w)
hk = None
Expand All @@ -95,9 +109,10 @@ def run(self, jobs, verbose=False, break_key=None):
if verbose:
total = len(jobs)
done = -working
error = 0
stop = False
while working > 0:
wid = self.q.get()
wid, state = self.q.get()
if hk and hk.peek():
stop = True
if len(splits[wid]) == 0:
Expand All @@ -109,9 +124,16 @@ def run(self, jobs, verbose=False, break_key=None):
workers[wid].stop()
working -= 1
if verbose:
done += 1
if state in [None, True]:
done += 1
else:
error += 1
if done >= 0:
print('job: {}/{} worker: {}/{} '.format(done, total, working, len(workers)), end='\r')
log = 'job: {}/{} '.format(done, total)
log += 'worker: {}/{} '.format(working, len(workers))
if error:
log += 'error: {} '.format(error)
print(log + ' ', end='\r')
if verbose:
print('')
return not stop
Expand Down

0 comments on commit b046b23

Please sign in to comment.