Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions lib/outboxer/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -724,5 +724,35 @@ def count_by_status
).transform_values(&:to_i)
end
end

def metrics_by_status(time: Time)
ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction(isolation: :repeatable_read) do
counts = Models::Message::Count.select(
"COALESCE(SUM(queued), 0) AS queued",
"COALESCE(SUM(publishing), 0) AS publishing",
"COALESCE(SUM(published), 0) AS published",
"COALESCE(SUM(failed), 0) AS failed",
"COALESCE(SUM(queued + publishing + published + failed), 0) AS total"
).take

min_times = Models::Message.group(:status).minimum(:updated_at)
min_total = min_times.values.compact.min

now = time.now.utc
latencies = min_times.transform_values { |t| t ? (now - t).to_i : 0 }
latencies["total"] = min_total ? (now - min_total).to_i : 0

counts_hash = counts.attributes.symbolize_keys.transform_values(&:to_i)

[:queued, :publishing, :published, :failed, :total].each_with_object({}) do |status, h|
h[status] = {
count: counts_hash[status] || 0,
latency: latencies[status.to_s] || 0
}
end
end
end
end
end
end
8 changes: 4 additions & 4 deletions lib/outboxer/models/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ class Message < ::ActiveRecord::Base
module Status
QUEUED = "queued"
PUBLISHING = "publishing"
PUBLISHED = "published"
FAILED = "failed"
PUBLISHED = "published"
end

STATUSES = [
Status::QUEUED,
Status::PUBLISHING,
Status::PUBLISHED,
Status::FAILED
Status::FAILED,
Status::PUBLISHED
]

# @!attribute [rw] status
Expand Down Expand Up @@ -46,8 +46,8 @@ module Status

scope :queued, -> { where(status: Status::QUEUED) }
scope :publishing, -> { where(status: Status::PUBLISHING) }
scope :published, -> { where(status: Status::PUBLISHED) }
scope :failed, -> { where(status: Status::FAILED) }
scope :published, -> { where(status: Status::PUBLISHED) }

# @!method messageable
# @return [ActiveRecord::Base] Polymorphic association to an event like model.
Expand Down
24 changes: 12 additions & 12 deletions lib/outboxer/web.rb
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,12 @@ def time_ago_in_words(time)
per_page: denormalised_query_params[:per_page],
time_zone: denormalised_query_params[:time_zone])

message_count_by_status = Message.count_by_status
message_metrics_by_status = Message.metrics_by_status

publishers = Publisher.all

erb :home, locals: {
message_count_by_status: message_count_by_status,
message_metrics_by_status: message_metrics_by_status,
denormalised_query_params: denormalised_query_params,
normalised_query_params: normalised_query_params,
normalised_query_string: normalised_query_string,
Expand Down Expand Up @@ -253,12 +253,12 @@ def time_ago_in_words(time)
per_page: denormalised_query_params[:per_page],
time_zone: denormalised_query_params[:time_zone])

message_count_by_status = Message.count_by_status
message_metrics_by_status = Message.metrics_by_status

publishers = Publisher.all

erb :home, locals: {
message_count_by_status: message_count_by_status,
message_metrics_by_status: message_metrics_by_status,
denormalised_query_params: denormalised_query_params,
normalised_query_params: normalised_query_params,
normalised_query_string: normalised_query_string,
Expand Down Expand Up @@ -311,7 +311,7 @@ def time_ago_in_words(time)
per_page: denormalised_query_params[:per_page],
time_zone: denormalised_query_params[:time_zone])

message_count_by_status = Message.count_by_status
message_metrics_by_status = Message.metrics_by_status

paginated_messages = Message.list(
status: denormalised_query_params[:status],
Expand All @@ -327,7 +327,7 @@ def time_ago_in_words(time)
denormalised_query_params: denormalised_query_params)

