Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatic offset tracking for stream queues #661

Draft
wants to merge 68 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
533ab86
WIP adds automatic offset tracking for streams
viktorerlingsson Apr 16, 2024
8636ec6
specs
viktorerlingsson Apr 16, 2024
9e6c236
add specs for edge cases. refactor specs so they are more readable
viktorerlingsson Apr 17, 2024
cc3718f
format
viktorerlingsson Apr 17, 2024
823562e
refactor, dont init consumer tracking stuff unless needed. resize fil…
viktorerlingsson Apr 17, 2024
bc291dd
add spec that checks that only one entry is saved per consumer tag
viktorerlingsson Apr 17, 2024
f0ff66e
format
viktorerlingsson Apr 17, 2024
632934a
lint
viktorerlingsson Apr 17, 2024
92280a2
add function to remove consumer tags from file
viktorerlingsson Apr 22, 2024
1d5e97d
spec for removing consumer tags
viktorerlingsson Apr 22, 2024
40cb965
save length of ctag in file, dont use space as deliminator
viktorerlingsson Apr 22, 2024
3bf4258
format
viktorerlingsson Apr 22, 2024
9455db1
lint
viktorerlingsson Apr 22, 2024
7127e6e
remove methods, use instance variables directly. add cleanup function
viktorerlingsson Apr 23, 2024
e3955ee
add spec for cleanup
viktorerlingsson Apr 23, 2024
50551b4
raise before updating instance variable
viktorerlingsson Apr 23, 2024
94e6db9
refactor
viktorerlingsson Apr 23, 2024
cab1200
remove comment
viktorerlingsson Apr 23, 2024
1606bfa
refactor
viktorerlingsson Apr 23, 2024
9fe79b2
refactor
viktorerlingsson Apr 23, 2024
2f12ad0
format
viktorerlingsson Apr 23, 2024
f047c6e
set size to 32768 for offsets file, change path name
viktorerlingsson Apr 24, 2024
7213715
remove comment
viktorerlingsson Apr 24, 2024
5f854d8
refactor restore_consumer_offset_positions
viktorerlingsson Apr 24, 2024
8c6bcf8
remove unused var
viktorerlingsson Apr 24, 2024
34c7a03
save_offset_by_consumer_tag -> update_consumer_offset
viktorerlingsson Apr 24, 2024
39f45fe
write_new_ctag_to_file -> store_consumer_offset
viktorerlingsson Apr 24, 2024
a1c73d7
refactor update_consumer_offset
viktorerlingsson Apr 24, 2024
0657ada
add option to get a writeable slice from mfile
viktorerlingsson Apr 24, 2024
85e1955
encode directly into mmap. fix bug with pos/size
viktorerlingsson Apr 24, 2024
ad69700
update spec
viktorerlingsson Apr 24, 2024
51d199a
more strict with types
viktorerlingsson Apr 24, 2024
b6c079d
format
viktorerlingsson Apr 24, 2024
00b05f0
lint
viktorerlingsson Apr 24, 2024
791c6a7
lint
viktorerlingsson Apr 24, 2024
ab1e624
String#bytesize
viktorerlingsson May 16, 2024
dbe06f1
refactor store_consumer_offset and restore_consumer_offset_positions
viktorerlingsson May 16, 2024
b608a8f
refactor cleanup_consumer_offsets
viktorerlingsson May 16, 2024
58c48d4
replace offset file instead of delete
viktorerlingsson May 16, 2024
3413949
lint
viktorerlingsson May 16, 2024
f6340f2
lint
viktorerlingsson May 16, 2024
48849af
dont track offsets when consumer tag is generated
viktorerlingsson May 23, 2024
5b071c9
remove unused code
viktorerlingsson May 23, 2024
f9bc466
cleanup consumer offsets when dropping overflow
viktorerlingsson May 28, 2024
2b31cbd
format
viktorerlingsson May 28, 2024
1619e35
lint
viktorerlingsson May 28, 2024
e4361c0
handle large messages causing first segment to be empty
viktorerlingsson May 30, 2024
e2c9c44
cleanup spec
viktorerlingsson May 30, 2024
87dd311
add option to use broker tracking when x-stream-offset is set by usin…
viktorerlingsson Jun 4, 2024
28bdf4c
add spec
viktorerlingsson Jun 4, 2024
1e0a92b
no need to truncate mfile, it's being deleted
viktorerlingsson Jun 17, 2024
b170d16
x-stream-use-automatic-offset -> x-stream-automatic-offset-tracking
viktorerlingsson Jun 17, 2024
6dcd056
implement rename in mfile
viktorerlingsson Jun 17, 2024
23036e1
expand consumer offsets file if full
viktorerlingsson Jun 17, 2024
526b81e
remove unused code
viktorerlingsson Jun 17, 2024
11cabda
use LittleEndian
viktorerlingsson Jun 17, 2024
7d48d7e
remove instance variables
viktorerlingsson Jun 19, 2024
265abcd
use old_consumer_offsets.path
viktorerlingsson Jun 19, 2024
e4b2951
start reading at pos=4 after IndexError in offset_at
viktorerlingsson Jun 19, 2024
37a446f
use queue_data_dir
viktorerlingsson Jun 19, 2024
9e3280f
update specs to start amqp servers where needed
viktorerlingsson Jun 19, 2024
a0bf581
ameba:disable Metrics/CyclomaticComplexity for find_offset
viktorerlingsson Jun 19, 2024
601217e
include tag size prefix byte in consumer_offset_file_full?
viktorerlingsson Jun 20, 2024
a526419
only append consumer offsets file, compact when full, expand if still…
viktorerlingsson Jun 26, 2024
e1a0bf8
set capacity of consumer offsets file to 1000*current size when compa…
viktorerlingsson Jun 27, 2024
cf1cca7
replicate consumer offsets file. remove unused code
viktorerlingsson Jun 27, 2024
f7aad09
fixes to match changes from main
viktorerlingsson Nov 12, 2024
cce2560
lint
viktorerlingsson Nov 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
302 changes: 302 additions & 0 deletions spec/stream_queue_spec.cr
Original file line number Diff line number Diff line change
@@ -1,6 +1,38 @@
require "./spec_helper"
require "./../src/lavinmq/amqp/queue"

