Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions db/migrate/create_outboxer_messages.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ def up
add_index :outboxer_messages, :status, name: "idx_outboxer_status"

# messages by status latency
add_index :outboxer_messages, [:status, :queued_at],
name: "idx_outboxer_status_queued_at"

# messages by status age
add_index :outboxer_messages, [:status, :updated_at],
name: "idx_outboxer_status_updated_at"

Expand Down
37 changes: 25 additions & 12 deletions lib/outboxer/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,8 @@ def count_by_status
def metrics_by_status(time: Time)
ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction(isolation: :repeatable_read) do
now = time.now.utc

counts = Models::Thread.select(
"COALESCE(SUM(queued_message_count), 0) AS queued",
"COALESCE(SUM(publishing_message_count), 0) AS publishing",
Expand All @@ -746,21 +748,32 @@ def metrics_by_status(time: Time)
"published_message_count + failed_message_count), 0) AS total"
).take

min_times = Models::Message.group(:status).minimum(:updated_at)
min_total = min_times.values.compact.min

now = time.now.utc
latencies = min_times.transform_values { |t| t ? (now - t).to_i : 0 }
latencies["total"] = min_total ? (now - min_total).to_i : 0

counts_hash = counts.attributes.symbolize_keys.transform_values(&:to_i)

[:queued, :publishing, :published, :failed, :total].each_with_object({}) do |status, h|
h[status] = {
count: counts_hash[status] || 0,
latency: latencies[status.to_s] || 0
last_queued_at = Models::Thread.maximum(:queued_message_count_last_updated_at)
last_publishing_at = Models::Thread.maximum(:publishing_message_count_last_updated_at)
last_failed_at = Models::Thread.maximum(:failed_message_count_last_updated_at)
last_published_at = Models::Thread.maximum(:published_message_count_last_updated_at)
oldest_queued_at = Models::Message.where(status: "queued").minimum(:queued_at)
queued_latency = oldest_queued_at ? (now - oldest_queued_at).to_i : 0

{
queued: {
count: counts_hash[:queued], last_update: last_queued_at, latency: queued_latency
},
publishing: {
count: counts_hash[:publishing], last_update: last_publishing_at, latency: nil
},
published: {
count: counts_hash[:published], last_update: last_published_at, latency: nil
},
failed: {
count: counts_hash[:failed], last_update: last_failed_at, latency: nil
},
total: {
count: counts_hash[:total], last_update: nil, latency: nil
}
end
}
end
end
end
Expand Down
8 changes: 4 additions & 4 deletions lib/outboxer/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -562,13 +562,13 @@ def create_publisher_thread(id:, index:,
process_id: process_id,
thread_id: Thread.current.object_id,
queued_message_count: 0,
queued_message_count_last_updated_at: current_utc_time,
queued_message_count_last_updated_at: nil,
publishing_message_count: 0,
publishing_message_count_last_updated_at: current_utc_time,
publishing_message_count_last_updated_at: nil,
published_message_count: 0,
published_message_count_last_updated_at: current_utc_time,
published_message_count_last_updated_at: nil,
failed_message_count: 0,
failed_message_count_last_updated_at: current_utc_time,
failed_message_count_last_updated_at: nil,
created_at: current_utc_time,
updated_at: current_utc_time)
end
Expand Down
36 changes: 23 additions & 13 deletions lib/outboxer/web.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def pretty_duration_from_period(start_time:,
end

def pretty_duration_from_seconds(seconds:)
return "-" if seconds <= 0
return "-" if seconds.nil? || seconds <= 0

# Units for sub-second durations
sub_second_units = [
Expand Down Expand Up @@ -161,23 +161,33 @@ def human_readable_size(kilobytes:)
end

def time_ago_in_words(time)
return "-" if time.nil?

seconds = (::Time.now - time).to_i
future = seconds.negative?
seconds = seconds.abs

if seconds < 60
"#{seconds} #{seconds == 1 ? "second" : "seconds"}"
else
prefix = seconds.negative? ? "from now" : ""
seconds = seconds.abs
# return "-" if seconds == 0

quantity, unit =
case seconds
when 0..59 then "#{seconds} seconds #{prefix}"
when 60..3599 then "#{seconds / 60} minutes #{prefix}"
when 3600..86_399 then "#{seconds / 3600} hours #{prefix}"
when 86_400..2_591_999 then "#{seconds / 86_400} days #{prefix}"
when 2_592_000..31_103_999 then "#{seconds / 2_592_000} months #{prefix}"
else "#{seconds / 31_104_000} years #{prefix}"
when 0..59
[seconds, "second"]
when 60..3_599
[seconds / 60, "minute"]
when 3_600..86_399
[seconds / 3_600, "hour"]
when 86_400..2_591_999
[seconds / 86_400, "day"]
when 2_592_000..31_103_999
[seconds / 2_592_000, "month"]
else
[seconds / 31_104_000, "year"]
end
end

unit += "s" unless quantity == 1

future ? "#{quantity} #{unit} from now" : "#{quantity} #{unit} ago"
end
end

Expand Down
83 changes: 51 additions & 32 deletions lib/outboxer/web/views/home.erb
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
<%= publisher[:name] %>
</a>
</td>
<td><%= Outboxer::Web.time_ago_in_words(publisher[:created_at]) %> ago</td>
<td><%= Outboxer::Web.time_ago_in_words(publisher[:updated_at]) %> ago</td>
<td><%= Outboxer::Web.time_ago_in_words(publisher[:created_at]) %></td>
<td><%= Outboxer::Web.time_ago_in_words(publisher[:updated_at]) %></td>
<td class="text-capitalize"><%= publisher[:status] %></td>
<td><%= Outboxer::Web.pretty_throughput(per_second: publisher[:metrics]['published']['throughput']) %></td>
<td><%= Outboxer::Web.pretty_duration_from_seconds(seconds: publisher[:metrics]['latency']) %></td>
Expand Down Expand Up @@ -100,35 +100,54 @@
</div>

<div class="container mt-4">
<div class="card">
<div class="card-header d-flex justify-content-between align-items-center">
<h3 class="mb-0">Messages</h3>
</div>
<div class="card-body">
<div class="table-responsive">
<table class="table" style="table-layout: fixed;">
<thead>
<tr>
<th scope="col">Status</th>
<th scope="col">Count</th>
<th scope="col">Latency</th>
</tr>
</thead>
<tbody>
<% Outboxer::Message::STATUSES.each do |status| %>
<tr>
<td class="text-capitalize">
<a class="custom-link" href="<%= outboxer_path("/messages?status=#{status}") %>">
<%= status %>
</a>
</td>
<td><%= Outboxer::Web.pretty_number(number: message_metrics_by_status[status.to_sym][:count]) %></td>
<td><%= Outboxer::Web.pretty_duration_from_seconds(seconds: message_metrics_by_status[status.to_sym][:latency]) %></td>
</tr>
<% end %>
</tbody>
</table>
</div>
</div>
<div class="card">
<div class="card-header d-flex justify-content-between align-items-center">
<h3 class="mb-0">Messages</h3>
</div>

<div class="card-body">
<div class="table-responsive">
<table class="table" style="table-layout: fixed;">
<thead>
<tr>
<th scope="col">Status</th>
<th scope="col">Count</th>
<th scope="col">Last Update</th>
<th scope="col">Latency</th>
</tr>
</thead>

<tbody>
<% Outboxer::Message::STATUSES.each do |status| %>
<% metrics = message_metrics_by_status[status.to_sym] %>

<tr>
<td class="text-capitalize">
<% if status != Outboxer::Message::Status::PUBLISHED %>
<a class="custom-link"
href="<%= outboxer_path("/messages?status=#{status}") %>">
<%= status %>
</a>
<% else %>
<%= status %>
<% end %>
</td>

<td><%= Outboxer::Web.pretty_number(number: metrics[:count]) %></td>
<td><%= Outboxer::Web.time_ago_in_words(metrics[:last_update]) %></td>

<td>
<% if metrics[:latency] %>
<%= Outboxer::Web.pretty_duration_from_seconds(seconds: metrics[:latency]) %>
<% else %>
-
<% end %>
</td>
</tr>
<% end %>
</tbody>
</table>
</div>
</div>
</div>
</div>
4 changes: 2 additions & 2 deletions lib/outboxer/web/views/message.erb
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@
<tr>
<th scope="row">Queued</th>
<td title="<%= message[:queued_at].in_time_zone(denormalised_query_params[:time_zone]) %>">
<%= Outboxer::Web.time_ago_in_words(message[:queued_at]) %> ago
<%= Outboxer::Web.time_ago_in_words(message[:queued_at]) %>
</td>
</tr>
<tr>
<th scope="row">Updated</th>
<td title="<%= message[:updated_at].in_time_zone(denormalised_query_params[:time_zone]) %>">
<%= Outboxer::Web.time_ago_in_words(message[:updated_at]) %> ago
<%= Outboxer::Web.time_ago_in_words(message[:updated_at]) %>
</td>
</tr>
<tr>
Expand Down
4 changes: 2 additions & 2 deletions lib/outboxer/web/views/messages.erb
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,10 @@
</a>
</td>
<td title="<%= message[:queued_at].in_time_zone(denormalised_query_params[:time_zone]) %>">
<%= Outboxer::Web.time_ago_in_words(message[:queued_at]) %> ago
<%= Outboxer::Web.time_ago_in_words(message[:queued_at]) %>
</td>
<td title="<%= message[:updated_at].in_time_zone(denormalised_query_params[:time_zone]) %>">
<%= Outboxer::Web.time_ago_in_words(message[:updated_at]) %> ago
<%= Outboxer::Web.time_ago_in_words(message[:updated_at]) %>
</td>
<td>
<% if message[:publisher_exists] %>
Expand Down
4 changes: 2 additions & 2 deletions lib/outboxer/web/views/publisher.erb
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@
<tr>
<th scope="row">Created</th>
<td title="<%= publisher[:created_at].in_time_zone(denormalised_query_params[:time_zone]) %>">
<%= Outboxer::Web.time_ago_in_words(publisher[:created_at]) %> ago
<%= Outboxer::Web.time_ago_in_words(publisher[:created_at]) %>
</td>
</tr>
<tr>
<th scope="row">Updated</th>
<td title="<%= publisher[:updated_at].in_time_zone(denormalised_query_params[:time_zone]) %>">
<%= Outboxer::Web.time_ago_in_words(publisher[:updated_at]) %> ago
<%= Outboxer::Web.time_ago_in_words(publisher[:updated_at]) %>
</td>
</tr>
</tbody>
Expand Down
28 changes: 16 additions & 12 deletions spec/lib/outboxer/message/metrics_by_status_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ module Outboxer
describe ".metrics_by_status" do
let(:current_utc_time) { Time.utc(2025, 1, 1, 0, 0, 0) }

before { Models::Thread.delete_all }

before do
create(
:outboxer_thread,
Expand All @@ -15,8 +13,10 @@ module Outboxer
publishing_message_count: 20,
published_message_count: 30,
failed_message_count: 40,
created_at: current_utc_time,
updated_at: current_utc_time
queued_message_count_last_updated_at: current_utc_time,
publishing_message_count_last_updated_at: current_utc_time,
published_message_count_last_updated_at: current_utc_time,
failed_message_count_last_updated_at: current_utc_time
)

create(
Expand All @@ -28,18 +28,22 @@ module Outboxer
publishing_message_count: 6,
published_message_count: 7,
failed_message_count: 8,
created_at: current_utc_time,
updated_at: current_utc_time
queued_message_count_last_updated_at: current_utc_time,
publishing_message_count_last_updated_at: current_utc_time,
published_message_count_last_updated_at: current_utc_time,
failed_message_count_last_updated_at: current_utc_time
)
end

it "returns metrics by status" do
expect(Message.metrics_by_status).to eq(
queued: { count: 15, latency: 0 },
publishing: { count: 26, latency: 0 },
published: { count: 37, latency: 0 },
failed: { count: 48, latency: 0 },
total: { count: 126, latency: 0 }
result = Message.metrics_by_status(time: double(now: current_utc_time))

expect(result).to eq(
queued: { count: 15, last_update: current_utc_time, latency: 0 },
publishing: { count: 26, last_update: current_utc_time, latency: nil },
published: { count: 37, last_update: current_utc_time, latency: nil },
failed: { count: 48, last_update: current_utc_time, latency: nil },
total: { count: 126, last_update: nil, latency: nil }
)
end
end
Expand Down