From 47c08969cee9fe10141373987c57c9cd903b75a4 Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Mon, 27 Oct 2025 20:49:22 +1100 Subject: [PATCH 01/17] update message to use new counters --- lib/outboxer/message.rb | 332 +++++++++++++++++----------------------- 1 file changed, 140 insertions(+), 192 deletions(-) diff --git a/lib/outboxer/message.rb b/lib/outboxer/message.rb index e4e56655..d62aaa70 100644 --- a/lib/outboxer/message.rb +++ b/lib/outboxer/message.rb @@ -7,65 +7,26 @@ module Message Status = Models::Message::Status class Error < StandardError; end - class MissingPartition < Error; end - - PARTITION_COUNT = Integer(ENV.fetch("OUTBOXER_MESSAGE_PARTITION_COUNT", 64)) - PARTITIONS = (0...PARTITION_COUNT) - - # Seed partitions. - # - # This method is idempotent and can be safely re-run. - # - # @param logger [#info, #error, #fatal, nil] optional logger - # @param time [Time] time source for created_at / updated_at. - # @return [Integer] number of rows ensured per table. - def seed_partitions(logger: nil, time: ::Time) - current_utc_time = time.now.utc - - [Models::MessageCount, Models::MessageTotal].each do |model| - PARTITIONS.each do |partition| - STATUSES.each do |status| - model.create_or_find_by!(status: status, partition: partition) do |record| - record.value = 0 - record.created_at = current_utc_time - record.updated_at = current_utc_time - end - end - end - end - - logger&.info "Seeded #{PARTITIONS.size} partitions" - - nil - end - - # Calculates the partition number for a given message ID. - # - # @param id [Integer] the unique message ID used to determine the partition. - # @param partition_count [Integer] the total number of partitions to divide messages across. - # Defaults to PARTITION_COUNT. - # @return [Integer] the partition number derived by taking the ID modulo partition_count. - # - # @example - # calculate_partition(id: 12345) - # # => 17 - def calculate_partition(id:, partition_count: PARTITION_COUNT) - id % partition_count - end # Queues a new message. # - # @overload queue(messageable:, attempt: 1, logger: nil, time: ::Time) - # @param messageable [Object] the associated object; must respond to `id` and `class.name`. - # @param attempt [Integer] call attempt counter; first call should leave as default (1). - # @param logger [#info, #error, #fatal, nil] optional logger - # @param time [Time] time context for setting timestamps. - # - # @overload queue(messageable_type:, messageable_id:, attempt: 1, logger: nil, time: ::Time) - # @param messageable_type [String] the associated object type name. - # @param messageable_id [Integer, String] the associated object identifier. - # @param attempt [Integer] call attempt counter; first call should leave as default (1). - # @param logger [#info, #error, #fatal, nil] optional logger + # @overload queue( + # messageable: nil, + # messageable_type: nil, + # messageable_id: nil, + # hostname: Socket.gethostname, + # process_id: Process.pid, + # thread_id: Thread.current.object_id, + # logger: nil, + # time: ::Time + # ) + # @param messageable [Object, nil] associated object; must respond to `id` and `class.name`. + # @param messageable_type [String, nil] the associated object type name. + # @param messageable_id [Integer, String, nil] the associated object identifier. + # @param hostname [String] name of the host (defaults to `Socket.gethostname`). + # @param process_id [Integer] current process ID (defaults to `Process.pid`). + # @param thread_id [Integer] current thread ID (defaults to `Thread.current.object_id`). + # @param logger [#info, #error, #fatal, nil] optional logger. # @param time [Time] time context for setting timestamps. # # @return [Hash] a serialized message hash with keys: @@ -76,12 +37,26 @@ def calculate_partition(id:, partition_count: PARTITION_COUNT) # - `:updated_at` [Time] # @raise [Outboxer::Message::MissingPartition] when partition seed absent and `attempt` > 1. # @raise [Outboxer::Message::Error] on other failures during queuing. + # # @example Using an object # Outboxer::Message.queue(messageable: event) + # # @example Using explicit type and id # Outboxer::Message.queue(messageable_type: "Event", messageable_id: 42) + # + # @example Overriding context explicitly + # Outboxer::Message.queue( + # messageable_type: "Event", + # messageable_id: 42, + # hostname: "worker1", + # process_id: 1234, + # thread_id: 5678 + # ) def queue(messageable: nil, messageable_type: nil, messageable_id: nil, - attempt: 1, logger: nil, time: ::Time) + hostname: Socket.gethostname, + process_id: Process.pid, + thread_id: Thread.current.object_id, + logger: nil, time: ::Time) current_utc_time = time.now.utc type, id = @@ -103,40 +78,16 @@ def queue(messageable: nil, messageable_type: nil, messageable_id: nil, updated_at: current_utc_time ) - partition = calculate_partition(id: message.id) - - updated_count = Models::MessageCount - .where(status: Status::QUEUED, partition: partition) - .update_all(["value = value + ?, updated_at = ?", 1, current_utc_time]) - - updated_total = Models::MessageTotal - .where(status: Status::QUEUED, partition: partition) - .update_all(["value = value + ?, updated_at = ?", 1, current_utc_time]) - - if updated_count.zero? || updated_total.zero? - raise MissingPartition, "Missing partition" - end + Models::Counter.insert_or_increment_by( + hostname: hostname, process_id: process_id, thread_id: thread_id, + queued_count: 1, + time: current_utc_time) { id: message.id, lock_version: message.lock_version } end - rescue MissingPartition - if attempt == 1 - Message.seed_partitions(time: time, logger: logger) - - Message.queue( - messageable: messageable, - messageable_type: messageable_type, - messageable_id: messageable_id, - attempt: attempt + 1, - logger: logger, - time: time - ) - else - raise - end end # Publishes the next queued message. @@ -186,9 +137,15 @@ def queue(messageable: nil, messageable_type: nil, messageable_id: nil, # Outboxer::Message.publish(logger: logger) do |message| # # Publish message to a broker here # end - def publish(logger: nil, time: ::Time) + def publish(hostname: Socket.gethostname, + process_id: Process.pid, + thread_id: Thread.current.object_id, + logger: nil, time: ::Time) publishing_started_at = time.now.utc - message = Message.publishing(time: time) + + message = Message.publishing( + hostname: hostname, process_id: process_id, thread_id: thread_id, time: time) + return if message.nil? logger&.info( @@ -202,7 +159,9 @@ def publish(logger: nil, time: ::Time) rescue StandardError => error publishing_failed( id: message[:id], lock_version: message[:lock_version], - error: error, time: time) + error: error, + hostname: hostname, process_id: process_id, thread_id: thread_id, + time: time) logger&.error( "Outboxer message publishing failed id=#{message[:id]} " \ @@ -211,7 +170,9 @@ def publish(logger: nil, time: ::Time) "#{error.backtrace.join("\n")}") rescue Exception => error publishing_failed( - id: message[:id], lock_version: message[:lock_version], error: error, time: time) + id: message[:id], lock_version: message[:lock_version], error: error, + hostname: hostname, process_id: process_id, thread_id: thread_id, + time: time) logger&.fatal( "Outboxer message publishing failed id=#{message[:id]} " \ @@ -221,7 +182,9 @@ def publish(logger: nil, time: ::Time) raise else - published(id: message[:id], lock_version: message[:lock_version], time: time) + published(id: message[:id], lock_version: message[:lock_version], + hostname: hostname, process_id: process_id, thread_id: thread_id, + time: time) logger&.info( "Outboxer message published id=#{message[:id]} " \ @@ -248,7 +211,11 @@ def publish(logger: nil, time: ::Time) # @note Runs inside a database transaction and uses the ActiveRecord connection pool. # @example # message = Outboxer::Message.publishing(time: Time) - def publishing(time: ::Time) + + def publishing(hostname: Socket.gethostname, + process_id: Process.pid, + thread_id: Thread.current.object_id, + time: ::Time) current_utc_time = time.now.utc ActiveRecord::Base.connection_pool.with_connection do @@ -270,19 +237,14 @@ def publishing(time: ::Time) updated_at: current_utc_time, publishing_at: current_utc_time) - partition = calculate_partition(id: message.id) - - Models::MessageCount - .where(status: Status::QUEUED, partition: partition) - .update_all(["value = value - ?, updated_at = ?", 1, current_utc_time]) - - Models::MessageCount - .where(status: Status::PUBLISHING, partition: partition) - .update_all(["value = value + ?, updated_at = ?", 1, current_utc_time]) - - Models::MessageTotal - .where(status: Status::PUBLISHING, partition: partition) - .update_all(["value = value + ?, updated_at = ?", 1, current_utc_time]) + Models::Counter.insert_or_increment_by( + hostname: hostname, + process_id: process_id, + thread_id: thread_id, + queued_count: -1, + publishing_count: 1, + time: current_utc_time + ) { id: message.id, @@ -308,7 +270,11 @@ def publishing(time: ::Time) # @note Executes within a transaction using a `SELECT ... FOR UPDATE` lock. # @example # Outboxer::Message.published(id: message[:id], time: Time) - def published(id:, lock_version:, time: ::Time) + def published(id:, lock_version:, + hostname: Socket.gethostname, + process_id: Process.pid, + thread_id: Thread.object_id, + time: ::Time) current_utc_time = time.now.utc ActiveRecord::Base.connection_pool.with_connection do @@ -325,19 +291,14 @@ def published(id:, lock_version:, time: ::Time) message.exceptions.delete_all message.delete - partition = calculate_partition(id: message.id) - - Models::MessageCount - .where(status: Status::PUBLISHING, partition: partition) - .update_all(["value = value - ?, updated_at = ?", 1, current_utc_time]) - - Models::MessageCount - .where(status: Status::PUBLISHED, partition: partition) - .update_all(["value = value + ?, updated_at = ?", 1, current_utc_time]) - - Models::MessageTotal - .where(status: Status::PUBLISHED, partition: partition) - .update_all(["value = value + ?, updated_at = ?", 1, current_utc_time]) + Models::Counter.insert_or_increment_by( + hostname: hostname, + process_id: process_id, + thread_id: thread_id, + publishing_count: -1, + published_count: 1, + time: current_utc_time + ) { id: message.id @@ -360,7 +321,11 @@ def published(id:, lock_version:, time: ::Time) # @note Runs inside a transaction and uses `SELECT ... FOR UPDATE` locking. # @example # Outboxer::Message.publishing_failed(id: message[:id], time: Time) - def publishing_failed(id:, lock_version:, error: nil, time: ::Time) + def publishing_failed(id:, lock_version:, error: nil, + hostname: Socket.gethostname, + process_id: Process.pid, + thread_id: Thread.object_id, + time: ::Time) current_utc_time = time.now.utc ActiveRecord::Base.connection_pool.with_connection do @@ -385,19 +350,13 @@ def publishing_failed(id:, lock_version:, error: nil, time: ::Time) end end - partition = calculate_partition(id: message.id) - - Models::MessageCount - .where(status: Status::PUBLISHING, partition: partition) - .update_all(["value = value - ?, updated_at = ?", 1, current_utc_time]) - - Models::MessageCount - .where(status: Status::FAILED, partition: partition) - .update_all(["value = value + ?, updated_at = ?", 1, current_utc_time]) - - Models::MessageTotal - .where(status: Status::FAILED, partition: partition) - .update_all(["value = value + ?, updated_at = ?", 1, current_utc_time]) + Models::Counter.insert_or_increment_by( + hostname: hostname, + process_id: process_id, + thread_id: thread_id, + publishing_count: -1, + failed_count: 1, + time: current_utc_time) { id: message.id, @@ -504,11 +463,15 @@ def delete(id:, lock_version:, time: ::Time) message.exceptions.delete_all message.delete - partition = calculate_partition(id: message.id) - - Models::MessageCount - .where(status: message.status, partition: partition) - .update_all(["value = value - ?, updated_at = ?", 1, current_utc_time]) + Models::Counter.insert_or_increment_by( + hostname: hostname, + process_id: process_id, + thread_id: thread_id, + queued_count: (message.status == Status::QUEUED ? -1 : 0), + publishing_count: (message.status == Status::PUBLISHING ? -1 : 0), + published_count: (message.status == Status::PUBLISHED ? -1 : 0), + failed_count: (message.status == Status::FAILED ? -1 : 0), + time: current_utc_time) { id: id } end @@ -538,8 +501,6 @@ def requeue(id:, lock_version:, publisher_id: nil, publisher_name: nil, ActiveRecord::Base.transaction do message = Models::Message.lock.find_by!(id: id) - partition = calculate_partition(id: message.id) - original_status = message.status message.update!( @@ -553,17 +514,15 @@ def requeue(id:, lock_version:, publisher_id: nil, publisher_name: nil, publisher_id: publisher_id, publisher_name: publisher_name) - Models::MessageCount - .where(status: original_status, partition: partition) - .update_all(["value = value - ?, updated_at = ?", 1, current_utc_time]) - - Models::MessageCount - .where(status: message.status, partition: partition) - .update_all(["value = value + ?, updated_at = ?", 1, current_utc_time]) - - Models::MessageTotal - .where(status: message.status, partition: partition) - .update_all(["value = value + ?, updated_at = ?", 1, current_utc_time]) + Models::Counter.insert_or_increment_by( + hostname: hostname, + process_id: process_id, + thread_id: thread_id, + queued_count: 1, + publishing_count: (original_status == Status::PUBLISHING ? -1 : 0), + published_count: (original_status == Status::PUBLISHED ? -1 : 0), + failed_count: (original_status == Status::FAILED ? -1 : 0), + time: current_utc_time) { id: id, @@ -672,64 +631,53 @@ def list(status: LIST_STATUS_DEFAULT, } end - # Retrieves and calculates metrics related to message statuses, including counts and totals. - # Latency and throughput are placeholders (0) until partitioned metrics tables are implemented. + # Retrieves and calculates metrics related to message statuses, + # using the unified Counter.total for all counts. + # Latency and throughput are placeholders (0) until time-series metrics are implemented. + # # @return [Hash] detailed metrics across various message statuses. def metrics - metrics = { all: { count: { current: 0 } } } + totals = Counter.total + + metrics = { + all: { + count: { + current: totals.values.sum + } + } + } - Models::Message::STATUSES.each do |status| - metrics[status.to_sym] = { - count: { current: 0 }, + { + queued: :queued_count, + publishing: :publishing_count, + published: :published_count, + failed: :failed_count + }.each do |status, field| + metrics[status] = { + count: { current: totals[field].to_i }, latency: 0, throughput: 0 } end - counts_by_status = count_by_status - totals_by_status = total_by_status - - Models::Message::STATUSES.each do |status| - status_key = status.to_s - status_symbol = status.to_sym - - current_count = counts_by_status[status_key] - metrics[status_symbol][:count][:current] = current_count - metrics[status_symbol][:count][:total] = totals_by_status[status_key] - metrics[:all][:count][:current] += current_count - - metrics[status_symbol][:latency] = 0 - metrics[status_symbol][:throughput] = 0 - end - - metrics[:published][:count][:total] = totals_by_status["published"].to_i - metrics[:failed][:count][:total] = totals_by_status["failed"].to_i - metrics end - # Returns the current counts per status aggregated across all partitions. - # - # @return [Hash{String=>Integer}] map of status name (string) to current count. - # @example - # Outboxer::Message.count_by_status - # # => { "queued" => 123, "publishing" => 4, "published" => 987, "failed" => 2 } - def count_by_status - Message::STATUSES - .index_with { 0 } - .merge(Models::MessageCount.group(:status).sum(:value).transform_keys(&:to_s)) - end - - # Returns the total (lifetime) counts per status aggregated across all partitions. + # Returns the total (lifetime) counts per status aggregated across all counters. # # @return [Hash{String=>Integer}] map of status name (string) to total count. # @example - # Outboxer::Message.total_by_status + # Message.total_by_status # # => { "queued" => 1200, "publishing" => 400, "published" => 100_000, "failed" => 250 } def total_by_status - Message::STATUSES - .index_with { 0 } - .merge(Models::MessageTotal.group(:status).sum(:value).transform_keys(&:to_s)) + totals = Counter.total + + { + "queued" => totals[:queued_count], + "publishing" => totals[:publishing_count], + "published" => totals[:published_count], + "failed" => totals[:failed_count] + } end end end From 0fc0199031b7e7598b6cee4212d0a26e174f618f Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Sat, 1 Nov 2025 13:29:15 +1100 Subject: [PATCH 02/17] make query sql safe --- lib/outboxer/models/counter.rb | 59 ++++++++++++++++++---------------- 1 file changed, 32 insertions(+), 27 deletions(-) diff --git a/lib/outboxer/models/counter.rb b/lib/outboxer/models/counter.rb index c32e0c39..ec64c4b4 100644 --- a/lib/outboxer/models/counter.rb +++ b/lib/outboxer/models/counter.rb @@ -1,32 +1,34 @@ module Outboxer module Models - class Counter < ::ActiveRecord::Base + class Counter < ActiveRecord::Base self.table_name = "outboxer_counters" - def self.insert_or_increment_by(hostname: Socket.gethostname, - process_id: Process.pid, - thread_id: Thread.current.object_id, - queued_count: 0, publishing_count: 0, - published_count: 0, failed_count: 0, - time: Time.now.utc) - adapter = ActiveRecord::Base.connection.adapter_name.downcase + def self.insert_or_increment_by( + hostname: Socket.gethostname, + process_id: Process.pid, + thread_id: Thread.current.object_id, + queued_count: 0, + publishing_count: 0, + published_count: 0, + failed_count: 0, + time: Time.now.utc + ) now = time.utc - sql = if adapter.include?("postgres") + sql = if connection.adapter_name.downcase.include?("postgres") <<~SQL INSERT INTO #{table_name} (hostname, process_id, thread_id, queued_count, publishing_count, published_count, failed_count, created_at, updated_at) - VALUES (?, ?, ?, #{queued_count}, #{publishing_count}, - #{published_count}, #{failed_count}, ?, ?) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT (hostname, process_id, thread_id) DO UPDATE SET - queued_count = #{table_name}.queued_count + #{queued_count}, - publishing_count = #{table_name}.publishing_count + #{publishing_count}, - published_count = #{table_name}.published_count + #{published_count}, - failed_count = #{table_name}.failed_count + #{failed_count}, - updated_at = EXCLUDED.updated_at + queued_count = #{table_name}.queued_count + ?, + publishing_count = #{table_name}.publishing_count + ?, + published_count = #{table_name}.published_count + ?, + failed_count = #{table_name}.failed_count + ?, + updated_at = ? SQL else <<~SQL @@ -34,21 +36,24 @@ def self.insert_or_increment_by(hostname: Socket.gethostname, (hostname, process_id, thread_id, queued_count, publishing_count, published_count, failed_count, created_at, updated_at) - VALUES (?, ?, ?, #{queued_count}, #{publishing_count}, - #{published_count}, #{failed_count}, ?, ?) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE - queued_count = queued_count + #{queued_count}, - publishing_count = publishing_count + #{publishing_count}, - published_count = published_count + #{published_count}, - failed_count = failed_count + #{failed_count}, - updated_at = VALUES(updated_at) + queued_count = queued_count + ?, + publishing_count = publishing_count + ?, + published_count = published_count + ?, + failed_count = failed_count + ?, + updated_at = ? SQL end - ActiveRecord::Base.connection.exec_query( - ActiveRecord::Base.sanitize_sql_array( - [sql, hostname, process_id, thread_id, now, now] - ) + connection.exec_query( + sanitize_sql_array([ + sql, + hostname, process_id, thread_id, + queued_count, publishing_count, published_count, failed_count, + now, now, + queued_count, publishing_count, published_count, failed_count, now + ]) ) nil From afb4157c3f87fce82da14bff8236d2ed2acc3eaa Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Sat, 1 Nov 2025 13:32:20 +1100 Subject: [PATCH 03/17] fix spec --- lib/outboxer/models/counter.rb | 8 ++++---- spec/lib/outboxer/models/counter_spec.rb | 4 +--- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/lib/outboxer/models/counter.rb b/lib/outboxer/models/counter.rb index ec64c4b4..3367688b 100644 --- a/lib/outboxer/models/counter.rb +++ b/lib/outboxer/models/counter.rb @@ -11,9 +11,9 @@ def self.insert_or_increment_by( publishing_count: 0, published_count: 0, failed_count: 0, - time: Time.now.utc + time: Time ) - now = time.utc + current_utc_time = time.now.utc sql = if connection.adapter_name.downcase.include?("postgres") <<~SQL @@ -51,8 +51,8 @@ def self.insert_or_increment_by( sql, hostname, process_id, thread_id, queued_count, publishing_count, published_count, failed_count, - now, now, - queued_count, publishing_count, published_count, failed_count, now + current_utc_time, current_utc_time, + queued_count, publishing_count, published_count, failed_count, current_utc_time ]) ) diff --git a/spec/lib/outboxer/models/counter_spec.rb b/spec/lib/outboxer/models/counter_spec.rb index e3b99682..6b693dbe 100644 --- a/spec/lib/outboxer/models/counter_spec.rb +++ b/spec/lib/outboxer/models/counter_spec.rb @@ -7,7 +7,6 @@ module Models let(:hostname) { "test-host" } let(:process_id) { 12_345 } let(:thread_id) { 999 } - let(:now) { Time.now.utc } it "inserts a new row successfully" do expect do @@ -15,8 +14,7 @@ module Models hostname: hostname, process_id: process_id, thread_id: thread_id, - queued_count: 1, - time: now + queued_count: 1 ) end.to change(Counter, :count).by(1) From 59ea895a124e28b225b9e6686621e729875ef7b0 Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Sat, 1 Nov 2025 14:56:50 +1100 Subject: [PATCH 04/17] fix bugs --- lib/outboxer/loader.rb | 56 ++++++++++--------------------- lib/outboxer/message.rb | 12 +++---- lib/outboxer/models/counter.rb | 4 +-- lib/outboxer/web/views/layout.erb | 2 +- 4 files changed, 25 insertions(+), 49 deletions(-) diff --git a/lib/outboxer/loader.rb b/lib/outboxer/loader.rb index 7e664a61..92585d43 100644 --- a/lib/outboxer/loader.rb +++ b/lib/outboxer/loader.rb @@ -13,7 +13,7 @@ module Loader LOAD_DEFAULTS = { batch_size: 1_000, concurrency: 5, - size: 1_000_000, + size: 1, tick_interval: 1 } @@ -35,7 +35,7 @@ def parse_cli_options(argv) options[:concurrency] = v end - opts.on("--size SIZE", Integer, "Number of messages to load (default: 1M)") do |v| + opts.on("--size SIZE", Integer, "Number of messages to load (default: 1)") do |v| options[:size] = v end @@ -66,7 +66,12 @@ def load(batch_size: LOAD_DEFAULTS[:batch_size], tick_interval: LOAD_DEFAULTS[:tick_interval], logger: Outboxer::Logger.new($stdout)) status = :loading - reader, _writer = trap_signals + + Signal.trap("INT") { status = :terminating } + Signal.trap("TERM") { status = :terminating } + Signal.trap("TSTP") { status = :stopped } + Signal.trap("CONT") { status = :loading } + queue = Queue.new threads = spawn_workers(concurrency, queue, logger) @@ -74,8 +79,6 @@ def load(batch_size: LOAD_DEFAULTS[:batch_size], started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) while enqueued < size - status = read_status(reader) || status - case status when :terminating break @@ -91,12 +94,11 @@ def load(batch_size: LOAD_DEFAULTS[:batch_size], # logger.info "[main] enqueued #{enqueued}/#{size}" if (enqueued % (batch_size * 2)).zero? end - - # display_metrics(logger) end queue.close - threads.each(&:join) + + threads.each(&:kill) finished_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) elapsed = finished_at - started_at @@ -108,41 +110,17 @@ def load(batch_size: LOAD_DEFAULTS[:batch_size], logger.info "[main] done" end - def trap_signals - reader, writer = IO.pipe - - %w[INT TERM TSTP CONT].each do |signal| - Signal.trap(signal) { writer.puts(signal) } - end - - [reader, writer] - end - - def read_status(reader) - line = reader.ready? ? reader.gets&.strip : nil - - case line - when "INT", "TERM" then :terminating - when "TSTP" then :stopped - when "CONT" then :loading - end - end - def spawn_workers(concurrency, queue, logger) Array.new(concurrency) do |index| Thread.new do - begin - while (messageables = queue.pop) - messageables.each do |messageable| - Outboxer::Message.queue(messageable: messageable) - rescue StandardError => error - logger.error "[thread-#{index}] #{error.class}: #{error.message}" - - sleep 1 - end + while (messageables = queue.pop) + messageables.each do |messageable| + Outboxer::Message.queue(messageable: messageable) + rescue StandardError => error + logger.error "[thread-#{index}] #{error.class}: #{error.message}" + + sleep 1 end - rescue ClosedQueueError - # Queue closed and empty — exit gracefully end end end diff --git a/lib/outboxer/message.rb b/lib/outboxer/message.rb index d62aaa70..2790cdc2 100644 --- a/lib/outboxer/message.rb +++ b/lib/outboxer/message.rb @@ -81,7 +81,7 @@ def queue(messageable: nil, messageable_type: nil, messageable_id: nil, Models::Counter.insert_or_increment_by( hostname: hostname, process_id: process_id, thread_id: thread_id, queued_count: 1, - time: current_utc_time) + current_utc_time: current_utc_time) { id: message.id, @@ -243,7 +243,7 @@ def publishing(hostname: Socket.gethostname, thread_id: thread_id, queued_count: -1, publishing_count: 1, - time: current_utc_time + current_utc_time: current_utc_time ) { @@ -297,7 +297,7 @@ def published(id:, lock_version:, thread_id: thread_id, publishing_count: -1, published_count: 1, - time: current_utc_time + current_utc_time: current_utc_time ) { @@ -356,7 +356,7 @@ def publishing_failed(id:, lock_version:, error: nil, thread_id: thread_id, publishing_count: -1, failed_count: 1, - time: current_utc_time) + current_utc_time: current_utc_time) { id: message.id, @@ -471,7 +471,7 @@ def delete(id:, lock_version:, time: ::Time) publishing_count: (message.status == Status::PUBLISHING ? -1 : 0), published_count: (message.status == Status::PUBLISHED ? -1 : 0), failed_count: (message.status == Status::FAILED ? -1 : 0), - time: current_utc_time) + current_utc_time: current_utc_time) { id: id } end @@ -522,7 +522,7 @@ def requeue(id:, lock_version:, publisher_id: nil, publisher_name: nil, publishing_count: (original_status == Status::PUBLISHING ? -1 : 0), published_count: (original_status == Status::PUBLISHED ? -1 : 0), failed_count: (original_status == Status::FAILED ? -1 : 0), - time: current_utc_time) + current_utc_time: current_utc_time) { id: id, diff --git a/lib/outboxer/models/counter.rb b/lib/outboxer/models/counter.rb index 3367688b..fe48314a 100644 --- a/lib/outboxer/models/counter.rb +++ b/lib/outboxer/models/counter.rb @@ -11,10 +11,8 @@ def self.insert_or_increment_by( publishing_count: 0, published_count: 0, failed_count: 0, - time: Time + current_utc_time: Time.now.utc ) - current_utc_time = time.now.utc - sql = if connection.adapter_name.downcase.include?("postgres") <<~SQL INSERT INTO #{table_name} diff --git a/lib/outboxer/web/views/layout.erb b/lib/outboxer/web/views/layout.erb index 48d6923f..e899952f 100644 --- a/lib/outboxer/web/views/layout.erb +++ b/lib/outboxer/web/views/layout.erb @@ -55,7 +55,7 @@
Outboxer - <%= Outboxer::Web.pretty_number(number: messages_metrics[:published][:count][:total]) %> + <%= Outboxer::Web.pretty_number(number: messages_metrics[:published][:count][:current]) %>