Permalink
Browse files

status_interval parameter implemented

  • Loading branch information...
1 parent 406c1ee commit 023a6fa7beac09703afa78a48e7bc76990f5a35d tuulos committed Jan 28, 2009
Showing with 46 additions and 11 deletions.
  1. +10 −1 doc/py/core.rst
  2. +1 −0 node/disco-worker
  3. +6 −3 node/disconode/disco_worker.py
  4. +2 −0 pydisco/disco/core.py
  5. +24 −6 test/test_ratelimit.py
  6. +3 −1 test/tserver.py
View
@@ -139,7 +139,7 @@ anymore. You can delete the unneeded job files as follows::
:class:`Job` --- Disco job
--------------------------
-.. class:: Job(master, [name, input_files, fun_map, map_reader, reduce, partition, combiner, nr_maps, nr_reduces, sort, params, mem_sort_limit, async, clean, chunked, ext_params, required_modules])
+.. class:: Job(master, [name, input_files, fun_map, map_reader, reduce, partition, combiner, nr_maps, nr_reduces, sort, params, mem_sort_limit, async, clean, chunked, ext_params, required_modules, status_interval])
Starts a new Disco job. You seldom instantiate this class
directly. Instead, the :meth:`Disco.new_job` is used to start a job
@@ -357,6 +357,15 @@ anymore. You can delete the unneeded job files as follows::
are required by job functions. Modules listed here are imported to the
functions' namespace.
+ * *status_interval* - print out "K items mapped / reduced" for
+ every Nth item. By default 100000. Setting the value to 0 disables
+ messages.
+
+ Increase this value, or set it to zero, if you get "Message rate limit
+ exceeded" error due to system messages. This might happen if your map /
+ reduce task is really fast. Decrease the value if you want to follow
+ your task in more real-time or you don't have many data items.
+
.. attribute:: Job.name
Name of the job. You can store or transfer the name string if
View
@@ -42,6 +42,7 @@ if __name__ == "__main__":
util.data_err("Decoding the job description failed", master_url)
job_name = dw.job_name = util.job_name = m['name']
+ dw.status_interval = int(m['status_interval'])
my_ver = ".".join(map(str, sys.version_info[:2]))
if m["version"] != my_ver:
@@ -10,6 +10,8 @@
job_name = ""
http_pool = {}
+status_interval = 0
+
def init():
global HTTP_PORT, LOCAL_PATH, PARAMS_FILE, EXT_MAP, EXT_REDUCE,\
MAP_OUTPUT, CHUNK_OUTPUT, REDUCE_DL, REDUCE_SORTED, REDUCE_OUTPUT
@@ -273,7 +275,7 @@ def list_iterator(self, lst):
for x in lst:
yield x
i += 1
- if not i % 100000:
+ if status_interval and not i % status_interval:
msg("%d entries reduced" % i)
msg("Reduce done: %d entries reduced in total" % i)
@@ -285,7 +287,8 @@ def multi_file_iterator(self, inputs, progress = True,
for x in reader(fd, sze, fname):
yield x
i += 1
- if progress and not i % 100000:
+ if progress and status_interval and\
+ not i % status_interval:
msg("%d entries reduced" % i)
if progress:
@@ -318,7 +321,7 @@ def run_map(job_input, partitions, param):
p = fun_partition(key, nr_reduces, param)
partitions[p].add(key, value)
i += 1
- if not i % 100000:
+ if status_interval and not i % status_interval:
msg("%d entries mapped" % i)
msg("Done: %d entries mapped in total" % i)
View
@@ -138,6 +138,7 @@ class Job(object):
"mem_sort_limit": 256 * 1024**2,
"chunked": None,
"ext_params": None,
+ "status_interval": 100000,
"required_modules": []}
def __init__(self, master, **kwargs):
@@ -190,6 +191,7 @@ def _run(self, **kw):
"params": cPickle.dumps(d("params")),
"sort": str(int(d("sort"))),
"mem_sort_limit": str(d("mem_sort_limit")),
+ "status_interval": str(d("status_interval")),
"required_modules": " ".join(d("required_modules"))}
if type(kw["map"]) == dict:
View
@@ -2,24 +2,42 @@
import tserver, sys, random, time
from disco import Disco
+def check_dead(job):
+ if job.jobinfo()['active'] == "dead":
+ job.purge()
+ else:
+ raise Exception("Rate limit failed")
+
def data_gen(path):
- return "badger\n" * 100
+ return "badger\n" * 1000000
def fun_map(e, params):
msg(e)
return []
+def fun_map2(e, params):
+ return []
+
tserver.run_server(data_gen)
inputs = tserver.makeurl([1])
job = Disco(sys.argv[1]).new_job(name = "test_ratelimit",
input = inputs, map = fun_map)
time.sleep(5)
+check_dead(job)
+
+job = Disco(sys.argv[1]).new_job(name = "test_ratelimit2",
+ input = inputs, map = fun_map2, status_interval = 1)
+
+time.sleep(5)
+check_dead(job)
+
+job = Disco(sys.argv[1]).new_job(name = "test_ratelimit3",
+ input = inputs, map = fun_map2, status_interval = 0)
+job.wait()
+job.purge()
+
+print "ok"
-if job.jobinfo()['active'] == "dead":
- print "ok"
- job.purge()
-else:
- raise Exception("Rate limit failed")
View
@@ -10,9 +10,11 @@ class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
class Handler(SimpleHTTPServer. SimpleHTTPRequestHandler):
def do_GET(self):
+ d = data_gen(self.path)
self.send_response(200)
+ self.send_header("Content-length", len(d))
self.end_headers()
- self.wfile.write(data_gen(self.path))
+ self.wfile.write(d)
def makeurl(inputs):
host = "http://%s:%d" % (socket.gethostname(), PORT)

0 comments on commit 023a6fa

Please sign in to comment.