Skip to content

Commit

Permalink
More initial work (master mostly works)
Browse files Browse the repository at this point in the history
  • Loading branch information
dcramer committed May 4, 2012
1 parent 32a498f commit fe5cbff
Show file tree
Hide file tree
Showing 17 changed files with 320 additions and 80 deletions.
4 changes: 2 additions & 2 deletions README.rst
Expand Up @@ -6,7 +6,7 @@ Taskmaster
Create an iterator, and callback:: Create an iterator, and callback::


# mymodule/job.py # mymodule/job.py
def get_jobs(last_job=0): def get_jobs(last=0):
# last_job would be sent if state was resumed # last_job would be sent if state was resumed
# from a previous run # from a previous run
for i in xrange(last_job, 100000000): for i in xrange(last_job, 100000000):
Expand All @@ -22,4 +22,4 @@ Spawn a master::


Spawn slaves:: Spawn slaves::


tm-slave mymodule.job:handle_job --host=127.0.0.1:3050 --key=foobar tm-slave mymodule.job:handle_job --host=127.0.0.1:3050 --key=foobar --procs=1 --threads=1
12 changes: 8 additions & 4 deletions setup.py
@@ -1,6 +1,6 @@
#!/usr/bin/python #!/usr/bin/python


from setuptools import setup from setuptools import setup, find_packages


