From b3f0867ae9f91cbe21c8f02e1bd52ed72babbdb4 Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Sat, 11 Oct 2025 21:25:57 +1100 Subject: [PATCH 1/2] support no yield --- lib/outboxer/message.rb | 115 ++++++++++++++++++++++++---------------- 1 file changed, 70 insertions(+), 45 deletions(-) diff --git a/lib/outboxer/message.rb b/lib/outboxer/message.rb index 07ced9c5..33a98bc6 100644 --- a/lib/outboxer/message.rb +++ b/lib/outboxer/message.rb @@ -111,30 +111,53 @@ def queue(messageable:, attempt: 1, logger: nil, time: ::Time) end end - # Publishes the next queued message by yielding it to the caller’s block. + # Publishes the next queued message. # - # Transitions the selected message from **queued** → **publishing**, - # yields it for processing, then marks it **published** on success or **failed** on error. + # Behavior depends on whether a block is given: # - # - Logs informational, error, or fatal output if a logger is provided. - # - Rescues `StandardError` (marks failed, logs as error) and continues. - # - Rescues `Exception` (marks failed, logs as fatal) and re-raises. - # - Returns the message hash with its identifiers, or `nil` if no queued message exists. + # - With a block: + # Transitions the next message from **queued** → **publishing**, yields it for processing, + # then marks it **published** on success or **failed** on error. + # Logs informational, error, or fatal output if a logger is provided. + # Rescues `StandardError` (marks failed, logs error) and continues. + # Rescues `Exception` (marks failed, logs fatal) and re-raises. + # + # - Without a block: + # Transitions the next message from **queued** → **publishing** and returns it immediately. + # No further publishing or failure handling is performed. + # + # In both cases, returns `nil` if no queued message exists. # # @param logger [#info, #error, #fatal, nil] optional logger for lifecycle logging # @param time [Time, #now] a time source; must respond to `now` - # @yield [message] yields the message hash for publishing - # @yieldparam message [Hash] the message data with: - # - `:id` [Integer] - # - `:messageable_type` [String] - # - `:messageable_id` [Integer, String] - # @yieldreturn [Object] the result of the block, ignored by this method - # @return [Hash, nil] the yielded message hash or `nil` if no queued message was found - # @raise [Exception] re-raises any non-StandardError exceptions after marking failed - # @example - # Outboxer::Message.publish(logger: logger) do |message| - # TODO: # publish message to broker - # end + # + # @overload publish(logger: nil, time: ::Time) + # Moves one message from **queued** to **publishing** without processing. + # @return [Hash, nil] the message hash with: + # - `:id` [Integer] + # - `:messageable_type` [String] + # - `:messageable_id` [Integer, String] + # or `nil` if no queued message exists. + # @example Transition without processing + # message = Outboxer::Message.publish + # if message + # # The message is now marked as publishing + # end + # + # @overload publish(logger: nil, time: ::Time) { |message| ... } + # Publishes one message with full success and failure handling. + # @yield [message] the message selected for publishing + # @yieldparam message [Hash] the message data with: + # - `:id` [Integer] + # - `:messageable_type` [String] + # - `:messageable_id` [Integer, String] + # @yieldreturn [Object] result of the block (ignored) + # @return [Hash, nil] the same yielded message hash, or `nil` if no queued message exists + # @raise [Exception] re-raises non-StandardError exceptions after marking failed + # @example Publish with processing + # Outboxer::Message.publish(logger: logger) do |message| + # # Publish message to a broker here + # end def publish(logger: nil, time: ::Time) publishing_started_at = time.now.utc message = Message.publishing(time: time) @@ -147,32 +170,34 @@ def publish(logger: nil, time: ::Time) "messageable_type=#{message[:messageable_type]} " \ "messageable_id=#{message[:messageable_id]}") - begin - yield message - rescue StandardError => error - publishing_failed(id: message[:id], error: error, time: time) - - logger&.error( - "Outboxer message publishing failed id=#{message[:id]} " \ - "duration_ms=#{((time.now.utc - publishing_started_at) * 1000).to_i}\n" \ - "#{error.class}: #{error.message}\n" \ - "#{error.backtrace.join("\n")}") - rescue Exception => error - publishing_failed(id: message[:id], error: error, time: time) - - logger&.fatal( - "Outboxer message publishing failed id=#{message[:id]} " \ - "duration_ms=#{((time.now.utc - publishing_started_at) * 1000).to_i}\n" \ - "#{error.class}: #{error.message}\n" \ - "#{error.backtrace.join("\n")}") - - raise - else - published(id: message[:id], time: time) - - logger&.info( - "Outboxer message published id=#{message[:id]} " \ - "duration_ms=#{((time.now.utc - publishing_started_at) * 1000).to_i}") + if block_given? + begin + yield message + rescue StandardError => error + publishing_failed(id: message[:id], error: error, time: time) + + logger&.error( + "Outboxer message publishing failed id=#{message[:id]} " \ + "duration_ms=#{((time.now.utc - publishing_started_at) * 1000).to_i}\n" \ + "#{error.class}: #{error.message}\n" \ + "#{error.backtrace.join("\n")}") + rescue Exception => error + publishing_failed(id: message[:id], error: error, time: time) + + logger&.fatal( + "Outboxer message publishing failed id=#{message[:id]} " \ + "duration_ms=#{((time.now.utc - publishing_started_at) * 1000).to_i}\n" \ + "#{error.class}: #{error.message}\n" \ + "#{error.backtrace.join("\n")}") + + raise + else + published(id: message[:id], time: time) + + logger&.info( + "Outboxer message published id=#{message[:id]} " \ + "duration_ms=#{((time.now.utc - publishing_started_at) * 1000).to_i}") + end end message From d1b50f5f619db1779bcefa92c5fd8179bc634486 Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Sat, 11 Oct 2025 21:29:31 +1100 Subject: [PATCH 2/2] simplifly implementation --- lib/outboxer/message.rb | 75 ++++++++++++++++++++--------------------- 1 file changed, 36 insertions(+), 39 deletions(-) diff --git a/lib/outboxer/message.rb b/lib/outboxer/message.rb index 33a98bc6..c5a3bbbd 100644 --- a/lib/outboxer/message.rb +++ b/lib/outboxer/message.rb @@ -161,47 +161,44 @@ def queue(messageable:, attempt: 1, logger: nil, time: ::Time) def publish(logger: nil, time: ::Time) publishing_started_at = time.now.utc message = Message.publishing(time: time) - - if message.nil? - nil - else - logger&.info( - "Outboxer message publishing id=#{message[:id]} " \ - "messageable_type=#{message[:messageable_type]} " \ - "messageable_id=#{message[:messageable_id]}") - - if block_given? - begin - yield message - rescue StandardError => error - publishing_failed(id: message[:id], error: error, time: time) - - logger&.error( - "Outboxer message publishing failed id=#{message[:id]} " \ - "duration_ms=#{((time.now.utc - publishing_started_at) * 1000).to_i}\n" \ - "#{error.class}: #{error.message}\n" \ - "#{error.backtrace.join("\n")}") - rescue Exception => error - publishing_failed(id: message[:id], error: error, time: time) - - logger&.fatal( - "Outboxer message publishing failed id=#{message[:id]} " \ - "duration_ms=#{((time.now.utc - publishing_started_at) * 1000).to_i}\n" \ - "#{error.class}: #{error.message}\n" \ - "#{error.backtrace.join("\n")}") - - raise - else - published(id: message[:id], time: time) - - logger&.info( - "Outboxer message published id=#{message[:id]} " \ - "duration_ms=#{((time.now.utc - publishing_started_at) * 1000).to_i}") - end + return if message.nil? + + logger&.info( + "Outboxer message publishing id=#{message[:id]} " \ + "messageable_type=#{message[:messageable_type]} " \ + "messageable_id=#{message[:messageable_id]}") + + if block_given? + begin + yield message + rescue StandardError => error + publishing_failed(id: message[:id], error: error, time: time) + + logger&.error( + "Outboxer message publishing failed id=#{message[:id]} " \ + "duration_ms=#{((time.now.utc - publishing_started_at) * 1000).to_i}\n" \ + "#{error.class}: #{error.message}\n" \ + "#{error.backtrace.join("\n")}") + rescue Exception => error + publishing_failed(id: message[:id], error: error, time: time) + + logger&.fatal( + "Outboxer message publishing failed id=#{message[:id]} " \ + "duration_ms=#{((time.now.utc - publishing_started_at) * 1000).to_i}\n" \ + "#{error.class}: #{error.message}\n" \ + "#{error.backtrace.join("\n")}") + + raise + else + published(id: message[:id], time: time) + + logger&.info( + "Outboxer message published id=#{message[:id]} " \ + "duration_ms=#{((time.now.utc - publishing_started_at) * 1000).to_i}") end - - message end + + message end # Selects and locks the next available message in **queued** status