From 55dffe606e6f86914ea9be0259cce14e14698cb0 Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Sun, 2 Nov 2025 10:42:37 +1100 Subject: [PATCH 1/2] fix bugs --- lib/outboxer/publisher.rb | 89 ++++++++++++++++++++-------- lib/outboxer/web/views/home.erb | 2 +- lib/outboxer/web/views/publisher.erb | 2 +- 3 files changed, 67 insertions(+), 26 deletions(-) diff --git a/lib/outboxer/publisher.rb b/lib/outboxer/publisher.rb index e9316f6b..b2136fcf 100644 --- a/lib/outboxer/publisher.rb +++ b/lib/outboxer/publisher.rb @@ -140,7 +140,10 @@ def create(name:, concurrency:, "heartbeat_interval" => heartbeat_interval }, metrics: { - "throughput" => 0, + "queued" => { "count" => 0, "throughput" => 0 }, + "publishing" => { "count" => 0, "throughput" => 0 }, + "published" => { "count" => 0, "throughput" => 0 }, + "failed" => { "count" => 0, "throughput" => 0 }, "latency" => 0, "cpu" => 0, "rss " => 0, @@ -274,7 +277,7 @@ def trap_signals # @param process [Process] The process module for accessing system metrics. # @param kernel [Kernel] The kernel module for sleeping operations. # @return [Thread] The heartbeat thread. - def create_heartbeat_thread(id:, + def create_heartbeat_thread(id:, hostname:, process_id:, heartbeat_interval:, tick_interval:, logger:, time:, process:, kernel:) Thread.new do @@ -301,33 +304,68 @@ def create_heartbeat_thread(id:, signal.destroy end - throughput = Models::Message - .where(status: Message::Status::PUBLISHED) - .where(publisher_id: id) - .where("published_at >= ?", 1.second.ago) - .count + current_utc_time = time.now.utc + + previous_metrics = publisher.metrics + last_updated_at = publisher.updated_at + time_delta = [current_utc_time - last_updated_at, heartbeat_interval].max + + publisher_message_count = Models::Message::Count + .select( + "COALESCE(SUM(queued), 0) AS queued, + COALESCE(SUM(publishing), 0) AS publishing, + COALESCE(SUM(published), 0) AS published, + COALESCE(SUM(failed), 0) AS failed, + MAX(updated_at) AS last_updated_at" + ) + .where(hostname: hostname, process_id: process_id) + .group(:hostname, :process_id) + .take + + current_counts = { + "queued" => publisher_message_count&.queued || 0, + "publishing" => publisher_message_count&.publishing || 0, + "published" => publisher_message_count&.published || 0, + "failed" => publisher_message_count&.failed || 0 + } + + throughput_by_status = + current_counts.each_with_object({}) do |(status, count), h| + prev = previous_metrics[status]["count"] || 0 + h[status] = ((count - prev) / time_delta).round(0) + end - last_published_message = Models::Message - .where(status: Message::Status::PUBLISHED) - .where(publisher_id: id) - .order(published_at: :desc) - .first + latency = 0 - latency = if last_published_message.nil? - 0 - else - (Time.now.utc - last_published_message.published_at).to_i - end + if !publisher_message_count.nil? + latency = (current_utc_time - publisher_message_count.last_updated_at).round(0) + end publisher.update!( - updated_at: time.now.utc, + updated_at: current_utc_time, metrics: { - throughput: throughput, - latency: latency, - cpu: cpu, - rss: rss, - rtt: rtt - }) + "queued" => { + "count" => current_counts["queued"], + "throughput" => throughput_by_status["queued"] + }, + "publishing" => { + "count" => current_counts["publishing"], + "throughput" => throughput_by_status["publishing"] + }, + "published" => { + "count" => current_counts["published"], + "throughput" => throughput_by_status["published"] + }, + "failed" => { + "count" => current_counts["failed"], + "throughput" => throughput_by_status["failed"] + }, + "latency" => latency, + "cpu" => cpu, + "rss" => rss, + "rtt" => rtt + } + ) end end @@ -429,6 +467,8 @@ def handle_signal(id:, name:, logger:) # @yieldparam messages [Array] An array of message hashes retrieved from the buffer. def publish_message( name: "#{::Socket.gethostname}:#{::Process.pid}", + hostname: Socket.gethostname, + process_id: Process.pid, concurrency: PUBLISH_MESSAGE_DEFAULTS[:concurrency], tick_interval: PUBLISH_MESSAGE_DEFAULTS[:tick_interval], poll_interval: PUBLISH_MESSAGE_DEFAULTS[:poll_interval], @@ -456,6 +496,7 @@ def publish_message( heartbeat_thread = create_heartbeat_thread( id: publisher[:id], heartbeat_interval: heartbeat_interval, tick_interval: tick_interval, + hostname: hostname, process_id: process_id, logger: logger, time: time, process: process, kernel: kernel) publisher_threads = Array.new(concurrency) do |index| diff --git a/lib/outboxer/web/views/home.erb b/lib/outboxer/web/views/home.erb index 8760720e..f7ab571a 100644 --- a/lib/outboxer/web/views/home.erb +++ b/lib/outboxer/web/views/home.erb @@ -38,7 +38,7 @@ <%= Outboxer::Web.time_ago_in_words(publisher[:created_at]) %> ago <%= Outboxer::Web.time_ago_in_words(publisher[:updated_at]) %> ago <%= publisher[:status] %> - <%= Outboxer::Web.pretty_throughput(per_second: publisher[:metrics]['throughput']) %> + <%= Outboxer::Web.pretty_throughput(per_second: publisher[:metrics]['published']['throughput']) %> <%= Outboxer::Web.pretty_duration_from_seconds(seconds: publisher[:metrics]['latency']) %> <%= publisher[:metrics]['cpu'].round(0) %>% <%= Outboxer::Web.human_readable_size(kilobytes: publisher[:metrics]['rss']) %> diff --git a/lib/outboxer/web/views/publisher.erb b/lib/outboxer/web/views/publisher.erb index e03a6574..6ca3d4c4 100644 --- a/lib/outboxer/web/views/publisher.erb +++ b/lib/outboxer/web/views/publisher.erb @@ -102,7 +102,7 @@ Throughput - <%= Outboxer::Web.pretty_throughput(per_second: publisher[:metrics]['throughput']) %> + <%= Outboxer::Web.pretty_throughput(per_second: publisher[:metrics]['published']['throughput']) %> Latency From b7c8d1364be9d5d97b43e4b434a2562d9918d4a0 Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Sun, 2 Nov 2025 11:51:52 +1100 Subject: [PATCH 2/2] add throughput and latency --- .rubocop.yml | 4 ++-- spec/factories/outboxer_publishers.rb | 5 ++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/.rubocop.yml b/.rubocop.yml index a493b6ad..c4493a66 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -3,7 +3,7 @@ AllCops: TargetRubyVersion: 3.1.5 SuggestExtensions: false Metrics/AbcSize: - Max: 80 + Max: 90 Metrics/CyclomaticComplexity: Enabled: true Max: 25 @@ -44,7 +44,7 @@ Metrics/ParameterLists: Style/FrozenStringLiteralComment: Enabled: false Metrics/MethodLength: - Max: 80 + Max: 100 Metrics/ClassLength: Enabled: true Max: 100 diff --git a/spec/factories/outboxer_publishers.rb b/spec/factories/outboxer_publishers.rb index 1f2e607c..96ed70d8 100644 --- a/spec/factories/outboxer_publishers.rb +++ b/spec/factories/outboxer_publishers.rb @@ -12,7 +12,10 @@ end metrics do { - "throughput" => 1000, + "queued" => { "count" => 1, "throughput" => 0 }, + "publishing" => { "count" => 1, "throughput" => 0 }, + "failed" => { "count" => 1, "throughput" => 0 }, + "published" => { "count" => 1, "throughput" => 0 }, "latency" => 0, "cpu" => 10.5, "rss" => 40.95,