Skip to content

Commit

Permalink
app: show job details
Browse files Browse the repository at this point in the history
  • Loading branch information
Bogdanp committed Jun 23, 2019
1 parent 5860e09 commit e3f53af
Show file tree
Hide file tree
Showing 9 changed files with 172 additions and 47 deletions.
64 changes: 43 additions & 21 deletions dramatiq_dashboard/application.py
@@ -1,12 +1,13 @@
from pprint import pformat
from urllib.parse import urlencode

import jinja2
from dramatiq.common import dq_name, q_name, xq_name

from .csrf import csrf_protect, render_csrf_token
from .filters import isoformat, short, timeago
from .http import HTTP_302, HTTP_404, HTTP_405, HTTP_410, App, Response, handler, templated
from .interface import RedisInterface
from .http import HTTP_302, HTTP_404, HTTP_405, HTTP_410, App, Response, handler, redirect, templated
from .interface import Job, RedisInterface


def make_uri_maker(prefix):
Expand All @@ -30,6 +31,14 @@ def tab_from_q_name(name):
return "standard"


def queue_for_tab(name, tab):
return {
"standard": name,
"delayed": dq_name(name),
"failed": xq_name(name),
}[tab]


class DashboardApp(App):
def __init__(self, broker, prefix):
super().__init__()
Expand All @@ -45,6 +54,7 @@ def __init__(self, broker, prefix):
)
self.templates.filters.update({
"isoformat": isoformat,
"pformat": pformat,
"short": short,
"timeago": timeago,
})
Expand All @@ -56,6 +66,7 @@ def __init__(self, broker, prefix):
self.add_route("/", self.dashboard)
self.add_route("/queues/(?P<name>[^/]+)", self.queue)
self.add_route("/queues/(?P<name>[^/]+)/(?P<current_tab>(standard|delayed|failed))", self.queue)
self.add_route("/queues/(?P<name>[^/]+)/(?P<current_tab>(standard|delayed|failed))/(?P<message_id>[^/]+)", self.job)
self.add_route("/delete-message", self.delete_message)
self.add_route("/requeue-message", self.requeue_message)
self.add_route(".*", self.not_found)
Expand All @@ -73,20 +84,32 @@ def dashboard(self, req):
@templated("queue.html")
def queue(self, req, *, name, current_tab="standard"):
cursor = int(req.params.get("cursor", 0))
queue_for_tab = {
"standard": name,
"delayed": dq_name(name),
"failed": xq_name(name),
}[current_tab]
qft = queue_for_tab(name, current_tab)

queue = self.iface.get_queue(q_name(name))
next_cursor, jobs = self.iface.get_jobs(queue_for_tab, cursor)
next_cursor, jobs = self.iface.get_jobs(qft, cursor)
return {
"queue": queue,
"jobs": jobs,
"cursor": next_cursor,
"current_tab": current_tab,
"queue_for_tab": queue_for_tab,
"queue_for_tab": qft,
}

@handler
@csrf_protect
@templated("job.html")
def job(self, req, *, name, current_tab, message_id):
qft = queue_for_tab(name, current_tab)
queue = self.iface.get_queue(q_name(name))
job = self.iface.get_job(qft, message_id)
if not job:
return redirect(self.make_uri("queues", name, current_tab))

return {
"queue": queue,
"job": job,
"queue_for_tab": qft,
}

@handler
Expand All @@ -99,10 +122,7 @@ def delete_message(self, req):
message_id = req.post_data["id"]
self.iface.delete_message(queue, message_id)

queue_uri = self.make_uri("queues", q_name(queue), tab_from_q_name(queue))
response = Response(status=HTTP_302)
response.add_header("location", queue_uri)
return response
return redirect(self.make_uri("queues", q_name(queue), tab_from_q_name(queue)))

@handler
@csrf_protect
Expand All @@ -112,22 +132,24 @@ def requeue_message(self, req):

