Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle IndexError when reading msg files #671

Merged
merged 11 commits into from
Jun 19, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed

- lavinmqctl didn't recognize 201/204 response codes from set_permissions, set_user_tags and add_vhost
- Queues will no longer be closed if file size is incorrect. Fixes [#669](https://github.com/cloudamqp/lavinmq/issues/669)

### Changed

Expand Down
30 changes: 30 additions & 0 deletions spec/storage_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,36 @@ describe LavinMQ::DurableQueue do
queue.message_count.should eq 2
end
end

it "handles files with few extra bytes" do
queue_name = Random::Secure.hex(10)
with_amqp_server do |s|
vhost = s.vhosts.create("test_vhost")
with_channel(s, vhost: vhost.name) do |ch|
q = ch.queue(queue_name)
queue = vhost.queues[queue_name].as(LavinMQ::DurableQueue)
mfile = queue.@msg_store.@segments.first_value

# fill up one segment
message_size = 41
while mfile.size < (LavinMQ::Config.instance.segment_size - message_size*2)
q.publish_confirm "a"
end
remaining_size = LavinMQ::Config.instance.segment_size - mfile.size - message_size
q.publish_confirm "a" * remaining_size

# publish one more message to create a new segment
q.publish_confirm "a"

# resize first segment to LavinMQ::Config.instance.segment_size
mfile.resize(LavinMQ::Config.instance.segment_size)

# read messages, should not raise any error
q.subscribe(tag: "tag", no_ack: false, &.ack)
should_eventually(be_true) { queue.empty? }
end
end
end
end

describe LavinMQ::VHost do
Expand Down
5 changes: 5 additions & 0 deletions src/lavinmq/queue/message_store.cr
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ module LavinMQ
@size -= 1
notify_empty(true) if @size.zero?
return Envelope.new(sp, msg, redelivered: false)
rescue ex : IndexError
Log.warn { "Msg file size does not match expected value, moving on to next segment" }
select_next_read_segment && next
return if @size.zero?
raise Error.new(@rfile, cause: ex)
rescue ex
raise Error.new(@rfile, cause: ex)
end
Expand Down
Loading