jobd is a simple job queue daemon that works with persistent queue storage. It uses a MySQL table as a storage backend (for queue input and output).
Currently, MySQL is the only supported storage. Other backends may be easily supported though.
It is by design that jobd never adds nor deletes jobs from storage. It only reads (when a certain request arrives) and updates them (during execution, when job status changes). Succeeded or failed, your jobs are never lost.
jobd consists of 2 parts:
-
jobd is a "worker" daemon that reads jobs from the database, enqueues and launches them. There may be multiple instances of jobd running on multiple hosts. Each jobd instance may have unlimited number of queues (called "targets"), each with its own concurrency limit.
-
jobd-master is a "master" or "central" daemon that simplifies control over many jobd instances. There should be only one instance of jobd-master running. jobd-master is not required for jobd workers to work (they can work without it), but it's very very useful.
In addition, there is a command line utility called jobctl.
Originally, jobd was created as a saner alternative to Gearman. It's been used in production with a large PHP web application on multiple servers for quite some time already, and proven to be stable and efficient. We were also monitoring the memory usage of all our jobd instances for two months, and can confirm there are no leaks.
- How it works
- Integration example
- Installation
- Usage
- Configuration
- Clients
- Protocol
- License
Every jobd instance has its own set of queues, called targets. A name of a
target is an arbitrary string, the length of which should be limited by the size
of target
field in the MySQL table.
Each target has its own concurrency limit (the maximum number of jobs that may
be executed simultaneously). Targets are loaded from the config at startup, and
also may be added or removed at runtime, by
add-target(target: string, concurrency: int)
and remove-target(target: string)
requests.
The purpose of targets is to logically separate jobs of different kinds by putting them in different queues. For instance, targets can be used to simulate jobs priorities:
[targets]
low = 5
normal = 5
high = 5
The config above defines three targets (or three queues), each with a concurrency
limit of 5
.
Or, let's imagine a scenario when you have two kinds of jobs: heavy, resource-consuming, long-running jobs (like video processing) and light, fast and quick jobs (like sending emails). In this case, you could define two targets, like so:
[targets]
heavy = 3
quick = 20
This config would allow running at most 3 heavy and up to 20 quick jobs simultaneously.
💭 In the author's opinion, the approach of having different targets (queues) for different kinds of jobs is better than having a single queue with each job having a "priority".
Imagine you had a single queue with maximum number of simultaneously running jobs set to, say, 20. What would happen if you'd add a new job, even with the highest priority possible, when there's already 20 slow jobs running? No matter how high the priority of new job is, it would have to wait.
By defining different targets, jobd allows you to create dedicated queues for such jobs, making sure there's always a room for high-priority tasks to run as early as possible.
Each job is described by one record in the MySQL table. Here is a table scheme with a minimal required set of fields:
CREATE TABLE `jobs` (
`id` int(10) UNSIGNED NOT NULL AUTO_INCREMENT,
`target` char(16) NOT NULL,
`time_created` int(10) UNSIGNED NOT NULL,
`time_started` int(10) UNSIGNED NOT NULL DEFAULT 0,
`time_finished` int(10) UNSIGNED NOT NULL DEFAULT 0,
`status` enum('waiting','manual','accepted','running','done','ignored') NOT NULL DEFAULT 'waiting',
`result` enum('ok','fail') DEFAULT NULL,
`return_code` tinyint(3) UNSIGNED DEFAULT NULL,
`sig` char(10) DEFAULT NULL,
`stdout` mediumtext DEFAULT NULL,
`stderr` mediumtext DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `status_target_idx` (`status`, `target`, `id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
As you can see:
- Each job has a unique ID. You don't need to care about assigning IDs because
AUTO_INCREMENT
is used. - Each job is associated with some target, or, in other words, is put to some queue. More about targets in the Targets section.
- There are
time_created
,time_started
andtime_finished
fields, and it's not hard to guess what their meaning. When creating a job, you should fill thetime_created
field with a UNIX timestamp. jobd will update the two other fields while executing the job. - Each job has a
status
.- A job must be created with status set to
waiting
ormanual
. - A status becomes
accepted
when jobd reads the job from the table and puts it to a queue, or it might becomeignored
in case of some error, like invalidtarget
, or invalidstatus
when processing a run-manual(ids: int) request. - Right before a job is getting started, its status becomes
running
. - Finally, when it's done, it is set to
done
.
- A job must be created with status set to
- The
result
field indicates whether a job completed successfully or not.- It is set to
ok
if the return code of launched command was0
. - Otherwise, it is set to
fail
.
- It is set to
- The
return_code
field is filled with the actual return code. - If the job process was killed by a POSIX signal, the signal name is written
to the
sig
field. - stdout and stderr of the process are written to
stdout
andstderr
fields, accordingly.
⚠️ In a real world, you'll want to have a few more additional fields, likejob_name
orjob_data
.
Check out the integration example.
To create a new job, it must be added to the table. As mentioned earlier, adding or removing rows from the table is by design outside the jobd's area of responsibility. A user must add jobs to the table manually.
There are two kinds of jobs, in terms of how they are executed: background and manual (or foreground).
- Background jobs are created with
waiting
status. When jobd gets new jobs from the table (which happens upon receiving apoll(target: strings[])
; this process is described in detail in the launching background jobs section), such jobs are added to their queues and get executed at some point, depending on the current queue status and concurrency limit. A user does not have control of the execution flow, the only feedback it has is the fields in the table that are going to be updated before, during and after the execution. At some point,status
will becomedone
,result
and other fields will have their values filled too, and that's it. - Manual, or foreground jobs, is a different story. They must be created with
status
set tomanual
. These jobs are processed only upon arun-manual(ids: int[])
request. When jobd receives such request, it reads and launches the specified jobs, waits for the results and sends them back to the client in a response. Learn more about it under the launching manual jobs section.
Launching (or executing) a job means running a command specified in
the config as the launcher
, replacing the {id}
template with current job id.
For example, if you have this in the config:
launcher = php /home/user/job-launcher.php {id}
and jobd is currently executing a job with id 123, it will launch
php /home/user/job-launcher.php 123
.
After jobs have been added to storage, jobd must be notified about it. This is
done by a poll(targets: string[])
request that a user
(a client) sends to the jobd instance. The targets
argument is an array
(a list) of targets
to poll. It can be omitted; in that case jobd will query
for jobs for all targets it is serving.
When jobd receives a poll(targets: string[])
request and
specified targets are not full (haven't reached their concurrency limit), it
performs a SELECT
query with status='waiting'
condition and LIMIT
set
according to the mysql_fetch_limit
config value.
For example, after receiving the poll(['1/low', '1/normal'])
request, assuming mysql_fetch_limit
is set to 100
, jobd will query jobs from
a table roughly like this:
SELECT id, status, target FROM jobs WHERE status='waiting' AND target IN ('1/low', '1/normal') ORDER BY id LIMIT 0, 100 FOR UPDATE
However, if all specified targets are full at the time of jobd receiving the
poll(targets: string[])
request, the query will be delayed until at least one of the targets becomes available for new jobs.
Then it loops through results, and either accepts a job (by setting
its status in the table to accepted
) or ignores it (by setting a status to
ignored
). Accepted jobs are then added to internal queues according to their
targets and executed.
"Manual" jobs is a way of launching jobs in a blocking way ("blocking" from a client's point of view).
After jobs have been added to a storage with status
set to manual
, a client
has to send a run-manual(ids: int[])
request to a jobd
instance that serves targets the new jobs are assigned to. When jobd receives
such request, it performs a SELECT
query with id IN ({ids})
condition.
For example, while processing the run-manual([5,6,7])
request, jobd will make a query that looks roughly something like this:
SELECT id, status, target FROM jobs WHERE id IN ('5', '6', '7') FOR UPDATE
Then it loops through results, and either accepts a job (by setting its status
in the table to accepted
) or ignores it (by setting its status to ignored
).
Accepted jobs are then added to internal queues according to their targets and
executed.
When all requested jobs are finished, one way or another (succeeded or failed), jobd compiles and sends a response to the client. The response format is described here.
If you had only one worker instance (one server, one node), it would not be a problem to use it directly. But what if you have tens or hundreds of servers, each of them serving different targets? This is where jobd-master comes in play: it's been created to simplify usage and management of multiple workers.
There should be only one instance of jobd-master running. All jobd workers are supposed to connect to it at startup. These connections between each worker and jobd-master are persistent.
When jobd worker connects to the master instance, it sends it the list of targets the worker is serving (see Fig. 1).
Let's imagine we have three servers (srv-1
, srv-2
and srv-3
), each having
a jobd worker. All of them are serving common target named any
, but they're also
configured to serve their own low
, normal
and high
targets s/low
,
s/normal
and s/high
respectively (where s
is the server number):
Figure 1
┌────────────┐ ┌────────────┐ ┌────────────┐
│ jobd on │ │ jobd on │ │ jobd on │
│ srv-1 │ │ srv-2 │ │ srv-3 │
├────────────┤ ├────────────┤ ├────────────┤
│ Targets: │ │ Targets: │ │ Targets: │
│ - any │ │ - any │ │ - any │
│ - 1/low │ │ - 2/low │ │ - 3/low │
│ - 1/normal │ │ - 2/normal │ │ - 3/normal │
│ - 1/high │ │ - 2/high │ │ - 3/high │
└──────┬─────┘ └─────┬──────┘ └────┬───────┘
│ │ │
│ ┌───────▼───────┐ │
└─────► jobd-master ◄──────┘
└───────────────┘
When targets are added or removed at runtime (by add-target()
or remove-target()
request), workers notify the master
too. Thus, jobd-master always know which workers serve which targets.
To launch jobd via jobd-master, client needs to send a
poke(targets: string[])
request to jobd-master instance, and
jobd-master will send poll()
requests to all appropriate
workers.
For example, if you created, say, 5 jobs:
- 3 for the
any
target, - 1 for target
2/normal
, and - 1 for target
3/low
,
you send poke('any', '2/normal', '3/low')
request to jobd-master. As a result, it will send:
poll('any')
request to a random worker serving theany
target,poll('2/normal')
request tosrv-2
, andpoll('3/low')
request tosrv-3
.
Also, you can launch manual (foreground) jobs in parallel on multiple workers
via jobd-master and synchronously (in a blocking way) get all results. To do that,
you can use the run-manual(jobs: {id: int, target: string}[])
request.
See the integration example for real code examples.
PHP: jobd-php-example
First, you need Node.JS 14 or newer. See here now to install it using package manager.
Then install jobd using npm:
npm i -g jobd
One of possible ways of launching jobd and jobd-master daemons is via systemd.
This repository contains basic examples of jobd.service
and jobd-master.service
unit files. Note that
jobs will be launched as the same user the jobd worker is running, so you might
want to change that.
Copy .service
file(s) to /etc/systemd/system
, then do:
systemctl daemon-reload
systemctl enable jobd
systemctl start jobd
# repeat last two steps for jobd-master, if needed
If you don't like systemd, supervisor might be an
option. Create a configuration file in /etc/supervisor/conf.d
with following
content:
[program:jobd]
command=/usr/bin/jobd --config /etc/jobd.conf
numprocs=1
directory=/
stdout_logfile=/var/log/jobd-stdout.log
autostart=true
autorestart=true
user=nobody
stopsignal=TERM
Then use supervisorctl
to start or stop jobd.
❗ Don't forget to filter access to jobd and jobd-master ports using a firewall. See a note here for more info.
Configuration files are written in ini format. All available options for both
daemons, as well as a command-line utility, are described below. You can copy
jobd.conf.example
and jobd-master.conf.example
and use them as a template instead of writing configs from scratch.
Default config path is /etc/jobd.conf
. Use the --config
option to use
a different path.
Without section:
host
(required, string) — jobd server hostnameport
(required, int) — jobd server portpassword
(string) — password for requestsalways_allow_localhost
(boolean, default:false
) — when set to1
ortrue
, allows accepting requests from clients connecting from localhost without passwordname
(string, default:os.hostname()
) — worker namemaster_host
(string) — master hostnamemaster_port
(int) — master port. If hostname or port is omitted, jobd will not connect to master.master_reconnect_timeout
(int, default:10
) — if connection to master failed, jobd will be constantly trying to reconnect. This option specifies a delay between connection attempts, in seconds.log_file
(string) — path to a log filelog_level_file
(string, default:warn
) — minimum level of logs that are written to the file.
Allowed values:trace
,debug
,info
,warn
,error
log_level_console
(string, default:warn
) — minimum level of logs that go to stdout.mysql_host
(required, string) — database hostmysql_port
(required, int) — database portmysql_user
(required, string) — database usermysql_password
(required, string) — database passwordmysql_database
(required, string) — database namemysql_table
(required, string) — table namemysql_fetch_limit
(int, default:100
) — a number of new jobs to fetch in every requestlauncher
(required, string) — a template of shell command that will be launched for every job.{id}
will be replaced with job idlauncher.cwd
(string, default:process.cwd()
) — current working directory for spawned launcher processeslauncher.env.{any}
(string) — environment variable for spawned launcher processesmax_output_buffer
(int, default:1048576
)
Under the [targets]
section, targets are specified. Each target is specified on
a separate line in the following format:
{target_name} = {n}
where:
{target_name}
(string) is target name{n}
(int) is maximum count of simultaneously executing jobs for this target
Default config path is /etc/jobd-master.conf
. Use the --config
option to use
a different path.
host
(required, string)port
(required, int)password
(string)always_allow_localhost
(boolean, default:false
)ping_interval
(int, default:30
) — specifies interval between workers pings.poke_throttle_interval
(int, default:0.5
)log_file
(string)log_level_file
(string, default:warn
)log_level_console
(string, default:warn
)
Default config path is ~/.jobctl.conf
.
master
(boolean) — same as--master
.host
(string) — same as--host
.port
(int) — same as--port
.password
(string)log_level
(string, default:warn
)
php-jobd-client (official)
By default, jobd and jobd-master listen on TCP ports 7080 and 7081 respectively, ports can be changed in a config.
❗ jobd has been created with an assumption that it'll be used in more-or-less trusted environments (like LAN or, at least, servers within one data center) so no encryption nor authentication mechanisms have been implemented. All traffic between jobd and clients flow in plain text.
You can protect a jobd instance with a password though, so at least basic password-based authorization is supported.
Both daemons receive and send Messages. Each message is followed by EOT
(0x4
)
byte which indicates an end of a message. Clients may send and receive multiple
messages over a single connection. Usually, it's the client who must close the
connection, when it's not needed anymore. A server, however, may close the
connection in some situations (invalid password, server error, etc).
Messages are encoded as JSON arrays with at least one item, representing the message type:
[TYPE]
If a message of specific type has some data, it's placed as a second item:
[TYPE, DATA]
Type of TYPE
is integer. Supported types are:
0
: Request1
: Response2
: Ping3
: Pong
DATA
is a JSON object with following keys:
-
Unique (per connection) request number. Clients can start counting request numbers from one (
1
) or from any other random number. Each subsequent request should increment this number by 1. Note that zero (0
) is reserved. -
Request type. Supported request types for jobd and jobd-master are listed below.
-
Request arguments (if needed): an object, whose keys and values represent argument names and values.
-
A password, for password-protected instances. Only needed for first request.
Example (w/o trailing EOT
):
[0,{no:1,type:'poll',data:{'targets':['target_1','target_2']}}]
Here is the list of supported requests, using type(arguments)
notation.
-
Get new tasks for specified
targets
from database. Iftargets
argument is no specified, get tasks for all serving targets.Response data type: string ('ok').
-
Pause execution of tasks of specified targets. If
targets
argument is omitted, pauses all targets.Response data type: string ('ok').
-
Continue execution of tasks of specified targets. If
targets
argument is omitted, continues all targets.Response data type: string ('ok').
-
Returns status of internal queues and memory usage.
Response data type: object with following keys:
targets
(object<target: string, {paused: bool, concurrency: int, length: int}>)jobPromisesCount
(int)memoryUsage
(NodeJS.MemoryUsage)
-
Add target.
Response data type: string ('ok').
-
Remove target.
Response data type: string ('ok').
-
Set concurrency limit of target
target
.Response data type: string ('ok').
-
Enqueue and run jobs with specified IDs and
status
set tomanual
, and return results.Response data type: object with following keys:
-
jobs
(object<int, object>)An object whose keys represent succeeded job IDs and whose values are objects with following keys:
result
(string)code
(int)signal
(string|null)stdout
(string)stderr
(string)
-
errors
(object<int, string>) An object whose keys represent failed job IDs and whose values are error messages.
-
-
Send signals to jobs which are still executing and return results.
Response data type: object with job IDs as keys and kill status (boolean where true means that signal is successfully delivered) as values.
-
Used by jobd instances to register themselves with master. Clients don't need it.
-
Send
poll(targets)
requests to all registered workers that serve specifiedtargets
. -
Send
pause(targets)
requests to workers serving specifiedtargets
. Iftargets
argument is omitted, sendspause()
to all workers. -
Send
continue(targets)
requests to workers serving specifiedtargets
. Iftargets
argument is omitted, sendscontinue()
to all workers. -
Returns list of registered workers and memory usage. If
poll_workers
argument is true, sendsstatus()
request to all workers and includes their responses. -
Send
run-manual()
requests to registered jobd instances serving specified targets, aggregate and return results. -
Send
send-signal()
requests to registered jobd instances serving specified targets, aggregate and return results.
DATA
is a JSON object with following keys:
-
no
of request this response is related to. -
Data, if request succeeded.
-
Error message, if request failed.
Example (w/o trailing EOT
):
[1,{no:1,data:'ok'}]
No DATA
.
Example (w/o trailing EOT
):
[2]
- graceful shutdown of jobd
- support signals in jobctl
MIT