queue = req.post_data["queue"]
message_id = req.post_data["id"]
message = self.iface.get_message(queue, message_id)
if not message:
job = self.iface.get_job(queue, message_id)
if not job:
return HTTP_410, "The requested message no longer exists."

self.iface.delete_message(queue, message_id)
message_copy = self.broker.enqueue(message.copy(
job_copy = Job.from_message(self.broker.enqueue(job.message.copy(
queue_name=q_name(queue),
options={
"eta": message.message_timestamp,
"eta": job.message_timestamp,
"retries": 0
},
)))
return redirect(self.make_uri(
"queues",
q_name(queue),
tab_from_q_name(queue),
job_copy.message_id,
))
message_uri = self.make_uri("queues", q_name(queue), tab_from_q_name(queue))
response = Response(status=HTTP_302)
response.add_header("location", message_uri)
return response

@handler
def not_found(self, req):
Expand Down
15 changes: 11 additions & 4 deletions dramatiq_dashboard/http.py
Expand Up @@ -121,10 +121,17 @@ def wrapper(self, request, start_response, *args, **kwargs):
def templated(template_name):
def decorator(fn):
def wrapper(self, request, *args, **kwargs):
context = {
"request": request,
**fn(self, request, *args, **kwargs)
}
response = fn(self, request, *args, **kwargs)
if isinstance(response, Response):
return response

context = {"request": request, **response}
return self.templates.get_template(template_name).render(context)
return wrapper
return decorator


def redirect(uri):
response = Response(status=HTTP_302)
response.add_header("location", uri)
return response
5 changes: 3 additions & 2 deletions dramatiq_dashboard/interface.py
Expand Up @@ -4,6 +4,7 @@
from operator import attrgetter
from os import path

from dramatiq.common import dq_name, xq_name
from dramatiq.message import Message


Expand Down Expand Up @@ -103,9 +104,9 @@ def get_jobs(self, queue_name, cursor=0):
jobs = [Job.from_message(Message.decode(data)) for data in messages_data.values()]
return next_cursor, sorted(jobs, key=attrgetter("timestamp"), reverse=True)

def get_message(self, queue_name, message_id):
def get_job(self, queue_name, message_id):
data = self.broker.client.hget(self.qualify(f"{queue_name}.msgs"), message_id)
return data and Message.decode(data)
return data and Job.from_message(Message.decode(data))

def delete_message(self, queue_name, message_id):
self.do_delete_message(queue_name, message_id)
Expand Down
11 changes: 11 additions & 0 deletions dramatiq_dashboard/templates/_action.html
@@ -0,0 +1,11 @@
{% macro action(label, uri, params, classes="button--primary") %}
<form action="{{ uri }}" method="POST">
{{ csrf_token()|safe }}

{% for name, value in params.items() %}
<input type="hidden" name="{{ name }}" value="{{ value }}">
{% endfor %}

<button class="button {{ classes }}" type="submit">{{ label }}</button>
</form>
{% endmacro %}
6 changes: 6 additions & 0 deletions dramatiq_dashboard/templates/_base.html
Expand Up @@ -45,6 +45,11 @@
text-decoration: underline;
}

pre {
font-family: var(--mono-fontstack);
font-size: 0.8rem;
}

.container {
margin: 0 auto;
}
Expand Down Expand Up @@ -129,6 +134,7 @@
}
.table th {
padding: 12px;
cursor: default;
font-family: var(--mono-fontstack);
font-size: 0.8rem;
font-weight: 300;
Expand Down
5 changes: 5 additions & 0 deletions dramatiq_dashboard/templates/_timestamp.html
@@ -0,0 +1,5 @@
{% macro timestamp(dt) %}
<time datetime="{{ dt|isoformat }}">
{{ dt|timeago }}
</time>
{% endmacro %}
85 changes: 85 additions & 0 deletions dramatiq_dashboard/templates/job.html
@@ -0,0 +1,85 @@
{% extends "_base.html" %}
{% from "_action.html" import action %}
{% from "_grid.html" import row, column %}
{% from "_panel.html" import panel %}
{% from "_timestamp.html" import timestamp %}

{% block subtitle %}Job {{ job.message_id }} in {{ queue.name }}{% endblock %}

{% block content %}
{% call row() %}
{% call column() %}
{% call panel("Job") %}
<table class="table">
<tbody>
<tr>
<th>Canonical id</th>
<td>{{ job.message.message_id }}</td>
</tr>
<tr>
<th>Redis id</th>
<td>{{ job.message_id }}</td>
</tr>
<tr>
<th>Queue</th>
<td>{{ job.queue_name }} ({{ queue_for_tab }})</td>
</tr>
<tr>
<th>Actor</th>
<td><pre>{{ job.actor_name }}</pre></td>
</tr>
<tr>
<th>Args</th>
<td>
<ul>
{% for arg in job.args %}
<li><pre>{{ arg|pformat }}</pre></li>
{% endfor %}
</ul>
</td>
</tr>
<tr>
<th>Kwargs</th>
<td>
<ul>
{% for name, value in job.kwargs.items() %}
<li><strong>{{ name }}</strong>=<pre>{{ value|pformat }}</pre></li>
{% endfor %}
</ul>
</td>
</tr>
<tr>
<th>ETA</th>
<td>
{{ timestamp(job.eta) }}
</td>
</tr>
<tr>
<th>Created</th>
<td>
{{ timestamp(job.timestamp) }}
</td>
</tr>
{% for name, value in job.options.items() %}
{% if name != "redis_message_id" %}
<tr>
<th>{{ name|capitalize }}</th>
<td><pre>{{ value }}</pre></td>
</tr>
{% endif %}
{% endfor %}
</tbody>
</table>
{% endcall %}
{% endcall %}
{% endcall %}

{% call row() %}
{% call column() %}
{{ action("Delete", make_uri("delete-message"), {
"id": job.message_id,
"queue": queue_for_tab,
}, classes="button--tertiary")}}
{% endcall %}
{% endcall %}
{% endblock %}
26 changes: 7 additions & 19 deletions dramatiq_dashboard/templates/queue.html
@@ -1,22 +1,12 @@
{% extends "_base.html" %}
{% from "_action.html" import action %}
{% from "_grid.html" import row, column %}
{% from "_panel.html" import panel %}
{% from "_tabs.html" import tabs, tab %}
{% from "_timestamp.html" import timestamp %}

{% block subtitle %}{{ queue.name }}{% endblock %}

{% macro action(label, uri, params, classes="button--primary") %}
<form action="{{ uri }}" method="POST">
{{ csrf_token()|safe }}

{% for name, value in params.items() %}
<input type="hidden" name="{{ name }}" value="{{ value }}">
{% endfor %}

<button class="button {{ classes }}" type="submit">{{ label }}</button>
</form>
{% endmacro %}

{% block content %}
{% call row() %}
{% call column() %}
Expand Down Expand Up @@ -53,17 +43,15 @@
{% for job in jobs %}
<tr>
<td class="table__subject table__subject--mono">
{{ job }}
<a href="{{ make_uri('queues', queue.name, current_tab, job.message_id) }}">
{{ job }}
</a>
</td>
<td>
<time datetime="{{ job.eta|isoformat }}">
{{ job.eta|timeago }}
</time>
{{ timestamp(job.eta) }}
</td>
<td>
<time datetime="{{ job.timestamp|isoformat }}">
{{ job.timestamp|timeago }}
</time>
{{ timestamp(job.timestamp) }}
</td>
<td class="table__number table__number--tertiary">
{{ job.retries }}
Expand Down
2 changes: 1 addition & 1 deletion examples/basic/app.py
Expand Up @@ -34,7 +34,7 @@ def start(uri: QueryParam):
uri = f"http://{uri}"

crawl.send(uri)
return "Task enqueued!"
return "Message sent!"


app = App(
Expand Down

0 comments on commit e3f53af

Please sign in to comment.