From 91ecf62d356ca89ac66bfbc413b280d61472a6ae Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Sat, 11 Oct 2025 18:52:15 +1100 Subject: [PATCH] remove sweeper --- bin/outboxer_publisher | 3 - lib/outboxer/publisher.rb | 95 +------------- lib/outboxer/web/views/publisher.erb | 15 --- quickstart_e2e_tests.sh | 2 +- .../publisher/parse_cli_options_spec.rb | 18 --- spec/lib/outboxer/publisher/pool_spec.rb | 2 +- .../publisher/publish_message_spec.rb | 120 ------------------ 7 files changed, 5 insertions(+), 250 deletions(-) diff --git a/bin/outboxer_publisher b/bin/outboxer_publisher index 93555aef..68cd6035 100755 --- a/bin/outboxer_publisher +++ b/bin/outboxer_publisher @@ -21,9 +21,6 @@ begin tick_interval: options[:tick_interval], poll_interval: options[:poll_interval], heartbeat_interval: options[:heartbeat_interval], - sweep_interval: options[:sweep_interval], - sweep_retention: options[:sweep_retention], - sweep_batch_size: options[:sweep_batch_size], logger: logger ) do |publisher, message| # TODO: publish message here diff --git a/lib/outboxer/publisher.rb b/lib/outboxer/publisher.rb index 9c513049..5ae4f36f 100644 --- a/lib/outboxer/publisher.rb +++ b/lib/outboxer/publisher.rb @@ -39,18 +39,6 @@ def self.parse_cli_options(args) options[:heartbeat_interval] = v end - opts.on("--sweep-interval SECS", Float, "Sweep interval in seconds") do |v| - options[:sweep_interval] = v - end - - opts.on("--sweep-retention SECS", Float, "Sweep retention in seconds") do |v| - options[:sweep_retention] = v - end - - opts.on("--sweep-batch-size SIZE", Integer, "Sweep batch size") do |v| - options[:sweep_batch_size] = v - end - opts.on("--log-level LEVEL", Integer, "Log level") do |v| options[:log_level] = v end @@ -138,7 +126,6 @@ def all # @return [Hash] Details of the created publisher. def create(name:, concurrency:, tick_interval:, poll_interval:, heartbeat_interval:, - sweep_interval:, sweep_retention:, sweep_batch_size:, time: ::Time) ActiveRecord::Base.connection_pool.with_connection do ActiveRecord::Base.transaction do @@ -151,10 +138,7 @@ def create(name:, concurrency:, "concurrency" => concurrency, "tick_interval" => tick_interval, "poll_interval" => poll_interval, - "heartbeat_interval" => heartbeat_interval, - "sweep_interval" => sweep_interval, - "sweep_retention" => sweep_retention, - "sweep_batch_size" => sweep_batch_size + "heartbeat_interval" => heartbeat_interval }, metrics: { "throughput" => 0, @@ -428,9 +412,6 @@ def handle_signal(id:, name:, logger:) tick_interval: 0.1, poll_interval: 5.0, heartbeat_interval: 5.0, - sweep_interval: 60, - sweep_retention: 60, - sweep_batch_size: 100, log_level: 1 } @@ -440,9 +421,6 @@ def handle_signal(id:, name:, logger:) # @param tick_interval [Float] The tick interval in seconds. # @param poll_interval [Float] The poll interval in seconds. # @param heartbeat_interval [Float] The heartbeat interval in seconds. - # @param sweep_interval [Float] The interval in seconds between sweeper runs. - # @param sweep_retention [Float] The retention period in seconds for published messages. - # @param sweep_batch_size [Integer] The maximum number of messages to delete per batch. # @param logger [Logger] Logger for recording publishing activities. # @param time [Time] The current time context. # @param process [Process] The process module for system metrics. @@ -456,9 +434,6 @@ def publish_message( tick_interval: PUBLISH_MESSAGE_DEFAULTS[:tick_interval], poll_interval: PUBLISH_MESSAGE_DEFAULTS[:poll_interval], heartbeat_interval: PUBLISH_MESSAGE_DEFAULTS[:heartbeat_interval], - sweep_interval: PUBLISH_MESSAGE_DEFAULTS[:sweep_interval], - sweep_retention: PUBLISH_MESSAGE_DEFAULTS[:sweep_retention], - sweep_batch_size: PUBLISH_MESSAGE_DEFAULTS[:sweep_batch_size], logger: Logger.new($stdout, level: PUBLISH_MESSAGE_DEFAULTS[:log_level]), time: ::Time, process: ::Process, kernel: ::Kernel, &block @@ -471,9 +446,6 @@ def publish_message( "tick_interval=#{tick_interval} " \ "poll_interval=#{poll_interval}, " \ "heartbeat_interval=#{heartbeat_interval}, " \ - "sweep_interval=#{sweep_interval}, " \ - "sweep_retention=#{sweep_retention}, " \ - "sweep_batch_size=#{sweep_batch_size}, " \ "log_level=#{logger.level}" Setting.create_all @@ -483,26 +455,12 @@ def publish_message( concurrency: concurrency, tick_interval: tick_interval, poll_interval: poll_interval, - heartbeat_interval: heartbeat_interval, - sweep_interval: sweep_interval, - sweep_retention: sweep_retention, - sweep_batch_size: sweep_batch_size) + heartbeat_interval: heartbeat_interval) heartbeat_thread = create_heartbeat_thread( id: publisher[:id], heartbeat_interval: heartbeat_interval, tick_interval: tick_interval, logger: logger, time: time, process: process, kernel: kernel) - sweeper_thread = create_sweeper_thread( - id: publisher[:id], - sweep_interval: sweep_interval, - sweep_retention: sweep_retention, - sweep_batch_size: sweep_batch_size, - tick_interval: tick_interval, - logger: logger, - time: time, - process: process, - kernel: kernel) - publisher_threads = Array.new(concurrency) do |index| create_publisher_thread( id: publisher[:id], index: index, @@ -523,7 +481,6 @@ def publish_message( publisher_threads.each(&:join) heartbeat_thread.join - sweeper_thread.join delete(id: publisher[:id]) @@ -531,7 +488,7 @@ def publish_message( end def pool(concurrency:) - concurrency + 3 # (main + heartbeat + sweeper) + concurrency + 2 # (main + heartbeat) end # @param id [Integer] Publisher id. @@ -589,51 +546,5 @@ def create_publisher_thread(id:, index:, end end end - - # Creates a sweeper thread to periodically delete old published messages for a publisher. - # - # @param id [Integer] The ID of the publisher. - # @param sweep_interval [Float] Time in seconds between each sweep run. - # @param sweep_retention [Float] Retention period in seconds for keeping published messages. - # @param sweep_batch_size [Integer] Max number of messages to delete in each sweep. - # @param tick_interval [Float] Time in seconds between signal checks while sleeping. - # @param logger [Logger] Logger instance to report errors and fatal issues. - # @param time [Time] Module used for fetching current UTC time. - # @param process [Process] Process module used for monotonic clock timings. - # @param kernel [Kernel] Kernel module used for sleeping between sweeps. - # @return [Thread] The thread executing the sweeping logic. - def create_sweeper_thread(id:, sweep_interval:, sweep_retention:, sweep_batch_size:, - tick_interval:, logger:, time:, process:, kernel:) - Thread.new do - Thread.current.name = "sweeper" - - while !terminating? - begin - deleted_count = Message.delete_batch( - status: Message::Status::PUBLISHED, - older_than: time.now.utc - sweep_retention, - batch_size: sweep_batch_size)[:deleted_count] - - if deleted_count == 0 - Publisher.sleep( - sweep_interval, - tick_interval: tick_interval, - process: process, - kernel: kernel) - end - rescue StandardError => error - logger.error( - "#{error.class}: #{error.message}\n" \ - "#{error.backtrace.join("\n")}") - rescue ::Exception => error - logger.fatal( - "#{error.class}: #{error.message}\n" \ - "#{error.backtrace.join("\n")}") - - terminate(id: id) - end - end - end - end end end diff --git a/lib/outboxer/web/views/publisher.erb b/lib/outboxer/web/views/publisher.erb index 82756896..a7679de4 100644 --- a/lib/outboxer/web/views/publisher.erb +++ b/lib/outboxer/web/views/publisher.erb @@ -98,21 +98,6 @@ Heartbeat Interval <%= publisher[:settings]['heartbeat_interval'] %> seconds - - - Sweep Interval - <%= publisher[:settings]['sweep_interval'] %> seconds - - - - Sweep Retention - <%= publisher[:settings]['sweep_retention'] %> seconds - - - - Sweep Batch Size - <%= publisher[:settings]['sweep_batch_size'] %> - diff --git a/quickstart_e2e_tests.sh b/quickstart_e2e_tests.sh index 393dcbad..8369090f 100755 --- a/quickstart_e2e_tests.sh +++ b/quickstart_e2e_tests.sh @@ -42,7 +42,7 @@ bundle exec rails new . \ bundle install bundle exec rails db:create -echo 'gem "outboxer", git: "https://github.com/fast-programmer/outboxer.git", branch: "refactor/delete_published_outboxer_messages"' \ +echo 'gem "outboxer", git: "https://github.com/fast-programmer/outboxer.git", branch: "master"' \ >> Gemfile bundle install bundle exec rails generate outboxer:install diff --git a/spec/lib/outboxer/publisher/parse_cli_options_spec.rb b/spec/lib/outboxer/publisher/parse_cli_options_spec.rb index eb04ec90..e06f0d73 100644 --- a/spec/lib/outboxer/publisher/parse_cli_options_spec.rb +++ b/spec/lib/outboxer/publisher/parse_cli_options_spec.rb @@ -42,24 +42,6 @@ module Outboxer ).to eq({ heartbeat_interval: 10 }) end - it "parses the sweep interval flag" do - expect( - Publisher.parse_cli_options(["--sweep-interval", "10"]) - ).to eq({ sweep_interval: 10 }) - end - - it "parses the sweep retention flag" do - expect( - Publisher.parse_cli_options(["--sweep-retention", "10"]) - ).to eq({ sweep_retention: 10 }) - end - - it "parses the sweep batch size flag" do - expect( - Publisher.parse_cli_options(["--sweep-batch_size", "10"]) - ).to eq({ sweep_batch_size: 10 }) - end - it "parses the log level flag" do expect(Publisher.parse_cli_options(["--log-level", "0"])).to eq({ log_level: 0 }) end diff --git a/spec/lib/outboxer/publisher/pool_spec.rb b/spec/lib/outboxer/publisher/pool_spec.rb index afe24d17..a0aa1880 100644 --- a/spec/lib/outboxer/publisher/pool_spec.rb +++ b/spec/lib/outboxer/publisher/pool_spec.rb @@ -4,7 +4,7 @@ module Outboxer RSpec.describe Publisher do describe ".pool" do it "returns correct value" do - expect(Publisher.pool(concurrency: 5)).to eql(8) + expect(Publisher.pool(concurrency: 5)).to eql(7) end end end diff --git a/spec/lib/outboxer/publisher/publish_message_spec.rb b/spec/lib/outboxer/publisher/publish_message_spec.rb index d96683b7..1330d265 100644 --- a/spec/lib/outboxer/publisher/publish_message_spec.rb +++ b/spec/lib/outboxer/publisher/publish_message_spec.rb @@ -28,126 +28,6 @@ module Outboxer end end - context "when sweeper deletes a message" do - let!(:message_1) { create(:outboxer_message, :published, updated_at: 2.seconds.ago) } - let!(:message_2) { create(:outboxer_message, :published, updated_at: 2.seconds.ago) } - - it "deletes the old published message" do - publish_messages_thread = Thread.new do - Publisher.publish_message( - poll_interval: poll_interval, - tick_interval: tick_interval, - sweep_interval: 0.1, - sweep_retention: 0.1, - sweep_batch_size: 100, - logger: logger, - kernel: kernel - ) do |_publisher, _message| - # no op - end - end - - sleep 0.3 - - ::Process.kill("TERM", ::Process.pid) - - publish_messages_thread.join - - expect(Models::Message.count).to be(0) - end - end - - context "when sweeper raises StandardError" do - let!(:old_message) { create(:outboxer_message, :published, updated_at: 2.seconds.ago) } - - before do - allow(Message).to receive(:delete_batch) - .and_raise(StandardError, "sweep fail") - end - - it "logs error" do - publish_messages_thread = Thread.new do - Publisher.publish_message( - poll_interval: poll_interval, - tick_interval: tick_interval, - sweep_interval: 0.1, - sweep_retention: 0.1, - sweep_batch_size: 100, - logger: logger, - kernel: kernel - ) do |_publisher, _message| - # no op - end - end - - sleep 0.3 - ::Process.kill("TERM", ::Process.pid) - publish_messages_thread.join - - expect(logger).to have_received(:error) - .with(include("StandardError: sweep fail")) - .at_least(:once) - end - - it "does not delete the old message" do - thread = Thread.new do - Publisher.publish_message( - poll_interval: poll_interval, - tick_interval: tick_interval, - sweep_interval: 0.1, - sweep_retention: 1, - sweep_batch_size: 1, - logger: logger, - kernel: kernel - ) do |_publisher, message| - # no op - end - end - - sleep 0.3 - ::Process.kill("TERM", ::Process.pid) - thread.join - - expect(Models::Message.exists?(id: old_message.id)).to be(true) - end - end - - context "when sweeper raises critical error" do - let!(:old_message) { create(:outboxer_message, :published, updated_at: 2.seconds.ago) } - - before do - allow(Message).to receive(:delete_batch) - .and_raise(NoMemoryError, "boom") - - thread = Thread.new do - Publisher.publish_message( - poll_interval: poll_interval, - tick_interval: tick_interval, - sweep_interval: 0.1, - sweep_retention: 0.1, - sweep_batch_size: 100, - logger: logger, - kernel: kernel - ) do |_publisher, message| - # no op - end - end - - sleep 0.3 - ::Process.kill("TERM", ::Process.pid) - thread.join - end - - it "logs fatal error" do - expect(logger).to have_received(:fatal) - .with(include("NoMemoryError: boom")) - end - - it "does not delete the old message" do - expect(Models::Message.exists?(id: old_message.id)).to be(true) - end - end - context "when TTIN signal sent" do let!(:old_message) { create(:outboxer_message, :queued, updated_at: 2.seconds.ago) }