-
Notifications
You must be signed in to change notification settings - Fork 2k
/
jobs.py
131 lines (105 loc) · 4.01 KB
/
jobs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# encoding: utf-8
import click
import ckan.lib.jobs as bg_jobs
import ckan.logic as logic
import ckan.plugins as p
from ckan.cli import error_shout
@click.group(name=u"jobs", short_help=u"Manage background jobs.")
def jobs():
pass
@jobs.command(short_help=u"Start a worker.",)
@click.option(u"--burst", is_flag=True, help=u"Start worker in burst mode.")
@click.argument(u"queues", nargs=-1)
def worker(burst, queues):
"""Start a worker that fetches jobs from queues and executes them. If
no queue names are given then the worker listens to the default
queue, this is equivalent to
paster jobs worker default
If queue names are given then the worker listens to those queues
and only those:
paster jobs worker my-custom-queue
Hence, if you want the worker to listen to the default queue and
some others then you must list the default queue explicitly:
paster jobs worker default my-custom-queue
If the `--burst` option is given then the worker will exit as soon
as all its queues are empty.
"""
bg_jobs.Worker(queues).work(burst=burst)
@jobs.command(name=u"list", short_help=u"List jobs.")
@click.argument(u"queues", nargs=-1)
def list_jobs(queues):
"""List currently enqueued jobs from the given queues. If no queue
names are given then the jobs from all queues are listed.
"""
data_dict = {
u"queues": list(queues),
}
jobs = p.toolkit.get_action(u"job_list")({u"ignore_auth": True}, data_dict)
if not jobs:
return click.secho(u"There are no pending jobs.", fg=u"green")
for job in jobs:
if job[u"title"] is None:
job[u"title"] = u""
else:
job[u"title"] = u'"{}"'.format(job[u"title"])
click.secho(u"{created} {id} {queue} {title}".format(**job))
@jobs.command(short_help=u"Show details about a specific job.")
@click.argument(u"id")
def show(id):
try:
job = p.toolkit.get_action(u"job_show")(
{u"ignore_auth": True}, {u"id": id}
)
except logic.NotFound:
return error_shout(u'There is no job with ID "{}"'.format(id))
click.secho(u"ID: {}".format(job[u"id"]))
if job[u"title"] is None:
title = u"None"
else:
title = u'"{}"'.format(job[u"title"])
click.secho(u"Title: {}".format(title))
click.secho(u"Created: {}".format(job[u"created"]))
click.secho(u"Queue: {}".format(job[u"queue"]))
@jobs.command(short_help=u"Cancel a specific job.")
@click.argument(u"id")
def cancel(id):
"""Cancel a specific job. Jobs can only be canceled while they are
enqueued. Once a worker has started executing a job it cannot be
aborted anymore.
"""
try:
p.toolkit.get_action(u"job_cancel")(
{u"ignore_auth": True}, {u"id": id}
)
except logic.NotFound:
return error_shout(u'There is no job with ID "{}"'.format(id))
click.secho(u"Cancelled job {}".format(id), fg=u"green")
@jobs.command(short_help=u"Cancel all jobs.")
@click.argument(u"queues", nargs=-1)
def clear(queues):
"""Cancel all jobs on the given queues. If no queue names are given
then ALL queues are cleared.
"""
data_dict = {
u"queues": list(queues),
}
queues = p.toolkit.get_action(u"job_clear")(
{u"ignore_auth": True}, data_dict
)
queues = (u'"{}"'.format(q) for q in queues)
click.secho(u"Cleared queue(s) {}".format(u", ".join(queues)), fg=u"green")
@jobs.command(short_help=u"Enqueue a test job.")
@click.argument(u"queues", nargs=-1)
def test(queues):
"""Enqueue a test job. If no queue names are given then the job is
added to the default queue. If queue names are given then a
separate test job is added to each of the queues.
"""
for queue in queues or [bg_jobs.DEFAULT_QUEUE_NAME]:
job = bg_jobs.enqueue(
bg_jobs.test_job, [u"A test job"], title=u"A test job", queue=queue
)
click.secho(
u'Added test job {} to queue "{}"'.format(job.id, queue),
fg=u"green",
)