setup( setup(
name="taskmaster", name="taskmaster",
Expand All @@ -10,14 +10,18 @@
author="David Cramer", author="David Cramer",
author_email="dcramer@gmail.com", author_email="dcramer@gmail.com",
url="https://github.com/dcramer/taskmaster", url="https://github.com/dcramer/taskmaster",
packages=["taskmaster", "taskmaster.master", "taskmaster.slave"], packages=find_packages("src/taskmaster"),
package_dir={'': 'src'}, package_dir={'': 'src'},
entry_points={ entry_points={
'console_scripts': [ 'console_scripts': [
'tm-master = taskmaster.master:main', 'tm-master = taskmaster.cli.master:main',
'tm-slave = taskmaster.slave:main', 'tm-slave = taskmaster.cli.slave:main',
], ],
}, },
tests_require=[
'unittest2',
'Nose>=1.0',
],
classifiers=[ classifiers=[
"Environment :: Console", "Environment :: Console",
"Intended Audience :: Developers", "Intended Audience :: Developers",
Expand Down
Empty file added src/taskmaster/__init__.py
Empty file.
Empty file added src/taskmaster/cli/__init__.py
Empty file.
84 changes: 84 additions & 0 deletions src/taskmaster/cli/master.py
@@ -0,0 +1,84 @@
"""
taskmaster.cli.master
~~~~~~~~~~~~~~~~~~~~~
:copyright: (c) 2010 DISQUS.
:license: Apache License 2.0, see LICENSE for more details.
"""

from multiprocessing.managers import BaseManager
from threading import Thread
import Queue
import time


class QueueServer(Thread):
def __init__(self, manager):
Thread.__init__(self)
self.manager = manager
self.server = None

def run(self):
self.server = self.manager.get_server()
self.server.serve_forever()

def shutdown(self):
if self.server:
self.server.shutdown()


class QueueManager(BaseManager):
pass


def sample(last=0):
return xrange(last, 1000000)


def run(target, size=10000, host='0.0.0.0:3050', key='taskmaster'):
host, port = host.split(':')

queue = Queue.Queue(maxsize=size)

QueueManager.register('get_queue', callable=lambda: queue)

manager = QueueManager(address=(host, int(port)), authkey=key)
server = QueueServer(manager)
server.daemon = True
server.start()

try:
mod_path, func_name = target.split(':', 1)
except ValueError:
raise ValueError('target must be in form of `path.to.module:function_name`')

module = __import__(mod_path, {}, {}, [func_name], -1)
callback = getattr(module, func_name)

# last=<last serialized job>
kwargs = {}

for job in callback(**kwargs):
queue.put(job)

while not Queue.empty():
time.sleep(0.1)

server.shutdown()


def main():
import optparse
import sys
parser = optparse.OptionParser()
parser.add_option("--host", dest="host", default='0.0.0.0:3050')
parser.add_option("--size", dest="size", default='10000', type=int)
parser.add_option("--key", dest="key", default='taskmaster')
(options, args) = parser.parse_args()
if len(args) != 1:
print 'Usage: tm-master <callback>'
sys.exit(1)
sys.exit(run(args[0], **options.__dict__))

if __name__ == '__main__':
main()
51 changes: 51 additions & 0 deletions src/taskmaster/cli/slave.py
@@ -0,0 +1,51 @@
"""
taskmaster.cli.slave
~~~~~~~~~~~~~~~~~~~~
:copyright: (c) 2010 DISQUS.
:license: Apache License 2.0, see LICENSE for more details.
"""

from multiprocessing.managers import BaseManager


class QueueManager(BaseManager):
pass


def run(target, host='0.0.0.0:3050', key='taskmaster', threads=1):
QueueManager.register('get_queue')

host, port = host.split(':')

m = QueueManager(address=(host, int(port)), authkey=key)
m.connect()
queue = m.get_queue()

mod_path, func_name = target.split(':', 1)
module = __import__(mod_path, {}, {}, [func_name], -1)
callback = getattr(module, func_name)

pool = ThreadPool(queue, size=threads)

# TODO: how do we know if we're done?
pool.join()
callback(queue.get)


def main():
import optparse
import sys
parser = optparse.OptionParser()
parser.add_option("--host", dest="host", default='0.0.0.0:3050')
parser.add_option("--key", dest="key", default='taskmaster')
parser.add_option("--threads", dest="threads", default=1, type=int)
# parser.add_option("--procs", dest="procs", default=1, type=int)
(options, args) = parser.parse_args()
if len(args) != 1:
print 'Usage: tm-slave <callback>'
sys.exit(1)
sys.exit(args[0], run(**options.__dict__))

if __name__ == '__main__':
main()
39 changes: 0 additions & 39 deletions src/taskmaster/master.py

This file was deleted.

35 changes: 0 additions & 35 deletions src/taskmaster/slave.py

This file was deleted.

125 changes: 125 additions & 0 deletions src/taskmaster/taskmaster.py
@@ -0,0 +1,125 @@
import sys
import time
from cPickle import dumps, loads
from os import path, unlink

from taskmaster.workers import ThreadPool


class Taskmaster(object):
def __init__(self, callback, queryset, state_file=None, qs_kwargs=None, node='1/1', progress=True):
if not state_file:
callback_file = sys.modules[callback.__module__].__file__
state_file = path.join(path.dirname(callback_file), '%s.node%s.state' % (path.basename(callback_file), node.replace('/', '-')))

if qs_kwargs is None:
qs_kwargs = {}

self.nodestr = node
self.node, self.total_nodes = map(int, node.split('/', 1))
self.node -= 1

self.callback = callback
self.state_file = state_file
self.queryset = queryset
self.qs_kwargs = qs_kwargs

self.progress = progress

def read_state(self):
if path.exists(self.state_file):
print "Reading previous state from %r" % self.state_file
with open(self.state_file, 'r') as fp:
data = fp.read()
if not data:
return {}
try:
return loads(data)
except Exception, e:
print e
return {}

def state_writer(self, id_state):
def cleanup(last_id):
for id_val, done in id_state.items():
if done and id_val <= last_id:
id_state.pop(id_val, None)

with open(self.state_file, 'w') as state_fp:
i = 0
while True:
try:
# we sort by lowest unprocessed id first, then highest processed id
last_job = sorted(id_state.items(), key=lambda x: (x[1], -x[0] if x[1] else x[0]))[0][0]
except IndexError:
time.sleep(0.1)
continue

state_fp.seek(0)
state_fp.write(dumps(last_job))

cleanup(last_job)

i += 1
if self.progress:
self.pbar.update(i)

def handle(self, obj, id_state):
if obj.pk % self.total_nodes != self.node:
return

id_state[obj.pk] = 0
try:
self.callback(obj)
finally:
id_state[obj.pk] = 1

def reset(self):
if path.exists(self.state_file):
unlink(self.state_file)

def get_pool(self, workers=1):
return ThreadPool(workers)

def put_job(self, pool, func, *args):
pool.spawn_n(func, *args)

def run(self, workers=1):
id_state = {
# stores a map of object ids to an int value representing if
# they've completed yet
# obj.pk: 1/0
}

queryset = self.queryset
qs_kwargs = self.qs_kwargs

state = self.read_state()

if state.get('last_id'):
qs_kwargs['min_id'] = max(int(state['last_id']), qs_kwargs.get('min_id', 0))

pool = self.get_pool(workers)

widgets = ['Status: ', Counter(), ' | ', Speed(), ' | ', Timer()]

print "Starting workers for thread=%r (node=%s) at min_id=%s" % (
thread.get_ident(), self.nodestr, qs_kwargs.get('min_id') or 0)
state_writer = Thread(target=self.state_writer, kwargs={
'id_state': id_state,
})
state_writer.daemon = True
state_writer.start()

if self.progress:
self.pbar = ProgressBar(widgets=widgets, maxval=UnknownLength)
self.pbar.start()

for obj in RangeQuerySetWrapper(queryset, sorted=True, **qs_kwargs):
self.put_job(pool, self.handle, obj, id_state)

pool.waitall()
state_writer.join(1)

if self.progress:
self.pbar.finish()

0 comments on commit fe5cbff

Please sign in to comment.