diff --git a/db/migrate/create_outboxer_messages.rb b/db/migrate/create_outboxer_messages.rb index 178be840..6f2eb6e6 100644 --- a/db/migrate/create_outboxer_messages.rb +++ b/db/migrate/create_outboxer_messages.rb @@ -23,6 +23,10 @@ def up add_index :outboxer_messages, :status, name: "idx_outboxer_status" # messages by status latency + add_index :outboxer_messages, [:status, :queued_at], + name: "idx_outboxer_status_queued_at" + + # messages by status age add_index :outboxer_messages, [:status, :updated_at], name: "idx_outboxer_status_updated_at" diff --git a/lib/outboxer/message.rb b/lib/outboxer/message.rb index 25c87a37..65bf669e 100644 --- a/lib/outboxer/message.rb +++ b/lib/outboxer/message.rb @@ -736,6 +736,8 @@ def count_by_status def metrics_by_status(time: Time) ActiveRecord::Base.connection_pool.with_connection do ActiveRecord::Base.transaction(isolation: :repeatable_read) do + now = time.now.utc + counts = Models::Thread.select( "COALESCE(SUM(queued_message_count), 0) AS queued", "COALESCE(SUM(publishing_message_count), 0) AS publishing", @@ -746,21 +748,32 @@ def metrics_by_status(time: Time) "published_message_count + failed_message_count), 0) AS total" ).take - min_times = Models::Message.group(:status).minimum(:updated_at) - min_total = min_times.values.compact.min - - now = time.now.utc - latencies = min_times.transform_values { |t| t ? (now - t).to_i : 0 } - latencies["total"] = min_total ? (now - min_total).to_i : 0 - counts_hash = counts.attributes.symbolize_keys.transform_values(&:to_i) - [:queued, :publishing, :published, :failed, :total].each_with_object({}) do |status, h| - h[status] = { - count: counts_hash[status] || 0, - latency: latencies[status.to_s] || 0 + last_queued_at = Models::Thread.maximum(:queued_message_count_last_updated_at) + last_publishing_at = Models::Thread.maximum(:publishing_message_count_last_updated_at) + last_failed_at = Models::Thread.maximum(:failed_message_count_last_updated_at) + last_published_at = Models::Thread.maximum(:published_message_count_last_updated_at) + oldest_queued_at = Models::Message.where(status: "queued").minimum(:queued_at) + queued_latency = oldest_queued_at ? (now - oldest_queued_at).to_i : 0 + + { + queued: { + count: counts_hash[:queued], last_update: last_queued_at, latency: queued_latency + }, + publishing: { + count: counts_hash[:publishing], last_update: last_publishing_at, latency: nil + }, + published: { + count: counts_hash[:published], last_update: last_published_at, latency: nil + }, + failed: { + count: counts_hash[:failed], last_update: last_failed_at, latency: nil + }, + total: { + count: counts_hash[:total], last_update: nil, latency: nil } - end + } end end end diff --git a/lib/outboxer/publisher.rb b/lib/outboxer/publisher.rb index bd24a3c6..4f7a836c 100644 --- a/lib/outboxer/publisher.rb +++ b/lib/outboxer/publisher.rb @@ -562,13 +562,13 @@ def create_publisher_thread(id:, index:, process_id: process_id, thread_id: Thread.current.object_id, queued_message_count: 0, - queued_message_count_last_updated_at: current_utc_time, + queued_message_count_last_updated_at: nil, publishing_message_count: 0, - publishing_message_count_last_updated_at: current_utc_time, + publishing_message_count_last_updated_at: nil, published_message_count: 0, - published_message_count_last_updated_at: current_utc_time, + published_message_count_last_updated_at: nil, failed_message_count: 0, - failed_message_count_last_updated_at: current_utc_time, + failed_message_count_last_updated_at: nil, created_at: current_utc_time, updated_at: current_utc_time) end diff --git a/lib/outboxer/web.rb b/lib/outboxer/web.rb index 1b1f672d..fef3f003 100755 --- a/lib/outboxer/web.rb +++ b/lib/outboxer/web.rb @@ -76,7 +76,7 @@ def pretty_duration_from_period(start_time:, end def pretty_duration_from_seconds(seconds:) - return "-" if seconds <= 0 + return "-" if seconds.nil? || seconds <= 0 # Units for sub-second durations sub_second_units = [ @@ -161,23 +161,33 @@ def human_readable_size(kilobytes:) end def time_ago_in_words(time) + return "-" if time.nil? + seconds = (::Time.now - time).to_i + future = seconds.negative? + seconds = seconds.abs - if seconds < 60 - "#{seconds} #{seconds == 1 ? "second" : "seconds"}" - else - prefix = seconds.negative? ? "from now" : "" - seconds = seconds.abs + # return "-" if seconds == 0 + quantity, unit = case seconds - when 0..59 then "#{seconds} seconds #{prefix}" - when 60..3599 then "#{seconds / 60} minutes #{prefix}" - when 3600..86_399 then "#{seconds / 3600} hours #{prefix}" - when 86_400..2_591_999 then "#{seconds / 86_400} days #{prefix}" - when 2_592_000..31_103_999 then "#{seconds / 2_592_000} months #{prefix}" - else "#{seconds / 31_104_000} years #{prefix}" + when 0..59 + [seconds, "second"] + when 60..3_599 + [seconds / 60, "minute"] + when 3_600..86_399 + [seconds / 3_600, "hour"] + when 86_400..2_591_999 + [seconds / 86_400, "day"] + when 2_592_000..31_103_999 + [seconds / 2_592_000, "month"] + else + [seconds / 31_104_000, "year"] end - end + + unit += "s" unless quantity == 1 + + future ? "#{quantity} #{unit} from now" : "#{quantity} #{unit} ago" end end diff --git a/lib/outboxer/web/views/home.erb b/lib/outboxer/web/views/home.erb index f7ab571a..3fdeb04f 100644 --- a/lib/outboxer/web/views/home.erb +++ b/lib/outboxer/web/views/home.erb @@ -35,8 +35,8 @@ <%= publisher[:name] %> - <%= Outboxer::Web.time_ago_in_words(publisher[:created_at]) %> ago - <%= Outboxer::Web.time_ago_in_words(publisher[:updated_at]) %> ago + <%= Outboxer::Web.time_ago_in_words(publisher[:created_at]) %> + <%= Outboxer::Web.time_ago_in_words(publisher[:updated_at]) %> <%= publisher[:status] %> <%= Outboxer::Web.pretty_throughput(per_second: publisher[:metrics]['published']['throughput']) %> <%= Outboxer::Web.pretty_duration_from_seconds(seconds: publisher[:metrics]['latency']) %> @@ -100,35 +100,54 @@
-
-
-

