Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions lib/outboxer/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ def queue(messageable: nil, messageable_type: nil, messageable_id: nil,
# - `:messageable_type` [String]
# - `:messageable_id` [Integer, String]
# @yieldreturn [Object] result of the block (ignored)
# @return [Hash, nil] the same yielded message hash, or `nil` if no queued message exists
# @return [Boolean] `true` if a message was published successfully,
# `false` if no queued message existed or publishing failed
# @raise [Exception] re-raises non-StandardError exceptions after marking failed
# @example Publish with processing
# Outboxer::Message.publish(logger: logger) do |message|
Expand All @@ -136,20 +137,22 @@ def queue(messageable: nil, messageable_type: nil, messageable_id: nil,
def publish(hostname: Socket.gethostname,
process_id: Process.pid,
thread_id: Thread.current.object_id,
logger: nil, time: ::Time)
logger: nil, time: ::Time, &block)
raise ArgumentError, "publish requires a block" if block.nil?

publishing_started_at = time.now.utc

message = Message.publishing(
hostname: hostname, process_id: process_id, thread_id: thread_id, time: time)

return if message.nil?

logger&.info(
"Outboxer message publishing id=#{message[:id]} " \
"messageable_type=#{message[:messageable_type]} " \
"messageable_id=#{message[:messageable_id]}")
if message.nil?
false
else
logger&.info(
"Outboxer message publishing id=#{message[:id]} " \
"messageable_type=#{message[:messageable_type]} " \
"messageable_id=#{message[:messageable_id]}")

if block_given?
begin
yield message
rescue StandardError => error
Expand All @@ -166,6 +169,8 @@ def publish(hostname: Socket.gethostname,
"duration_ms=#{((time.now.utc - publishing_started_at) * 1000).to_i}\n" \
"#{error.class}: #{error.message}\n" \
"#{error.backtrace.join("\n")}")

false
rescue Exception => error
publishing_failed(
id: message[:id], lock_version: message[:lock_version], error: error,
Expand All @@ -191,10 +196,10 @@ def publish(hostname: Socket.gethostname,
"messageable_type=#{message[:messageable_type]} " \
"messageable_id=#{message[:messageable_id]} " \
"duration_ms=#{((time.now.utc - publishing_started_at) * 1000).to_i}")

true
end
end

message
end

# Selects and locks the next available message in **queued** status
Expand Down
4 changes: 2 additions & 2 deletions lib/outboxer/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ def create_publisher_thread(id:, index:,
while !terminating?
if publishing?
begin
published_message = Message.publish(logger: logger) do |message|
message_published = Message.publish(logger: logger) do |message|
block.call({ id: id }, message)
end
rescue StandardError => error
Expand All @@ -590,7 +590,7 @@ def create_publisher_thread(id:, index:,
process: process,
kernel: kernel)
else
if published_message.nil?
if message_published == false
Publisher.sleep(
poll_interval,
tick_interval: tick_interval,
Expand Down
2 changes: 1 addition & 1 deletion quickstart_e2e_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ bundle exec rails new . \

bundle install
bundle exec rails db:create
echo 'gem "outboxer", git: "https://github.com/fast-programmer/outboxer.git", branch: "optimisation/increase_throughput"' \
echo 'gem "outboxer", git: "https://github.com/fast-programmer/outboxer.git", branch: "'"${GITHUB_HEAD_REF}"'"' \
>> Gemfile
bundle install
bundle exec rails generate outboxer:install
Expand Down
15 changes: 4 additions & 11 deletions spec/lib/outboxer/message/publish_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@ module Outboxer
end

expect(yielded_id).to eq(queued_message.id)
expect(result).to include(
id: queued_message.id,
messageable_type: queued_message.messageable_type,
messageable_id: queued_message.messageable_id)
expect(result).to be true

expect(Models::Message.published.count).to eql(0)
end
Expand All @@ -36,11 +33,7 @@ module Outboxer
raise StandardError, "temporary failure"
end

expect(result).to include({
id: queued_message.id,
messageable_type: queued_message.messageable_type,
messageable_id: queued_message.messageable_id
})
expect(result).to be false
expect(queued_message.reload.status)
.to eq(Models::Message::Status::FAILED)
end
Expand All @@ -61,7 +54,7 @@ module Outboxer
it "returns nil" do
result = Message.publish { raise "block should not be called" }

expect(result).to be_nil
expect(result).to be false
expect(
Models::Message.where(status: Models::Message::Status::QUEUED).count
).to eq(0)
Expand Down Expand Up @@ -126,7 +119,7 @@ module Outboxer
it "returns nil and does not log" do
result = Message.publish(logger: logger) { raise "should not run" }

expect(result).to be_nil
expect(result).to be false
expect(logger).not_to have_received(:info)
expect(logger).not_to have_received(:error)
expect(logger).not_to have_received(:fatal)
Expand Down