Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Some fixes for delays and safely eiting

  • Loading branch information...
commit d3a97db0b8c05bca805798b3c1f029a48f415f8a 1 parent d558bcc
@dcramer authored
View
7 .gitignore
@@ -20,8 +20,11 @@ pip-log.txt
.coverage
.tox
-#Translations
+# Translations
*.mo
-#Mr Developer
+# Mr Developer
.mr.developer.cfg
+
+# Taskmaster
+*.state
View
11 README.rst
@@ -36,10 +36,15 @@ Create an iterator, and callback::
Spawn a master::
- tm-master taskmaster.example:get_jobs --host=0.0.0.0:3050 --key=foobar --size=10000
+ $ tm-master taskmaster.example:get_jobs
-Spawn slaves::
+Spawn as many slaves as you need::
+
+ $ tm-slave taskmaster.example:handle_job
+ $ tm-slave taskmaster.example:handle_job
+ $ tm-slave taskmaster.example:handle_job
+ $ tm-slave taskmaster.example:handle_job
+ $ tm-slave taskmaster.example:handle_job
- tm-slave taskmaster.example:handle_job --host=127.0.0.1:3050 --key=foobar --threads=1
.. note:: All arguments are optional, and will default to localhost with no auth key.
View
2  setup.py
@@ -10,7 +10,7 @@
author="David Cramer",
author_email="dcramer@gmail.com",
url="https://github.com/dcramer/taskmaster",
- packages=find_packages("src/taskmaster"),
+ packages=find_packages("src"),
package_dir={'': 'src'},
entry_points={
'console_scripts': [
View
18 src/taskmaster/cli/master.py
@@ -20,7 +20,7 @@ class QueueServer(Thread):
def __init__(self, host, port, size=None, authkey=None):
Thread.__init__(self)
self.daemon = True
- self.server = None
+ self.started = False
self.queue = Queue.Queue(maxsize=size)
QueueManager.register('get_queue', callable=lambda: self.queue)
@@ -28,8 +28,9 @@ def __init__(self, host, port, size=None, authkey=None):
self.manager = QueueManager(address=(host, int(port)), authkey=authkey)
def run(self):
- self.server = self.manager.get_server()
- self.server.serve_forever()
+ self.started = True
+ server = self.manager.get_server()
+ server.serve_forever()
def put_job(self, job):
self.queue.put(job)
@@ -41,16 +42,20 @@ def has_work(self):
return not self.queue.empty()
def shutdown(self):
- if self.server:
- self.server.shutdown()
+ # TODO:
+ # if self.started:
+ # self.manager.shutdown()
+ pass
-def run(target, size=10000, host='0.0.0.0:3050', key='taskmaster'):
+def run(target, reset=False, size=10000, host='0.0.0.0:3050', key='taskmaster'):
host, port = host.split(':')
server = QueueServer(host, int(port), size=size, authkey=key)
controller = Controller(server, target)
+ if reset:
+ controller.reset()
controller.start()
@@ -61,6 +66,7 @@ def main():
parser.add_option("--host", dest="host", default='0.0.0.0:3050')
parser.add_option("--size", dest="size", default='10000', type=int)
parser.add_option("--key", dest="key", default='taskmaster')
+ parser.add_option("--reset", dest="reset", default=False, action='store_true')
(options, args) = parser.parse_args()
if len(args) != 1:
print 'Usage: tm-master <callback>'
View
8 src/taskmaster/cli/master.py.state
@@ -1,8 +0,0 @@
-(dp1
-S'job'
-p2
-I31888
-sS'job_id'
-p3
-I31888
-s.
View
6 src/taskmaster/cli/slave.py
@@ -29,10 +29,8 @@ def run(target, host='0.0.0.0:3050', key='taskmaster', threads=1):
callback = getattr(module, func_name)
pool = ThreadPool(queue, callback, size=threads)
-
- # TODO: how do we know if we're done?
- while True or not queue.empty():
- time.sleep(0.000001)
+ while pool.is_alive() and not queue.empty():
+ time.sleep(0)
pool.join()
callback(queue.get)
View
8 src/taskmaster/controller.py
@@ -49,7 +49,7 @@ def get_callable_target(cls, target):
def get_progressbar(cls):
from taskmaster.progressbar import Counter, Speed, Timer, ProgressBar, UnknownLength
- widgets = ['Status: ', Counter(), ' | ', Speed(), ' | ', Timer()]
+ widgets = ['Current Job: ', Counter(), ' | ', Speed(), ' | ', Timer()]
pbar = ProgressBar(widgets=widgets, maxval=UnknownLength)
@@ -72,7 +72,7 @@ def state_writer(self):
last_job_id = None
with open(self.state_file, 'w') as state_fp:
while True:
- time.sleep(0.000001)
+ time.sleep(0)
try:
job_id, job = self.server.first_job()
except IndexError:
@@ -115,10 +115,10 @@ def start(self):
for job_id, job in enumerate(self.target(**kwargs), start_id):
self.server.put_job((job_id, job))
- time.sleep(0.000001)
+ time.sleep(0)
while self.server.has_work():
- time.sleep(0.000001)
+ time.sleep(0)
state_writer.join(1)
if self.pbar:
View
2  src/taskmaster/example.py
@@ -10,7 +10,7 @@
def get_jobs(last=0):
# last_job would be sent if state was resumed
# from a previous run
- for i in xrange(last, 1000000000):
+ for i in xrange(last, 100000):
yield i
View
45 src/taskmaster/workers.py
@@ -19,19 +19,26 @@ def __init__(self, queue, target):
def run(self):
self.running = True
- while self.running:
- try:
- job_id, job = self.queue.get_nowait()
- except Empty:
- time.sleep(0.1)
- continue
-
- try:
- self.target(job)
- except KeyboardInterrupt:
- return
- finally:
- self.queue.task_done()
+ try:
+ while self.running:
+ try:
+ job_id, job = self.queue.get_nowait()
+ except Empty:
+ time.sleep(0)
+ continue
+ except EOFError:
+ return
+
+ print repr(job)
+
+ try:
+ self.target(job)
+ except KeyboardInterrupt:
+ return
+ finally:
+ self.queue.task_done()
+ finally:
+ self.running = False
class ThreadPool(object):
@@ -44,7 +51,11 @@ def __init__(self, queue, target, size=10):
for worker in self.workers:
worker.start()
- def join(self):
- for worker in self.workers:
- worker.running = False
- worker.join()
+ def is_alive(self):
+ return any(w.running for w in self.workers)
+
+ def join(self, nowait=False):
+ for worker in (w for w in self.workers if w.running):
+ if nowait:
+ worker.running = False
+ worker.join(0)
Please sign in to comment.
Something went wrong with that request. Please try again.