Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions db/migrate/create_outboxer_counters.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
class CreateOutboxerCounters < ActiveRecord::Migration[6.1]
def up
create_table :outboxer_counters do |t|
t.string :hostname, limit: 255
t.integer :process_id
t.integer :thread_id
t.string :hostname, limit: 255, null: false
t.integer :process_id, null: false
t.integer :thread_id, null: false

t.integer :publisher_id

Expand Down
1 change: 1 addition & 0 deletions lib/outboxer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
require_relative "outboxer/models/signal"

require_relative "outboxer/database"
require_relative "outboxer/counter"
require_relative "outboxer/message"
require_relative "outboxer/publisher"

Expand Down
86 changes: 86 additions & 0 deletions lib/outboxer/counter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
module Outboxer
module Counter
HISTORIC_HOSTNAME = "historic"
HISTORIC_PROCESS_ID = 0
HISTORIC_THREAD_ID = 0

# Rolls up thread_counters for the given publisher_id into the historic_counter,
# then destroys rolled-up thread_counters.
#
# @param publisher_id [Integer] publisher to roll up
# @param time [Time] timestamp context for updated_at
# @return [Hash] new historic_totals after rollup
def self.rollup(publisher_id:, time:)
ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction do
Models::Counter.insert_or_increment_by(
hostname: HISTORIC_HOSTNAME,
process_id: HISTORIC_PROCESS_ID,
thread_id: HISTORIC_THREAD_ID,
time: time
)

historic_counter = Models::Counter.lock("FOR UPDATE").find_by!(
hostname: HISTORIC_HOSTNAME,
process_id: HISTORIC_PROCESS_ID,
thread_id: HISTORIC_THREAD_ID
)

thread_counters = Models::Counter.lock("FOR UPDATE")
.where("publisher_id = ? OR publisher_id IS NULL", publisher_id)
.where.not(
hostname: HISTORIC_HOSTNAME,
process_id: HISTORIC_PROCESS_ID,
thread_id: HISTORIC_THREAD_ID)

thread_counter_totals = Models::Counter
.where(id: thread_counters.select(:id))
.select(
"COALESCE(SUM(queued_count), 0) AS queued_count",
"COALESCE(SUM(publishing_count), 0) AS publishing_count",
"COALESCE(SUM(published_count), 0) AS published_count",
"COALESCE(SUM(failed_count), 0) AS failed_count"
).take.attributes.symbolize_keys.slice(
:queued_count, :publishing_count, :published_count, :failed_count)

historic_counter.update!(
queued_count: historic_counter.queued_count + thread_counter_totals[:queued_count],
publishing_count: historic_counter.publishing_count +
thread_counter_totals[:publishing_count],
published_count: historic_counter.published_count +
thread_counter_totals[:published_count],
failed_count: historic_counter.failed_count + thread_counter_totals[:failed_count],
updated_at: time
)

thread_counters.destroy_all

{
queued_count: historic_counter.queued_count,
publishing_count: historic_counter.publishing_count,
published_count: historic_counter.published_count,
failed_count: historic_counter.failed_count
}
end
end
end

# Returns total counts across the historic_counter and all thread_counters.
#
# @return [Hash] total counts across all rows
def self.total
ActiveRecord::Base.connection_pool.with_connection do
result = Models::Counter.select(
"COALESCE(SUM(queued_count), 0) AS queued_count",
"COALESCE(SUM(publishing_count), 0) AS publishing_count",
"COALESCE(SUM(published_count), 0) AS published_count",
"COALESCE(SUM(failed_count), 0) AS failed_count"
).take

result.attributes.symbolize_keys.slice(
:queued_count, :publishing_count, :published_count, :failed_count
)
end
end
end
end
216 changes: 216 additions & 0 deletions spec/lib/outboxer/counter_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
# frozen_string_literal: true

require "rails_helper"

module Outboxer
RSpec.describe Counter do
let(:time_now) { Time.utc(2025, 1, 1, 0, 0, 0) }
let(:publisher_a) { 100 }
let(:publisher_b) { 200 }

before { Models::Counter.delete_all }

describe ".rollup" do
context "when historic counter does not exist" do
let!(:thread_1) do
Models::Counter.create!(
hostname: "worker1.test.local", process_id: 111, thread_id: 21_001,
publisher_id: publisher_a,
queued_count: 1, publishing_count: 2,
published_count: 3, failed_count: 4,
created_at: time_now, updated_at: time_now)
end

let!(:thread_2) do
Models::Counter.create!(
hostname: "worker1.test.local", process_id: 111, thread_id: 21_002,
publisher_id: publisher_a,
queued_count: 10, publishing_count: 20,
published_count: 30, failed_count: 40,
created_at: time_now, updated_at: time_now)
end

it "creates the historic counter and rolls up all the thread counters" do
result = Counter.rollup(publisher_id: publisher_a, time: time_now)

expect(result).to eq(
queued_count: 11,
publishing_count: 22,
published_count: 33,
failed_count: 44
)

historic = Models::Counter.find_by!(
hostname: Counter::HISTORIC_HOSTNAME,
process_id: Counter::HISTORIC_PROCESS_ID,
thread_id: Counter::HISTORIC_THREAD_ID)

