Skip to content

Commit

Permalink
add spec for cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
viktorerlingsson committed Apr 23, 2024
1 parent 13bd31e commit 2ecc8d3
Showing 1 changed file with 25 additions and 1 deletion.
26 changes: 25 additions & 1 deletion spec/stream_queue_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ describe LavinMQ::StreamQueue do

msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store.last_offset_by_consumer_tag(consumer_tag).should eq offsets.last
msg_store.consumer_offsets.size.should eq 15
msg_store.@consumer_offsets.size.should eq 15

msg_store.close
end
Expand Down Expand Up @@ -314,6 +314,30 @@ describe LavinMQ::StreamQueue do

msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store.last_offset_by_consumer_tag(tag_prefix + 1.to_s).should eq nil
msg_store.last_offset_by_consumer_tag(tag_prefix + 0.to_s).should eq offsets[0]
msg_store.close
end

it "cleanup_consumer_offsets removes outdated offset" do
queue_name = Random::Secure.hex
vhost = Server.vhosts["/"]
offsets = [84_i64, -10_i64]
tag_prefix = "ctag-"
StreamQueueSpecHelpers.publish(queue_name, 1)

data_dir = File.join(vhost.data_dir, Digest::SHA1.hexdigest queue_name)
msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
offsets.each_with_index do |offset, i|
msg_store.save_offset_by_consumer_tag(tag_prefix + i.to_s, offset)
end
sleep 0.1
msg_store.cleanup_consumer_offsets
msg_store.close
sleep 0.1

msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store.last_offset_by_consumer_tag(tag_prefix + 1.to_s).should eq nil
msg_store.last_offset_by_consumer_tag(tag_prefix + 0.to_s).should eq offsets[0]
msg_store.close
end
end
Expand Down

0 comments on commit 2ecc8d3

Please sign in to comment.