From 84b920cd3f3acfb37b38649487d0831186807bdc Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Sun, 2 Nov 2025 09:07:08 +1100 Subject: [PATCH 1/2] commit latency --- lib/outboxer/message.rb | 30 ++++++++++ lib/outboxer/models/message.rb | 8 +-- lib/outboxer/publisher.rb | 59 ++++++++++++++----- lib/outboxer/web.rb | 24 ++++---- lib/outboxer/web/views/home.erb | 8 +-- lib/outboxer/web/views/layout.erb | 6 +- quickstart_e2e_tests.sh | 4 +- spec/bin/outboxer_publisher_spec.rb | 4 +- spec/lib/outboxer/message/delete_spec.rb | 4 +- ...atus_spec.rb => metrics_by_status_spec.rb} | 13 ++-- 10 files changed, 110 insertions(+), 50 deletions(-) rename spec/lib/outboxer/message/{count_by_status_spec.rb => metrics_by_status_spec.rb} (71%) diff --git a/lib/outboxer/message.rb b/lib/outboxer/message.rb index 70691695..7fe78c92 100644 --- a/lib/outboxer/message.rb +++ b/lib/outboxer/message.rb @@ -724,5 +724,35 @@ def count_by_status ).transform_values(&:to_i) end end + + def metrics_by_status(time: Time) + ActiveRecord::Base.connection_pool.with_connection do + ActiveRecord::Base.transaction(isolation: :repeatable_read) do + counts = Models::Message::Count.select( + "COALESCE(SUM(queued), 0) AS queued", + "COALESCE(SUM(publishing), 0) AS publishing", + "COALESCE(SUM(published), 0) AS published", + "COALESCE(SUM(failed), 0) AS failed", + "COALESCE(SUM(queued + publishing + published + failed), 0) AS total" + ).take + + min_times = Models::Message.group(:status).minimum(:updated_at) + min_total = min_times.values.compact.min + + now = time.now.utc + latencies = min_times.transform_values { |t| t ? (now - t).to_i : 0 } + latencies["total"] = min_total ? (now - min_total).to_i : 0 + + counts_hash = counts.attributes.symbolize_keys.transform_values(&:to_i) + + [:queued, :publishing, :published, :failed, :total].each_with_object({}) do |status, h| + h[status] = { + count: counts_hash[status] || 0, + latency: latencies[status.to_s] || 0 + } + end + end + end + end end end diff --git a/lib/outboxer/models/message.rb b/lib/outboxer/models/message.rb index 5d4c7401..f68fdf8a 100644 --- a/lib/outboxer/models/message.rb +++ b/lib/outboxer/models/message.rb @@ -6,15 +6,15 @@ class Message < ::ActiveRecord::Base module Status QUEUED = "queued" PUBLISHING = "publishing" - PUBLISHED = "published" FAILED = "failed" + PUBLISHED = "published" end STATUSES = [ Status::QUEUED, Status::PUBLISHING, - Status::PUBLISHED, - Status::FAILED + Status::FAILED, + Status::PUBLISHED ] # @!attribute [rw] status @@ -46,8 +46,8 @@ module Status scope :queued, -> { where(status: Status::QUEUED) } scope :publishing, -> { where(status: Status::PUBLISHING) } - scope :published, -> { where(status: Status::PUBLISHED) } scope :failed, -> { where(status: Status::FAILED) } + scope :published, -> { where(status: Status::PUBLISHED) } # @!method messageable # @return [ActiveRecord::Base] Polymorphic association to an event like model. diff --git a/lib/outboxer/publisher.rb b/lib/outboxer/publisher.rb index e9316f6b..8bbc2ce3 100644 --- a/lib/outboxer/publisher.rb +++ b/lib/outboxer/publisher.rb @@ -274,7 +274,7 @@ def trap_signals # @param process [Process] The process module for accessing system metrics. # @param kernel [Kernel] The kernel module for sleeping operations. # @return [Thread] The heartbeat thread. - def create_heartbeat_thread(id:, + def create_heartbeat_thread(id:, hostname:, process_id:, heartbeat_interval:, tick_interval:, logger:, time:, process:, kernel:) Thread.new do @@ -301,22 +301,46 @@ def create_heartbeat_thread(id:, signal.destroy end - throughput = Models::Message - .where(status: Message::Status::PUBLISHED) - .where(publisher_id: id) - .where("published_at >= ?", 1.second.ago) - .count - - last_published_message = Models::Message - .where(status: Message::Status::PUBLISHED) - .where(publisher_id: id) - .order(published_at: :desc) - .first - - latency = if last_published_message.nil? - 0 + row = Models::Message::Count + .select( + "COALESCE(SUM(outboxer_message_counts.queued), 0) AS queued, + COALESCE(SUM(outboxer_message_counts.publishing), 0) AS publishing, + COALESCE(SUM(outboxer_message_counts.published), 0) AS published, + COALESCE(SUM(outboxer_message_counts.failed), 0) AS failed, + COALESCE(SUM( + outboxer_message_counts.queued + + outboxer_message_counts.publishing + + outboxer_message_counts.published + + outboxer_message_counts.failed + ), 0) AS total, + MAX(outboxer_message_counts.updated_at) AS last_message_updated_at" + ) + .where( + "outboxer_message_counts.hostname = ? AND + outboxer_message_counts.process_id = ?", + hostname, process_id + ) + .group( + "outboxer_message_counts.hostname, + outboxer_message_counts.process_id" + ) + .take + + published = row&.published.to_i + + prev_metrics = publisher.metrics.is_a?(String) ? JSON.parse(publisher.metrics) : publisher.metrics + last_published = prev_metrics["last_published"].to_i rescue 0 + last_updated = Time.parse(prev_metrics["last_updated_at"].to_s) rescue (time.now.utc - heartbeat_interval) + + delta_pub = published - last_published + delta_t = [time.now.utc - last_updated, heartbeat_interval].max + throughput = (delta_pub / delta_t).round(0) + + last_message_updated_at = row&.last_message_updated_at + latency = if last_message_updated_at + (time.now.utc - last_message_updated_at).round(1) else - (Time.now.utc - last_published_message.published_at).to_i + 0 end publisher.update!( @@ -429,6 +453,8 @@ def handle_signal(id:, name:, logger:) # @yieldparam messages [Array] An array of message hashes retrieved from the buffer. def publish_message( name: "#{::Socket.gethostname}:#{::Process.pid}", + hostname: Socket.gethostname, + process_id: Process.pid, concurrency: PUBLISH_MESSAGE_DEFAULTS[:concurrency], tick_interval: PUBLISH_MESSAGE_DEFAULTS[:tick_interval], poll_interval: PUBLISH_MESSAGE_DEFAULTS[:poll_interval], @@ -456,6 +482,7 @@ def publish_message( heartbeat_thread = create_heartbeat_thread( id: publisher[:id], heartbeat_interval: heartbeat_interval, tick_interval: tick_interval, + hostname: hostname, process_id: process_id, logger: logger, time: time, process: process, kernel: kernel) publisher_threads = Array.new(concurrency) do |index| diff --git a/lib/outboxer/web.rb b/lib/outboxer/web.rb index 5c6fb213..1b1f672d 100755 --- a/lib/outboxer/web.rb +++ b/lib/outboxer/web.rb @@ -215,12 +215,12 @@ def time_ago_in_words(time) per_page: denormalised_query_params[:per_page], time_zone: denormalised_query_params[:time_zone]) - message_count_by_status = Message.count_by_status + message_metrics_by_status = Message.metrics_by_status publishers = Publisher.all erb :home, locals: { - message_count_by_status: message_count_by_status, + message_metrics_by_status: message_metrics_by_status, denormalised_query_params: denormalised_query_params, normalised_query_params: normalised_query_params, normalised_query_string: normalised_query_string, @@ -253,12 +253,12 @@ def time_ago_in_words(time) per_page: denormalised_query_params[:per_page], time_zone: denormalised_query_params[:time_zone]) - message_count_by_status = Message.count_by_status + message_metrics_by_status = Message.metrics_by_status publishers = Publisher.all erb :home, locals: { - message_count_by_status: message_count_by_status, + message_metrics_by_status: message_metrics_by_status, denormalised_query_params: denormalised_query_params, normalised_query_params: normalised_query_params, normalised_query_string: normalised_query_string, @@ -311,7 +311,7 @@ def time_ago_in_words(time) per_page: denormalised_query_params[:per_page], time_zone: denormalised_query_params[:time_zone]) - message_count_by_status = Message.count_by_status + message_metrics_by_status = Message.metrics_by_status paginated_messages = Message.list( status: denormalised_query_params[:status], @@ -327,7 +327,7 @@ def time_ago_in_words(time) denormalised_query_params: denormalised_query_params) erb :messages, locals: { - message_count_by_status: message_count_by_status, + message_metrics_by_status: message_metrics_by_status, messages: paginated_messages[:messages], denormalised_query_params: denormalised_query_params, normalised_query_params: normalised_query_params, @@ -627,7 +627,7 @@ def normalise_query_string(status: Message::LIST_STATUS_DEFAULT, per_page: denormalised_query_params[:per_page], time_zone: denormalised_query_params[:time_zone]) - message_count_by_status = Message.count_by_status + message_metrics_by_status = Message.metrics_by_status message = Message.find_by_id(id: params[:id]) @@ -638,7 +638,7 @@ def normalise_query_string(status: Message::LIST_STATUS_DEFAULT, denormalised_query_params: denormalised_query_params, normalised_query_params: normalised_query_params, normalised_query_string: normalised_query_string, - message_count_by_status: message_count_by_status, + message_metrics_by_status: message_metrics_by_status, message: message, messageable: messageable } @@ -654,7 +654,7 @@ def normalise_query_string(status: Message::LIST_STATUS_DEFAULT, time_zone: params[:time_zone]) message = Message.find_by_id(id: params[:id]) - message_count_by_status = Message.count_by_status + message_metrics_by_status = Message.metrics_by_status messageable_class = message[:messageable_type]&.safe_constantize messageable = messageable_class&.find_by_id(id: message[:messageable_id]) @@ -662,7 +662,7 @@ def normalise_query_string(status: Message::LIST_STATUS_DEFAULT, erb :messageable, locals: { message: message, messageable: messageable, - message_count_by_status: message_count_by_status, + message_metrics_by_status: message_metrics_by_status, denormalised_query_params: denormalised_query_params } end @@ -740,10 +740,10 @@ def normalise_query_string(status: Message::LIST_STATUS_DEFAULT, publisher = Publisher.find_by_id(id: params[:id]) - message_count_by_status = Message.count_by_status + message_metrics_by_status = Message.metrics_by_status erb :publisher, locals: { - message_count_by_status: message_count_by_status, + message_metrics_by_status: message_metrics_by_status, denormalised_query_params: denormalised_query_params, normalised_query_params: normalised_query_params, normalised_query_string: normalised_query_string, diff --git a/lib/outboxer/web/views/home.erb b/lib/outboxer/web/views/home.erb index 708d803c..8760720e 100644 --- a/lib/outboxer/web/views/home.erb +++ b/lib/outboxer/web/views/home.erb @@ -111,21 +111,19 @@ Status Count - Throughput Latency - <% ['queued', 'publishing', 'published', 'failed'].each do |status| %> + <% Outboxer::Message::STATUSES.each do |status| %> "> <%= status %> - <%= Outboxer::Web.pretty_number(number: message_count_by_status[status.to_sym]) %> - <%= Outboxer::Web.pretty_throughput(per_second: 0) %> - <%= Outboxer::Web.pretty_duration_from_seconds(seconds: 0) %> + <%= Outboxer::Web.pretty_number(number: message_metrics_by_status[status.to_sym][:count]) %> + <%= Outboxer::Web.pretty_duration_from_seconds(seconds: message_metrics_by_status[status.to_sym][:latency]) %> <% end %> diff --git a/lib/outboxer/web/views/layout.erb b/lib/outboxer/web/views/layout.erb index 83e46732..9ebfb294 100644 --- a/lib/outboxer/web/views/layout.erb +++ b/lib/outboxer/web/views/layout.erb @@ -55,7 +55,7 @@
Outboxer - <%= Outboxer::Web.pretty_number(number: message_count_by_status[:published]) %> + <%= Outboxer::Web.pretty_number(number: message_metrics_by_status[:published][:count]) %>