Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 50 additions & 28 deletions lib/outboxer/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -111,42 +111,64 @@ def queue(messageable:, attempt: 1, logger: nil, time: ::Time)
end
end

# Publishes the next queued message by yielding it to the caller’s block.
# Publishes the next queued message.
#
# Transitions the selected message from **queued** → **publishing**,
# yields it for processing, then marks it **published** on success or **failed** on error.
# Behavior depends on whether a block is given:
#
# - Logs informational, error, or fatal output if a logger is provided.
# - Rescues `StandardError` (marks failed, logs as error) and continues.
# - Rescues `Exception` (marks failed, logs as fatal) and re-raises.
# - Returns the message hash with its identifiers, or `nil` if no queued message exists.
# - With a block:
# Transitions the next message from **queued** → **publishing**, yields it for processing,
# then marks it **published** on success or **failed** on error.
# Logs informational, error, or fatal output if a logger is provided.
# Rescues `StandardError` (marks failed, logs error) and continues.
# Rescues `Exception` (marks failed, logs fatal) and re-raises.
#
# - Without a block:
# Transitions the next message from **queued** → **publishing** and returns it immediately.
# No further publishing or failure handling is performed.
#
# In both cases, returns `nil` if no queued message exists.
#
# @param logger [#info, #error, #fatal, nil] optional logger for lifecycle logging
# @param time [Time, #now] a time source; must respond to `now`
# @yield [message] yields the message hash for publishing
# @yieldparam message [Hash] the message data with:
# - `:id` [Integer]
# - `:messageable_type` [String]
# - `:messageable_id` [Integer, String]
# @yieldreturn [Object] the result of the block, ignored by this method
# @return [Hash, nil] the yielded message hash or `nil` if no queued message was found
# @raise [Exception] re-raises any non-StandardError exceptions after marking failed
# @example
# Outboxer::Message.publish(logger: logger) do |message|
# TODO: # publish message to broker
# end
#
# @overload publish(logger: nil, time: ::Time)
# Moves one message from **queued** to **publishing** without processing.
# @return [Hash, nil] the message hash with:
# - `:id` [Integer]
# - `:messageable_type` [String]
# - `:messageable_id` [Integer, String]
# or `nil` if no queued message exists.
# @example Transition without processing
# message = Outboxer::Message.publish
# if message
# # The message is now marked as publishing
# end
#
# @overload publish(logger: nil, time: ::Time) { |message| ... }
# Publishes one message with full success and failure handling.
# @yield [message] the message selected for publishing
# @yieldparam message [Hash] the message data with:
# - `:id` [Integer]
# - `:messageable_type` [String]
# - `:messageable_id` [Integer, String]
# @yieldreturn [Object] result of the block (ignored)
# @return [Hash, nil] the same yielded message hash, or `nil` if no queued message exists
# @raise [Exception] re-raises non-StandardError exceptions after marking failed
# @example Publish with processing
# Outboxer::Message.publish(logger: logger) do |message|
# # Publish message to a broker here
# end
def publish(logger: nil, time: ::Time)
publishing_started_at = time.now.utc
message = Message.publishing(time: time)
return if message.nil?

if message.nil?
nil
else
logger&.info(
"Outboxer message publishing id=#{message[:id]} " \
"messageable_type=#{message[:messageable_type]} " \
"messageable_id=#{message[:messageable_id]}")
logger&.info(
"Outboxer message publishing id=#{message[:id]} " \
"messageable_type=#{message[:messageable_type]} " \
"messageable_id=#{message[:messageable_id]}")

if block_given?
begin
yield message
rescue StandardError => error
Expand Down Expand Up @@ -174,9 +196,9 @@ def publish(logger: nil, time: ::Time)
"Outboxer message published id=#{message[:id]} " \
"duration_ms=#{((time.now.utc - publishing_started_at) * 1000).to_i}")
end

message
end

message
end

# Selects and locks the next available message in **queued** status
Expand Down