Skip to content
Switch branches/tags
Go to file
Cannot retrieve contributors at this time
-- This file is a part of Dramatiq.
-- Copyright (C) 2017,2018,2019,2020 CLEARTYPE SRL <>
-- Dramatiq is free software; you can redistribute it and/or modify it
-- under the terms of the GNU Lesser General Public License as published by
-- the Free Software Foundation, either version 3 of the License, or (at
-- your option) any later version.
-- Dramatiq is distributed in the hope that it will be useful, but WITHOUT
-- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
-- FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
-- License for more details.
-- You should have received a copy of the GNU Lesser General Public License
-- along with this program. If not, see <>.
-- luacheck: globals ARGV KEYS redis unpack
-- dispatch(
-- args=[command, timestamp, queue_name, worker_id, heartbeat_timeout, dead_message_ttl, do_maintenance, max_unpack_size, ...],
-- keys=[namespace]
-- )
-- $namespace:__acks__.$worker_id.$queue_name
-- A set of message ids representing fetched-but-not-yet-acked
-- messages belonging to that (worker, queue) pair.
-- $namespace:__heartbeats__
-- A sorted set containing unique worker ids sorted by when their
-- last heartbeat was received.
-- $namespace:$queue_name
-- A list of message ids.
-- $namespace:$queue_name.msgs
-- A hash of message ids -> message data.
-- $namespace:$queue_name.XQ
-- A sorted set containing all the dead-lettered message ids
-- belonging to a queue, sorted by when they were dead lettered.
-- $namespace:$queue_name.XQ.msgs
-- A hash of message ids -> message data.
local namespace = KEYS[1]
local command = ARGV[1]
local timestamp = ARGV[2]
local queue_name = ARGV[3]
local worker_id = ARGV[4]
local heartbeat_timeout = ARGV[5]
local dead_message_ttl = ARGV[6]
local do_maintenance = ARGV[7]
local max_unpack_size = ARGV[8]
local acks = namespace .. ":__acks__." .. worker_id
local heartbeats = namespace .. ":__heartbeats__""zadd", heartbeats, timestamp, worker_id)
-- This is used to ensure that we never have a DLQ like default.DQ.XQ
-- since that wouldn't make much sense.
local queue_canonical_name = queue_name
if string.sub(queue_name, -3) == ".DQ" then
queue_canonical_name = string.sub(queue_name, 1, -4)
local queue_acks = acks .. "." .. queue_name
local queue_full_name = namespace .. ":" .. queue_name
local queue_messages = queue_full_name .. ".msgs"
local xqueue_full_name = namespace .. ":" .. queue_canonical_name .. ".XQ"
local xqueue_messages = xqueue_full_name .. ".msgs"
-- Command-specific arguments.
local ARGS = {}
for i=9,#ARGV do
ARGS[i - 8] = ARGV[i]
-- Iterates over a table in chunks, yielding a new chunk on each
-- iteration. We use this to do work in batches so we don't overflow
-- lua's stack.
-- Example:
-- for chunk in iter_chunks(some_table) do
-- do_something(unpack(chunk))
-- end
local function iter_chunks(tbl)
local len = #tbl
local i = 1
return function()
if i <= len then
local chunk = {}
local last_idx = math.min(i + max_unpack_size - 1, len)
local j
for j = i, last_idx do
table.insert(chunk, tbl[j])
i = last_idx + 1
return chunk
-- Every call to dispatch has some % chance to trigger maintenance on
-- a queue. Maintenance moves any unacked messages belonging to dead
-- workers back to their queues and deletes any expired messages from
-- DLQs.
if do_maintenance == "1" then
local dead_workers ="zrangebyscore", heartbeats, 0, timestamp - heartbeat_timeout)
for i=1,#dead_workers do
local dead_worker = dead_workers[i]
local dead_worker_acks = namespace .. ":__acks__." .. dead_worker
local dead_worker_queue_acks = dead_worker_acks .. "." .. queue_name
local message_ids ="smembers", dead_worker_queue_acks)
if next(message_ids) then
for message_ids_batch in iter_chunks(message_ids) do"rpush", queue_full_name, unpack(message_ids_batch))
end"del", dead_worker_queue_acks)
-- If there are no more ack groups for this worker, then
-- remove it from the heartbeats set.
local ack_queues ="keys", dead_worker_acks .. "*")
if not next(ack_queues) then"zrem", heartbeats, dead_worker)
local dead_message_ids ="zrangebyscore", xqueue_full_name, 0, timestamp - dead_message_ttl)
if next(dead_message_ids) then
for dead_message_ids_batch in iter_chunks(dead_message_ids) do"zrem", xqueue_full_name, unpack(dead_message_ids_batch))"hdel", xqueue_messages, unpack(dead_message_ids_batch))
-- The following code is required for backwards-compatibility with
-- the old way acks used to be implemented. It hoists any
-- existing acks zsets into the per-worker sets.
local compat_queue_acks = queue_full_name .. ".acks"
local compat_message_ids ="zrangebyscore", compat_queue_acks, 0, timestamp - 86400000 * 7.5)
if next(compat_message_ids) then
for compat_message_ids_batch in iter_chunks(compat_message_ids) do"sadd", queue_acks, unpack(compat_message_ids_batch))"zrem", compat_queue_acks, unpack(compat_message_ids_batch))
-- Enqueues a new message on $queue_full_name.
if command == "enqueue" then
local message_id = ARGS[1]
local message_data = ARGS[2]"hset", queue_messages, message_id, message_data)"rpush", queue_full_name, message_id)
-- Returns up to $prefetch number of messages from $queue_full_name.
elseif command == "fetch" then
-- Ensure prefetch isn't so large that we get errors fetching
local prefetch = math.min(ARGS[1], max_unpack_size)
local message_ids = {}
for i=1,prefetch do
local message_id ="lpop", queue_full_name)
if not message_id then
message_ids[i] = message_id"sadd", queue_acks, message_id)
if next(message_ids) ~= nil then
return"hmget", queue_messages, unpack(message_ids))
return {}
-- Moves fetched-but-not-processed messages back to their queues on
-- worker shutdown.
elseif command == "requeue" then
for i=1,#ARGS do
local message_id = ARGS[i]
if"srem", queue_acks, message_id) > 0 then
if"hexists", queue_messages, message_id) then"rpush", queue_full_name, message_id)
-- Acknowledges that a message has been processed.
elseif command == "ack" then
local message_id = ARGS[1]
if"srem", queue_acks, message_id) > 0 then"hdel", queue_messages, message_id)
-- Moves a message from a queue to a dead-letter queue.
elseif command == "nack" then
local message_id = ARGS[1]
-- unack the message
if"srem", queue_acks, message_id) > 0 then
-- then pop it off the messages hash and move it onto the DLQ
local message ="hget", queue_messages, message_id)
if message then"zadd", xqueue_full_name, timestamp, message_id)"hset", xqueue_messages, message_id, message)"hdel", queue_messages, message_id)
-- Removes all messages from a queue.
elseif command == "purge" then"del", queue_full_name, queue_acks, queue_messages, xqueue_full_name, xqueue_messages)
-- Used in tests to determine the size of the queue.
elseif command == "qsize" then
return"hlen", queue_messages) +"scard", queue_acks)