/
ddl.sql
67 lines (58 loc) · 1.73 KB
/
ddl.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
-- Shamelessly adapted from queue_classic
DROP FUNCTION IF EXISTS pop_lock(name varchar);
DROP FUNCTION IF EXISTS pop_lock(name varchar, boundary integer);
CREATE OR REPLACE FUNCTION pop_lock(name varchar, boundary integer)
RETURNS SETOF trunk_queue AS $$
DECLARE
unlocked bigint;
relative_top integer;
job_count integer;
BEGIN
-- The purpose is to release contention for the first spot in the table.
-- The select count(*) is going to slow down dequeue performance but allow
-- for more workers. Would love to see some optimization here...
EXECUTE 'SELECT count(*) FROM '
|| '(SELECT * FROM trunk_queue WHERE name = '
|| quote_literal(name)
|| ' AND locked_at IS NULL'
|| ' LIMIT '
|| quote_literal(boundary)
|| ') limited'
INTO job_count;
SELECT TRUNC(random() * (boundary - 1))
INTO relative_top;
IF job_count < boundary THEN
relative_top = 0;
END IF;
LOOP
BEGIN
EXECUTE 'SELECT id FROM trunk_queue '
|| ' WHERE locked_at IS NULL'
|| ' AND name = '
|| quote_literal(name)
|| ' ORDER BY id ASC'
|| ' LIMIT 1'
|| ' OFFSET ' || quote_literal(relative_top)
|| ' FOR UPDATE NOWAIT'
INTO unlocked;
EXIT;
EXCEPTION
WHEN lock_not_available THEN
-- do nothing. loop again and hope we get a lock
END;
END LOOP;
RETURN QUERY EXECUTE 'UPDATE trunk_queue '
|| ' SET locked_at = (CURRENT_TIMESTAMP)'
|| ' WHERE id = $1'
|| ' AND locked_at is NULL'
|| ' RETURNING *'
USING unlocked;
RETURN;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION pop_lock(name varchar)
RETURNS SETOF trunk_queue AS $$
BEGIN
RETURN QUERY EXECUTE 'SELECT * FROM pop_lock($1,10)' USING name;
END;
$$ LANGUAGE plpgsql;