Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 40 additions & 10 deletions lib/outboxer/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,20 @@ def calculate_partition(id:, partition_count: PARTITION_COUNT)
end

# Queues a new message.
# @param messageable [Object, nil] the object associated with the message.
# @param attempt [Integer] call attempt counter; first call should leave as default (1).
# @param logger [#info, #error, #fatal, nil] optional logger
# @param time [Time] time context for setting timestamps.
#
# @overload queue(messageable:, attempt: 1, logger: nil, time: ::Time)
# @param messageable [Object] the associated object; must respond to `id` and `class.name`.
# @param attempt [Integer] call attempt counter; first call should leave as default (1).
# @param logger [#info, #error, #fatal, nil] optional logger
# @param time [Time] time context for setting timestamps.
#
# @overload queue(messageable_type:, messageable_id:, attempt: 1, logger: nil, time: ::Time)
# @param messageable_type [String] the associated object type name.
# @param messageable_id [Integer, String] the associated object identifier.
# @param attempt [Integer] call attempt counter; first call should leave as default (1).
# @param logger [#info, #error, #fatal, nil] optional logger
# @param time [Time] time context for setting timestamps.
#
# @return [Hash] a serialized message hash with keys:
# - `:id` [Integer]
# - `:status` [String]
Expand All @@ -65,16 +75,29 @@ def calculate_partition(id:, partition_count: PARTITION_COUNT)
# - `:updated_at` [Time]
# @raise [Outboxer::Message::MissingSeed] when counter rows are absent and `attempt` > 1.
# @raise [Outboxer::Message::Error] on other failures during queuing.
# @example Basic usage
# Outboxer::Message.queue(messageable: event, time: Time)
def queue(messageable:, attempt: 1, logger: nil, time: ::Time)
# @example Using an object
# Outboxer::Message.queue(messageable: event)
# @example Using explicit type and id
# Outboxer::Message.queue(messageable_type: "Event", messageable_id: 42)
def queue(messageable: nil, messageable_type: nil, messageable_id: nil,
attempt: 1, logger: nil, time: ::Time)
current_utc_time = time.now.utc

type, id =
if messageable
[messageable.class.name, messageable.id]
elsif messageable_type && messageable_id
[String(messageable_type), messageable_id]
else
raise ArgumentError,
"Provide either messageable or messageable_type and messageable_id"
end

ActiveRecord::Base.transaction do
message = Models::Message.create!(
status: Status::QUEUED,
messageable_id: messageable.id,
messageable_type: messageable.class.name,
messageable_id: id,
messageable_type: type,
queued_at: current_utc_time,
updated_at: current_utc_time
)
Expand Down Expand Up @@ -105,7 +128,14 @@ def queue(messageable:, attempt: 1, logger: nil, time: ::Time)
if attempt == 1
Message.seed(time: time, logger: logger)

Message.queue(messageable: messageable, attempt: attempt + 1, logger: logger, time: time)
Message.queue(
messageable: messageable,
messageable_type: messageable_type,
messageable_id: messageable_id,
attempt: attempt + 1,
logger: logger,
time: time
)
else
raise
end
Expand Down