Skip to content

Commit

Permalink
Add implicit targets
Browse files Browse the repository at this point in the history
  • Loading branch information
dcramer committed May 4, 2012
1 parent 8327652 commit 5699a46
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 21 deletions.
10 changes: 7 additions & 3 deletions README.rst
Expand Up @@ -38,15 +38,19 @@ Create an iterator, and callback::

Spawn a master::

$ tm-master taskmaster.example:get_jobs
$ tm-master taskmaster.example

Spawn a slave::

$ tm-slave taskmaster.example:handle_job
$ tm-slave taskmaster.example

Or spawn 8 slaves (each containing a threadpool)::

$ tm-spawn taskmaster.example:handle_job 8
$ tm-spawn taskmaster.example 8

Dont like the magical function discover for master/slave? Specify your own targets:

$ tm-master taskmaster.example:get_jobs
$ tm-slave taskmaster.example:handle_job

.. note:: All arguments are optional, and will default to localhost with no auth key.
9 changes: 4 additions & 5 deletions src/taskmaster/cli/slave.py
Expand Up @@ -7,6 +7,7 @@
"""

from multiprocessing.managers import BaseManager
from taskmaster.util import import_target
from taskmaster.workers import ThreadPool
import time

Expand All @@ -24,16 +25,14 @@ def run(target, host='0.0.0.0:3050', key='taskmaster', threads=1):
m.connect()
queue = m.get_queue()

mod_path, func_name = target.split(':', 1)
module = __import__(mod_path, {}, {}, [func_name], -1)
callback = getattr(module, func_name)
target = import_target(target, 'handle_job')

pool = ThreadPool(queue, callback, size=threads)
pool = ThreadPool(queue, target, size=threads)
while pool.is_alive() and not queue.empty():
time.sleep(0)

pool.join()
callback(queue.get)
target(queue.get)


def main():
Expand Down
15 changes: 2 additions & 13 deletions src/taskmaster/controller.py
Expand Up @@ -11,14 +11,15 @@
import cPickle as pickle
from os import path, unlink
from threading import Thread
from taskmaster.util import import_target


class Controller(object):
def __init__(self, server, target, state_file=None, progressbar=True):
cls = type(self)

if isinstance(target, basestring):
target = cls.get_callable_target(target)
target = import_target(target, 'get_jobs')

if not state_file:
target_file = sys.modules[target.__module__].__file__
Expand All @@ -33,18 +34,6 @@ def __init__(self, server, target, state_file=None, progressbar=True):
else:
self.pbar = None

@classmethod
def get_callable_target(cls, target):
try:
mod_path, func_name = target.split(':', 1)
except ValueError:
raise ValueError('target must be in form of `path.to.module:function_name`')

module = __import__(mod_path, {}, {}, [func_name], -1)
callback = getattr(module, func_name)

return callback

@classmethod
def get_progressbar(cls):
from taskmaster.progressbar import Counter, Speed, Timer, ProgressBar, UnknownLength
Expand Down
31 changes: 31 additions & 0 deletions src/taskmaster/util.py
@@ -0,0 +1,31 @@
"""
taskmaster.util
~~~~~~~~~~~~~~~
:copyright: (c) 2010 DISQUS.
:license: Apache License 2.0, see LICENSE for more details.
"""


def import_target(target, default=None):
"""
>>> import_target('foo.bar:blah', 'get_jobs')
<function foo.bar.blah>
>>> import_target('foo.bar', 'get_jobs')
<function foo.bar.get_jobs>
>>> import_target('foo.bar:get_jobs')
<function foo.bar.get_jobs>
"""
if ':' not in target:
target += ':%s' % default
else:
raise ValueError('target must be in form of `path.to.module:function_name`')

mod_path, func_name = target.split(':', 1)

module = __import__(mod_path, {}, {}, [func_name], -1)
callback = getattr(module, func_name)

return callback

0 comments on commit 5699a46

Please sign in to comment.