Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatic offset tracking for stream queues #661

Open
wants to merge 66 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 62 commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
5bfa5eb
WIP adds automatic offset tracking for streams
viktorerlingsson Apr 16, 2024
5c2b46b
specs
viktorerlingsson Apr 16, 2024
0e50b86
add specs for edge cases. refactor specs so they are more readable
viktorerlingsson Apr 17, 2024
da8962f
format
viktorerlingsson Apr 17, 2024
c6e649d
refactor, dont init consumer tracking stuff unless needed. resize fil…
viktorerlingsson Apr 17, 2024
f3a3090
add spec that checks that only one entry is saved per consumer tag
viktorerlingsson Apr 17, 2024
69ab9b4
format
viktorerlingsson Apr 17, 2024
a3c7a26
lint
viktorerlingsson Apr 17, 2024
27a7028
add function to remove consumer tags from file
viktorerlingsson Apr 22, 2024
d4ea06f
spec for removing consumer tags
viktorerlingsson Apr 22, 2024
7af298b
save length of ctag in file, dont use space as deliminator
viktorerlingsson Apr 22, 2024
aa11afd
format
viktorerlingsson Apr 22, 2024
10fd5d0
lint
viktorerlingsson Apr 22, 2024
3aa0ddf
remove methods, use instance variables directly. add cleanup function
viktorerlingsson Apr 23, 2024
9877191
add spec for cleanup
viktorerlingsson Apr 23, 2024
1c3f6b5
raise before updating instance variable
viktorerlingsson Apr 23, 2024
1c4124a
refactor
viktorerlingsson Apr 23, 2024
e2d457c
remove comment
viktorerlingsson Apr 23, 2024
a616f9b
refactor
viktorerlingsson Apr 23, 2024
04c9eff
refactor
viktorerlingsson Apr 23, 2024
2f5e77e
format
viktorerlingsson Apr 23, 2024
5a41008
set size to 32768 for offsets file, change path name
viktorerlingsson Apr 24, 2024
9e5b7ba
remove comment
viktorerlingsson Apr 24, 2024
940704b
refactor restore_consumer_offset_positions
viktorerlingsson Apr 24, 2024
2183b06
remove unused var
viktorerlingsson Apr 24, 2024
37aa4fe
save_offset_by_consumer_tag -> update_consumer_offset
viktorerlingsson Apr 24, 2024
088d46d
write_new_ctag_to_file -> store_consumer_offset
viktorerlingsson Apr 24, 2024
397a7f8
refactor update_consumer_offset
viktorerlingsson Apr 24, 2024
1e5616b
add option to get a writeable slice from mfile
viktorerlingsson Apr 24, 2024
3bc3a42
encode directly into mmap. fix bug with pos/size
viktorerlingsson Apr 24, 2024
86fa2ba
update spec
viktorerlingsson Apr 24, 2024
ee1dbbb
more strict with types
viktorerlingsson Apr 24, 2024
3658c4d
format
viktorerlingsson Apr 24, 2024
0586bc8
lint
viktorerlingsson Apr 24, 2024
46670ea
lint
viktorerlingsson Apr 24, 2024
d83d2f3
String#bytesize
viktorerlingsson May 16, 2024
8afb057
refactor store_consumer_offset and restore_consumer_offset_positions
viktorerlingsson May 16, 2024
0f86d68
refactor cleanup_consumer_offsets
viktorerlingsson May 16, 2024
d510845
replace offset file instead of delete
viktorerlingsson May 16, 2024
5d97755
lint
viktorerlingsson May 16, 2024
511ac32
lint
viktorerlingsson May 16, 2024
4eb1a0c
dont track offsets when consumer tag is generated
viktorerlingsson May 23, 2024
e008ec1
remove unused code
viktorerlingsson May 23, 2024
3d186ce
cleanup consumer offsets when dropping overflow
viktorerlingsson May 28, 2024
17fed17
format
viktorerlingsson May 28, 2024
e8e6486
lint
viktorerlingsson May 28, 2024
0c9856a
handle large messages causing first segment to be empty
viktorerlingsson May 30, 2024
5b70c04
cleanup spec
viktorerlingsson May 30, 2024
6b30bf0
add option to use broker tracking when x-stream-offset is set by usin…
viktorerlingsson Jun 4, 2024
ca6112f
add spec
viktorerlingsson Jun 4, 2024
728f2f7
no need to truncate mfile, it's being deleted
viktorerlingsson Jun 17, 2024
91519e1
x-stream-use-automatic-offset -> x-stream-automatic-offset-tracking
viktorerlingsson Jun 17, 2024
fb8228b
implement rename in mfile
viktorerlingsson Jun 17, 2024
c988961
expand consumer offsets file if full
viktorerlingsson Jun 17, 2024
69cd07a
remove unused code
viktorerlingsson Jun 17, 2024
6dbbc1e
use LittleEndian
viktorerlingsson Jun 17, 2024
749f154
remove instance variables
viktorerlingsson Jun 19, 2024
c7819f3
use old_consumer_offsets.path
viktorerlingsson Jun 19, 2024
78eacb7
start reading at pos=4 after IndexError in offset_at
viktorerlingsson Jun 19, 2024
3c2f97a
use queue_data_dir
viktorerlingsson Jun 19, 2024
a74d4a6
update specs to start amqp servers where needed
viktorerlingsson Jun 19, 2024
8c21f06
ameba:disable Metrics/CyclomaticComplexity for find_offset
viktorerlingsson Jun 19, 2024
6412095
include tag size prefix byte in consumer_offset_file_full?
viktorerlingsson Jun 20, 2024
173377f
only append consumer offsets file, compact when full, expand if still…
viktorerlingsson Jun 26, 2024
834bb33
set capacity of consumer offsets file to 1000*current size when compa…
viktorerlingsson Jun 27, 2024
221f248
replicate consumer offsets file. remove unused code
viktorerlingsson Jun 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 267 additions & 0 deletions spec/stream_queue_spec.cr
Original file line number Diff line number Diff line change
@@ -1,6 +1,38 @@
require "./spec_helper"
require "./../src/lavinmq/queue"