module StreamQueueSpecHelpers
def self.publish(s, queue_name, nr_of_messages)
args = {"x-queue-type": "stream"}
with_channel(s) do |ch|
q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args))
nr_of_messages.times { |i| q.publish "m#{i}" }
end
end

def self.consume_one(s, queue_name, c_tag, c_args = AMQP::Client::Arguments.new)
args = {"x-queue-type": "stream"}
with_channel(s) do |ch|
ch.prefetch 1
q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args))
msgs = Channel(AMQP::Client::DeliverMessage).new
q.subscribe(no_ack: false, tag: c_tag, args: c_args) do |msg|
msgs.send msg
msg.ack
end
msgs.receive
end
end

def self.offset_from_headers(headers)
if headers
headers["x-stream-offset"].as(Int64)
else
fail("No headers found")
end
end
end

describe LavinMQ::AMQP::StreamQueue do
stream_queue_args = LavinMQ::AMQP::Table.new({"x-queue-type": "stream"})

Expand Down Expand Up @@ -203,4 +235,274 @@ describe LavinMQ::AMQP::StreamQueue do
end
end
end

describe "Automatic consumer offset tracking" do
it "resumes from last offset on reconnect" do
queue_name = Random::Secure.hex
consumer_tag = Random::Secure.hex
offset = 3

with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, offset + 1)
offset.times { StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag) }
sleep 0.1

# consume again, should start from last offset automatically
msg = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag)
StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq offset + 1
end
end

it "reads offsets from file on init" do
queue_name = Random::Secure.hex
offsets = [84_i64, 24_i64, 1_i64, 100_i64, 42_i64]
tag_prefix = "ctag-"

