Skip to content

Commit

Permalink
update specs to start amqp servers where needed
Browse files Browse the repository at this point in the history
  • Loading branch information
viktorerlingsson committed Jun 19, 2024
1 parent 3c2f97a commit a74d4a6
Showing 1 changed file with 150 additions and 132 deletions.
282 changes: 150 additions & 132 deletions spec/stream_queue_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@ require "./spec_helper"
require "./../src/lavinmq/queue"

module StreamQueueSpecHelpers
def self.publish(queue_name, nr_of_messages)
def self.publish(s, queue_name, nr_of_messages)
args = {"x-queue-type": "stream"}
with_channel do |ch|
with_channel(s) do |ch|
q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args))
nr_of_messages.times { |i| q.publish "m#{i}" }
end
end

def self.consume_one(queue_name, c_tag, c_args = AMQP::Client::Arguments.new)
def self.consume_one(s, queue_name, c_tag, c_args = AMQP::Client::Arguments.new)
args = {"x-queue-type": "stream"}
with_channel do |ch|
with_channel(s) do |ch|
ch.prefetch 1
q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args))
msgs = Channel(AMQP::Client::DeliverMessage).new
Expand Down Expand Up @@ -242,214 +242,232 @@ describe LavinMQ::StreamQueue do
consumer_tag = Random::Secure.hex
offset = 3

StreamQueueSpecHelpers.publish(queue_name, offset + 1)

offset.times { StreamQueueSpecHelpers.consume_one(queue_name, consumer_tag) }
sleep 0.1
with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, offset + 1)
offset.times { StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag) }
sleep 0.1

# consume again, should start from last offset automatically
msg = StreamQueueSpecHelpers.consume_one(queue_name, consumer_tag)
StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq offset + 1
# consume again, should start from last offset automatically
msg = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag)
StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq offset + 1
end
end

it "reads offsets from file on init" do
queue_name = Random::Secure.hex
vhost = Server.vhosts["/"]
offsets = [84_i64, 24_i64, 1_i64, 100_i64, 42_i64]
tag_prefix = "ctag-"
StreamQueueSpecHelpers.publish(queue_name, 1)

data_dir = File.join(vhost.data_dir, Digest::SHA1.hexdigest queue_name)
msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
offsets.each_with_index do |offset, i|
msg_store.update_consumer_offset(tag_prefix + i.to_s, offset)
end
msg_store.close
sleep 0.1
with_amqp_server do |s|
vhost = s.vhosts["/"]
StreamQueueSpecHelpers.publish(s, queue_name, 1)

data_dir = File.join(vhost.data_dir, Digest::SHA1.hexdigest queue_name)
msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
offsets.each_with_index do |offset, i|
msg_store.update_consumer_offset(tag_prefix + i.to_s, offset)
end
msg_store.close
sleep 0.1

msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
offsets.each_with_index do |offset, i|
msg_store.last_offset_by_consumer_tag(tag_prefix + i.to_s).should eq offset
msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
offsets.each_with_index do |offset, i|
msg_store.last_offset_by_consumer_tag(tag_prefix + i.to_s).should eq offset
end
msg_store.close
end
msg_store.close
end

it "only saves one entry per consumer tag" do
queue_name = Random::Secure.hex
vhost = Server.vhosts["/"]
offsets = [84_i64, 24_i64, 1_i64, 100_i64, 42_i64]
consumer_tag = "ctag-1"
StreamQueueSpecHelpers.publish(queue_name, 1)
with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, 1)

data_dir = File.join(vhost.data_dir, Digest::SHA1.hexdigest queue_name)
msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
offsets.each do |offset|
msg_store.update_consumer_offset(consumer_tag, offset)
end
msg_store.close
sleep 0.1
data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name)
msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
offsets.each do |offset|
msg_store.update_consumer_offset(consumer_tag, offset)
end
msg_store.close
sleep 0.1

msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store.last_offset_by_consumer_tag(consumer_tag).should eq offsets.last
msg_store.@consumer_offsets.size.should eq 15
msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store.last_offset_by_consumer_tag(consumer_tag).should eq offsets.last
msg_store.@consumer_offsets.size.should eq 15

msg_store.close
msg_store.close
end
end

