Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions bin/outboxer_publisher
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,22 @@ Outboxer::Database.connect(config: database_config, logger: logger)

begin
Outboxer::Publisher.publish_message(
batch_size: options[:batch_size],
buffer_size: options[:buffer_size],
concurrency: options[:concurrency],
tick_interval: options[:tick_interval],
poll_interval: options[:poll_interval],
heartbeat_interval: options[:heartbeat_interval],
logger: logger
) do |message|
# TODO: publish message here
) do |messages|
# TODO: publish messages here

logger.info "Outboxer published message " \
"id=#{message[:id]} " \
"messageable_type=#{message[:messageable_type]} " \
"messageable_id=#{message[:messageable_id]} "
messages.each do |message|
logger.info "Outboxer published message " \
"id=#{message[:id]} " \
"messageable_type=#{message[:messageable_type]} " \
"messageable_id=#{message[:messageable_id]} "
end
end
ensure
Outboxer::Database.disconnect(logger: logger)
Expand Down
1 change: 1 addition & 0 deletions config/outboxer.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
---
:batch_size: 1
:buffer_size: 100
:concurrency: 1
:tick_interval: 0.1
Expand Down
4 changes: 4 additions & 0 deletions db/migrate/create_outboxer_messages.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ def up
# publisher throughput
add_index :outboxer_messages, [:status, :publisher_id, :updated_at],
name: "idx_outboxer_status_pub_id_updated_at"

# bulk status + id locking
add_index :outboxer_messages, [:status, :id],
name: "idx_outboxer_status_id"
end

def down
Expand Down
116 changes: 116 additions & 0 deletions lib/outboxer/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -672,5 +672,121 @@ def metrics(time: ::Time)

metrics
end

def publishing_by_ids(ids:, publisher_id: nil, publisher_name: nil, time: ::Time)
ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction do
messages = Models::Message
.where(status: Status::BUFFERED, id: ids)
.lock("FOR UPDATE SKIP LOCKED")
.to_a

raise ArgumentError, "Some messages not buffered" if messages.size != ids.size

current_utc_time = time.now.utc

Models::Message.where(status: Status::BUFFERED, id: ids).update_all(
status: Status::PUBLISHING,
publishing_at: current_utc_time,
updated_at: current_utc_time,
publisher_id: publisher_id,
publisher_name: publisher_name)

messages.map do |message|
Message.serialize(
id: message.id,
status: Status::PUBLISHING,
messageable_type: message.messageable_type,
messageable_id: message.messageable_id,
queued_at: message.queued_at,
buffered_at: message.buffered_at,
publishing_at: current_utc_time,
updated_at: current_utc_time,
publisher_id: publisher_id,
publisher_name: publisher_name)
end
end
end
end

def published_by_ids(ids:, publisher_id: nil, publisher_name: nil, time: ::Time)
ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction do
messages = Models::Message
.where(status: Status::PUBLISHING, id: ids)
.lock("FOR UPDATE SKIP LOCKED")
.to_a

raise ArgumentError, "Some messages not publishing" if messages.size != ids.size

current_utc_time = time.now.utc

Models::Message.where(status: Status::PUBLISHING, id: ids).update_all(
status: Status::PUBLISHED,
updated_at: current_utc_time,
publisher_id: publisher_id,
publisher_name: publisher_name)

messages.map do |message|
Message.serialize(
id: message.id,
status: Status::PUBLISHED,
messageable_type: message.messageable_type,
messageable_id: message.messageable_id,
queued_at: message.queued_at,
buffered_at: message.buffered_at,
publishing_at: message.publishing_at,
updated_at: current_utc_time,
publisher_id: publisher_id,
publisher_name: publisher_name)
end
end
end
end

def failed_by_ids(ids:, exception:, publisher_id: nil, publisher_name: nil, time: ::Time)
ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction do
messages = Models::Message
.where(status: Status::PUBLISHING, id: ids)
.lock("FOR UPDATE SKIP LOCKED")
.to_a

raise ArgumentError, "Some messages not publishing" if messages.size != ids.size

current_utc_time = time.now.utc

Models::Message.where(status: Status::PUBLISHING, id: ids).update_all(
status: Status::FAILED,
updated_at: current_utc_time,
publisher_id: publisher_id,
publisher_name: publisher_name)

messages.each do |message|
outboxer_exception = message.exceptions.create!(
class_name: exception.class.name,
message_text: exception.message)

exception.backtrace.each_with_index do |frame, index|
outboxer_exception.frames.create!(index: index, text: frame)
end
end