expect(historic.queued_count).to eq(11)
expect(historic.publishing_count).to eq(22)
expect(historic.published_count).to eq(33)
expect(historic.failed_count).to eq(44)

expect(Models::Counter.where.not(hostname: Counter::HISTORIC_HOSTNAME)).to be_empty
end
end

context "when historic counter already exists" do
let!(:historic) do
Models::Counter.create!(
hostname: Counter::HISTORIC_HOSTNAME,
process_id: Counter::HISTORIC_PROCESS_ID,
thread_id: Counter::HISTORIC_THREAD_ID,
queued_count: 100, publishing_count: 200,
published_count: 300, failed_count: 400,
created_at: time_now, updated_at: time_now)
end

let!(:thread) do
Models::Counter.create!(
hostname: "worker2.test.local", process_id: 222, thread_id: 22_001,
publisher_id: publisher_a,
queued_count: 1, publishing_count: 1,
published_count: 1, failed_count: 1,
created_at: time_now, updated_at: time_now)
end

it "increments existing historic counters" do
result = Counter.rollup(publisher_id: publisher_a, time: time_now)

expect(result).to eq(
queued_count: 101,
publishing_count: 201,
published_count: 301,
failed_count: 401
)

historic.reload

expect(historic.queued_count).to eq(101)
expect(historic.publishing_count).to eq(201)
expect(historic.published_count).to eq(301)
expect(historic.failed_count).to eq(401)
end
end

context "when thread counters belong to a different publisher" do
let!(:thread) do
Models::Counter.create!(
hostname: "worker3.test.local", process_id: 333, thread_id: 23_001,
publisher_id: publisher_b,
queued_count: 5, publishing_count: 6,
published_count: 7, failed_count: 8,
created_at: time_now, updated_at: time_now)
end

it "creates the historic counter but does not include other publishers" do
result = Counter.rollup(publisher_id: publisher_a, time: time_now)

expect(result).to eq(
queued_count: 0,
publishing_count: 0,
published_count: 0,
failed_count: 0
)

historic = Models::Counter.find_by!(
hostname: Counter::HISTORIC_HOSTNAME,
process_id: Counter::HISTORIC_PROCESS_ID,
thread_id: Counter::HISTORIC_THREAD_ID)

expect(historic.queued_count).to eq(0)
expect(historic.publishing_count).to eq(0)
expect(historic.published_count).to eq(0)
expect(historic.failed_count).to eq(0)

expect(Models::Counter.where(publisher_id: publisher_b).count).to eq(1)
end
end

context "when thread counters have nil publisher_id" do
let!(:thread) do
Models::Counter.create!(
hostname: "worker4.test.local", process_id: 444, thread_id: 24_001,
publisher_id: nil,
queued_count: 2, publishing_count: 3,
published_count: 4, failed_count: 5,
created_at: time_now, updated_at: time_now)
end

it "includes nil publisher rows in the rollup" do
result = Counter.rollup(publisher_id: publisher_a, time: time_now)

expect(result).to eq(
queued_count: 2,
publishing_count: 3,
published_count: 4,
failed_count: 5
)

historic = Models::Counter.find_by!(
hostname: Counter::HISTORIC_HOSTNAME,
process_id: Counter::HISTORIC_PROCESS_ID,
thread_id: Counter::HISTORIC_THREAD_ID)

expect(historic.queued_count).to eq(2)
expect(historic.publishing_count).to eq(3)
expect(historic.published_count).to eq(4)
expect(historic.failed_count).to eq(5)
end
end

context "when no matching counters exist" do
it "creates a historic counter with all zero counts" do
result = Counter.rollup(publisher_id: publisher_a, time: time_now)

expect(result).to eq(
queued_count: 0,
publishing_count: 0,
published_count: 0,
failed_count: 0
)

historic = Models::Counter.find_by!(
hostname: Counter::HISTORIC_HOSTNAME,
process_id: Counter::HISTORIC_PROCESS_ID,
thread_id: Counter::HISTORIC_THREAD_ID)

expect(historic.queued_count).to eq(0)
expect(historic.publishing_count).to eq(0)
expect(historic.published_count).to eq(0)
expect(historic.failed_count).to eq(0)
end
end
end

describe ".total" do
before do
Models::Counter.create!(
hostname: Counter::HISTORIC_HOSTNAME,
process_id: Counter::HISTORIC_PROCESS_ID,
thread_id: Counter::HISTORIC_THREAD_ID,
queued_count: 10, publishing_count: 20,
published_count: 30, failed_count: 40,
created_at: time_now, updated_at: time_now)

Models::Counter.create!(
hostname: "worker5.test.local", process_id: 555, thread_id: 25_001,
publisher_id: publisher_a,
queued_count: 5, publishing_count: 6,
published_count: 7, failed_count: 8,
created_at: time_now, updated_at: time_now)
end

it "returns total counts across all counters" do
totals = Counter.total

expect(totals).to eq(
queued_count: 15,
publishing_count: 26,
published_count: 37,
failed_count: 48
)
end
end
end
end