diff --git a/README.md b/README.md index b967bd5..08a03fb 100644 --- a/README.md +++ b/README.md @@ -84,27 +84,8 @@ TRANSACTION (0.2ms) COMMIT ```ruby # bin/outboxer_publisher -Outboxer::Publisher.publish_messages(batch_size: 1_000, concurrency: 10) do |publisher, messages| - begin - # TODO: publish messages here - rescue => error - Outboxer::Publisher.update_messages( - id: publisher[:id], - failed_messages: messages.map do |message| - { - id: message[:id], - exception: { - class_name: error.class.name, - message_text: error.message, - backtrace: error.backtrace.join("\n") - } - } - end) - else - Outboxer::Publisher.update_messages( - id: publisher[:id], - published_message_ids: messages.map { |message| message[:id] }) - end +Outboxer::Publisher.publish_message(concurrency: 10) do |publisher, message| + # TODO: publish messages here end ``` diff --git a/bin/outboxer_bunny_publisher b/bin/outboxer_bunny_publisher index 4eb8b82..ef36d90 100755 --- a/bin/outboxer_bunny_publisher +++ b/bin/outboxer_bunny_publisher @@ -8,7 +8,7 @@ require "json" cli_options = Outboxer::Publisher.parse_cli_options(ARGV) environment = cli_options.delete(:environment) || ENV["APP_ENV"] || ENV["RAILS_ENV"] || "development" -options = Outboxer::Publisher::PUBLISH_MESSAGES_DEFAULTS.merge(cli_options) +options = Outboxer::Publisher::PUBLISH_MESSAGE_DEFAULTS.merge(cli_options) database_config = Outboxer::Database.config( environment: environment, @@ -28,7 +28,7 @@ exchange = channel.direct(ENV.fetch("BUNNY_EXCHANGE"), durable: true) routing_key = ENV.fetch("BUNNY_ROUTING_KEY", "") begin - Outboxer::Publisher.publish_messages( + Outboxer::Publisher.publish_message( batch_size: 1, concurrency: options[:concurrency]) do |publisher, messages| begin exchange.publish( diff --git a/bin/outboxer_kafka_publisher b/bin/outboxer_kafka_publisher index ede92fe..c9f4af9 100755 --- a/bin/outboxer_kafka_publisher +++ b/bin/outboxer_kafka_publisher @@ -7,7 +7,7 @@ require "kafka" cli_options = Outboxer::Publisher.parse_cli_options(ARGV) environment = cli_options.delete(:environment) || ENV["APP_ENV"] || ENV["RAILS_ENV"] || "development" -options = Outboxer::Publisher::PUBLISH_MESSAGES_DEFAULTS.merge(cli_options) +options = Outboxer::Publisher::PUBLISH_MESSAGE_DEFAULTS.merge(cli_options) database_config = Outboxer::Database.config( environment: environment, diff --git a/bin/outboxer_publisher b/bin/outboxer_publisher index 2ee7647..93555ae 100755 --- a/bin/outboxer_publisher +++ b/bin/outboxer_publisher @@ -6,7 +6,7 @@ require "outboxer" cli_options = Outboxer::Publisher.parse_cli_options(ARGV) environment = cli_options.delete(:environment) || ENV["APP_ENV"] || ENV["RAILS_ENV"] || "development" -options = Outboxer::Publisher::PUBLISH_MESSAGES_DEFAULTS.merge(cli_options) +options = Outboxer::Publisher::PUBLISH_MESSAGE_DEFAULTS.merge(cli_options) logger = Outboxer::Logger.new($stdout, level: options[:log_level]) database_config = Outboxer::Database.config( @@ -16,8 +16,7 @@ database_config = Outboxer::Database.config( Outboxer::Database.connect(config: database_config, logger: logger) begin - Outboxer::Publisher.publish_messages( - batch_size: options[:batch_size], + Outboxer::Publisher.publish_message( concurrency: options[:concurrency], tick_interval: options[:tick_interval], poll_interval: options[:poll_interval], @@ -26,27 +25,8 @@ begin sweep_retention: options[:sweep_retention], sweep_batch_size: options[:sweep_batch_size], logger: logger - ) do |publisher, messages| - begin - # TODO: publish messages here - rescue => error - Outboxer::Publisher.update_messages( - id: publisher[:id], - failed_messages: messages.map do |message| - { - id: message[:id], - exception: { - class_name: error.class.name, - message_text: error.message, - backtrace: error.backtrace.join("\n") - } - } - end) - else - Outboxer::Publisher.update_messages( - id: publisher[:id], - published_message_ids: messages.map { |message| message[:id] }) - end + ) do |publisher, message| + # TODO: publish message here end ensure Outboxer::Database.disconnect(logger: logger) diff --git a/bin/outboxer_sidekiq_push_bulk_publisher b/bin/outboxer_sidekiq_push_bulk_publisher index ed3fe28..1395737 100755 --- a/bin/outboxer_sidekiq_push_bulk_publisher +++ b/bin/outboxer_sidekiq_push_bulk_publisher @@ -5,7 +5,7 @@ require "sidekiq" cli_options = Outboxer::Publisher.parse_cli_options(ARGV) environment = cli_options.delete(:environment) || ENV["APP_ENV"] || ENV["RAILS_ENV"] || "development" -options = Outboxer::Publisher::PUBLISH_MESSAGES_DEFAULTS.merge(cli_options) +options = Outboxer::Publisher::PUBLISH_MESSAGE_DEFAULTS.merge(cli_options) database_config = Outboxer::Database.config( environment: environment, diff --git a/bin/outboxer_sqs_send_message_batch_publisher b/bin/outboxer_sqs_send_message_batch_publisher index b145bab..0977b9e 100755 --- a/bin/outboxer_sqs_send_message_batch_publisher +++ b/bin/outboxer_sqs_send_message_batch_publisher @@ -7,7 +7,7 @@ require "aws-sdk-sqs" cli_options = Outboxer::Publisher.parse_cli_options(ARGV) environment = cli_options.delete(:environment) || ENV["APP_ENV"] || ENV["RAILS_ENV"] || "development" -options = Outboxer::Publisher::PUBLISH_MESSAGES_DEFAULTS.merge(cli_options) +options = Outboxer::Publisher::PUBLISH_MESSAGE_DEFAULTS.merge(cli_options) database_config = Outboxer::Database.config( environment: environment, diff --git a/lib/outboxer/publisher.rb b/lib/outboxer/publisher.rb index fe5bcd0..b9a46dc 100644 --- a/lib/outboxer/publisher.rb +++ b/lib/outboxer/publisher.rb @@ -23,10 +23,6 @@ def self.parse_cli_options(args) options[:environment] = v end - opts.on("--batch-size SIZE", Integer, "Batch size") do |v| - options[:batch_size] = v - end - opts.on("--concurrency N", Integer, "Number of threads to publish messages") do |v| options[:concurrency] = v end @@ -59,10 +55,6 @@ def self.parse_cli_options(args) options[:log_level] = v end - opts.on("--config PATH", "Path to YAML config file") do |v| - options[:config] = v - end - opts.on("--version", "Print version and exit") do puts "Outboxer version #{Outboxer::VERSION}" exit @@ -79,33 +71,6 @@ def self.parse_cli_options(args) options end - CONFIG_DEFAULTS = { - path: "config/outboxer.yml", - enviroment: "development" - } - - # Loads and processes the YAML configuration for the publisher. - # @param environment [String] The application environment. - # @param path [String] The path to the configuration file. - # @return [Hash] The processed configuration data with environment-specific overrides. - def config( - environment: CONFIG_DEFAULTS[:environment], - path: CONFIG_DEFAULTS[:path] - ) - path_expanded = ::File.expand_path(path) - text = File.read(path_expanded) - erb = ERB.new(text, trim_mode: "-") - erb.filename = path_expanded - erb_result = erb.result - - yaml = YAML.safe_load(erb_result, permitted_classes: [Symbol], aliases: true) - yaml.deep_symbolize_keys! - yaml_override = yaml.fetch(environment&.to_sym, {}).slice(*PUBLISH_MESSAGES_DEFAULTS.keys) - yaml.slice(*PUBLISH_MESSAGES_DEFAULTS.keys).merge(yaml_override) - rescue Errno::ENOENT - {} - end - # Retrieves publisher data by ID including associated signals. # @param id [Integer] The ID of the publisher to find. # @return [Hash] Detailed information about the publisher including its signals. @@ -165,14 +130,13 @@ def all # Creates a new publisher with specified settings and metrics. # @param name [String] The name of the publisher. - # @param batch_size [Integer] The batch size. # @param concurrency [Integer] The number of publishing threads. # @param tick_interval [Float] The tick interval in seconds. # @param poll_interval [Float] The poll interval in seconds. # @param heartbeat_interval [Float] The heartbeat interval in seconds. # @param time [Time] The current time context for timestamping. # @return [Hash] Details of the created publisher. - def create(name:, batch_size:, concurrency:, + def create(name:, concurrency:, tick_interval:, poll_interval:, heartbeat_interval:, sweep_interval:, sweep_retention:, sweep_batch_size:, time: ::Time) @@ -184,7 +148,6 @@ def create(name:, batch_size:, concurrency:, name: name, status: Status::PUBLISHING, settings: { - "batch_size" => batch_size, "concurrency" => concurrency, "tick_interval" => tick_interval, "poll_interval" => poll_interval, @@ -460,8 +423,7 @@ def handle_signal(id:, name:, logger:) end end - PUBLISH_MESSAGES_DEFAULTS = { - batch_size: 1000, + PUBLISH_MESSAGE_DEFAULTS = { concurrency: 1, tick_interval: 0.1, poll_interval: 5.0, @@ -474,7 +436,6 @@ def handle_signal(id:, name:, logger:) # Publish queued messages concurrently # @param name [String] The name of the publisher. - # @param batch_size [Integer] The batch size. # @param concurrency [Integer] The number of publisher threads. # @param tick_interval [Float] The tick interval in seconds. # @param poll_interval [Float] The poll interval in seconds. @@ -489,17 +450,16 @@ def handle_signal(id:, name:, logger:) # @yield [publisher, messages] Yields publisher and messages to be published. # @yieldparam publisher [Hash] A hash with keys `:id` and `:name` representing the publisher. # @yieldparam messages [Array] An array of message hashes retrieved from the buffer. - def publish_messages( + def publish_message( name: "#{::Socket.gethostname}:#{::Process.pid}", - batch_size: PUBLISH_MESSAGES_DEFAULTS[:batch_size], - concurrency: PUBLISH_MESSAGES_DEFAULTS[:concurrency], - tick_interval: PUBLISH_MESSAGES_DEFAULTS[:tick_interval], - poll_interval: PUBLISH_MESSAGES_DEFAULTS[:poll_interval], - heartbeat_interval: PUBLISH_MESSAGES_DEFAULTS[:heartbeat_interval], - sweep_interval: PUBLISH_MESSAGES_DEFAULTS[:sweep_interval], - sweep_retention: PUBLISH_MESSAGES_DEFAULTS[:sweep_retention], - sweep_batch_size: PUBLISH_MESSAGES_DEFAULTS[:sweep_batch_size], - logger: Logger.new($stdout, level: PUBLISH_MESSAGES_DEFAULTS[:log_level]), + concurrency: PUBLISH_MESSAGE_DEFAULTS[:concurrency], + tick_interval: PUBLISH_MESSAGE_DEFAULTS[:tick_interval], + poll_interval: PUBLISH_MESSAGE_DEFAULTS[:poll_interval], + heartbeat_interval: PUBLISH_MESSAGE_DEFAULTS[:heartbeat_interval], + sweep_interval: PUBLISH_MESSAGE_DEFAULTS[:sweep_interval], + sweep_retention: PUBLISH_MESSAGE_DEFAULTS[:sweep_retention], + sweep_batch_size: PUBLISH_MESSAGE_DEFAULTS[:sweep_batch_size], + logger: Logger.new($stdout, level: PUBLISH_MESSAGE_DEFAULTS[:log_level]), time: ::Time, process: ::Process, kernel: ::Kernel, &block ) @@ -507,7 +467,6 @@ def publish_messages( "(#{RUBY_RELEASE_DATE} revision #{RUBY_REVISION[0, 10]}) [#{RUBY_PLATFORM}]" logger.info "Outboxer config " \ - "batch_size=#{batch_size}, " \ "concurrency=#{concurrency}, " \ "tick_interval=#{tick_interval} " \ "poll_interval=#{poll_interval}, " \ @@ -520,9 +479,10 @@ def publish_messages( Setting.create_all publisher = create( - name: name, batch_size: batch_size, + name: name, concurrency: concurrency, - tick_interval: tick_interval, poll_interval: poll_interval, + tick_interval: tick_interval, + poll_interval: poll_interval, heartbeat_interval: heartbeat_interval, sweep_interval: sweep_interval, sweep_retention: sweep_retention, @@ -545,7 +505,7 @@ def publish_messages( publisher_threads = Array.new(concurrency) do |index| create_publisher_thread( - id: publisher[:id], name: name, index: index, batch_size: batch_size, + id: publisher[:id], name: name, index: index, poll_interval: poll_interval, tick_interval: tick_interval, logger: logger, process: process, kernel: kernel, &block) end @@ -576,7 +536,6 @@ def pool(concurrency:) # @param id [Integer] Publisher id. # @param name [String] Publisher name. - # @param batch_size [Integer] Max number of messages per batch. # @param index [Integer] Zero-based thread index (used for thread name). # @param poll_interval [Numeric] Seconds to wait when no messages found. # @param tick_interval [Numeric] Seconds between signal checks during sleep. @@ -587,7 +546,7 @@ def pool(concurrency:) # e.g., `{ id: Integer, name: String }`. # @yieldparam messages [Array] Batch of messages to publish. # @return [Thread] The created publishing thread. - def create_publisher_thread(id:, name:, batch_size:, index:, + def create_publisher_thread(id:, name:, index:, poll_interval:, tick_interval:, logger:, process:, kernel:, &block) Thread.new do @@ -595,23 +554,66 @@ def create_publisher_thread(id:, name:, batch_size:, index:, Thread.current.name = "publisher-#{index + 1}" while !terminating? - begin - messages = buffer_messages(id: id, name: name, limit: batch_size) + messages = [] - if messages.any? - block.call({ id: id, name: name }, messages) - else - Publisher.sleep( - poll_interval, - tick_interval: tick_interval, - process: process, - kernel: kernel) - end + begin + messages = buffer_messages(id: id, name: name, limit: 1) rescue StandardError => error logger.error( "#{error.class}: #{error.message}\n" \ "#{error.backtrace.join("\n")}") end + + if messages.any? + begin + block.call({ id: id, name: name }, messages[0]) + rescue StandardError => error + logger.error( + "#{error.class}: #{error.message}\n" \ + "#{error.backtrace.join("\n")}") + + Publisher.update_messages( + id: id, + failed_messages: [ + { + id: messages[0][:id], + exception: { + class_name: error.class.name, + message_text: error.message, + backtrace: error.backtrace + } + } + ]) + rescue ::Exception => error + logger.fatal( + "#{error.class}: #{error.message}\n" \ + "#{error.backtrace.join("\n")}") + + Publisher.update_messages( + id: id, + failed_messages: [ + { + id: messages[0][:id], + exception: { + class_name: error.class.name, + message_text: error.message, + backtrace: error.backtrace + } + } + ]) + + terminate(id: id) + else + Outboxer::Publisher.update_messages( + id: id, published_message_ids: [messages[0][:id]]) + end + else + Publisher.sleep( + poll_interval, + tick_interval: tick_interval, + process: process, + kernel: kernel) + end end rescue ::Exception => error logger.fatal( diff --git a/spec/lib/outboxer/publisher/parse_cli_options_spec.rb b/spec/lib/outboxer/publisher/parse_cli_options_spec.rb index 3d35ca7..eb04ec9 100644 --- a/spec/lib/outboxer/publisher/parse_cli_options_spec.rb +++ b/spec/lib/outboxer/publisher/parse_cli_options_spec.rb @@ -19,10 +19,6 @@ module Outboxer end end - it "parses the batch size flag" do - expect(Publisher.parse_cli_options(["--batch-size", "100"])).to eq({ batch_size: 100 }) - end - it "parses concurrency flag" do expect(Publisher.parse_cli_options(["--concurrency", "8"])) .to eq({ concurrency: 8 }) @@ -64,12 +60,6 @@ module Outboxer ).to eq({ sweep_batch_size: 10 }) end - it "parses the config file flag" do - expect( - Publisher.parse_cli_options(["--config", "config/path.yml"]) - ).to eq({ config: "config/path.yml" }) - end - it "parses the log level flag" do expect(Publisher.parse_cli_options(["--log-level", "0"])).to eq({ log_level: 0 }) end diff --git a/spec/lib/outboxer/publisher/pool_spec.rb b/spec/lib/outboxer/publisher/pool_spec.rb new file mode 100644 index 0000000..afe24d1 --- /dev/null +++ b/spec/lib/outboxer/publisher/pool_spec.rb @@ -0,0 +1,11 @@ +require "rails_helper" + +module Outboxer + RSpec.describe Publisher do + describe ".pool" do + it "returns correct value" do + expect(Publisher.pool(concurrency: 5)).to eql(8) + end + end + end +end diff --git a/spec/lib/outboxer/publisher/publish_messages_spec.rb b/spec/lib/outboxer/publisher/publish_message_spec.rb similarity index 81% rename from spec/lib/outboxer/publisher/publish_messages_spec.rb rename to spec/lib/outboxer/publisher/publish_message_spec.rb index 2051d53..75aef78 100644 --- a/spec/lib/outboxer/publisher/publish_messages_spec.rb +++ b/spec/lib/outboxer/publisher/publish_message_spec.rb @@ -3,8 +3,7 @@ module Outboxer RSpec.describe Publisher do - describe ".publish_messages" do - let(:batch_size) { 1 } + describe ".publish_message" do let(:poll_interval) { 1 } let(:tick_interval) { 0.1 } let(:logger) { instance_double(Logger, debug: true, error: true, fatal: true, info: true, level: 1) } @@ -18,13 +17,12 @@ module Outboxer let!(:queued_message) { create(:outboxer_message, :queued, updated_at: 2.seconds.ago) } it "terminates the publisher" do - Publisher.publish_messages( - batch_size: batch_size, + Publisher.publish_message( poll_interval: poll_interval, tick_interval: tick_interval, logger: logger, kernel: kernel - ) do |publisher, _messages| + ) do |publisher, _message| Publisher.terminate(id: publisher[:id]) end end @@ -36,8 +34,7 @@ module Outboxer it "deletes the old published message" do publish_messages_thread = Thread.new do - Publisher.publish_messages( - batch_size: batch_size, + Publisher.publish_message( poll_interval: poll_interval, tick_interval: tick_interval, sweep_interval: 0.1, @@ -45,7 +42,7 @@ module Outboxer sweep_batch_size: 100, logger: logger, kernel: kernel - ) do |_publisher, messages| + ) do |_publisher, _message| # no op end end @@ -70,8 +67,7 @@ module Outboxer it "logs error" do publish_messages_thread = Thread.new do - Publisher.publish_messages( - batch_size: batch_size, + Publisher.publish_message( poll_interval: poll_interval, tick_interval: tick_interval, sweep_interval: 0.1, @@ -79,7 +75,7 @@ module Outboxer sweep_batch_size: 100, logger: logger, kernel: kernel - ) do |_publisher, messages| + ) do |_publisher, _message| # no op end end @@ -95,8 +91,7 @@ module Outboxer it "does not delete the old message" do thread = Thread.new do - Publisher.publish_messages( - batch_size: batch_size, + Publisher.publish_message( poll_interval: poll_interval, tick_interval: tick_interval, sweep_interval: 0.1, @@ -104,7 +99,7 @@ module Outboxer sweep_batch_size: 1, logger: logger, kernel: kernel - ) do |_publisher, messages| + ) do |_publisher, message| # no op end end @@ -125,8 +120,7 @@ module Outboxer .and_raise(NoMemoryError, "boom") thread = Thread.new do - Publisher.publish_messages( - batch_size: batch_size, + Publisher.publish_message( poll_interval: poll_interval, tick_interval: tick_interval, sweep_interval: 0.1, @@ -134,7 +128,7 @@ module Outboxer sweep_batch_size: 100, logger: logger, kernel: kernel - ) do |_publisher, messages| + ) do |_publisher, message| # no op end end @@ -159,12 +153,11 @@ module Outboxer it "dumps stack trace" do publish_messages_thread = Thread.new do - Publisher.publish_messages( - batch_size: batch_size, + Publisher.publish_message( poll_interval: poll_interval, tick_interval: tick_interval, logger: logger, kernel: kernel - ) do |_publisher, _messages| + ) do |_publisher, _message| ::Process.kill("TTIN", ::Process.pid) end end @@ -186,18 +179,12 @@ module Outboxer let!(:queued_message) { create(:outboxer_message, :queued, updated_at: 2.seconds.ago) } it "stops and resumes the publishing process correctly" do - Publisher.publish_messages( - batch_size: batch_size, + Publisher.publish_message( poll_interval: poll_interval, tick_interval: tick_interval, logger: logger, kernel: kernel - ) do |publisher, messages| - Publisher.update_messages( - id: publisher[:id], - published_message_ids: messages.map { |message| message[:id] } - ) - + ) do |_publisher, _message| ::Process.kill("TSTP", ::Process.pid) sleep 0.5 ::Process.kill("CONT", ::Process.pid) @@ -213,25 +200,15 @@ module Outboxer let!(:queued_message) { create(:outboxer_message, :queued, updated_at: 2.seconds.ago) } it "sets the message to published" do - Publisher.publish_messages( - batch_size: batch_size, + Publisher.publish_message( poll_interval: poll_interval, tick_interval: tick_interval, logger: logger, kernel: kernel - ) do |publisher, messages| - expect(messages.first[:id]) - .to eq(queued_message.id) - - expect(messages.first[:messageable_type]) - .to eq(queued_message.messageable_type) - - expect(messages.first[:messageable_id]) - .to eq(queued_message.messageable_id) - - Publisher.update_messages( - id: publisher[:id], - published_message_ids: messages.map { |message| message[:id] }) + ) do |_publisher, message| + expect(message[:id]).to eq(queued_message.id) + expect(message[:messageable_type]).to eq(queued_message.messageable_type) + expect(message[:messageable_id]).to eq(queued_message.messageable_id) ::Process.kill("TERM", ::Process.pid) end @@ -247,23 +224,22 @@ module Outboxer let(:standard_error) { StandardError.new("some error") } before do - Publisher.publish_messages( - batch_size: batch_size, + Publisher.publish_message( poll_interval: poll_interval, tick_interval: tick_interval, logger: logger, kernel: kernel - ) do |_publisher, _messages| + ) do |_publisher, _message| ::Process.kill("TERM", ::Process.pid) raise standard_error end end - it "does not change publishing status" do + it "changes publishing status to failed" do queued_message.reload - expect(queued_message.status).to eq(Message::Status::PUBLISHING) + expect(queued_message.status).to eq(Message::Status::FAILED) end it "logs errors" do @@ -277,21 +253,20 @@ module Outboxer let!(:queued_message) { create(:outboxer_message, :queued, updated_at: 2.seconds.ago) } before do - Publisher.publish_messages( - batch_size: batch_size, + Publisher.publish_message( poll_interval: poll_interval, tick_interval: tick_interval, logger: logger, kernel: kernel - ) do |_publisher, _messages| + ) do |_publisher, _message| raise no_memory_error end end - it "does not change publishing status" do + it "changes publishing status to failed" do queued_message.reload - expect(queued_message.status).to eq(Message::Status::PUBLISHING) + expect(queued_message.status).to eq(Message::Status::FAILED) end it "logs errors" do @@ -319,8 +294,7 @@ module Outboxer expect(logger).to receive(:error).with(include("StandardError: queue error")).once - Publisher.publish_messages( - batch_size: 2, + Publisher.publish_message( poll_interval: poll_interval, tick_interval: tick_interval, logger: logger, @@ -339,8 +313,7 @@ module Outboxer .with(include("NoMemoryError: failed to allocate memory")) .once - Publisher.publish_messages( - batch_size: batch_size, + Publisher.publish_message( poll_interval: poll_interval, tick_interval: tick_interval, logger: logger,