with_amqp_server do |s|
vhost = s.vhosts["/"]
StreamQueueSpecHelpers.publish(s, queue_name, 1)

data_dir = File.join(vhost.data_dir, Digest::SHA1.hexdigest queue_name)
msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
offsets.each_with_index do |offset, i|
msg_store.store_consumer_offset(tag_prefix + i.to_s, offset)
end
msg_store.close
sleep 0.1

msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
offsets.each_with_index do |offset, i|
msg_store.last_offset_by_consumer_tag(tag_prefix + i.to_s).should eq offset
end
msg_store.close
end
end

it "appends consumer tag file" do
queue_name = Random::Secure.hex
offsets = [84_i64, 24_i64, 1_i64, 100_i64, 42_i64]
consumer_tag = "ctag-1"
with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, 1)

data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name)
msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
offsets.each do |offset|
msg_store.store_consumer_offset(consumer_tag, offset)
end
bytesize = consumer_tag.bytesize + 1 + 8
msg_store.@consumer_offsets.size.should eq bytesize*5
msg_store.last_offset_by_consumer_tag(consumer_tag).should eq offsets.last
msg_store.close
end
end

it "compacts consumer tag file on restart" do
queue_name = Random::Secure.hex
offsets = [84_i64, 24_i64, 1_i64, 100_i64, 42_i64]
consumer_tag = "ctag-1"
with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, 1)

data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name)
msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
offsets.each do |offset|
msg_store.store_consumer_offset(consumer_tag, offset)
end
msg_store.close

msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store.last_offset_by_consumer_tag(consumer_tag).should eq offsets.last
bytesize = consumer_tag.bytesize + 1 + 8
msg_store.@consumer_offsets.size.should eq bytesize
msg_store.close
end
end

it "compacts consumer tag file when full" do
queue_name = Random::Secure.hex
offsets = [84_i64, 24_i64, 1_i64, 100_i64, 42_i64]
consumer_tag = Random::Secure.hex(32)
with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, 1)
data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name)
msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
bytesize = consumer_tag.bytesize + 1 + 8

offsets = (LavinMQ::Config.instance.segment_size / bytesize).to_i32 + 1
offsets.times do |i|
msg_store.store_consumer_offset(consumer_tag, i)
end
msg_store.last_offset_by_consumer_tag(consumer_tag).should eq offsets - 1
msg_store.@consumer_offsets.size.should eq bytesize*2
msg_store.close
end
end

it "does not track offset if x-stream-offset is set" do
queue_name = Random::Secure.hex
consumer_tag = Random::Secure.hex
c_args = AMQP::Client::Arguments.new({"x-stream-offset": 0})

with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, 2)
msg = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag, c_args)
StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq 1
sleep 0.1

# should consume the same message again since tracking was not saved from last consume
msg_2 = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag)
StreamQueueSpecHelpers.offset_from_headers(msg_2.properties.headers).should eq 1
end
end

it "should not use saved offset if x-stream-offset is set" do
viktorerlingsson marked this conversation as resolved.
Show resolved Hide resolved
queue_name = Random::Secure.hex
consumer_tag = Random::Secure.hex
c_args = AMQP::Client::Arguments.new({"x-stream-offset": 0})

with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, 2)

# get message without x-stream-offset, tracks offset
msg = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag)
StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq 1
sleep 0.1

# consume with x-stream-offset set, should consume the same message again
msg_2 = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag, c_args)
StreamQueueSpecHelpers.offset_from_headers(msg_2.properties.headers).should eq 1
end
end

it "should use saved offset if x-stream-offset & x-stream-automatic-offset-tracking is set" do
queue_name = Random::Secure.hex
consumer_tag = Random::Secure.hex
c_args = AMQP::Client::Arguments.new({"x-stream-offset": 0, "x-stream-automatic-offset-tracking": true})

