Skip to content

Commit

Permalink
Executor: Use a semaphore to throttle input, since SimpleQueue's don'…
Browse files Browse the repository at this point in the history
…t have maxsize. Also add a status update lock.
  • Loading branch information
aht committed Jan 10, 2010
1 parent f62eee9 commit c235895
Showing 1 changed file with 26 additions and 24 deletions.
50 changes: 26 additions & 24 deletions stream.py
Expand Up @@ -849,10 +849,8 @@ class Executor(object):
Provide fine-grained control over a ThreadPool or ProcessPool.
>>> executor = Executor(ProcessPool, map(lambda x: x*x))
>>> executor.submit(*range(10))
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> executor.submit('foo')
[10]
>>> job_ids = executor.submit(*range(10))
>>> foo_id = executor.submit('foo')
>>> executor.finish()
>>> set(executor.result) == set([0, 1, 4, 9, 16, 25, 36, 49, 64, 81])
True
Expand All @@ -871,31 +869,33 @@ def process_job_id(input):
poolsize=poolsize,
args=args,
kwargs=kwargs)
self.job_count = 0
self.jobcount = 0
self.status = []
self.waitqueue = Queue.Queue()
if poolclass is ProcessPool:
self.waitqueue = multiprocessing.queues.SimpleQueue()
self.resultqueue = multiprocessing.queues.SimpleQueue()
self.failqueue = multiprocessing.queues.SimpleQueue()
self.lock = multiprocessing.Lock()
self.sema = multiprocessing.BoundedSemaphore(poolsize)
else:
self.waitqueue = Queue.Queue()
self.resultqueue = Queue.Queue()
self.failqueue = Queue.Queue()
self.lock = threading.Lock()
self.sema = threading.BoundedSemaphore(poolsize)
self.result = Stream(_iterqueue(self.resultqueue))
self.failure = Stream(_iterqueue(self.failqueue))

self.statupdate_lock = threading.Lock()
## Acquired by trackers to update job statuses.

self.sema = threading.BoundedSemaphore(poolsize)
## Used to throttle transfer from waitqueue to pool.inqueue,
## acquired by input_feeder, released by trackers.

def feed_input():
while 1:
id, item = self.waitqueue.get()
if item is StopIteration:
break
else:
self.sema.acquire()
with self.lock:
with self.statupdate_lock:
if self.status[id] == 'SUBMITTED':
self.pool.inqueue.put((id, item))
self.status[id] = 'RUNNING'
Expand All @@ -908,7 +908,7 @@ def feed_input():
def track_result():
for id, item in self.pool:
self.sema.release()
with self.lock:
with self.statupdate_lock:
self.status[id] = 'FINISHED'
self.resultqueue.put(item)
self.resultqueue.put(StopIteration)
Expand All @@ -919,7 +919,7 @@ def track_failure():
for outval, exception in self.pool.failure:
self.sema.release()
id, item = outval
with self.lock:
with self.statupdate_lock:
self.status[id] = 'FAILED'
self.failqueue.put((item, exception))
self.failqueue.put(StopIteration)
Expand All @@ -928,23 +928,25 @@ def track_failure():

def submit(self, *items):
"""Return a list of job ids corresponding to the submitted items."""
last_id = self.job_count
last_id = self.jobcount
for item in items:
id = self.job_count
with self.lock:
self.waitqueue.put((id, item))
self.status.append('SUBMITTED')
self.job_count += 1
return range(last_id, id+1)
id = self.jobcount
self.jobcount += 1
self.status.append('SUBMITTED')
self.waitqueue.put((id, item))
if id > last_id:
return range(last_id, id+1)
else:
return id

def cancel(self, *ids):
"""Cancel jobs with associated ids."""
with self.lock:
with self.statupdate_lock:
for id in ids:
try:
if self.status[id] == 'SUBMITTED':
self.status[id] = 'CANCELLED'
except KeyError:
except IndexError:
pass

def finish(self):
Expand All @@ -957,7 +959,7 @@ def shutdown(self):
This call will block until all workers die.
"""
with self.lock:
with self.statupdate_lock:
self.pool.inqueue.put(StopIteration)
self.waitqueue.put((None, StopIteration))
_iterqueue(self.waitqueue) >> item[-1]
Expand Down

0 comments on commit c235895

Please sign in to comment.