erb :messages, locals: {
message_count_by_status: message_count_by_status,
message_metrics_by_status: message_metrics_by_status,
messages: paginated_messages[:messages],
denormalised_query_params: denormalised_query_params,
normalised_query_params: normalised_query_params,
Expand Down Expand Up @@ -627,7 +627,7 @@ def normalise_query_string(status: Message::LIST_STATUS_DEFAULT,
per_page: denormalised_query_params[:per_page],
time_zone: denormalised_query_params[:time_zone])

message_count_by_status = Message.count_by_status
message_metrics_by_status = Message.metrics_by_status

message = Message.find_by_id(id: params[:id])

Expand All @@ -638,7 +638,7 @@ def normalise_query_string(status: Message::LIST_STATUS_DEFAULT,
denormalised_query_params: denormalised_query_params,
normalised_query_params: normalised_query_params,
normalised_query_string: normalised_query_string,
message_count_by_status: message_count_by_status,
message_metrics_by_status: message_metrics_by_status,
message: message,
messageable: messageable
}
Expand All @@ -654,15 +654,15 @@ def normalise_query_string(status: Message::LIST_STATUS_DEFAULT,
time_zone: params[:time_zone])

message = Message.find_by_id(id: params[:id])
message_count_by_status = Message.count_by_status
message_metrics_by_status = Message.metrics_by_status

messageable_class = message[:messageable_type]&.safe_constantize
messageable = messageable_class&.find_by_id(id: message[:messageable_id])

erb :messageable, locals: {
message: message,
messageable: messageable,
message_count_by_status: message_count_by_status,
message_metrics_by_status: message_metrics_by_status,
denormalised_query_params: denormalised_query_params
}
end
Expand Down Expand Up @@ -740,10 +740,10 @@ def normalise_query_string(status: Message::LIST_STATUS_DEFAULT,

publisher = Publisher.find_by_id(id: params[:id])

message_count_by_status = Message.count_by_status
message_metrics_by_status = Message.metrics_by_status

erb :publisher, locals: {
message_count_by_status: message_count_by_status,
message_metrics_by_status: message_metrics_by_status,
denormalised_query_params: denormalised_query_params,
normalised_query_params: normalised_query_params,
normalised_query_string: normalised_query_string,
Expand Down
8 changes: 3 additions & 5 deletions lib/outboxer/web/views/home.erb
Original file line number Diff line number Diff line change
Expand Up @@ -111,21 +111,19 @@
<tr>
<th scope="col">Status</th>
<th scope="col">Count</th>
<th scope="col">Throughput</th>
<th scope="col">Latency</th>
</tr>
</thead>
<tbody>
<% ['queued', 'publishing', 'published', 'failed'].each do |status| %>
<% Outboxer::Message::STATUSES.each do |status| %>
<tr>
<td class="text-capitalize">
<a class="custom-link" href="<%= outboxer_path("/messages?status=#{status}") %>">
<%= status %>
</a>
</td>
<td><%= Outboxer::Web.pretty_number(number: message_count_by_status[status.to_sym]) %></td>
<td><%= Outboxer::Web.pretty_throughput(per_second: 0) %></td>
<td><%= Outboxer::Web.pretty_duration_from_seconds(seconds: 0) %></td>
<td><%= Outboxer::Web.pretty_number(number: message_metrics_by_status[status.to_sym][:count]) %></td>
<td><%= Outboxer::Web.pretty_duration_from_seconds(seconds: message_metrics_by_status[status.to_sym][:latency]) %></td>
</tr>
<% end %>
</tbody>
Expand Down
6 changes: 3 additions & 3 deletions lib/outboxer/web/views/layout.erb
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
<div class="container-fluid">
<a class="navbar-brand" href="<%= outboxer_path('') %>">
<i class="bi bi-envelope-open-fill"></i> Outboxer
<span class="badge bg-secondary"><%= Outboxer::Web.pretty_number(number: message_count_by_status[:published]) %></span>
<span class="badge bg-secondary"><%= Outboxer::Web.pretty_number(number: message_metrics_by_status[:published][:count]) %></span>
</a>
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarNav" aria-controls="navbarNav" aria-expanded="false" aria-label="Toggle navigation">
<span class="navbar-toggler-icon"></span>
Expand All @@ -71,15 +71,15 @@
<li class="nav-item">
<a class="nav-link <%= 'active' if denormalised_query_params[:status] == nil %>"
href="<%= outboxer_path("/messages#{normalise_query_string(status: nil, time_zone: denormalised_query_params[:time_zone])}") %>">
All (<%= Outboxer::Web.pretty_number(number: message_count_by_status[:total]) %>)
All (<%= Outboxer::Web.pretty_number(number: message_metrics_by_status[:total][:count]) %>)
</a>
</li>

<% statuses.each do |status| %>
<li class="nav-item">
<a class="nav-link <%= 'active' if denormalised_query_params[:status] == status[:key] %>"
href="<%= outboxer_path("/messages#{normalise_query_string(status: status[:key], time_zone: params[:time_zone])}") %>">
<%= status[:name] %> (<%= Outboxer::Web.pretty_number(number: message_count_by_status[status[:key].to_sym]) %>)
<%= status[:name] %> (<%= Outboxer::Web.pretty_number(number: message_metrics_by_status[status[:key].to_sym][:count]) %>)
</a>
</li>
<% end %>
Expand Down
4 changes: 2 additions & 2 deletions quickstart_e2e_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,13 @@ attempt = 1
max_attempts = 10
delay = 1

published_count = Outboxer::Message.count_by_status[:published]
published_count = Outboxer::Message.metrics_by_status[:published][:count]

while (attempt <= max_attempts) && published_count.zero?
warn "Outboxer not published yet (#{attempt}/#{max_attempts})..."
sleep delay
attempt += 1
published_count = Outboxer::Message.count_by_status[:published]
published_count = Outboxer::Message.metrics_by_status[:published][:count]
end

Process.kill("TERM", publisher_pid)
Expand Down
4 changes: 2 additions & 2 deletions spec/bin/outboxer_publisher_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@

it "publishes message" do
attempt = 1
published_count = Outboxer::Message.count_by_status[:published]
published_count = Outboxer::Message.metrics_by_status[:published][:count]

while (attempt <= max_attempts) && published_count.zero?
warn "Outboxer message not published yet. Retrying (#{attempt}/#{max_attempts})..."
sleep delay
attempt += 1
published_count = Outboxer::Message.count_by_status[:published]
published_count = Outboxer::Message.metrics_by_status[:published][:count]
end

Process.kill("TERM", publisher_pid)
Expand Down
4 changes: 2 additions & 2 deletions spec/lib/outboxer/message/delete_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ module Outboxer
end

it "increments the total failed count metric" do
expect(Message.count_by_status[:failed]).to eq(29)
expect(Message.metrics_by_status[:failed][:count]).to eq(29)
end

it "returns the message id" do
Expand Down Expand Up @@ -52,7 +52,7 @@ module Outboxer
end

it "increments the total published count metric" do
expect(Message.count_by_status[:published]).to eq(29)
expect(Message.metrics_by_status[:published][:count]).to eq(29)
end

it "returns the message id" do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

module Outboxer
RSpec.describe Message do
describe ".count_by_status" do
describe ".metrics_by_status" do
let(:current_utc_time) { Time.utc(2025, 1, 1, 0, 0, 0) }

before { Models::Message::Count.delete_all }
Expand All @@ -25,9 +25,14 @@ module Outboxer
)
end

it "returns total counts by status" do
expect(Message.count_by_status).to eq(
queued: 15, publishing: 26, published: 37, failed: 48, total: 126)
it "returns metrics by status" do
expect(Message.metrics_by_status).to eq({
queued: { count: 15, latency: 0 },
publishing: { count: 26, latency: 0 },
published: { count: 37, latency: 0 },
failed: { count: 48, latency: 0 },
total: { count: 126, latency: 0 }
})
end
end
end
Expand Down
Loading