From 194926e0c930e0a6c3548e3267e47657e9a3c249 Mon Sep 17 00:00:00 2001 From: Viktor Erlingsson Date: Thu, 23 May 2024 16:38:14 +0200 Subject: [PATCH] dont track offsets when consumer tag is generated --- spec/stream_queue_spec.cr | 23 +++++++++++++++++++ src/lavinmq/client/channel/stream_consumer.cr | 4 ++-- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/spec/stream_queue_spec.cr b/spec/stream_queue_spec.cr index 02f9efed28..6938a08c3e 100644 --- a/spec/stream_queue_spec.cr +++ b/spec/stream_queue_spec.cr @@ -348,5 +348,28 @@ describe LavinMQ::StreamQueue do msg_store.last_offset_by_consumer_tag(tag_prefix + 0.to_s).should eq offsets[0] msg_store.close end + + it "does not track offset if c-tag is auto-generated" do + queue_name = Random::Secure.hex + StreamQueueSpecHelpers.publish(queue_name, 1) + args = {"x-queue-type": "stream"} + c_tag = "" + with_channel do |ch| + ch.prefetch 1 + q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args)) + msgs = Channel(AMQP::Client::DeliverMessage).new + c_tag = q.subscribe(no_ack: false) do |msg| + msgs.send msg + msg.ack + end + msg = msgs.receive + end + + sleep 0.1 + vhost = Server.vhosts["/"] + data_dir = File.join(vhost.data_dir, Digest::SHA1.hexdigest queue_name) + msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) + msg_store.last_offset_by_consumer_tag(c_tag).should eq nil + end end end diff --git a/src/lavinmq/client/channel/stream_consumer.cr b/src/lavinmq/client/channel/stream_consumer.cr index da235f61a9..0a44a2a9b3 100644 --- a/src/lavinmq/client/channel/stream_consumer.cr +++ b/src/lavinmq/client/channel/stream_consumer.cr @@ -12,9 +12,9 @@ module LavinMQ @track_offset = false def initialize(@channel : Client::Channel, @queue : StreamQueue, frame : AMQP::Frame::Basic::Consume) + @tag = frame.consumer_tag validate_preconditions(frame) offset = frame.arguments["x-stream-offset"]? - @tag = frame.consumer_tag @offset, @segment, @pos = stream_queue.find_offset(offset, @tag) super end @@ -37,7 +37,7 @@ module LavinMQ end case frame.arguments["x-stream-offset"]? when Nil - @track_offset = true + @track_offset = true unless @tag.starts_with?("amq.ctag-") when Int, Time, "first", "next", "last" @track_offset = false else raise Error::PreconditionFailed.new("x-stream-offset must be an integer, a timestamp, 'first', 'next' or 'last'")