From 73674de8e5de31e0ece3837a6b607491a2af01fe Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Sat, 13 Dec 2025 02:46:25 +1100 Subject: [PATCH 1/3] publish must receive block --- lib/outboxer/message.rb | 27 ++++++++++++++--------- lib/outboxer/publisher.rb | 4 ++-- spec/lib/outboxer/message/publish_spec.rb | 15 ++++--------- 3 files changed, 22 insertions(+), 24 deletions(-) diff --git a/lib/outboxer/message.rb b/lib/outboxer/message.rb index 4c276444..25c87a37 100644 --- a/lib/outboxer/message.rb +++ b/lib/outboxer/message.rb @@ -127,7 +127,8 @@ def queue(messageable: nil, messageable_type: nil, messageable_id: nil, # - `:messageable_type` [String] # - `:messageable_id` [Integer, String] # @yieldreturn [Object] result of the block (ignored) - # @return [Hash, nil] the same yielded message hash, or `nil` if no queued message exists + # @return [Boolean] `true` if a message was published successfully, + # `false` if no queued message existed or publishing failed # @raise [Exception] re-raises non-StandardError exceptions after marking failed # @example Publish with processing # Outboxer::Message.publish(logger: logger) do |message| @@ -136,20 +137,22 @@ def queue(messageable: nil, messageable_type: nil, messageable_id: nil, def publish(hostname: Socket.gethostname, process_id: Process.pid, thread_id: Thread.current.object_id, - logger: nil, time: ::Time) + logger: nil, time: ::Time, &block) + raise ArgumentError, "publish requires a block" if block.nil? + publishing_started_at = time.now.utc message = Message.publishing( hostname: hostname, process_id: process_id, thread_id: thread_id, time: time) - return if message.nil? - - logger&.info( - "Outboxer message publishing id=#{message[:id]} " \ - "messageable_type=#{message[:messageable_type]} " \ - "messageable_id=#{message[:messageable_id]}") + if message.nil? + false + else + logger&.info( + "Outboxer message publishing id=#{message[:id]} " \ + "messageable_type=#{message[:messageable_type]} " \ + "messageable_id=#{message[:messageable_id]}") - if block_given? begin yield message rescue StandardError => error @@ -166,6 +169,8 @@ def publish(hostname: Socket.gethostname, "duration_ms=#{((time.now.utc - publishing_started_at) * 1000).to_i}\n" \ "#{error.class}: #{error.message}\n" \ "#{error.backtrace.join("\n")}") + + false rescue Exception => error publishing_failed( id: message[:id], lock_version: message[:lock_version], error: error, @@ -191,10 +196,10 @@ def publish(hostname: Socket.gethostname, "messageable_type=#{message[:messageable_type]} " \ "messageable_id=#{message[:messageable_id]} " \ "duration_ms=#{((time.now.utc - publishing_started_at) * 1000).to_i}") + + true end end - - message end # Selects and locks the next available message in **queued** status diff --git a/lib/outboxer/publisher.rb b/lib/outboxer/publisher.rb index 4499768b..bd24a3c6 100644 --- a/lib/outboxer/publisher.rb +++ b/lib/outboxer/publisher.rb @@ -576,7 +576,7 @@ def create_publisher_thread(id:, index:, while !terminating? if publishing? begin - published_message = Message.publish(logger: logger) do |message| + message_published = Message.publish(logger: logger) do |message| block.call({ id: id }, message) end rescue StandardError => error @@ -590,7 +590,7 @@ def create_publisher_thread(id:, index:, process: process, kernel: kernel) else - if published_message.nil? + if message_published == false Publisher.sleep( poll_interval, tick_interval: tick_interval, diff --git a/spec/lib/outboxer/message/publish_spec.rb b/spec/lib/outboxer/message/publish_spec.rb index 0c45b6d6..3591fd67 100644 --- a/spec/lib/outboxer/message/publish_spec.rb +++ b/spec/lib/outboxer/message/publish_spec.rb @@ -23,10 +23,7 @@ module Outboxer end expect(yielded_id).to eq(queued_message.id) - expect(result).to include( - id: queued_message.id, - messageable_type: queued_message.messageable_type, - messageable_id: queued_message.messageable_id) + expect(result).to be true expect(Models::Message.published.count).to eql(0) end @@ -36,11 +33,7 @@ module Outboxer raise StandardError, "temporary failure" end - expect(result).to include({ - id: queued_message.id, - messageable_type: queued_message.messageable_type, - messageable_id: queued_message.messageable_id - }) + expect(result).to be false expect(queued_message.reload.status) .to eq(Models::Message::Status::FAILED) end @@ -61,7 +54,7 @@ module Outboxer it "returns nil" do result = Message.publish { raise "block should not be called" } - expect(result).to be_nil + expect(result).to be false expect( Models::Message.where(status: Models::Message::Status::QUEUED).count ).to eq(0) @@ -126,7 +119,7 @@ module Outboxer it "returns nil and does not log" do result = Message.publish(logger: logger) { raise "should not run" } - expect(result).to be_nil + expect(result).to be false expect(logger).not_to have_received(:info) expect(logger).not_to have_received(:error) expect(logger).not_to have_received(:fatal) From b6b8aedcb5ed7e31a1caadb30b80607e47d5395e Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Sat, 13 Dec 2025 02:50:32 +1100 Subject: [PATCH 2/3] fix end to end tests --- quickstart_e2e_tests.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quickstart_e2e_tests.sh b/quickstart_e2e_tests.sh index 16853bbf..3ffd0cfa 100755 --- a/quickstart_e2e_tests.sh +++ b/quickstart_e2e_tests.sh @@ -42,7 +42,7 @@ bundle exec rails new . \ bundle install bundle exec rails db:create -echo 'gem "outboxer", git: "https://github.com/fast-programmer/outboxer.git", branch: "optimisation/increase_throughput"' \ +echo 'gem "outboxer", git: "https://github.com/fast-programmer/outboxer.git", branch: "'"${GITHUB_REF_NAME}"'"' \ >> Gemfile bundle install bundle exec rails generate outboxer:install From 992fe568b87c54f67d9c634c1b4295fee941da9c Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Sat, 13 Dec 2025 02:55:22 +1100 Subject: [PATCH 3/3] update github head ref --- quickstart_e2e_tests.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quickstart_e2e_tests.sh b/quickstart_e2e_tests.sh index 3ffd0cfa..776ee6d2 100755 --- a/quickstart_e2e_tests.sh +++ b/quickstart_e2e_tests.sh @@ -42,7 +42,7 @@ bundle exec rails new . \ bundle install bundle exec rails db:create -echo 'gem "outboxer", git: "https://github.com/fast-programmer/outboxer.git", branch: "'"${GITHUB_REF_NAME}"'"' \ +echo 'gem "outboxer", git: "https://github.com/fast-programmer/outboxer.git", branch: "'"${GITHUB_HEAD_REF}"'"' \ >> Gemfile bundle install bundle exec rails generate outboxer:install