Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions lib/outboxer/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,19 @@ module Message
Status = Models::Message::Status

class Error < StandardError; end
class MissingSeed < Error; end
class MissingPartition < Error; end

PARTITION_COUNT = Integer(ENV.fetch("OUTBOXER_MESSAGE_PARTITION_COUNT", 64))
PARTITIONS = (0...PARTITION_COUNT)

# Seeds counts and totals for all partitions.
# Seed partitions.
#
# This method is idempotent and can be safely re-run.
#
# @param logger [#info, #error, #fatal, nil] optional logger
# @param time [Time] time source for created_at / updated_at.
# @return [Integer] number of rows ensured per table.
def seed(logger: nil, time: ::Time)
def seed_partitions(logger: nil, time: ::Time)
current_utc_time = time.now.utc

[Models::MessageCount, Models::MessageTotal].each do |model|
Expand All @@ -33,7 +34,7 @@ def seed(logger: nil, time: ::Time)
end
end

logger&.info "Seeded #{PARTITIONS.size} message partitions"
logger&.info "Seeded #{PARTITIONS.size} partitions"

nil
end
Expand Down Expand Up @@ -73,7 +74,7 @@ def calculate_partition(id:, partition_count: PARTITION_COUNT)
# - `:messageable_type` [String]
# - `:messageable_id` [Integer, String]
# - `:updated_at` [Time]
# @raise [Outboxer::Message::MissingSeed] when counter rows are absent and `attempt` > 1.
# @raise [Outboxer::Message::MissingPartition] when partition seed absent and `attempt` > 1.
# @raise [Outboxer::Message::Error] on other failures during queuing.
# @example Using an object
# Outboxer::Message.queue(messageable: event)
Expand Down Expand Up @@ -113,7 +114,7 @@ def queue(messageable: nil, messageable_type: nil, messageable_id: nil,
.update_all(["value = value + ?, updated_at = ?", 1, current_utc_time])

if updated_count.zero? || updated_total.zero?
raise MissingSeed, "Missing seed"
raise MissingPartition, "Missing partition"
end

{
Expand All @@ -124,9 +125,9 @@ def queue(messageable: nil, messageable_type: nil, messageable_id: nil,
updated_at: message.updated_at
}
end
rescue MissingSeed
rescue MissingPartition
if attempt == 1
Message.seed(time: time, logger: logger)
Message.seed_partitions(time: time, logger: logger)

Message.queue(
messageable: messageable,
Expand Down