module StreamQueueSpecHelpers
def self.publish(s, queue_name, nr_of_messages)
args = {"x-queue-type": "stream"}
with_channel(s) do |ch|
q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args))
nr_of_messages.times { |i| q.publish "m#{i}" }
end
end

def self.consume_one(s, queue_name, c_tag, c_args = AMQP::Client::Arguments.new)
args = {"x-queue-type": "stream"}
with_channel(s) do |ch|
ch.prefetch 1
q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args))
msgs = Channel(AMQP::Client::DeliverMessage).new
q.subscribe(no_ack: false, tag: c_tag, args: c_args) do |msg|
msgs.send msg
msg.ack
end
msgs.receive
end
end

def self.offset_from_headers(headers)
if headers
headers["x-stream-offset"].as(Int64)
else
fail("No headers found")
end
end
end

describe LavinMQ::StreamQueue do
stream_queue_args = LavinMQ::AMQP::Table.new({"x-queue-type": "stream"})

Expand Down Expand Up @@ -203,4 +235,239 @@ describe LavinMQ::StreamQueue do
end
end
end

describe "Automatic consumer offset tracking" do
it "resumes from last offset on reconnect" do
queue_name = Random::Secure.hex
consumer_tag = Random::Secure.hex
offset = 3

with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, offset + 1)
offset.times { StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag) }
sleep 0.1

# consume again, should start from last offset automatically
msg = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag)
StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq offset + 1
end
end

it "reads offsets from file on init" do
queue_name = Random::Secure.hex
offsets = [84_i64, 24_i64, 1_i64, 100_i64, 42_i64]
tag_prefix = "ctag-"

with_amqp_server do |s|
vhost = s.vhosts["/"]
StreamQueueSpecHelpers.publish(s, queue_name, 1)

data_dir = File.join(vhost.data_dir, Digest::SHA1.hexdigest queue_name)
msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
offsets.each_with_index do |offset, i|
msg_store.update_consumer_offset(tag_prefix + i.to_s, offset)
end
msg_store.close
sleep 0.1

msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
offsets.each_with_index do |offset, i|
msg_store.last_offset_by_consumer_tag(tag_prefix + i.to_s).should eq offset
end
msg_store.close
end
end

it "only saves one entry per consumer tag" do
queue_name = Random::Secure.hex
offsets = [84_i64, 24_i64, 1_i64, 100_i64, 42_i64]
consumer_tag = "ctag-1"
with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, 1)

data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name)
msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
offsets.each do |offset|
msg_store.update_consumer_offset(consumer_tag, offset)
end
msg_store.close
sleep 0.1

msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store.last_offset_by_consumer_tag(consumer_tag).should eq offsets.last
msg_store.@consumer_offsets.size.should eq 15

msg_store.close
end
end

it "does not track offset if x-stream-offset is set" do
queue_name = Random::Secure.hex
consumer_tag = Random::Secure.hex
c_args = AMQP::Client::Arguments.new({"x-stream-offset": 0})

with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, 2)
msg = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag, c_args)
StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq 1
sleep 0.1

# should consume the same message again since tracking was not saved from last consume
msg_2 = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag)
StreamQueueSpecHelpers.offset_from_headers(msg_2.properties.headers).should eq 1
end
end

it "should not use saved offset if x-stream-offset is set" do
viktorerlingsson marked this conversation as resolved.
Show resolved Hide resolved
queue_name = Random::Secure.hex
consumer_tag = Random::Secure.hex
c_args = AMQP::Client::Arguments.new({"x-stream-offset": 0})

with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, 2)

# get message without x-stream-offset, tracks offset
msg = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag)
StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq 1
sleep 0.1

# consume with x-stream-offset set, should consume the same message again
msg_2 = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag, c_args)
StreamQueueSpecHelpers.offset_from_headers(msg_2.properties.headers).should eq 1
end
end

it "should use saved offset if x-stream-offset & x-stream-automatic-offset-tracking is set" do
queue_name = Random::Secure.hex
consumer_tag = Random::Secure.hex
c_args = AMQP::Client::Arguments.new({"x-stream-offset": 0, "x-stream-automatic-offset-tracking": true})

with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, 2)

# get message without x-stream-offset, tracks offset
msg = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag, c_args)
StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq 1
sleep 0.1

# consume with x-stream-offset set, should consume the same message again
msg_2 = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag, c_args)
StreamQueueSpecHelpers.offset_from_headers(msg_2.properties.headers).should eq 2
end
end

it "cleanup_consumer_offsets removes outdated offset" do
queue_name = Random::Secure.hex
offsets = [84_i64, -10_i64]
tag_prefix = "ctag-"

with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, 1)

data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name)
msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
offsets.each_with_index do |offset, i|
msg_store.update_consumer_offset(tag_prefix + i.to_s, offset)
end
sleep 0.1
msg_store.cleanup_consumer_offsets
msg_store.close
sleep 0.1

msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store.last_offset_by_consumer_tag(tag_prefix + 1.to_s).should eq nil
msg_store.last_offset_by_consumer_tag(tag_prefix + 0.to_s).should eq offsets[0]
msg_store.close
end
end

it "runs cleanup when removing segment" do
consumer_tag = "ctag-1"
queue_name = Random::Secure.hex
args = {"x-queue-type": "stream", "x-max-length": 1}
msg_body = Bytes.new(LavinMQ::Config.instance.segment_size)

with_amqp_server do |s|
data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name)

with_channel(s) do |ch|
q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args))
q.publish_confirm msg_body
end

with_channel(s) do |ch|
ch.prefetch 1
q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args))
msgs = Channel(AMQP::Client::DeliverMessage).new
q.subscribe(no_ack: false, tag: consumer_tag) do |msg|
msgs.send msg
msg.ack
end
msgs.receive
end

msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store.last_offset_by_consumer_tag(consumer_tag).should eq 2

with_channel(s) do |ch|
q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args))
2.times { q.publish_confirm msg_body }
end

msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store.last_offset_by_consumer_tag(consumer_tag).should eq nil
end
end

it "does not track offset if c-tag is auto-generated" do
queue_name = Random::Secure.hex

with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, 1)
args = {"x-queue-type": "stream"}
c_tag = ""
with_channel(s) do |ch|
ch.prefetch 1
q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args))
msgs = Channel(AMQP::Client::DeliverMessage).new
c_tag = q.subscribe(no_ack: false) do |msg|
msgs.send msg
msg.ack
end
msgs.receive
end

sleep 0.1
data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name)
msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store.last_offset_by_consumer_tag(c_tag).should eq nil
end
end

it "expands consumer offset file when needed" do
queue_name = Random::Secure.hex
consumer_tag_prefix = "ctag-"
with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, 1)

data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name)
msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
2000.times do |i|
next if i == 0
msg_store.update_consumer_offset("#{consumer_tag_prefix}#{i}", i)
end
msg_store.close

msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store.@consumer_offsets.size.should eq 34_875

2000.times do |i|
next if i == 0
msg_store.last_offset_by_consumer_tag("#{consumer_tag_prefix}#{i}").should eq i
end

msg_store.close
end
end
end
end
14 changes: 12 additions & 2 deletions src/lavinmq/client/channel/stream_consumer.cr
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ module LavinMQ
property segment : UInt32
property pos : UInt32
getter requeued = Deque(SegmentPosition).new
@track_offset = false

def initialize(@channel : Client::Channel, @queue : StreamQueue, frame : AMQP::Frame::Basic::Consume)
@tag = frame.consumer_tag
validate_preconditions(frame)
offset = frame.arguments["x-stream-offset"]?
@offset, @segment, @pos = stream_queue.find_offset(offset)
@offset, @segment, @pos = stream_queue.find_offset(offset, @tag, @track_offset)
super
end

Expand All @@ -34,7 +36,10 @@ module LavinMQ
raise Error::PreconditionFailed.new("x-priority not supported on stream queues")
end
case frame.arguments["x-stream-offset"]?
when Nil, Int, Time, "first", "next", "last"
when Nil
viktorerlingsson marked this conversation as resolved.
Show resolved Hide resolved
@track_offset = true unless @tag.starts_with?("amq.ctag-")
when Int, Time, "first", "next", "last"
@track_offset = true if frame.arguments["x-stream-automatic-offset-tracking"]?
else raise Error::PreconditionFailed.new("x-stream-offset must be an integer, a timestamp, 'first', 'next' or 'last'")
end
end
Expand Down Expand Up @@ -82,6 +87,11 @@ module LavinMQ
@queue.as(StreamQueue)
end

def ack(sp)
stream_queue.update_consumer_offset(@tag, @offset) if @track_offset
super
end

def reject(sp, requeue : Bool)
super
if requeue
Expand Down
9 changes: 7 additions & 2 deletions src/lavinmq/mfile.cr
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,9 @@ class MFile < IO
Bytes.new(buffer, @size, read_only: true)
end

def to_slice(pos, size)
def to_slice(pos, size, read_only = true)
raise IO::EOFError.new if pos + size > @size
Bytes.new(buffer + pos, size, read_only: true)
Bytes.new(buffer + pos, size, read_only: read_only)
end

def advise(advice : Advice, addr = buffer, offset = 0, length = @capacity) : Nil
Expand Down Expand Up @@ -261,4 +261,9 @@ class MFile < IO
raise IO::Error.from_errno("pread") if cnt == -1
cnt
end

def rename(new_path : String) : Nil
File.rename @path, new_path
@path = new_path
end
end
4 changes: 4 additions & 0 deletions src/lavinmq/queue/stream_queue.cr
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ module LavinMQ
end
end

def update_consumer_offset(consumer_tag : String, offset : Int64) : Nil
stream_queue_msg_store.update_consumer_offset(consumer_tag, offset)
end

# yield the next message in the ready queue
# returns true if a message was deliviered, false otherwise
# if we encouncer an unrecoverable ReadError, close queue
Expand Down
Loading
Loading