Messages

-
-
-
- - - - - - - - - - <% Outboxer::Message::STATUSES.each do |status| %> - - - - - - <% end %> - -
StatusCountLatency
- "> - <%= status %> - - <%= Outboxer::Web.pretty_number(number: message_metrics_by_status[status.to_sym][:count]) %><%= Outboxer::Web.pretty_duration_from_seconds(seconds: message_metrics_by_status[status.to_sym][:latency]) %>
-
-
+
+
+

Messages

+
+ +
+
+ + + + + + + + + + + + <% Outboxer::Message::STATUSES.each do |status| %> + <% metrics = message_metrics_by_status[status.to_sym] %> + + + + + + + + + + <% end %> + +
StatusCountLast UpdateLatency
+ <% if status != Outboxer::Message::Status::PUBLISHED %> + "> + <%= status %> + + <% else %> + <%= status %> + <% end %> + <%= Outboxer::Web.pretty_number(number: metrics[:count]) %><%= Outboxer::Web.time_ago_in_words(metrics[:last_update]) %> + <% if metrics[:latency] %> + <%= Outboxer::Web.pretty_duration_from_seconds(seconds: metrics[:latency]) %> + <% else %> + - + <% end %> +
+
+
diff --git a/lib/outboxer/web/views/message.erb b/lib/outboxer/web/views/message.erb index 119bf69a..afa412ca 100644 --- a/lib/outboxer/web/views/message.erb +++ b/lib/outboxer/web/views/message.erb @@ -56,13 +56,13 @@ Queued - <%= Outboxer::Web.time_ago_in_words(message[:queued_at]) %> ago + <%= Outboxer::Web.time_ago_in_words(message[:queued_at]) %> Updated - <%= Outboxer::Web.time_ago_in_words(message[:updated_at]) %> ago + <%= Outboxer::Web.time_ago_in_words(message[:updated_at]) %> diff --git a/lib/outboxer/web/views/messages.erb b/lib/outboxer/web/views/messages.erb index 32581d73..4bfb09f7 100644 --- a/lib/outboxer/web/views/messages.erb +++ b/lib/outboxer/web/views/messages.erb @@ -150,10 +150,10 @@ - <%= Outboxer::Web.time_ago_in_words(message[:queued_at]) %> ago + <%= Outboxer::Web.time_ago_in_words(message[:queued_at]) %> - <%= Outboxer::Web.time_ago_in_words(message[:updated_at]) %> ago + <%= Outboxer::Web.time_ago_in_words(message[:updated_at]) %> <% if message[:publisher_exists] %> diff --git a/lib/outboxer/web/views/publisher.erb b/lib/outboxer/web/views/publisher.erb index 6ca3d4c4..2f2c5f38 100644 --- a/lib/outboxer/web/views/publisher.erb +++ b/lib/outboxer/web/views/publisher.erb @@ -63,13 +63,13 @@ Created - <%= Outboxer::Web.time_ago_in_words(publisher[:created_at]) %> ago + <%= Outboxer::Web.time_ago_in_words(publisher[:created_at]) %> Updated - <%= Outboxer::Web.time_ago_in_words(publisher[:updated_at]) %> ago + <%= Outboxer::Web.time_ago_in_words(publisher[:updated_at]) %> diff --git a/spec/lib/outboxer/message/metrics_by_status_spec.rb b/spec/lib/outboxer/message/metrics_by_status_spec.rb index f7202475..ae6c78de 100644 --- a/spec/lib/outboxer/message/metrics_by_status_spec.rb +++ b/spec/lib/outboxer/message/metrics_by_status_spec.rb @@ -5,8 +5,6 @@ module Outboxer describe ".metrics_by_status" do let(:current_utc_time) { Time.utc(2025, 1, 1, 0, 0, 0) } - before { Models::Thread.delete_all } - before do create( :outboxer_thread, @@ -15,8 +13,10 @@ module Outboxer publishing_message_count: 20, published_message_count: 30, failed_message_count: 40, - created_at: current_utc_time, - updated_at: current_utc_time + queued_message_count_last_updated_at: current_utc_time, + publishing_message_count_last_updated_at: current_utc_time, + published_message_count_last_updated_at: current_utc_time, + failed_message_count_last_updated_at: current_utc_time ) create( @@ -28,18 +28,22 @@ module Outboxer publishing_message_count: 6, published_message_count: 7, failed_message_count: 8, - created_at: current_utc_time, - updated_at: current_utc_time + queued_message_count_last_updated_at: current_utc_time, + publishing_message_count_last_updated_at: current_utc_time, + published_message_count_last_updated_at: current_utc_time, + failed_message_count_last_updated_at: current_utc_time ) end it "returns metrics by status" do - expect(Message.metrics_by_status).to eq( - queued: { count: 15, latency: 0 }, - publishing: { count: 26, latency: 0 }, - published: { count: 37, latency: 0 }, - failed: { count: 48, latency: 0 }, - total: { count: 126, latency: 0 } + result = Message.metrics_by_status(time: double(now: current_utc_time)) + + expect(result).to eq( + queued: { count: 15, last_update: current_utc_time, latency: 0 }, + publishing: { count: 26, last_update: current_utc_time, latency: nil }, + published: { count: 37, last_update: current_utc_time, latency: nil }, + failed: { count: 48, last_update: current_utc_time, latency: nil }, + total: { count: 126, last_update: nil, latency: nil } ) end end