messages.map do |message|
Message.serialize(
id: message.id,
status: Status::FAILED,
messageable_type: message.messageable_type,
messageable_id: message.messageable_id,
queued_at: message.queued_at,
buffered_at: message.buffered_at,
publishing_at: message.publishing_at,
updated_at: current_utc_time,
publisher_id: publisher_id,
publisher_name: publisher_name)
end
end
end
end
end
end
81 changes: 50 additions & 31 deletions lib/outboxer/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ def self.parse_cli_options(args)
options[:environment] = v
end

opts.on("-b", "--buffer-size SIZE", Integer, "Buffer size") do |v|
opts.on("-b", "--batch-size SIZE", Integer, "Batch size") do |v|
options[:batch_size] = v
end

opts.on("-u", "--buffer-size SIZE", Integer, "Buffer size") do |v|
options[:buffer_size] = v
end

Expand Down Expand Up @@ -127,7 +131,7 @@ def find_by_id(id:)
# @param heartbeat_interval [Float] The heartbeat interval in seconds.
# @param time [Time] The current time context for timestamping.
# @return [Hash] Details of the created publisher.
def create(name:, buffer_size:, concurrency:,
def create(name:, batch_size:, buffer_size:, concurrency:,
tick_interval:, poll_interval:, heartbeat_interval:, time: ::Time)
ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction do
Expand All @@ -137,6 +141,7 @@ def create(name:, buffer_size:, concurrency:,
name: name,
status: Status::PUBLISHING,
settings: {
"batch_size" => batch_size,
"buffer_size" => buffer_size,
"concurrency" => concurrency,
"tick_interval" => tick_interval,
Expand Down Expand Up @@ -284,11 +289,11 @@ def create_worker_threads(id:, name:,
Thread.new do
Thread.current.name = "publisher-#{index + 1}"

while (buffered_message = queue.pop)
break if buffered_message.nil?
while (buffered_messages = queue.pop)
break if buffered_messages.nil?

publish_buffered_message(
id: id, name: name, buffered_message: buffered_message,
publish_buffered_messages(
id: id, name: name, buffered_messages: buffered_messages,
logger: logger, &block)
end
end
Expand All @@ -299,23 +304,26 @@ def create_worker_threads(id:, name:,
# @param id [Integer] The ID of the publisher.
# @param name [String] The name of the publisher.
# @param queue [Queue] The queue of messages to be published.
# @param buffer_size [Integer] The buffer sie.
# @param batch_size [Integer] The batch size.
# @param buffer_size [Integer] The buffer size.
# @param poll_interval [Float] The poll interval in seconds.
# @param tick_interval [Float] The tick interval in seconds.
# @param signal_read [IO] The IO object to read signals.
# @param logger [Logger] The logger to use for logging operations.
# @param process [Process] The process object to use for timing.
# @param kernel [Kernel] The kernel module to use for sleep operations.
def buffer_messages(id:, name:, queue:, buffer_size:, poll_interval:, tick_interval:,
def buffer_messages(id:, name:, queue:, batch_size:, buffer_size:,
poll_interval:, tick_interval:,
signal_read:, logger:, process:, kernel:)
buffer_remaining = buffer_size - queue.size

if buffer_remaining > 0
buffered_messages = Message.buffer(
limit: buffer_remaining, publisher_id: id, publisher_name: name)
limit: [buffer_remaining, batch_size].min,
publisher_id: id, publisher_name: name)

if buffered_messages.count > 0
buffered_messages.each { |message| queue.push(message) }
queue.push(buffered_messages)
else
Publisher.sleep(
poll_interval,
Expand Down Expand Up @@ -492,6 +500,7 @@ def handle_signal(id:, name:, logger:)
end

PUBLISH_MESSAGE_DEFAULTS = {
batch_size: 10,
buffer_size: 100,
concurrency: 1,
tick_interval: 0.1,
Expand All @@ -502,6 +511,7 @@ def handle_signal(id:, name:, logger:)

# Publish queued messages concurrently
# @param name [String] The name of the publisher.
# @param batch_size [Integer] The batch size.
# @param buffer_size [Integer] The buffer size.
# @param concurrency [Integer] The number of threads for concurrent publishing.
# @param tick_interval [Float] The tick interval in seconds.
Expand All @@ -514,6 +524,7 @@ def handle_signal(id:, name:, logger:)
# @yield [Hash] A block to handle the publishing of each message.
def publish_message(
name: "#{::Socket.gethostname}:#{::Process.pid}",
batch_size: PUBLISH_MESSAGE_DEFAULTS[:batch_size],
buffer_size: PUBLISH_MESSAGE_DEFAULTS[:buffer_size],
concurrency: PUBLISH_MESSAGE_DEFAULTS[:concurrency],
tick_interval: PUBLISH_MESSAGE_DEFAULTS[:tick_interval],
Expand All @@ -527,8 +538,11 @@ def publish_message(
"(#{RUBY_RELEASE_DATE} revision #{RUBY_REVISION[0, 10]}) [#{RUBY_PLATFORM}]"

logger.info "Outboxer config " \
"buffer_size=#{buffer_size}, concurrency=#{concurrency}, " \
"tick_interval=#{tick_interval}, poll_interval=#{poll_interval}, " \
"batch_size=#{batch_size}, " \
"buffer_size=#{buffer_size}, " \
"concurrency=#{concurrency}, " \
"tick_interval=#{tick_interval} " \
"poll_interval=#{poll_interval}, " \
"heartbeat_interval=#{heartbeat_interval}, " \
"log_level=#{logger.level}"

Expand All @@ -537,7 +551,7 @@ def publish_message(
queue = Queue.new

publisher = create(
name: name, buffer_size: buffer_size, concurrency: concurrency,
name: name, batch_size: batch_size, buffer_size: buffer_size, concurrency: concurrency,
tick_interval: tick_interval, poll_interval: poll_interval,
heartbeat_interval: heartbeat_interval)
id = publisher[:id]
Expand All @@ -561,7 +575,7 @@ def publish_message(
when Status::PUBLISHING
buffer_messages(
id: id, name: name,
queue: queue, buffer_size: buffer_size,
queue: queue, batch_size: batch_size, buffer_size: buffer_size,
poll_interval: poll_interval, tick_interval: tick_interval,
signal_read: signal_read, logger: logger, process: process, kernel: kernel)
when Status::STOPPED
Expand Down Expand Up @@ -597,36 +611,41 @@ def publish_message(
logger.info "Outboxer terminated"
end

# Publishes a buffered message
# Publishes buffered messages
# @param id [Integer] The ID of the publisher.
# @param name [String] The name of the publisher.
# @param buffered_message [Hash] The message data retrieved from the buffer.
# @param buffered_messages [Hash] The message data retrieved from the buffer.
# @param logger [Logger] Logger for recording the outcome of the publishing attempt.
# @yield [Hash] A block to process the publishing of the message.
def publish_buffered_message(id:, name:, buffered_message:, logger:, &block)
publishing_message = Message.publishing(
id: buffered_message[:id], publisher_id: id, publisher_name: name)
def publish_buffered_messages(id:, name:, buffered_messages:, logger:, &block)
buffered_message_ids = buffered_messages.map { |buffered_message| buffered_message[:id] }

publishing_messages = Message.publishing_by_ids(
ids: buffered_message_ids, publisher_id: id, publisher_name: name)

begin
block.call(publishing_message)
block.call(publishing_messages)
rescue ::Exception => error
failed_message = Message.failed(
id: publishing_message[:id], exception: error, publisher_id: id, publisher_name: name)
failed_messages = Message.failed_by_ids(
ids: buffered_message_ids, exception: error, publisher_id: id, publisher_name: name)

logger.debug "Outboxer failed to publish message id=#{failed_message[:id]} " \
"messageable=#{failed_message[:messageable_type]}::#{failed_message[:messageable_id]} " \
"in #{(failed_message[:updated_at] - failed_message[:queued_at]).round(3)}s"
failed_messages.each do |message|
logger.debug "Outboxer failed to publish message id=#{message[:id]} " \
"messageable=#{message[:messageable_type]}::#{message[:messageable_id]} in " \
"#{(message[:updated_at] - message[:queued_at]).round(3)}s"
end

raise
end

published_message = Message.published(
id: publishing_message[:id], publisher_id: id, publisher_name: name)
published_messages = Message.published_by_ids(
ids: buffered_message_ids, publisher_id: id, publisher_name: name)

logger.debug "Outboxer published message id=#{published_message[:id]} " \
"messageable=#{published_message[:messageable_type]}::" \
"#{published_message[:messageable_id]} in " \
"#{(published_message[:updated_at] - published_message[:queued_at]).round(3)}s"
published_messages.each do |message|
logger.debug "Outboxer published message id=#{message[:id]} " \
"messageable=#{message[:messageable_type]}::#{message[:messageable_id]} in " \
"#{(message[:updated_at] - message[:queued_at]).round(3)}s"
end
rescue StandardError => error
logger.error(
"#{error.class}: #{error.message}\n" \
Expand Down
4 changes: 4 additions & 0 deletions lib/outboxer/web/views/publisher.erb
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@
<h5>Settings</h5>
<table class="table table-sm">
<tbody>
<tr>
<th scope="row">Batch Size</th>
<td><%= publisher[:settings]['batch_size'] %></td>
</tr>
<tr>
<th scope="row">Buffer Size</th>
<td><%= publisher[:settings]['buffer_size'] %></td>
Expand Down
1 change: 1 addition & 0 deletions spec/fixtures/config/outboxer.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
---
:batch_size: 10
:buffer_size: 100
:concurrency: 1
:tick_interval: 0.1
Expand Down
Loading