diff --git a/db/migrate/create_outboxer_counters.rb b/db/migrate/create_outboxer_counters.rb index 37200725..618d6d50 100644 --- a/db/migrate/create_outboxer_counters.rb +++ b/db/migrate/create_outboxer_counters.rb @@ -1,9 +1,9 @@ class CreateOutboxerCounters < ActiveRecord::Migration[6.1] def up create_table :outboxer_counters do |t| - t.string :hostname, limit: 255 - t.integer :process_id - t.integer :thread_id + t.string :hostname, limit: 255, null: false + t.integer :process_id, null: false + t.integer :thread_id, null: false t.integer :publisher_id diff --git a/lib/outboxer.rb b/lib/outboxer.rb index 48d14579..390da366 100644 --- a/lib/outboxer.rb +++ b/lib/outboxer.rb @@ -16,6 +16,7 @@ require_relative "outboxer/models/signal" require_relative "outboxer/database" +require_relative "outboxer/counter" require_relative "outboxer/message" require_relative "outboxer/publisher" diff --git a/lib/outboxer/counter.rb b/lib/outboxer/counter.rb new file mode 100644 index 00000000..0f4c9e46 --- /dev/null +++ b/lib/outboxer/counter.rb @@ -0,0 +1,86 @@ +module Outboxer + module Counter + HISTORIC_HOSTNAME = "historic" + HISTORIC_PROCESS_ID = 0 + HISTORIC_THREAD_ID = 0 + + # Rolls up thread_counters for the given publisher_id into the historic_counter, + # then destroys rolled-up thread_counters. + # + # @param publisher_id [Integer] publisher to roll up + # @param time [Time] timestamp context for updated_at + # @return [Hash] new historic_totals after rollup + def self.rollup(publisher_id:, time:) + ActiveRecord::Base.connection_pool.with_connection do + ActiveRecord::Base.transaction do + Models::Counter.insert_or_increment_by( + hostname: HISTORIC_HOSTNAME, + process_id: HISTORIC_PROCESS_ID, + thread_id: HISTORIC_THREAD_ID, + time: time + ) + + historic_counter = Models::Counter.lock("FOR UPDATE").find_by!( + hostname: HISTORIC_HOSTNAME, + process_id: HISTORIC_PROCESS_ID, + thread_id: HISTORIC_THREAD_ID + ) + + thread_counters = Models::Counter.lock("FOR UPDATE") + .where("publisher_id = ? OR publisher_id IS NULL", publisher_id) + .where.not( + hostname: HISTORIC_HOSTNAME, + process_id: HISTORIC_PROCESS_ID, + thread_id: HISTORIC_THREAD_ID) + + thread_counter_totals = Models::Counter + .where(id: thread_counters.select(:id)) + .select( + "COALESCE(SUM(queued_count), 0) AS queued_count", + "COALESCE(SUM(publishing_count), 0) AS publishing_count", + "COALESCE(SUM(published_count), 0) AS published_count", + "COALESCE(SUM(failed_count), 0) AS failed_count" + ).take.attributes.symbolize_keys.slice( + :queued_count, :publishing_count, :published_count, :failed_count) + + historic_counter.update!( + queued_count: historic_counter.queued_count + thread_counter_totals[:queued_count], + publishing_count: historic_counter.publishing_count + + thread_counter_totals[:publishing_count], + published_count: historic_counter.published_count + + thread_counter_totals[:published_count], + failed_count: historic_counter.failed_count + thread_counter_totals[:failed_count], + updated_at: time + ) + + thread_counters.destroy_all + + { + queued_count: historic_counter.queued_count, + publishing_count: historic_counter.publishing_count, + published_count: historic_counter.published_count, + failed_count: historic_counter.failed_count + } + end + end + end + + # Returns total counts across the historic_counter and all thread_counters. + # + # @return [Hash] total counts across all rows + def self.total + ActiveRecord::Base.connection_pool.with_connection do + result = Models::Counter.select( + "COALESCE(SUM(queued_count), 0) AS queued_count", + "COALESCE(SUM(publishing_count), 0) AS publishing_count", + "COALESCE(SUM(published_count), 0) AS published_count", + "COALESCE(SUM(failed_count), 0) AS failed_count" + ).take + + result.attributes.symbolize_keys.slice( + :queued_count, :publishing_count, :published_count, :failed_count + ) + end + end + end +end diff --git a/spec/lib/outboxer/counter_spec.rb b/spec/lib/outboxer/counter_spec.rb new file mode 100644 index 00000000..6217b177 --- /dev/null +++ b/spec/lib/outboxer/counter_spec.rb @@ -0,0 +1,216 @@ +# frozen_string_literal: true + +require "rails_helper" + +module Outboxer + RSpec.describe Counter do + let(:time_now) { Time.utc(2025, 1, 1, 0, 0, 0) } + let(:publisher_a) { 100 } + let(:publisher_b) { 200 } + + before { Models::Counter.delete_all } + + describe ".rollup" do + context "when historic counter does not exist" do + let!(:thread_1) do + Models::Counter.create!( + hostname: "worker1.test.local", process_id: 111, thread_id: 21_001, + publisher_id: publisher_a, + queued_count: 1, publishing_count: 2, + published_count: 3, failed_count: 4, + created_at: time_now, updated_at: time_now) + end + + let!(:thread_2) do + Models::Counter.create!( + hostname: "worker1.test.local", process_id: 111, thread_id: 21_002, + publisher_id: publisher_a, + queued_count: 10, publishing_count: 20, + published_count: 30, failed_count: 40, + created_at: time_now, updated_at: time_now) + end + + it "creates the historic counter and rolls up all the thread counters" do + result = Counter.rollup(publisher_id: publisher_a, time: time_now) + + expect(result).to eq( + queued_count: 11, + publishing_count: 22, + published_count: 33, + failed_count: 44 + ) + + historic = Models::Counter.find_by!( + hostname: Counter::HISTORIC_HOSTNAME, + process_id: Counter::HISTORIC_PROCESS_ID, + thread_id: Counter::HISTORIC_THREAD_ID) + + expect(historic.queued_count).to eq(11) + expect(historic.publishing_count).to eq(22) + expect(historic.published_count).to eq(33) + expect(historic.failed_count).to eq(44) + + expect(Models::Counter.where.not(hostname: Counter::HISTORIC_HOSTNAME)).to be_empty + end + end + + context "when historic counter already exists" do + let!(:historic) do + Models::Counter.create!( + hostname: Counter::HISTORIC_HOSTNAME, + process_id: Counter::HISTORIC_PROCESS_ID, + thread_id: Counter::HISTORIC_THREAD_ID, + queued_count: 100, publishing_count: 200, + published_count: 300, failed_count: 400, + created_at: time_now, updated_at: time_now) + end + + let!(:thread) do + Models::Counter.create!( + hostname: "worker2.test.local", process_id: 222, thread_id: 22_001, + publisher_id: publisher_a, + queued_count: 1, publishing_count: 1, + published_count: 1, failed_count: 1, + created_at: time_now, updated_at: time_now) + end + + it "increments existing historic counters" do + result = Counter.rollup(publisher_id: publisher_a, time: time_now) + + expect(result).to eq( + queued_count: 101, + publishing_count: 201, + published_count: 301, + failed_count: 401 + ) + + historic.reload + + expect(historic.queued_count).to eq(101) + expect(historic.publishing_count).to eq(201) + expect(historic.published_count).to eq(301) + expect(historic.failed_count).to eq(401) + end + end + + context "when thread counters belong to a different publisher" do + let!(:thread) do + Models::Counter.create!( + hostname: "worker3.test.local", process_id: 333, thread_id: 23_001, + publisher_id: publisher_b, + queued_count: 5, publishing_count: 6, + published_count: 7, failed_count: 8, + created_at: time_now, updated_at: time_now) + end + + it "creates the historic counter but does not include other publishers" do + result = Counter.rollup(publisher_id: publisher_a, time: time_now) + + expect(result).to eq( + queued_count: 0, + publishing_count: 0, + published_count: 0, + failed_count: 0 + ) + + historic = Models::Counter.find_by!( + hostname: Counter::HISTORIC_HOSTNAME, + process_id: Counter::HISTORIC_PROCESS_ID, + thread_id: Counter::HISTORIC_THREAD_ID) + + expect(historic.queued_count).to eq(0) + expect(historic.publishing_count).to eq(0) + expect(historic.published_count).to eq(0) + expect(historic.failed_count).to eq(0) + + expect(Models::Counter.where(publisher_id: publisher_b).count).to eq(1) + end + end + + context "when thread counters have nil publisher_id" do + let!(:thread) do + Models::Counter.create!( + hostname: "worker4.test.local", process_id: 444, thread_id: 24_001, + publisher_id: nil, + queued_count: 2, publishing_count: 3, + published_count: 4, failed_count: 5, + created_at: time_now, updated_at: time_now) + end + + it "includes nil publisher rows in the rollup" do + result = Counter.rollup(publisher_id: publisher_a, time: time_now) + + expect(result).to eq( + queued_count: 2, + publishing_count: 3, + published_count: 4, + failed_count: 5 + ) + + historic = Models::Counter.find_by!( + hostname: Counter::HISTORIC_HOSTNAME, + process_id: Counter::HISTORIC_PROCESS_ID, + thread_id: Counter::HISTORIC_THREAD_ID) + + expect(historic.queued_count).to eq(2) + expect(historic.publishing_count).to eq(3) + expect(historic.published_count).to eq(4) + expect(historic.failed_count).to eq(5) + end + end + + context "when no matching counters exist" do + it "creates a historic counter with all zero counts" do + result = Counter.rollup(publisher_id: publisher_a, time: time_now) + + expect(result).to eq( + queued_count: 0, + publishing_count: 0, + published_count: 0, + failed_count: 0 + ) + + historic = Models::Counter.find_by!( + hostname: Counter::HISTORIC_HOSTNAME, + process_id: Counter::HISTORIC_PROCESS_ID, + thread_id: Counter::HISTORIC_THREAD_ID) + + expect(historic.queued_count).to eq(0) + expect(historic.publishing_count).to eq(0) + expect(historic.published_count).to eq(0) + expect(historic.failed_count).to eq(0) + end + end + end + + describe ".total" do + before do + Models::Counter.create!( + hostname: Counter::HISTORIC_HOSTNAME, + process_id: Counter::HISTORIC_PROCESS_ID, + thread_id: Counter::HISTORIC_THREAD_ID, + queued_count: 10, publishing_count: 20, + published_count: 30, failed_count: 40, + created_at: time_now, updated_at: time_now) + + Models::Counter.create!( + hostname: "worker5.test.local", process_id: 555, thread_id: 25_001, + publisher_id: publisher_a, + queued_count: 5, publishing_count: 6, + published_count: 7, failed_count: 8, + created_at: time_now, updated_at: time_now) + end + + it "returns total counts across all counters" do + totals = Counter.total + + expect(totals).to eq( + queued_count: 15, + publishing_count: 26, + published_count: 37, + failed_count: 48 + ) + end + end + end +end