with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, 2)

# get message without x-stream-offset, tracks offset
msg = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag, c_args)
StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq 1
sleep 0.1

# consume with x-stream-offset set, should consume the same message again
msg_2 = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag, c_args)
StreamQueueSpecHelpers.offset_from_headers(msg_2.properties.headers).should eq 2
end
end

it "cleanup_consumer_offsets removes outdated offset" do
queue_name = Random::Secure.hex
offsets = [84_i64, -10_i64]
tag_prefix = "ctag-"

with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, 1)

data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name)
msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
offsets.each_with_index do |offset, i|
msg_store.store_consumer_offset(tag_prefix + i.to_s, offset)
end
sleep 0.1
msg_store.cleanup_consumer_offsets
msg_store.close
sleep 0.1

msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store.last_offset_by_consumer_tag(tag_prefix + 1.to_s).should eq nil
msg_store.last_offset_by_consumer_tag(tag_prefix + 0.to_s).should eq offsets[0]
msg_store.close
end
end

it "runs cleanup when removing segment" do
consumer_tag = "ctag-1"
queue_name = Random::Secure.hex
args = {"x-queue-type": "stream", "x-max-length": 1}
msg_body = Bytes.new(LavinMQ::Config.instance.segment_size)

with_amqp_server do |s|
data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name)

with_channel(s) do |ch|
q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args))
q.publish_confirm msg_body
end

with_channel(s) do |ch|
ch.prefetch 1
q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args))
msgs = Channel(AMQP::Client::DeliverMessage).new
q.subscribe(no_ack: false, tag: consumer_tag) do |msg|
msgs.send msg
msg.ack
end
msgs.receive
end

msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store.last_offset_by_consumer_tag(consumer_tag).should eq 2

with_channel(s) do |ch|
q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args))
2.times { q.publish_confirm msg_body }
end

msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store.last_offset_by_consumer_tag(consumer_tag).should eq nil
end
end

it "does not track offset if c-tag is auto-generated" do
queue_name = Random::Secure.hex

with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, 1)
args = {"x-queue-type": "stream"}
c_tag = ""
with_channel(s) do |ch|
ch.prefetch 1
q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args))
msgs = Channel(AMQP::Client::DeliverMessage).new
c_tag = q.subscribe(no_ack: false) do |msg|
msgs.send msg
msg.ack
end
msgs.receive
end

sleep 0.1
data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name)
msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store.last_offset_by_consumer_tag(c_tag).should eq nil
end
end

it "expands consumer offset file when needed" do
queue_name = Random::Secure.hex
consumer_tag_prefix = Random::Secure.hex(32)
with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, 1)
data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name)
msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
one_offset_bytesize = "#{consumer_tag_prefix}1000".bytesize + 1 + 8
offsets = (LavinMQ::Config.instance.segment_size / one_offset_bytesize).to_i32 + 1
bytesize = 0
offsets.times do |i|
consumer_tag = "#{consumer_tag_prefix}#{i + 1000}"
msg_store.store_consumer_offset(consumer_tag, i + 1000)
bytesize += consumer_tag.bytesize + 1 + 8
end
msg_store.@consumer_offsets.size.should eq bytesize
msg_store.@consumer_offsets.size.should be > LavinMQ::Config.instance.segment_size
offsets.times do |i|
msg_store.last_offset_by_consumer_tag("#{consumer_tag_prefix}#{i + 1000}").should eq i + 1000
end
end
end
end
end
4 changes: 4 additions & 0 deletions src/lavinmq/amqp/queue/stream_queue.cr
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ module LavinMQ::AMQP
end
end

def store_consumer_offset(consumer_tag : String, offset : Int64) : Nil
stream_queue_msg_store.store_consumer_offset(consumer_tag, offset)
end

# yield the next message in the ready queue
# returns true if a message was deliviered, false otherwise
# if we encouncer an unrecoverable ReadError, close queue
Expand Down
Loading
Loading