diff --git a/bin/outboxer_publisher b/bin/outboxer_publisher index 098d3514..bde6cb76 100755 --- a/bin/outboxer_publisher +++ b/bin/outboxer_publisher @@ -17,19 +17,22 @@ Outboxer::Database.connect(config: database_config, logger: logger) begin Outboxer::Publisher.publish_message( + batch_size: options[:batch_size], buffer_size: options[:buffer_size], concurrency: options[:concurrency], tick_interval: options[:tick_interval], poll_interval: options[:poll_interval], heartbeat_interval: options[:heartbeat_interval], logger: logger - ) do |message| - # TODO: publish message here + ) do |messages| + # TODO: publish messages here - logger.info "Outboxer published message " \ - "id=#{message[:id]} " \ - "messageable_type=#{message[:messageable_type]} " \ - "messageable_id=#{message[:messageable_id]} " + messages.each do |message| + logger.info "Outboxer published message " \ + "id=#{message[:id]} " \ + "messageable_type=#{message[:messageable_type]} " \ + "messageable_id=#{message[:messageable_id]} " + end end ensure Outboxer::Database.disconnect(logger: logger) diff --git a/config/outboxer.yml b/config/outboxer.yml index 5cc65a7e..7bb50cd9 100644 --- a/config/outboxer.yml +++ b/config/outboxer.yml @@ -1,4 +1,5 @@ --- +:batch_size: 1 :buffer_size: 100 :concurrency: 1 :tick_interval: 0.1 diff --git a/db/migrate/create_outboxer_messages.rb b/db/migrate/create_outboxer_messages.rb index 2af3d17e..450a8f7d 100644 --- a/db/migrate/create_outboxer_messages.rb +++ b/db/migrate/create_outboxer_messages.rb @@ -29,6 +29,10 @@ def up # publisher throughput add_index :outboxer_messages, [:status, :publisher_id, :updated_at], name: "idx_outboxer_status_pub_id_updated_at" + + # bulk status + id locking + add_index :outboxer_messages, [:status, :id], + name: "idx_outboxer_status_id" end def down diff --git a/lib/outboxer/message.rb b/lib/outboxer/message.rb index b9a24614..7e99f967 100644 --- a/lib/outboxer/message.rb +++ b/lib/outboxer/message.rb @@ -672,5 +672,121 @@ def metrics(time: ::Time) metrics end + + def publishing_by_ids(ids:, publisher_id: nil, publisher_name: nil, time: ::Time) + ActiveRecord::Base.connection_pool.with_connection do + ActiveRecord::Base.transaction do + messages = Models::Message + .where(status: Status::BUFFERED, id: ids) + .lock("FOR UPDATE SKIP LOCKED") + .to_a + + raise ArgumentError, "Some messages not buffered" if messages.size != ids.size + + current_utc_time = time.now.utc + + Models::Message.where(status: Status::BUFFERED, id: ids).update_all( + status: Status::PUBLISHING, + publishing_at: current_utc_time, + updated_at: current_utc_time, + publisher_id: publisher_id, + publisher_name: publisher_name) + + messages.map do |message| + Message.serialize( + id: message.id, + status: Status::PUBLISHING, + messageable_type: message.messageable_type, + messageable_id: message.messageable_id, + queued_at: message.queued_at, + buffered_at: message.buffered_at, + publishing_at: current_utc_time, + updated_at: current_utc_time, + publisher_id: publisher_id, + publisher_name: publisher_name) + end + end + end + end + + def published_by_ids(ids:, publisher_id: nil, publisher_name: nil, time: ::Time) + ActiveRecord::Base.connection_pool.with_connection do + ActiveRecord::Base.transaction do + messages = Models::Message + .where(status: Status::PUBLISHING, id: ids) + .lock("FOR UPDATE SKIP LOCKED") + .to_a + + raise ArgumentError, "Some messages not publishing" if messages.size != ids.size + + current_utc_time = time.now.utc + + Models::Message.where(status: Status::PUBLISHING, id: ids).update_all( + status: Status::PUBLISHED, + updated_at: current_utc_time, + publisher_id: publisher_id, + publisher_name: publisher_name) + + messages.map do |message| + Message.serialize( + id: message.id, + status: Status::PUBLISHED, + messageable_type: message.messageable_type, + messageable_id: message.messageable_id, + queued_at: message.queued_at, + buffered_at: message.buffered_at, + publishing_at: message.publishing_at, + updated_at: current_utc_time, + publisher_id: publisher_id, + publisher_name: publisher_name) + end + end + end + end + + def failed_by_ids(ids:, exception:, publisher_id: nil, publisher_name: nil, time: ::Time) + ActiveRecord::Base.connection_pool.with_connection do + ActiveRecord::Base.transaction do + messages = Models::Message + .where(status: Status::PUBLISHING, id: ids) + .lock("FOR UPDATE SKIP LOCKED") + .to_a + + raise ArgumentError, "Some messages not publishing" if messages.size != ids.size + + current_utc_time = time.now.utc + + Models::Message.where(status: Status::PUBLISHING, id: ids).update_all( + status: Status::FAILED, + updated_at: current_utc_time, + publisher_id: publisher_id, + publisher_name: publisher_name) + + messages.each do |message| + outboxer_exception = message.exceptions.create!( + class_name: exception.class.name, + message_text: exception.message) + + exception.backtrace.each_with_index do |frame, index| + outboxer_exception.frames.create!(index: index, text: frame) + end + end + + messages.map do |message| + Message.serialize( + id: message.id, + status: Status::FAILED, + messageable_type: message.messageable_type, + messageable_id: message.messageable_id, + queued_at: message.queued_at, + buffered_at: message.buffered_at, + publishing_at: message.publishing_at, + updated_at: current_utc_time, + publisher_id: publisher_id, + publisher_name: publisher_name) + end + end + end + end end end diff --git a/lib/outboxer/publisher.rb b/lib/outboxer/publisher.rb index 9bc9fa9a..4473d133 100644 --- a/lib/outboxer/publisher.rb +++ b/lib/outboxer/publisher.rb @@ -23,7 +23,11 @@ def self.parse_cli_options(args) options[:environment] = v end - opts.on("-b", "--buffer-size SIZE", Integer, "Buffer size") do |v| + opts.on("-b", "--batch-size SIZE", Integer, "Batch size") do |v| + options[:batch_size] = v + end + + opts.on("-u", "--buffer-size SIZE", Integer, "Buffer size") do |v| options[:buffer_size] = v end @@ -127,7 +131,7 @@ def find_by_id(id:) # @param heartbeat_interval [Float] The heartbeat interval in seconds. # @param time [Time] The current time context for timestamping. # @return [Hash] Details of the created publisher. - def create(name:, buffer_size:, concurrency:, + def create(name:, batch_size:, buffer_size:, concurrency:, tick_interval:, poll_interval:, heartbeat_interval:, time: ::Time) ActiveRecord::Base.connection_pool.with_connection do ActiveRecord::Base.transaction do @@ -137,6 +141,7 @@ def create(name:, buffer_size:, concurrency:, name: name, status: Status::PUBLISHING, settings: { + "batch_size" => batch_size, "buffer_size" => buffer_size, "concurrency" => concurrency, "tick_interval" => tick_interval, @@ -284,11 +289,11 @@ def create_worker_threads(id:, name:, Thread.new do Thread.current.name = "publisher-#{index + 1}" - while (buffered_message = queue.pop) - break if buffered_message.nil? + while (buffered_messages = queue.pop) + break if buffered_messages.nil? - publish_buffered_message( - id: id, name: name, buffered_message: buffered_message, + publish_buffered_messages( + id: id, name: name, buffered_messages: buffered_messages, logger: logger, &block) end end @@ -299,23 +304,26 @@ def create_worker_threads(id:, name:, # @param id [Integer] The ID of the publisher. # @param name [String] The name of the publisher. # @param queue [Queue] The queue of messages to be published. - # @param buffer_size [Integer] The buffer sie. + # @param batch_size [Integer] The batch size. + # @param buffer_size [Integer] The buffer size. # @param poll_interval [Float] The poll interval in seconds. # @param tick_interval [Float] The tick interval in seconds. # @param signal_read [IO] The IO object to read signals. # @param logger [Logger] The logger to use for logging operations. # @param process [Process] The process object to use for timing. # @param kernel [Kernel] The kernel module to use for sleep operations. - def buffer_messages(id:, name:, queue:, buffer_size:, poll_interval:, tick_interval:, + def buffer_messages(id:, name:, queue:, batch_size:, buffer_size:, + poll_interval:, tick_interval:, signal_read:, logger:, process:, kernel:) buffer_remaining = buffer_size - queue.size if buffer_remaining > 0 buffered_messages = Message.buffer( - limit: buffer_remaining, publisher_id: id, publisher_name: name) + limit: [buffer_remaining, batch_size].min, + publisher_id: id, publisher_name: name) if buffered_messages.count > 0 - buffered_messages.each { |message| queue.push(message) } + queue.push(buffered_messages) else Publisher.sleep( poll_interval, @@ -492,6 +500,7 @@ def handle_signal(id:, name:, logger:) end PUBLISH_MESSAGE_DEFAULTS = { + batch_size: 10, buffer_size: 100, concurrency: 1, tick_interval: 0.1, @@ -502,6 +511,7 @@ def handle_signal(id:, name:, logger:) # Publish queued messages concurrently # @param name [String] The name of the publisher. + # @param batch_size [Integer] The batch size. # @param buffer_size [Integer] The buffer size. # @param concurrency [Integer] The number of threads for concurrent publishing. # @param tick_interval [Float] The tick interval in seconds. @@ -514,6 +524,7 @@ def handle_signal(id:, name:, logger:) # @yield [Hash] A block to handle the publishing of each message. def publish_message( name: "#{::Socket.gethostname}:#{::Process.pid}", + batch_size: PUBLISH_MESSAGE_DEFAULTS[:batch_size], buffer_size: PUBLISH_MESSAGE_DEFAULTS[:buffer_size], concurrency: PUBLISH_MESSAGE_DEFAULTS[:concurrency], tick_interval: PUBLISH_MESSAGE_DEFAULTS[:tick_interval], @@ -527,8 +538,11 @@ def publish_message( "(#{RUBY_RELEASE_DATE} revision #{RUBY_REVISION[0, 10]}) [#{RUBY_PLATFORM}]" logger.info "Outboxer config " \ - "buffer_size=#{buffer_size}, concurrency=#{concurrency}, " \ - "tick_interval=#{tick_interval}, poll_interval=#{poll_interval}, " \ + "batch_size=#{batch_size}, " \ + "buffer_size=#{buffer_size}, " \ + "concurrency=#{concurrency}, " \ + "tick_interval=#{tick_interval} " \ + "poll_interval=#{poll_interval}, " \ "heartbeat_interval=#{heartbeat_interval}, " \ "log_level=#{logger.level}" @@ -537,7 +551,7 @@ def publish_message( queue = Queue.new publisher = create( - name: name, buffer_size: buffer_size, concurrency: concurrency, + name: name, batch_size: batch_size, buffer_size: buffer_size, concurrency: concurrency, tick_interval: tick_interval, poll_interval: poll_interval, heartbeat_interval: heartbeat_interval) id = publisher[:id] @@ -561,7 +575,7 @@ def publish_message( when Status::PUBLISHING buffer_messages( id: id, name: name, - queue: queue, buffer_size: buffer_size, + queue: queue, batch_size: batch_size, buffer_size: buffer_size, poll_interval: poll_interval, tick_interval: tick_interval, signal_read: signal_read, logger: logger, process: process, kernel: kernel) when Status::STOPPED @@ -597,36 +611,41 @@ def publish_message( logger.info "Outboxer terminated" end - # Publishes a buffered message + # Publishes buffered messages # @param id [Integer] The ID of the publisher. # @param name [String] The name of the publisher. - # @param buffered_message [Hash] The message data retrieved from the buffer. + # @param buffered_messages [Hash] The message data retrieved from the buffer. # @param logger [Logger] Logger for recording the outcome of the publishing attempt. # @yield [Hash] A block to process the publishing of the message. - def publish_buffered_message(id:, name:, buffered_message:, logger:, &block) - publishing_message = Message.publishing( - id: buffered_message[:id], publisher_id: id, publisher_name: name) + def publish_buffered_messages(id:, name:, buffered_messages:, logger:, &block) + buffered_message_ids = buffered_messages.map { |buffered_message| buffered_message[:id] } + + publishing_messages = Message.publishing_by_ids( + ids: buffered_message_ids, publisher_id: id, publisher_name: name) begin - block.call(publishing_message) + block.call(publishing_messages) rescue ::Exception => error - failed_message = Message.failed( - id: publishing_message[:id], exception: error, publisher_id: id, publisher_name: name) + failed_messages = Message.failed_by_ids( + ids: buffered_message_ids, exception: error, publisher_id: id, publisher_name: name) - logger.debug "Outboxer failed to publish message id=#{failed_message[:id]} " \ - "messageable=#{failed_message[:messageable_type]}::#{failed_message[:messageable_id]} " \ - "in #{(failed_message[:updated_at] - failed_message[:queued_at]).round(3)}s" + failed_messages.each do |message| + logger.debug "Outboxer failed to publish message id=#{message[:id]} " \ + "messageable=#{message[:messageable_type]}::#{message[:messageable_id]} in " \ + "#{(message[:updated_at] - message[:queued_at]).round(3)}s" + end raise end - published_message = Message.published( - id: publishing_message[:id], publisher_id: id, publisher_name: name) + published_messages = Message.published_by_ids( + ids: buffered_message_ids, publisher_id: id, publisher_name: name) - logger.debug "Outboxer published message id=#{published_message[:id]} " \ - "messageable=#{published_message[:messageable_type]}::" \ - "#{published_message[:messageable_id]} in " \ - "#{(published_message[:updated_at] - published_message[:queued_at]).round(3)}s" + published_messages.each do |message| + logger.debug "Outboxer published message id=#{message[:id]} " \ + "messageable=#{message[:messageable_type]}::#{message[:messageable_id]} in " \ + "#{(message[:updated_at] - message[:queued_at]).round(3)}s" + end rescue StandardError => error logger.error( "#{error.class}: #{error.message}\n" \ diff --git a/lib/outboxer/web/views/publisher.erb b/lib/outboxer/web/views/publisher.erb index 974445f1..4cf87d33 100644 --- a/lib/outboxer/web/views/publisher.erb +++ b/lib/outboxer/web/views/publisher.erb @@ -78,6 +78,10 @@
Batch Size | +<%= publisher[:settings]['batch_size'] %> | +
---|---|
Buffer Size | <%= publisher[:settings]['buffer_size'] %> | diff --git a/spec/fixtures/config/outboxer.yml b/spec/fixtures/config/outboxer.yml index 819781f9..e5a5ad81 100644 --- a/spec/fixtures/config/outboxer.yml +++ b/spec/fixtures/config/outboxer.yml @@ -1,4 +1,5 @@ --- +:batch_size: 10 :buffer_size: 100 :concurrency: 1 :tick_interval: 0.1 diff --git a/spec/lib/outboxer/message/failed_by_ids_spec.rb b/spec/lib/outboxer/message/failed_by_ids_spec.rb new file mode 100644 index 00000000..4d7aeef5 --- /dev/null +++ b/spec/lib/outboxer/message/failed_by_ids_spec.rb @@ -0,0 +1,74 @@ +require "rails_helper" + +module Outboxer + RSpec.describe Message do + describe ".failed_by_ids" do + context "when messages are publishing" do + let!(:messages) do + [ + create(:outboxer_message, :publishing), + create(:outboxer_message, :publishing) + ] + end + + let(:ids) { messages.map(&:id) } + + let(:exception) do + def raise_exception + raise StandardError, "something went wrong" + rescue StandardError => error + error + end + + raise_exception + end + + it "transitions publishing messages to failed and stores exception frames" do + result = Message.failed_by_ids(ids: ids, exception: exception) + + expect(result.count).to eq(2) + expect(result.map { |message| message[:id] }).to match_array(ids) + + expect(Models::Message.failed.pluck(:id)).to match_array(ids) + end + end + + context "when a message is queued" do + let!(:messages) do + [ + create(:outboxer_message, :queued), + create(:outboxer_message, :publishing) + ] + end + + let(:ids) { messages.map(&:id) } + + let(:exception) do + def raise_exception + raise StandardError, "something went wrong" + rescue StandardError => error + error + end + + raise_exception + end + + it "does not update any messages" do + begin + Message.publishing_by_ids(ids: ids) + rescue ArgumentError + # no op + end + + expect(Models::Message.group(:status).count).to eq({ "queued" => 1, "publishing" => 1 }) + end + + it "raises error" do + expect do + Message.failed_by_ids(ids: ids, exception: exception) + end.to raise_error(ArgumentError, /not publishing/) + end + end + end + end +end diff --git a/spec/lib/outboxer/message/published_by_ids_spec.rb b/spec/lib/outboxer/message/published_by_ids_spec.rb new file mode 100644 index 00000000..ed5f0ae4 --- /dev/null +++ b/spec/lib/outboxer/message/published_by_ids_spec.rb @@ -0,0 +1,54 @@ +require "rails_helper" + +module Outboxer + RSpec.describe Message do + describe ".published_by_ids" do + context "when messages are publishing" do + let!(:messages) do + [ + create(:outboxer_message, :publishing), + create(:outboxer_message, :publishing) + ] + end + + let(:ids) { messages.map(&:id) } + + it "transitions publishing messages to published and returns serialized results" do + result = Message.published_by_ids(ids: ids) + + expect(result.count).to eq(2) + expect(result.map { |message| message[:id] }).to match_array(ids) + + expect(Models::Message.published.pluck(:id)).to match_array(ids) + end + end + + context "when a message is queued" do + let!(:messages) do + [ + create(:outboxer_message, :buffered), + create(:outboxer_message, :queued) + ] + end + + let(:ids) { messages.map(&:id) } + + it "does not update any messages" do + begin + Message.published_by_ids(ids: ids) + rescue ArgumentError + # no op + end + + expect(Models::Message.group(:status).count).to eq({ "buffered" => 1, "queued" => 1 }) + end + + it "raises error if any message is not buffered" do + expect do + Message.published_by_ids(ids: ids) + end.to raise_error(ArgumentError, /not publishing/) + end + end + end + end +end diff --git a/spec/lib/outboxer/message/publishing_by_ids_spec.rb b/spec/lib/outboxer/message/publishing_by_ids_spec.rb new file mode 100644 index 00000000..0302175e --- /dev/null +++ b/spec/lib/outboxer/message/publishing_by_ids_spec.rb @@ -0,0 +1,54 @@ +require "rails_helper" + +module Outboxer + RSpec.describe Message do + describe ".publishing_by_ids" do + context "when messages are buffered" do + let!(:messages) do + [ + create(:outboxer_message, :buffered), + create(:outboxer_message, :buffered) + ] + end + + let(:ids) { messages.map(&:id) } + + it "transitions buffered messages to publishing and returns serialized results" do + result = Message.publishing_by_ids(ids: ids) + + expect(result.count).to eq(2) + expect(result.map { |message| message[:id] }).to match_array(ids) + + expect(Models::Message.publishing.pluck(:id)).to match_array(ids) + end + end + + context "when a message is queued" do + let!(:messages) do + [ + create(:outboxer_message, :buffered), + create(:outboxer_message, :queued) + ] + end + + let(:ids) { messages.map(&:id) } + + it "does not update any messages" do + begin + Message.publishing_by_ids(ids: ids) + rescue ArgumentError + # no op + end + + expect(Models::Message.group(:status).count).to eq({ "buffered" => 1, "queued" => 1 }) + end + + it "raises error" do + expect do + Message.publishing_by_ids(ids: ids) + end.to raise_error(ArgumentError, /not buffered/) + end + end + end + end +end diff --git a/spec/lib/outboxer/publisher/config_spec.rb b/spec/lib/outboxer/publisher/config_spec.rb index 4faf83ec..9af72efb 100644 --- a/spec/lib/outboxer/publisher/config_spec.rb +++ b/spec/lib/outboxer/publisher/config_spec.rb @@ -9,6 +9,7 @@ module Outboxer allow(ENV).to receive(:[]).with("RAILS_MAX_THREADS").and_return("4") config = Publisher.config(environment: "test", path: path) expect(config).to eq({ + batch_size: 10, buffer_size: 100, concurrency: 4, tick_interval: 0.1, @@ -20,6 +21,7 @@ module Outboxer it "returns root values when environment not overridden" do expect(Publisher.config(path: path)).to eq({ + batch_size: 10, buffer_size: 100, concurrency: 1, tick_interval: 0.1, @@ -32,6 +34,7 @@ module Outboxer it "returns environment overrides" do expect(Publisher.config(environment: "development", path: path)) .to eq({ + batch_size: 10, buffer_size: 100, concurrency: 2, tick_interval: 0.1, diff --git a/spec/lib/outboxer/publisher/parse_cli_options_spec.rb b/spec/lib/outboxer/publisher/parse_cli_options_spec.rb index 8f9eb53d..e1f88c11 100644 --- a/spec/lib/outboxer/publisher/parse_cli_options_spec.rb +++ b/spec/lib/outboxer/publisher/parse_cli_options_spec.rb @@ -27,9 +27,16 @@ module Outboxer end end + context "when parsing the batch size option" do + it "parses short and long flags" do + expect(Publisher.parse_cli_options(["-b", "10"])).to eq({ batch_size: 10 }) + expect(Publisher.parse_cli_options(["--batch-size", "10"])).to eq({ batch_size: 10 }) + end + end + context "when parsing the buffer size option" do it "parses short and long flags" do - expect(Publisher.parse_cli_options(["-b", "100"])).to eq({ buffer_size: 100 }) + expect(Publisher.parse_cli_options(["-u", "100"])).to eq({ buffer_size: 100 }) expect(Publisher.parse_cli_options(["--buffer-size", "100"])).to eq({ buffer_size: 100 }) end end diff --git a/spec/lib/outboxer/publisher/publish_message_spec.rb b/spec/lib/outboxer/publisher/publish_message_spec.rb index 1d17c5f1..4a258dcd 100644 --- a/spec/lib/outboxer/publisher/publish_message_spec.rb +++ b/spec/lib/outboxer/publisher/publish_message_spec.rb @@ -4,6 +4,7 @@ module Outboxer RSpec.describe Publisher do describe ".publish_message" do + let(:batch_size) { 1 } let(:buffer_size) { 1 } let(:poll_interval) { 1 } let(:tick_interval) { 0.1 } @@ -20,6 +21,7 @@ module Outboxer it "dumps stack trace" do publish_message_thread = Thread.new do Outboxer::Publisher.publish_message( + batch_size: batch_size, buffer_size: buffer_size, poll_interval: poll_interval, tick_interval: tick_interval, @@ -45,6 +47,7 @@ module Outboxer it "stops and resumes the publishing process correctly" do publish_message_thread = Thread.new do Outboxer::Publisher.publish_message( + batch_size: batch_size, buffer_size: buffer_size, poll_interval: poll_interval, tick_interval: tick_interval, @@ -67,11 +70,14 @@ module Outboxer context "when message published successfully" do it "sets the message to published" do Publisher.publish_message( + batch_size: batch_size, buffer_size: buffer_size, poll_interval: poll_interval, tick_interval: tick_interval, logger: logger, - kernel: kernel) do |message| + kernel: kernel) do |messages| + message = messages.first + expect(message[:id]).to eq(queued_message.id) expect(message[:messageable_type]).to eq(queued_message.messageable_type) expect(message[:messageable_id]).to eq(queued_message.messageable_id) @@ -90,6 +96,7 @@ module Outboxer before do Publisher.publish_message( + batch_size: batch_size, buffer_size: buffer_size, poll_interval: poll_interval, tick_interval: tick_interval, @@ -131,6 +138,7 @@ module Outboxer before do Publisher.publish_message( + batch_size: batch_size, buffer_size: buffer_size, poll_interval: poll_interval, tick_interval: tick_interval, @@ -184,6 +192,7 @@ module Outboxer expect(logger).to receive(:error).with(include("StandardError: queue error")).once Publisher.publish_message( + batch_size: batch_size, buffer_size: buffer_size, poll_interval: poll_interval, tick_interval: tick_interval, @@ -202,6 +211,7 @@ module Outboxer .once Publisher.publish_message( + batch_size: batch_size, buffer_size: buffer_size, poll_interval: poll_interval, tick_interval: tick_interval,