Skip to content

Commit

Permalink
dont track offsets when consumer tag is generated
Browse files Browse the repository at this point in the history
  • Loading branch information
viktorerlingsson committed May 23, 2024
1 parent 3408dd5 commit 194926e
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 2 deletions.
23 changes: 23 additions & 0 deletions spec/stream_queue_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -348,5 +348,28 @@ describe LavinMQ::StreamQueue do
msg_store.last_offset_by_consumer_tag(tag_prefix + 0.to_s).should eq offsets[0]
msg_store.close
end

it "does not track offset if c-tag is auto-generated" do
queue_name = Random::Secure.hex
StreamQueueSpecHelpers.publish(queue_name, 1)
args = {"x-queue-type": "stream"}
c_tag = ""
with_channel do |ch|
ch.prefetch 1
q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args))
msgs = Channel(AMQP::Client::DeliverMessage).new
c_tag = q.subscribe(no_ack: false) do |msg|
msgs.send msg
msg.ack
end
msg = msgs.receive
end

sleep 0.1
vhost = Server.vhosts["/"]
data_dir = File.join(vhost.data_dir, Digest::SHA1.hexdigest queue_name)
msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store.last_offset_by_consumer_tag(c_tag).should eq nil
end
end
end
4 changes: 2 additions & 2 deletions src/lavinmq/client/channel/stream_consumer.cr
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ module LavinMQ
@track_offset = false

def initialize(@channel : Client::Channel, @queue : StreamQueue, frame : AMQP::Frame::Basic::Consume)
@tag = frame.consumer_tag
validate_preconditions(frame)
offset = frame.arguments["x-stream-offset"]?
@tag = frame.consumer_tag
@offset, @segment, @pos = stream_queue.find_offset(offset, @tag)
super
end
Expand All @@ -37,7 +37,7 @@ module LavinMQ
end
case frame.arguments["x-stream-offset"]?
when Nil
@track_offset = true
@track_offset = true unless @tag.starts_with?("amq.ctag-")
when Int, Time, "first", "next", "last"
@track_offset = false
else raise Error::PreconditionFailed.new("x-stream-offset must be an integer, a timestamp, 'first', 'next' or 'last'")
Expand Down

0 comments on commit 194926e

Please sign in to comment.