Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 193 additions & 0 deletions lib/outboxer/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ module Message

Status = Models::Message::Status

class Error < StandardError; end

# Queues a new message.
# @param messageable [Object, nil] the object associated with the message.
# @param time [Time] time context for setting timestamps.
Expand All @@ -27,6 +29,197 @@ def queue(messageable:, time: ::Time)
updated_at: message.updated_at)
end

# Publishes the next queued message by yielding it to the caller’s block.
#
# Transitions the selected message from **queued** → **publishing**,
# yields it for processing, then marks it **published** on success or **failed** on error.
#
# - Logs informational, error, or fatal output if a logger is provided.
# - Rescues `StandardError` (marks failed, logs as error) and continues.
# - Rescues `Exception` (marks failed, logs as fatal) and re-raises.
# - Returns the message hash with its identifiers, or `nil` if no queued message exists.
#
# @param logger [#info, #error, #fatal, nil] optional logger for lifecycle logging
# @param time [Time, #now] a time source; must respond to `now`
# @yield [message] yields the message hash for publishing
# @yieldparam message [Hash] the message data with:
# - `:id` [Integer]
# - `:messageable_type` [String]
# - `:messageable_id` [Integer, String]
# @yieldreturn [Object] the result of the block, ignored by this method
# @return [Hash, nil] the yielded message hash or `nil` if no queued message was found
# @raise [Exception] re-raises any non-StandardError exceptions after marking failed
# @example
# Outboxer::Message.publish(logger: logger) do |message|
# TODO: # publish message to broker
# end
def publish(logger: nil, time: ::Time)
publishing_started_at = time.now.utc
message = Message.publishing(time: time)

if message.nil?
nil
else
logger&.info(
"Outboxer message publishing id=#{message[:id]}" \
"messageable_type=#{message[:messageable_type]} " \
"messageable_id=#{message[:messageable_id]}")

begin
yield message
rescue StandardError => error
publishing_failed(id: message[:id], error: error, time: time)

logger&.error(
"Outboxer message publishing failed id=#{message[:id]} " \
"duration_ms=#{((time.now.utc - publishing_started_at) * 1000).to_i}\n" \
"#{error.class}: #{error.message}\n" \
"#{error.backtrace.join("\n")}")
rescue Exception => error
publishing_failed(id: message[:id], error: error, time: time)

logger&.fatal(
"Outboxer message publishing failed id=#{message[:id]} " \
"duration_ms=#{((time.now.utc - publishing_started_at) * 1000).to_i}\n" \
"#{error.class}: #{error.message}\n" \
"#{error.backtrace.join("\n")}")

raise
else
published(id: message[:id], time: time)

logger&.info(
"Outboxer message published id=#{message[:id]} " \
"duration_ms=#{((time.now.utc - publishing_started_at) * 1000).to_i}")
end

message
end
end

# Selects and locks the next available message in **queued** status
# and transitions it to **publishing**.
#
# Uses `FOR UPDATE SKIP LOCKED` to safely publish one message from the queue
# across concurrent processes. Updates `publishing_at` and `updated_at`
# to the current UTC time before returning.
#
# @param time [Time, #now] a time source; must respond to `now`
# @return [Hash, nil] a hash with:
# - `:id` [Integer]
# - `:messageable_type` [String]
# - `:messageable_id` [Integer, String]
# or `nil` if no queued message was available
# @note Runs inside a database transaction and uses the ActiveRecord connection pool.
# @example
# message = Outboxer::Message.publishing(time: Time)
def publishing(time: ::Time)
current_utc_time = time.now.utc

ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction do
message = Models::Message
.select(:id, :messageable_type, :messageable_id)
.where(status: Status::QUEUED)
.order(:id)
.limit(1)
.lock("FOR UPDATE SKIP LOCKED")
.first

if message.nil?
nil
else
message.update!(
status: Status::PUBLISHING,
updated_at: current_utc_time,
publishing_at: current_utc_time)

{
id: message[:id],
messageable_type: message[:messageable_type],
messageable_id: message[:messageable_id]
}
end
end
end
end

# Marks a message in **publishing** status as **published**.
#
# Ensures that the message is currently in the **publishing** state
# before transitioning. Updates `published_at` and `updated_at`
# to the current UTC time.
#
# @param id [Integer] the ID of the message to mark as published
# @param time [Time, #now] a time source; must respond to `now`
# @return [nil]
# @raise [Outboxer::Message::Error] if the message is not in publishing state
# @note Executes within a transaction using a `SELECT ... FOR UPDATE` lock.
# @example
# Outboxer::Message.published(id: message[:id], time: Time)
def published(id:, time: ::Time)
current_utc_time = time.now.utc

ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction do
message = Models::Message.lock.find_by!(id: id)

if message.status != Models::Message::Status::PUBLISHING
raise Error, "Status must be publishing"
end

message.update!(
status: Status::PUBLISHED,
updated_at: current_utc_time, published_at: current_utc_time)

nil
end
end
end

# Marks a message in **publishing** status as **failed** and records the exception details.
#
# Sets `failed_at` and `updated_at` to the current UTC time,
# persists the error’s class, message, and backtrace (if present).
# Ensures the message is currently in **publishing** state before updating.
#
# @param id [Integer] the ID of the message to mark as failed
# @param error [Exception, nil] optional error to persist (class name, message text, backtrace)
# @param time [Time, #now] a time source; must respond to `now`
# @return [nil]
# @raise [Outboxer::Message::Error] if the message is not in publishing state
# @note Runs inside a transaction and uses `SELECT ... FOR UPDATE` locking.
# @example
# Outboxer::Message.publishing_failed(id: message[:id], time: Time)
def publishing_failed(id:, error: nil, time: ::Time)
current_utc_time = time.now.utc

ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction do
message = Models::Message.lock.find_by!(id: id)

if message.status != Models::Message::Status::PUBLISHING
raise Error, "Status must be publishing"
end

message.update!(
status: Status::FAILED,
updated_at: current_utc_time, failed_at: current_utc_time)

if error
exception = message.exceptions.create!(
class_name: error.class.name, message_text: error.message)

Array(error.backtrace).each_with_index do |backtrace_line, index|
exception.frames.create!(index: index, text: backtrace_line)
end
end

nil
end
end
end

# Serializes message attributes into a hash.
#
# @param id [Integer] message ID.
Expand Down
Loading