From 732afdbfebd8b75bbbea9423bc07c3fa61cc6941 Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Sun, 26 Oct 2025 11:42:20 +1100 Subject: [PATCH 1/5] add thread counter --- db/migrate/create_outboxer_thread_counters.rb | 28 ++++++ lib/outboxer.rb | 1 + lib/outboxer/models/thread_counter.rb | 58 ++++++++++++ .../outboxer/models/thread_counter_spec.rb | 88 +++++++++++++++++++ tasks/database.rake | 3 + 5 files changed, 178 insertions(+) create mode 100644 db/migrate/create_outboxer_thread_counters.rb create mode 100644 lib/outboxer/models/thread_counter.rb create mode 100644 spec/lib/outboxer/models/thread_counter_spec.rb diff --git a/db/migrate/create_outboxer_thread_counters.rb b/db/migrate/create_outboxer_thread_counters.rb new file mode 100644 index 00000000..1f06709d --- /dev/null +++ b/db/migrate/create_outboxer_thread_counters.rb @@ -0,0 +1,28 @@ +class CreateOutboxerThreadCounters < ActiveRecord::Migration[6.1] + def up + create_table :outboxer_thread_counters do |t| + t.string :hostname, limit: 255, null: false + t.integer :process_id, null: false + t.integer :thread_id, null: false + + t.integer :publisher_id + + t.integer :queued_count, null: false + t.integer :publishing_count, null: false + t.integer :published_count, null: false + t.integer :failed_count, null: false + + t.timestamps + end + + add_index :outboxer_thread_counters, [:hostname, :process_id, :thread_id], + unique: true, name: "idx_outboxer_thread_identity" + + add_index :outboxer_thread_counters, + :updated_at, name: "idx_outboxer_thread_counters_updated_at" + end + + def down + drop_table :outboxer_thread_counters if table_exists?(:outboxer_thread_counters) + end +end diff --git a/lib/outboxer.rb b/lib/outboxer.rb index 432720ac..3d5bdeb0 100644 --- a/lib/outboxer.rb +++ b/lib/outboxer.rb @@ -10,6 +10,7 @@ require_relative "outboxer/models/message" require_relative "outboxer/models/message_count" require_relative "outboxer/models/message_total" +require_relative "outboxer/models/thread_counter" require_relative "outboxer/models/publisher" require_relative "outboxer/models/signal" diff --git a/lib/outboxer/models/thread_counter.rb b/lib/outboxer/models/thread_counter.rb new file mode 100644 index 00000000..4c5de076 --- /dev/null +++ b/lib/outboxer/models/thread_counter.rb @@ -0,0 +1,58 @@ +module Outboxer + module Models + class ThreadCounter < ::ActiveRecord::Base + self.table_name = "outboxer_thread_counters" + + def self.insert_or_increment(hostname: Socket.gethostname, + process_id: Process.pid, + thread_id: Thread.current.object_id, + queued_count: 0, publishing_count: 0, + published_count: 0, failed_count: 0, + time: Time.now.utc) + adapter = ActiveRecord::Base.connection.adapter_name.downcase + now = time.utc + + sql = if adapter.include?("postgres") + <<~SQL + INSERT INTO #{table_name} + (hostname, process_id, thread_id, + queued_count, publishing_count, published_count, failed_count, + created_at, updated_at) + VALUES (?, ?, ?, #{queued_count}, #{publishing_count}, #{published_count}, #{failed_count}, ?, ?) + ON CONFLICT (hostname, process_id, thread_id) + DO UPDATE SET + queued_count = #{table_name}.queued_count + #{queued_count}, + publishing_count = #{table_name}.publishing_count + #{publishing_count}, + published_count = #{table_name}.published_count + #{published_count}, + failed_count = #{table_name}.failed_count + #{failed_count}, + updated_at = EXCLUDED.updated_at + RETURNING id + SQL + else + <<~SQL + INSERT INTO #{table_name} + (hostname, process_id, thread_id, + queued_count, publishing_count, published_count, failed_count, + created_at, updated_at) + VALUES (?, ?, ?, #{queued_count}, #{publishing_count}, #{published_count}, #{failed_count}, ?, ?) + ON DUPLICATE KEY UPDATE + queued_count = queued_count + #{queued_count}, + publishing_count = publishing_count + #{publishing_count}, + published_count = published_count + #{published_count}, + failed_count = failed_count + #{failed_count}, + updated_at = VALUES(updated_at) + RETURNING id + SQL + end + + result = ActiveRecord::Base.connection.exec_query( + ActiveRecord::Base.sanitize_sql_array( + [sql, hostname, process_id, thread_id, now, now] + ) + ) + + { id: result.first["id"] } + end + end + end +end diff --git a/spec/lib/outboxer/models/thread_counter_spec.rb b/spec/lib/outboxer/models/thread_counter_spec.rb new file mode 100644 index 00000000..c02564b3 --- /dev/null +++ b/spec/lib/outboxer/models/thread_counter_spec.rb @@ -0,0 +1,88 @@ +require "rails_helper" + +module Outboxer + module Models + RSpec.describe ThreadCounter, type: :model do + describe ".insert_or_increment" do + let(:hostname) { "test-host" } + let(:process_id) { 12_345 } + let(:thread_id) { 999 } + let(:now) { Time.now.utc } + + it "inserts a new row and returns its id" do + thread_counter = ThreadCounter.insert_or_increment( + hostname: hostname, + process_id: process_id, + thread_id: thread_id, + queued_count: 1, + time: now + ) + + expect(thread_counter).to be_a(Hash) + expect(thread_counter).to include(:id) + expect(thread_counter[:id]).to be_present + + row = ThreadCounter.find(thread_counter[:id]) + expect(row.hostname).to eq(hostname) + expect(row.queued_count).to eq(1) + end + + it "increments existing counts if called again" do + first = ThreadCounter.insert_or_increment( + hostname: hostname, + process_id: process_id, + thread_id: thread_id, + queued_count: 1 + ) + + second = ThreadCounter.insert_or_increment( + hostname: hostname, + process_id: process_id, + thread_id: thread_id, + queued_count: 2 + ) + + expect(second[:id]).to eq(first[:id]) + + row = ThreadCounter.find(second[:id]) + expect(row.queued_count).to eq(3) + end + + it "updates multiple counters atomically" do + thread_counter = ThreadCounter.insert_or_increment( + hostname: hostname, + process_id: process_id, + thread_id: thread_id, + publishing_count: 1, + published_count: 2, + failed_count: 3 + ) + + row = ThreadCounter.find(thread_counter[:id]) + expect(row.publishing_count).to eq(1) + expect(row.published_count).to eq(2) + expect(row.failed_count).to eq(3) + end + + it "does not create duplicate rows on repeated calls" do + 3.times do + ThreadCounter.insert_or_increment( + hostname: hostname, + process_id: process_id, + thread_id: thread_id, + queued_count: 1 + ) + end + + count = ThreadCounter.where( + hostname: hostname, + process_id: process_id, + thread_id: thread_id + ).count + + expect(count).to eq(1) + end + end + end + end +end diff --git a/tasks/database.rake b/tasks/database.rake index 7397a1d2..f3e0b3c1 100644 --- a/tasks/database.rake +++ b/tasks/database.rake @@ -28,6 +28,9 @@ namespace :outboxer do db_config = Outboxer::Database.config(environment: environment, pool: 1) ActiveRecord::Base.establish_connection(db_config) + require_relative "../db/migrate/create_outboxer_thread_counters" + CreateOutboxerThreadCounters.new.up + require_relative "../db/migrate/create_outboxer_messages" CreateOutboxerMessages.new.up From 15163ae301af424ff56fa016cdd0ec64742e4752 Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Sun, 26 Oct 2025 11:45:15 +1100 Subject: [PATCH 2/5] fix line formatting --- lib/outboxer/models/thread_counter.rb | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/outboxer/models/thread_counter.rb b/lib/outboxer/models/thread_counter.rb index 4c5de076..ec9ec0a3 100644 --- a/lib/outboxer/models/thread_counter.rb +++ b/lib/outboxer/models/thread_counter.rb @@ -18,7 +18,8 @@ def self.insert_or_increment(hostname: Socket.gethostname, (hostname, process_id, thread_id, queued_count, publishing_count, published_count, failed_count, created_at, updated_at) - VALUES (?, ?, ?, #{queued_count}, #{publishing_count}, #{published_count}, #{failed_count}, ?, ?) + VALUES (?, ?, ?, #{queued_count}, #{publishing_count}, + #{published_count}, #{failed_count}, ?, ?) ON CONFLICT (hostname, process_id, thread_id) DO UPDATE SET queued_count = #{table_name}.queued_count + #{queued_count}, @@ -34,7 +35,8 @@ def self.insert_or_increment(hostname: Socket.gethostname, (hostname, process_id, thread_id, queued_count, publishing_count, published_count, failed_count, created_at, updated_at) - VALUES (?, ?, ?, #{queued_count}, #{publishing_count}, #{published_count}, #{failed_count}, ?, ?) + VALUES (?, ?, ?, #{queued_count}, #{publishing_count}, + #{published_count}, #{failed_count}, ?, ?) ON DUPLICATE KEY UPDATE queued_count = queued_count + #{queued_count}, publishing_count = publishing_count + #{publishing_count}, From 8284c651a45281e5926bf03a4cc5ad089f5e2b6c Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Sun, 26 Oct 2025 11:56:05 +1100 Subject: [PATCH 3/5] fix build --- lib/outboxer/models/thread_counter.rb | 6 +-- .../outboxer/models/thread_counter_spec.rb | 45 ++++++++++++------- 2 files changed, 30 insertions(+), 21 deletions(-) diff --git a/lib/outboxer/models/thread_counter.rb b/lib/outboxer/models/thread_counter.rb index ec9ec0a3..398a885d 100644 --- a/lib/outboxer/models/thread_counter.rb +++ b/lib/outboxer/models/thread_counter.rb @@ -27,7 +27,6 @@ def self.insert_or_increment(hostname: Socket.gethostname, published_count = #{table_name}.published_count + #{published_count}, failed_count = #{table_name}.failed_count + #{failed_count}, updated_at = EXCLUDED.updated_at - RETURNING id SQL else <<~SQL @@ -43,17 +42,16 @@ def self.insert_or_increment(hostname: Socket.gethostname, published_count = published_count + #{published_count}, failed_count = failed_count + #{failed_count}, updated_at = VALUES(updated_at) - RETURNING id SQL end - result = ActiveRecord::Base.connection.exec_query( + ActiveRecord::Base.connection.exec_query( ActiveRecord::Base.sanitize_sql_array( [sql, hostname, process_id, thread_id, now, now] ) ) - { id: result.first["id"] } + nil end end end diff --git a/spec/lib/outboxer/models/thread_counter_spec.rb b/spec/lib/outboxer/models/thread_counter_spec.rb index c02564b3..a30b7979 100644 --- a/spec/lib/outboxer/models/thread_counter_spec.rb +++ b/spec/lib/outboxer/models/thread_counter_spec.rb @@ -9,47 +9,53 @@ module Models let(:thread_id) { 999 } let(:now) { Time.now.utc } - it "inserts a new row and returns its id" do - thread_counter = ThreadCounter.insert_or_increment( + it "inserts a new row successfully" do + expect do + ThreadCounter.insert_or_increment( + hostname: hostname, + process_id: process_id, + thread_id: thread_id, + queued_count: 1, + time: now + ) + end.to change(ThreadCounter, :count).by(1) + + row = ThreadCounter.find_by( hostname: hostname, process_id: process_id, - thread_id: thread_id, - queued_count: 1, - time: now + thread_id: thread_id ) - expect(thread_counter).to be_a(Hash) - expect(thread_counter).to include(:id) - expect(thread_counter[:id]).to be_present - - row = ThreadCounter.find(thread_counter[:id]) - expect(row.hostname).to eq(hostname) + expect(row).not_to be_nil expect(row.queued_count).to eq(1) end it "increments existing counts if called again" do - first = ThreadCounter.insert_or_increment( + ThreadCounter.insert_or_increment( hostname: hostname, process_id: process_id, thread_id: thread_id, queued_count: 1 ) - second = ThreadCounter.insert_or_increment( + ThreadCounter.insert_or_increment( hostname: hostname, process_id: process_id, thread_id: thread_id, queued_count: 2 ) - expect(second[:id]).to eq(first[:id]) + row = ThreadCounter.find_by( + hostname: hostname, + process_id: process_id, + thread_id: thread_id + ) - row = ThreadCounter.find(second[:id]) expect(row.queued_count).to eq(3) end it "updates multiple counters atomically" do - thread_counter = ThreadCounter.insert_or_increment( + ThreadCounter.insert_or_increment( hostname: hostname, process_id: process_id, thread_id: thread_id, @@ -58,7 +64,12 @@ module Models failed_count: 3 ) - row = ThreadCounter.find(thread_counter[:id]) + row = ThreadCounter.find_by( + hostname: hostname, + process_id: process_id, + thread_id: thread_id + ) + expect(row.publishing_count).to eq(1) expect(row.published_count).to eq(2) expect(row.failed_count).to eq(3) From 46b3f03d4f26fb82ee6f460708ede1edc924996b Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Sun, 26 Oct 2025 12:11:05 +1100 Subject: [PATCH 4/5] update method --- lib/outboxer/models/thread_counter.rb | 12 ++++++------ spec/lib/outboxer/models/thread_counter_spec.rb | 12 ++++++------ 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/lib/outboxer/models/thread_counter.rb b/lib/outboxer/models/thread_counter.rb index 398a885d..834efc74 100644 --- a/lib/outboxer/models/thread_counter.rb +++ b/lib/outboxer/models/thread_counter.rb @@ -3,12 +3,12 @@ module Models class ThreadCounter < ::ActiveRecord::Base self.table_name = "outboxer_thread_counters" - def self.insert_or_increment(hostname: Socket.gethostname, - process_id: Process.pid, - thread_id: Thread.current.object_id, - queued_count: 0, publishing_count: 0, - published_count: 0, failed_count: 0, - time: Time.now.utc) + def self.insert_or_increment_by(hostname: Socket.gethostname, + process_id: Process.pid, + thread_id: Thread.current.object_id, + queued_count: 0, publishing_count: 0, + published_count: 0, failed_count: 0, + time: Time.now.utc) adapter = ActiveRecord::Base.connection.adapter_name.downcase now = time.utc diff --git a/spec/lib/outboxer/models/thread_counter_spec.rb b/spec/lib/outboxer/models/thread_counter_spec.rb index a30b7979..427f4e8e 100644 --- a/spec/lib/outboxer/models/thread_counter_spec.rb +++ b/spec/lib/outboxer/models/thread_counter_spec.rb @@ -3,7 +3,7 @@ module Outboxer module Models RSpec.describe ThreadCounter, type: :model do - describe ".insert_or_increment" do + describe ".insert_or_increment_by" do let(:hostname) { "test-host" } let(:process_id) { 12_345 } let(:thread_id) { 999 } @@ -11,7 +11,7 @@ module Models it "inserts a new row successfully" do expect do - ThreadCounter.insert_or_increment( + ThreadCounter.insert_or_increment_by( hostname: hostname, process_id: process_id, thread_id: thread_id, @@ -31,14 +31,14 @@ module Models end it "increments existing counts if called again" do - ThreadCounter.insert_or_increment( + ThreadCounter.insert_or_increment_by( hostname: hostname, process_id: process_id, thread_id: thread_id, queued_count: 1 ) - ThreadCounter.insert_or_increment( + ThreadCounter.insert_or_increment_by( hostname: hostname, process_id: process_id, thread_id: thread_id, @@ -55,7 +55,7 @@ module Models end it "updates multiple counters atomically" do - ThreadCounter.insert_or_increment( + ThreadCounter.insert_or_increment_by( hostname: hostname, process_id: process_id, thread_id: thread_id, @@ -77,7 +77,7 @@ module Models it "does not create duplicate rows on repeated calls" do 3.times do - ThreadCounter.insert_or_increment( + ThreadCounter.insert_or_increment_by( hostname: hostname, process_id: process_id, thread_id: thread_id, From 174db93861e120fc9625140acd65cd417ca783e6 Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Sun, 26 Oct 2025 12:27:10 +1100 Subject: [PATCH 5/5] update schema --- db/migrate/create_outboxer_thread_counters.rb | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/db/migrate/create_outboxer_thread_counters.rb b/db/migrate/create_outboxer_thread_counters.rb index 1f06709d..527f028b 100644 --- a/db/migrate/create_outboxer_thread_counters.rb +++ b/db/migrate/create_outboxer_thread_counters.rb @@ -16,10 +16,7 @@ def up end add_index :outboxer_thread_counters, [:hostname, :process_id, :thread_id], - unique: true, name: "idx_outboxer_thread_identity" - - add_index :outboxer_thread_counters, - :updated_at, name: "idx_outboxer_thread_counters_updated_at" + unique: true, name: "idx_outboxer_thread_counters_identity" end def down