Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ AllCops:
TargetRubyVersion: 3.1.5
SuggestExtensions: false
Metrics/AbcSize:
Max: 80
Max: 90
Metrics/CyclomaticComplexity:
Enabled: true
Max: 25
Expand Down Expand Up @@ -44,7 +44,7 @@ Metrics/ParameterLists:
Style/FrozenStringLiteralComment:
Enabled: false
Metrics/MethodLength:
Max: 80
Max: 100
Metrics/ClassLength:
Enabled: true
Max: 100
Expand Down
89 changes: 65 additions & 24 deletions lib/outboxer/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,10 @@ def create(name:, concurrency:,
"heartbeat_interval" => heartbeat_interval
},
metrics: {
"throughput" => 0,
"queued" => { "count" => 0, "throughput" => 0 },
"publishing" => { "count" => 0, "throughput" => 0 },
"published" => { "count" => 0, "throughput" => 0 },
"failed" => { "count" => 0, "throughput" => 0 },
"latency" => 0,
"cpu" => 0,
"rss " => 0,
Expand Down Expand Up @@ -274,7 +277,7 @@ def trap_signals
# @param process [Process] The process module for accessing system metrics.
# @param kernel [Kernel] The kernel module for sleeping operations.
# @return [Thread] The heartbeat thread.
def create_heartbeat_thread(id:,
def create_heartbeat_thread(id:, hostname:, process_id:,
heartbeat_interval:, tick_interval:,
logger:, time:, process:, kernel:)
Thread.new do
Expand All @@ -301,33 +304,68 @@ def create_heartbeat_thread(id:,
signal.destroy
end

throughput = Models::Message
.where(status: Message::Status::PUBLISHED)
.where(publisher_id: id)
.where("published_at >= ?", 1.second.ago)
.count
current_utc_time = time.now.utc

previous_metrics = publisher.metrics
last_updated_at = publisher.updated_at
time_delta = [current_utc_time - last_updated_at, heartbeat_interval].max

publisher_message_count = Models::Message::Count
.select(
"COALESCE(SUM(queued), 0) AS queued,
COALESCE(SUM(publishing), 0) AS publishing,
COALESCE(SUM(published), 0) AS published,
COALESCE(SUM(failed), 0) AS failed,
MAX(updated_at) AS last_updated_at"
)
.where(hostname: hostname, process_id: process_id)
.group(:hostname, :process_id)
.take

current_counts = {
"queued" => publisher_message_count&.queued || 0,
"publishing" => publisher_message_count&.publishing || 0,
"published" => publisher_message_count&.published || 0,
"failed" => publisher_message_count&.failed || 0
}

throughput_by_status =
current_counts.each_with_object({}) do |(status, count), h|
prev = previous_metrics[status]["count"] || 0
h[status] = ((count - prev) / time_delta).round(0)
end

last_published_message = Models::Message
.where(status: Message::Status::PUBLISHED)
.where(publisher_id: id)
.order(published_at: :desc)
.first
latency = 0

latency = if last_published_message.nil?
0
else
(Time.now.utc - last_published_message.published_at).to_i
end
if !publisher_message_count.nil?
latency = (current_utc_time - publisher_message_count.last_updated_at).round(0)
end

publisher.update!(
updated_at: time.now.utc,
updated_at: current_utc_time,
metrics: {
throughput: throughput,
latency: latency,
cpu: cpu,
rss: rss,
rtt: rtt
})
"queued" => {
"count" => current_counts["queued"],
"throughput" => throughput_by_status["queued"]
},
"publishing" => {
"count" => current_counts["publishing"],
"throughput" => throughput_by_status["publishing"]
},
"published" => {
"count" => current_counts["published"],
"throughput" => throughput_by_status["published"]
},
"failed" => {
"count" => current_counts["failed"],
"throughput" => throughput_by_status["failed"]
},
"latency" => latency,
"cpu" => cpu,
"rss" => rss,
"rtt" => rtt
}
)
end
end

Expand Down Expand Up @@ -429,6 +467,8 @@ def handle_signal(id:, name:, logger:)
# @yieldparam messages [Array<Hash>] An array of message hashes retrieved from the buffer.
def publish_message(
name: "#{::Socket.gethostname}:#{::Process.pid}",
hostname: Socket.gethostname,
process_id: Process.pid,
concurrency: PUBLISH_MESSAGE_DEFAULTS[:concurrency],
tick_interval: PUBLISH_MESSAGE_DEFAULTS[:tick_interval],
poll_interval: PUBLISH_MESSAGE_DEFAULTS[:poll_interval],
Expand Down Expand Up @@ -456,6 +496,7 @@ def publish_message(

heartbeat_thread = create_heartbeat_thread(
id: publisher[:id], heartbeat_interval: heartbeat_interval, tick_interval: tick_interval,
hostname: hostname, process_id: process_id,
logger: logger, time: time, process: process, kernel: kernel)

publisher_threads = Array.new(concurrency) do |index|
Expand Down
2 changes: 1 addition & 1 deletion lib/outboxer/web/views/home.erb
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
<td><%= Outboxer::Web.time_ago_in_words(publisher[:created_at]) %> ago</td>
<td><%= Outboxer::Web.time_ago_in_words(publisher[:updated_at]) %> ago</td>
<td class="text-capitalize"><%= publisher[:status] %></td>
<td><%= Outboxer::Web.pretty_throughput(per_second: publisher[:metrics]['throughput']) %></td>
<td><%= Outboxer::Web.pretty_throughput(per_second: publisher[:metrics]['published']['throughput']) %></td>
<td><%= Outboxer::Web.pretty_duration_from_seconds(seconds: publisher[:metrics]['latency']) %></td>
<td><%= publisher[:metrics]['cpu'].round(0) %>%</td>
<td><%= Outboxer::Web.human_readable_size(kilobytes: publisher[:metrics]['rss']) %></td>
Expand Down
2 changes: 1 addition & 1 deletion lib/outboxer/web/views/publisher.erb
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@
<tbody>
<tr>
<th scope="row">Throughput</th>
<td><%= Outboxer::Web.pretty_throughput(per_second: publisher[:metrics]['throughput']) %></td>
<td><%= Outboxer::Web.pretty_throughput(per_second: publisher[:metrics]['published']['throughput']) %></td>
</tr>
<tr>
<th scope="row">Latency</th>
Expand Down
5 changes: 4 additions & 1 deletion spec/factories/outboxer_publishers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
end
metrics do
{
"throughput" => 1000,
"queued" => { "count" => 1, "throughput" => 0 },
"publishing" => { "count" => 1, "throughput" => 0 },
"failed" => { "count" => 1, "throughput" => 0 },
"published" => { "count" => 1, "throughput" => 0 },
"latency" => 0,
"cpu" => 10.5,
"rss" => 40.95,
Expand Down