diff --git a/bin/outboxer_publisher b/bin/outboxer_publisher index 93555aef..68cd6035 100755 --- a/bin/outboxer_publisher +++ b/bin/outboxer_publisher @@ -21,9 +21,6 @@ begin tick_interval: options[:tick_interval], poll_interval: options[:poll_interval], heartbeat_interval: options[:heartbeat_interval], - sweep_interval: options[:sweep_interval], - sweep_retention: options[:sweep_retention], - sweep_batch_size: options[:sweep_batch_size], logger: logger ) do |publisher, message| # TODO: publish message here diff --git a/lib/outboxer/publisher.rb b/lib/outboxer/publisher.rb index 9c513049..5ae4f36f 100644 --- a/lib/outboxer/publisher.rb +++ b/lib/outboxer/publisher.rb @@ -39,18 +39,6 @@ def self.parse_cli_options(args) options[:heartbeat_interval] = v end - opts.on("--sweep-interval SECS", Float, "Sweep interval in seconds") do |v| - options[:sweep_interval] = v - end - - opts.on("--sweep-retention SECS", Float, "Sweep retention in seconds") do |v| - options[:sweep_retention] = v - end - - opts.on("--sweep-batch-size SIZE", Integer, "Sweep batch size") do |v| - options[:sweep_batch_size] = v - end - opts.on("--log-level LEVEL", Integer, "Log level") do |v| options[:log_level] = v end @@ -138,7 +126,6 @@ def all # @return [Hash] Details of the created publisher. def create(name:, concurrency:, tick_interval:, poll_interval:, heartbeat_interval:, - sweep_interval:, sweep_retention:, sweep_batch_size:, time: ::Time) ActiveRecord::Base.connection_pool.with_connection do ActiveRecord::Base.transaction do @@ -151,10 +138,7 @@ def create(name:, concurrency:, "concurrency" => concurrency, "tick_interval" => tick_interval, "poll_interval" => poll_interval, - "heartbeat_interval" => heartbeat_interval, - "sweep_interval" => sweep_interval, - "sweep_retention" => sweep_retention, - "sweep_batch_size" => sweep_batch_size + "heartbeat_interval" => heartbeat_interval }, metrics: { "throughput" => 0, @@ -428,9 +412,6 @@ def handle_signal(id:, name:, logger:) tick_interval: 0.1, poll_interval: 5.0, heartbeat_interval: 5.0, - sweep_interval: 60, - sweep_retention: 60, - sweep_batch_size: 100, log_level: 1 } @@ -440,9 +421,6 @@ def handle_signal(id:, name:, logger:) # @param tick_interval [Float] The tick interval in seconds. # @param poll_interval [Float] The poll interval in seconds. # @param heartbeat_interval [Float] The heartbeat interval in seconds. - # @param sweep_interval [Float] The interval in seconds between sweeper runs. - # @param sweep_retention [Float] The retention period in seconds for published messages. - # @param sweep_batch_size [Integer] The maximum number of messages to delete per batch. # @param logger [Logger] Logger for recording publishing activities. # @param time [Time] The current time context. # @param process [Process] The process module for system metrics. @@ -456,9 +434,6 @@ def publish_message( tick_interval: PUBLISH_MESSAGE_DEFAULTS[:tick_interval], poll_interval: PUBLISH_MESSAGE_DEFAULTS[:poll_interval], heartbeat_interval: PUBLISH_MESSAGE_DEFAULTS[:heartbeat_interval], - sweep_interval: PUBLISH_MESSAGE_DEFAULTS[:sweep_interval], - sweep_retention: PUBLISH_MESSAGE_DEFAULTS[:sweep_retention], - sweep_batch_size: PUBLISH_MESSAGE_DEFAULTS[:sweep_batch_size], logger: Logger.new($stdout, level: PUBLISH_MESSAGE_DEFAULTS[:log_level]), time: ::Time, process: ::Process, kernel: ::Kernel, &block @@ -471,9 +446,6 @@ def publish_message( "tick_interval=#{tick_interval} " \ "poll_interval=#{poll_interval}, " \ "heartbeat_interval=#{heartbeat_interval}, " \ - "sweep_interval=#{sweep_interval}, " \ - "sweep_retention=#{sweep_retention}, " \ - "sweep_batch_size=#{sweep_batch_size}, " \ "log_level=#{logger.level}" Setting.create_all @@ -483,26 +455,12 @@ def publish_message( concurrency: concurrency, tick_interval: tick_interval, poll_interval: poll_interval, - heartbeat_interval: heartbeat_interval, - sweep_interval: sweep_interval, - sweep_retention: sweep_retention, - sweep_batch_size: sweep_batch_size) + heartbeat_interval: heartbeat_interval) heartbeat_thread = create_heartbeat_thread( id: publisher[:id], heartbeat_interval: heartbeat_interval, tick_interval: tick_interval, logger: logger, time: time, process: process, kernel: kernel) - sweeper_thread = create_sweeper_thread( - id: publisher[:id], - sweep_interval: sweep_interval, - sweep_retention: sweep_retention, - sweep_batch_size: sweep_batch_size, - tick_interval: tick_interval, - logger: logger, - time: time, - process: process, - kernel: kernel) - publisher_threads = Array.new(concurrency) do |index| create_publisher_thread( id: publisher[:id], index: index, @@ -523,7 +481,6 @@ def publish_message( publisher_threads.each(&:join) heartbeat_thread.join - sweeper_thread.join delete(id: publisher[:id]) @@ -531,7 +488,7 @@ def publish_message( end def pool(concurrency:) - concurrency + 3 # (main + heartbeat + sweeper) + concurrency + 2 # (main + heartbeat) end # @param id [Integer] Publisher id. @@ -589,51 +546,5 @@ def create_publisher_thread(id:, index:, end end end - - # Creates a sweeper thread to periodically delete old published messages for a publisher. - # - # @param id [Integer] The ID of the publisher. - # @param sweep_interval [Float] Time in seconds between each sweep run. - # @param sweep_retention [Float] Retention period in seconds for keeping published messages. - # @param sweep_batch_size [Integer] Max number of messages to delete in each sweep. - # @param tick_interval [Float] Time in seconds between signal checks while sleeping. - # @param logger [Logger] Logger instance to report errors and fatal issues. - # @param time [Time] Module used for fetching current UTC time. - # @param process [Process] Process module used for monotonic clock timings. - # @param kernel [Kernel] Kernel module used for sleeping between sweeps. - # @return [Thread] The thread executing the sweeping logic. - def create_sweeper_thread(id:, sweep_interval:, sweep_retention:, sweep_batch_size:, - tick_interval:, logger:, time:, process:, kernel:) - Thread.new do - Thread.current.name = "sweeper" - - while !terminating? - begin - deleted_count = Message.delete_batch( - status: Message::Status::PUBLISHED, - older_than: time.now.utc - sweep_retention, - batch_size: sweep_batch_size)[:deleted_count] - - if deleted_count == 0 - Publisher.sleep( - sweep_interval, - tick_interval: tick_interval, - process: process, - kernel: kernel) - end - rescue StandardError => error - logger.error( - "#{error.class}: #{error.message}\n" \ - "#{error.backtrace.join("\n")}") - rescue ::Exception => error - logger.fatal( - "#{error.class}: #{error.message}\n" \ - "#{error.backtrace.join("\n")}") - - terminate(id: id) - end - end - end - end end end diff --git a/lib/outboxer/web/views/publisher.erb b/lib/outboxer/web/views/publisher.erb index 82756896..a7679de4 100644 --- a/lib/outboxer/web/views/publisher.erb +++ b/lib/outboxer/web/views/publisher.erb @@ -98,21 +98,6 @@