Permalink
Browse files

Removed job descriptions

  • Loading branch information...
1 parent 0ac1de8 commit 1da3d3b490d8893f77d252ebe78013604c0a2d77 @heynemann committed Jul 16, 2012
View
@@ -2,6 +2,7 @@
# -*- coding: utf-8 -*-
import time
+import logging
from uuid import uuid4
from ujson import dumps, loads
from datetime import datetime
@@ -35,49 +36,49 @@ def get(self):
job_type_input_queue = JOB_TYPE_KEY % job_key
self.redis.sadd(job_type_input_queue, str(job_id))
- start = time.time()
- input_stream = self.application.input_streams[job_key]
- items = input_stream.process(arguments)
- if hasattr(input_stream, 'group_size'):
- items = self.group_items(items, input_stream.group_size)
+ try:
+ start = time.time()
+ input_stream = self.application.input_streams[job_key]
+ items = input_stream.process(arguments)
+ if hasattr(input_stream, 'group_size'):
+ items = self.group_items(items, input_stream.group_size)
- mapper_input_queue = MAPPER_INPUT_KEY % job_key
- mapper_output_queue = MAPPER_OUTPUT_KEY % (job_key, job_id)
- mapper_error_queue = MAPPER_ERROR_KEY % job_key
+ mapper_input_queue = MAPPER_INPUT_KEY % job_key
+ mapper_output_queue = MAPPER_OUTPUT_KEY % (job_key, job_id)
+ mapper_error_queue = MAPPER_ERROR_KEY % job_key
- with self.redis.pipeline() as pipe:
- start = time.time()
+ with self.redis.pipeline() as pipe:
+ start = time.time()
- for item in items:
- msg = {
- 'output_queue': mapper_output_queue,
- 'job_id': str(job_id),
- 'job_key': job_key,
- 'item': item,
- 'date': job_date.strftime(DATETIME_FORMAT),
- 'retries': 0
- }
- pipe.rpush(mapper_input_queue, dumps(msg))
- pipe.execute()
- print "input queue took %.2f" % (time.time() - start)
-
- start = time.time()
- results = []
- errored = False
- while (len(results) < len(items)):
- key, item = self.redis.blpop(mapper_output_queue)
- json_item = loads(item)
- if 'error' in json_item:
- json_item['retries'] -= 1
- self.redis.hset(mapper_error_queue, json_item['job_id'], dumps(json_item))
- errored = True
- break
- results.append(loads(json_item['result']))
-
- self.redis.delete(mapper_output_queue)
- print "map took %.2f" % (time.time() - start)
+ for item in items:
+ msg = {
+ 'output_queue': mapper_output_queue,
+ 'job_id': str(job_id),
+ 'job_key': job_key,
+ 'item': item,
+ 'date': job_date.strftime(DATETIME_FORMAT),
+ 'retries': 0
+ }
+ pipe.rpush(mapper_input_queue, dumps(msg))
+ pipe.execute()
+ logging.debug("input queue took %.2f" % (time.time() - start))
+
+ start = time.time()
+ results = []
+ errored = False
+ while (len(results) < len(items)):
+ key, item = self.redis.blpop(mapper_output_queue)
+ json_item = loads(item)
+ if 'error' in json_item:
+ json_item['retries'] -= 1
+ self.redis.hset(mapper_error_queue, json_item['job_id'], dumps(json_item))
+ errored = True
+ break
+ results.append(loads(json_item['result']))
+
+ self.redis.delete(mapper_output_queue)
+ logging.debug("map took %.2f" % (time.time() - start))
- try:
if errored:
self.redis.incr(PROCESSED)
self.redis.incr(PROCESSED_FAILED)
@@ -86,7 +87,7 @@ def get(self):
start = time.time()
reducer = self.application.reducers[job_key]
result = reducer.reduce(results)
- print "reduce took %.2f" % (time.time() - start)
+ logging.debug("reduce took %.2f" % (time.time() - start))
self.set_header('Content-Type', 'application/json')
View
@@ -13,7 +13,7 @@
def main(arguments=None):
'''Runs r³ server with the specified arguments.'''
- log_level = 'debug'
+ log_level = 'warning'
port = int(arguments[1])
ip = arguments[0]
View
@@ -34,7 +34,7 @@ def get_mappers():
if not working:
mappers_status[mapper] = None
else:
- mappers_status[mapper] = working
+ mappers_status[mapper] = loads(working[0])
return mappers_status
@@ -146,10 +146,6 @@ def delete_key(key):
db.connection.delete(key)
return redirect(url_for('stats'))
-@app.route("/jobs/<job_id>")
-def job(job_id):
- return render_template('job.html', job_id=job_id)
-
if __name__ == "__main__":
app.config.from_object('r3.web.config')
db = RedisDB(app)
@@ -29,7 +29,7 @@ <h2>{{ job_type }}</h2>
<tbody>
{% for job in g.jobs[job_type] %}
<tr>
- <td><a href="/jobs/{{ job }}">{{ job }}</a></td>
+ <td>{{ job }}</td>
<td>Mapping</td>
</tr>
{% else %}
@@ -82,7 +82,7 @@ <h1>Mappers</h1>
<tr>
<td>{{ mapper }}</td>
{% if status %}
- <td>Processing job <a href="/jobs/{{ status }}">{{ status }}</a></td>
+ <td>Processing job {{ status['job_id'] }}</td>
{% else %}
<td>Waiting for a new job...</td>
{% endif %}
@@ -19,7 +19,7 @@ <h2>{{ job_type }}</h2>
<tbody>
{% for job in g.jobs[job_type] %}
<tr>
- <td><a href="/jobs/{{ job }}">{{ job }}</a></td>
+ <td>{{ job }}</td>
<td>Mapping</td>
</tr>
{% else %}
View
@@ -1,31 +0,0 @@
-{% extends "master.html" %}
-
-{% block title %} - Job {{ job_id }}{% endblock %}
-
-{% block body %}
-<div class="container main">
- <div class="job section">
- <h1>Job #{{ job_id }}</h1>
-
- <pre><code>{ a: "b", c: "d" }</code></pre>
-
- <div class="meter animate blue">
- <span style="width: {{ 105 / 168 * 100 }}%"><span></span></span>
- </div>
-
- <table class="table table-striped table-bordered table-condensed">
- <tbody>
- <tr>
- <td>mapped input streams</td>
- <td>105</td>
- </tr>
- <tr>
- <td>total input streams</td>
- <td>168</td>
- </tr>
- </tbody>
- </table>
-
- </div>
-</div>
-{% endblock %}
@@ -28,7 +28,7 @@ <h1>Mappers</h1>
<tr>
<td>{{ mapper }}</td>
{% if status %}
- <td>Processing job <a href="/jobs/{{ status }}">{{ status }}</a></td>
+ <td>Processing job {{ status }}</td>
{% else %}
<td>Waiting for a new job...</td>
{% endif %}

0 comments on commit 1da3d3b

Please sign in to comment.