-
-
Notifications
You must be signed in to change notification settings - Fork 24
/
100_jobs.sql
179 lines (151 loc) · 5.91 KB
/
100_jobs.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
-- BEGIN: JOBS
--
-- An asynchronous job queue schema for ACID compliant job creation through
-- triggers/functions/etc.
--
-- Worker code: worker.js
--
-- Author: Benjie Gillam <code@benjiegillam.com>
-- License: MIT
-- URL: https://gist.github.com/benjie/839740697f5a1c46ee8da98a1efac218
-- Donations: https://www.paypal.me/benjie
CREATE EXTENSION IF NOT EXISTS pgcrypto WITH SCHEMA public;
CREATE EXTENSION IF NOT EXISTS "uuid-ossp" WITH SCHEMA public;
CREATE SCHEMA IF NOT EXISTS app_jobs;
CREATE TABLE app_jobs.job_queues (
queue_name varchar NOT NULL PRIMARY KEY,
job_count int DEFAULT 0 NOT NULL,
locked_at timestamp with time zone,
locked_by varchar
);
ALTER TABLE app_jobs.job_queues ENABLE ROW LEVEL SECURITY;
CREATE TABLE app_jobs.jobs (
id serial PRIMARY KEY,
queue_name varchar DEFAULT (public.gen_random_uuid())::varchar NOT NULL,
task_identifier varchar NOT NULL,
payload json DEFAULT '{}'::json NOT NULL,
priority int DEFAULT 0 NOT NULL,
run_at timestamp with time zone DEFAULT now() NOT NULL,
attempts int DEFAULT 0 NOT NULL,
last_error varchar,
created_at timestamp with time zone NOT NULL DEFAULT NOW(),
updated_at timestamp with time zone NOT NULL DEFAULT NOW()
);
ALTER TABLE app_jobs.job_queues ENABLE ROW LEVEL SECURITY;
CREATE FUNCTION app_jobs.do_notify() RETURNS trigger AS $$
BEGIN
PERFORM pg_notify(TG_ARGV[0], '');
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE FUNCTION app_jobs.update_timestamps() RETURNS trigger AS $$
BEGIN
IF TG_OP = 'INSERT' THEN
NEW.created_at = NOW();
NEW.updated_at = NOW();
ELSIF TG_OP = 'UPDATE' THEN
NEW.created_at = OLD.created_at;
NEW.updated_at = GREATEST(NOW(), OLD.updated_at + INTERVAL '1 millisecond');
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE FUNCTION app_jobs.jobs__decrease_job_queue_count() RETURNS trigger AS $$
BEGIN
UPDATE app_jobs.job_queues
SET job_count = job_queues.job_count - 1
WHERE queue_name = OLD.queue_name
AND job_queues.job_count > 1;
IF NOT FOUND THEN
DELETE FROM app_jobs.job_queues WHERE queue_name = OLD.queue_name;
END IF;
RETURN OLD;
END;
$$ LANGUAGE plpgsql;
CREATE FUNCTION app_jobs.jobs__increase_job_queue_count() RETURNS trigger AS $$
BEGIN
INSERT INTO app_jobs.job_queues(queue_name, job_count)
VALUES(NEW.queue_name, 1)
ON CONFLICT (queue_name) DO UPDATE SET job_count = job_queues.job_count + 1;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER _100_timestamps BEFORE INSERT OR UPDATE ON app_jobs.jobs FOR EACH ROW EXECUTE PROCEDURE app_jobs.update_timestamps();
CREATE TRIGGER _500_increase_job_queue_count AFTER INSERT ON app_jobs.jobs FOR EACH ROW EXECUTE PROCEDURE app_jobs.jobs__increase_job_queue_count();
CREATE TRIGGER _500_decrease_job_queue_count BEFORE DELETE ON app_jobs.jobs FOR EACH ROW EXECUTE PROCEDURE app_jobs.jobs__decrease_job_queue_count();
CREATE TRIGGER _900_notify_worker AFTER INSERT ON app_jobs.jobs FOR EACH STATEMENT EXECUTE PROCEDURE app_jobs.do_notify('jobs:insert');
CREATE FUNCTION app_jobs.add_job(identifier varchar, payload json) RETURNS app_jobs.jobs AS $$
INSERT INTO app_jobs.jobs(task_identifier, payload) VALUES(identifier, payload) RETURNING *;
$$ LANGUAGE sql;
CREATE FUNCTION app_jobs.add_job(identifier varchar, queue_name varchar, payload json) RETURNS app_jobs.jobs AS $$
INSERT INTO app_jobs.jobs(task_identifier, queue_name, payload) VALUES(identifier, queue_name, payload) RETURNING *;
$$ LANGUAGE sql;
CREATE FUNCTION app_jobs.schedule_job(identifier varchar, queue_name varchar, payload json, run_at timestamptz) RETURNS app_jobs.jobs AS $$
INSERT INTO app_jobs.jobs(task_identifier, queue_name, payload, run_at) VALUES(identifier, queue_name, payload, run_at) RETURNING *;
$$ LANGUAGE sql;
CREATE FUNCTION app_jobs.complete_job(worker_id varchar, job_id int) RETURNS app_jobs.jobs AS $$
DECLARE
v_row app_jobs.jobs;
BEGIN
DELETE FROM app_jobs.jobs
WHERE id = job_id
RETURNING * INTO v_row;
UPDATE app_jobs.job_queues
SET locked_by = null, locked_at = null
WHERE queue_name = v_row.queue_name AND locked_by = worker_id;
RETURN v_row;
END;
$$ LANGUAGE plpgsql;
CREATE FUNCTION app_jobs.fail_job(worker_id varchar, job_id int, error_message varchar) RETURNS app_jobs.jobs AS $$
DECLARE
v_row app_jobs.jobs;
BEGIN
UPDATE app_jobs.jobs
SET
last_error = error_message,
run_at = greatest(now(), run_at) + (exp(least(attempts, 10))::text || ' seconds')::interval
WHERE id = job_id
RETURNING * INTO v_row;
UPDATE app_jobs.job_queues
SET locked_by = null, locked_at = null
WHERE queue_name = v_row.queue_name AND locked_by = worker_id;
RETURN v_row;
END;
$$ LANGUAGE plpgsql;
CREATE FUNCTION app_jobs.get_job(worker_id varchar, identifiers varchar[]) RETURNS app_jobs.jobs AS $$
DECLARE
v_job_id int;
v_queue_name varchar;
v_default_job_expiry text = (4 * 60 * 60)::text;
v_default_job_maximum_attempts text = '25';
v_row app_jobs.jobs;
BEGIN
IF worker_id IS NULL OR length(worker_id) < 10 THEN
RAISE EXCEPTION 'Invalid worker ID';
END IF;
SELECT job_queues.queue_name, jobs.id INTO v_queue_name, v_job_id
FROM app_jobs.job_queues
INNER JOIN app_jobs.jobs USING (queue_name)
WHERE (locked_at IS NULL OR locked_at < (now() - (COALESCE(current_setting('jobs.expiry', true), v_default_job_expiry) || ' seconds')::interval))
AND run_at <= now()
AND attempts < COALESCE(current_setting('jobs.maximum_attempts', true), v_default_job_maximum_attempts)::int
AND (identifiers IS NULL OR task_identifier = any(identifiers))
ORDER BY priority ASC, run_at ASC, id ASC
LIMIT 1
FOR UPDATE SKIP LOCKED;
IF v_queue_name IS NULL THEN
RETURN NULL;
END IF;
UPDATE app_jobs.job_queues
SET
locked_by = worker_id,
locked_at = now()
WHERE job_queues.queue_name = v_queue_name;
UPDATE app_jobs.jobs
SET attempts = attempts + 1
WHERE id = v_job_id
RETURNING * INTO v_row;
RETURN v_row;
END;
$$ LANGUAGE plpgsql;
-- END: JOBS