it "does not track offset if x-stream-offset is set" do
queue_name = Random::Secure.hex
consumer_tag = Random::Secure.hex
c_args = AMQP::Client::Arguments.new({"x-stream-offset": 0})

StreamQueueSpecHelpers.publish(queue_name, 2)
msg = StreamQueueSpecHelpers.consume_one(queue_name, consumer_tag, c_args)
StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq 1
sleep 0.1

# should consume the same message again since tracking was not saved from last consume
msg_2 = StreamQueueSpecHelpers.consume_one(queue_name, consumer_tag)
StreamQueueSpecHelpers.offset_from_headers(msg_2.properties.headers).should eq 1
with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, 2)
msg = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag, c_args)
StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq 1
sleep 0.1

# should consume the same message again since tracking was not saved from last consume
msg_2 = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag)
StreamQueueSpecHelpers.offset_from_headers(msg_2.properties.headers).should eq 1
end
end

it "should not use saved offset if x-stream-offset is set" do
queue_name = Random::Secure.hex
consumer_tag = Random::Secure.hex
c_args = AMQP::Client::Arguments.new({"x-stream-offset": 0})

StreamQueueSpecHelpers.publish(queue_name, 2)
with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, 2)

# get message without x-stream-offset, tracks offset
msg = StreamQueueSpecHelpers.consume_one(queue_name, consumer_tag)
StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq 1
sleep 0.1
# get message without x-stream-offset, tracks offset
msg = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag)
StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq 1
sleep 0.1

# consume with x-stream-offset set, should consume the same message again
msg_2 = StreamQueueSpecHelpers.consume_one(queue_name, consumer_tag, c_args)
StreamQueueSpecHelpers.offset_from_headers(msg_2.properties.headers).should eq 1
# consume with x-stream-offset set, should consume the same message again
msg_2 = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag, c_args)
StreamQueueSpecHelpers.offset_from_headers(msg_2.properties.headers).should eq 1
end
end

it "should use saved offset if x-stream-offset & x-stream-automatic-offset-tracking is set" do
queue_name = Random::Secure.hex
consumer_tag = Random::Secure.hex
c_args = AMQP::Client::Arguments.new({"x-stream-offset": 0, "x-stream-automatic-offset-tracking": true})

StreamQueueSpecHelpers.publish(queue_name, 2)
with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, 2)

# get message without x-stream-offset, tracks offset
msg = StreamQueueSpecHelpers.consume_one(queue_name, consumer_tag, c_args)
StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq 1
sleep 0.1
# get message without x-stream-offset, tracks offset
msg = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag, c_args)
StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq 1
sleep 0.1

# consume with x-stream-offset set, should consume the same message again
msg_2 = StreamQueueSpecHelpers.consume_one(queue_name, consumer_tag, c_args)
StreamQueueSpecHelpers.offset_from_headers(msg_2.properties.headers).should eq 2
# consume with x-stream-offset set, should consume the same message again
msg_2 = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag, c_args)
StreamQueueSpecHelpers.offset_from_headers(msg_2.properties.headers).should eq 2
end
end

it "cleanup_consumer_offsets removes outdated offset" do
queue_name = Random::Secure.hex
vhost = Server.vhosts["/"]
offsets = [84_i64, -10_i64]
tag_prefix = "ctag-"
StreamQueueSpecHelpers.publish(queue_name, 1)

data_dir = File.join(vhost.data_dir, Digest::SHA1.hexdigest queue_name)
msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
offsets.each_with_index do |offset, i|
msg_store.update_consumer_offset(tag_prefix + i.to_s, offset)
with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, 1)

data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name)
msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
offsets.each_with_index do |offset, i|
msg_store.update_consumer_offset(tag_prefix + i.to_s, offset)
end
sleep 0.1
msg_store.cleanup_consumer_offsets
msg_store.close
sleep 0.1

msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store.last_offset_by_consumer_tag(tag_prefix + 1.to_s).should eq nil
msg_store.last_offset_by_consumer_tag(tag_prefix + 0.to_s).should eq offsets[0]
msg_store.close
end
sleep 0.1
msg_store.cleanup_consumer_offsets
msg_store.close
sleep 0.1

msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store.last_offset_by_consumer_tag(tag_prefix + 1.to_s).should eq nil
msg_store.last_offset_by_consumer_tag(tag_prefix + 0.to_s).should eq offsets[0]
msg_store.close
end

it "runs cleanup when removing segment" do
consumer_tag = "ctag-1"
vhost = Server.vhosts["/"]
queue_name = Random::Secure.hex
args = {"x-queue-type": "stream", "x-max-length": 1}
msg_body = Bytes.new(LavinMQ::Config.instance.segment_size)
data_dir = File.join(vhost.data_dir, Digest::SHA1.hexdigest queue_name)

with_channel do |ch|
q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args))
q.publish_confirm msg_body
end
with_amqp_server do |s|
data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name)

with_channel do |ch|
ch.prefetch 1
q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args))
msgs = Channel(AMQP::Client::DeliverMessage).new
q.subscribe(no_ack: false, tag: consumer_tag) do |msg|
msgs.send msg
msg.ack
with_channel(s) do |ch|
q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args))
q.publish_confirm msg_body
end

with_channel(s) do |ch|
ch.prefetch 1
q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args))
msgs = Channel(AMQP::Client::DeliverMessage).new
q.subscribe(no_ack: false, tag: consumer_tag) do |msg|
msgs.send msg
msg.ack
end
msgs.receive
end
msgs.receive
end

msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store.last_offset_by_consumer_tag(consumer_tag).should eq 2
msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store.last_offset_by_consumer_tag(consumer_tag).should eq 2

with_channel do |ch|
q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args))
2.times { q.publish_confirm msg_body }
end
with_channel(s) do |ch|
q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args))
2.times { q.publish_confirm msg_body }
end

msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store.last_offset_by_consumer_tag(consumer_tag).should eq nil
msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store.last_offset_by_consumer_tag(consumer_tag).should eq nil
end
end

it "does not track offset if c-tag is auto-generated" do
queue_name = Random::Secure.hex
StreamQueueSpecHelpers.publish(queue_name, 1)
args = {"x-queue-type": "stream"}
c_tag = ""
with_channel do |ch|
ch.prefetch 1
q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args))
msgs = Channel(AMQP::Client::DeliverMessage).new
c_tag = q.subscribe(no_ack: false) do |msg|
msgs.send msg
msg.ack

with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, 1)
args = {"x-queue-type": "stream"}
c_tag = ""
with_channel(s) do |ch|
ch.prefetch 1
q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args))
msgs = Channel(AMQP::Client::DeliverMessage).new
c_tag = q.subscribe(no_ack: false) do |msg|
msgs.send msg
msg.ack
end
msgs.receive
end
msgs.receive
end

sleep 0.1
vhost = Server.vhosts["/"]
data_dir = File.join(vhost.data_dir, Digest::SHA1.hexdigest queue_name)
msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store.last_offset_by_consumer_tag(c_tag).should eq nil
sleep 0.1
data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name)
msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store.last_offset_by_consumer_tag(c_tag).should eq nil
end
end

it "expands consumer offset file when needed" do
queue_name = Random::Secure.hex
vhost = Server.vhosts["/"]
consumer_tag_prefix = "ctag-"
StreamQueueSpecHelpers.publish(queue_name, 1)
with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, 1)

data_dir = File.join(vhost.data_dir, Digest::SHA1.hexdigest queue_name)
msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
2000.times do |i|
next if i == 0
msg_store.update_consumer_offset("#{consumer_tag_prefix}#{i}", i)
end
msg_store.close
data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name)
msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
2000.times do |i|
next if i == 0
msg_store.update_consumer_offset("#{consumer_tag_prefix}#{i}", i)
end
msg_store.close

msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store.@consumer_offsets.size.should eq 34_875
msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store.@consumer_offsets.size.should eq 34_875

2000.times do |i|
next if i == 0
msg_store.last_offset_by_consumer_tag("#{consumer_tag_prefix}#{i}").should eq i
end
2000.times do |i|
next if i == 0
msg_store.last_offset_by_consumer_tag("#{consumer_tag_prefix}#{i}").should eq i
end

msg_store.close
msg_store.close
end
end
end
end

0 comments on commit a74d4a6

Please sign in to comment.