diff --git a/db/migrate/create_outboxer_thread_counters.rb b/db/migrate/create_outboxer_thread_counters.rb new file mode 100644 index 00000000..527f028b --- /dev/null +++ b/db/migrate/create_outboxer_thread_counters.rb @@ -0,0 +1,25 @@ +class CreateOutboxerThreadCounters < ActiveRecord::Migration[6.1] + def up + create_table :outboxer_thread_counters do |t| + t.string :hostname, limit: 255, null: false + t.integer :process_id, null: false + t.integer :thread_id, null: false + + t.integer :publisher_id + + t.integer :queued_count, null: false + t.integer :publishing_count, null: false + t.integer :published_count, null: false + t.integer :failed_count, null: false + + t.timestamps + end + + add_index :outboxer_thread_counters, [:hostname, :process_id, :thread_id], + unique: true, name: "idx_outboxer_thread_counters_identity" + end + + def down + drop_table :outboxer_thread_counters if table_exists?(:outboxer_thread_counters) + end +end diff --git a/lib/outboxer.rb b/lib/outboxer.rb index 432720ac..3d5bdeb0 100644 --- a/lib/outboxer.rb +++ b/lib/outboxer.rb @@ -10,6 +10,7 @@ require_relative "outboxer/models/message" require_relative "outboxer/models/message_count" require_relative "outboxer/models/message_total" +require_relative "outboxer/models/thread_counter" require_relative "outboxer/models/publisher" require_relative "outboxer/models/signal" diff --git a/lib/outboxer/models/thread_counter.rb b/lib/outboxer/models/thread_counter.rb new file mode 100644 index 00000000..834efc74 --- /dev/null +++ b/lib/outboxer/models/thread_counter.rb @@ -0,0 +1,58 @@ +module Outboxer + module Models + class ThreadCounter < ::ActiveRecord::Base + self.table_name = "outboxer_thread_counters" + + def self.insert_or_increment_by(hostname: Socket.gethostname, + process_id: Process.pid, + thread_id: Thread.current.object_id, + queued_count: 0, publishing_count: 0, + published_count: 0, failed_count: 0, + time: Time.now.utc) + adapter = ActiveRecord::Base.connection.adapter_name.downcase + now = time.utc + + sql = if adapter.include?("postgres") + <<~SQL + INSERT INTO #{table_name} + (hostname, process_id, thread_id, + queued_count, publishing_count, published_count, failed_count, + created_at, updated_at) + VALUES (?, ?, ?, #{queued_count}, #{publishing_count}, + #{published_count}, #{failed_count}, ?, ?) + ON CONFLICT (hostname, process_id, thread_id) + DO UPDATE SET + queued_count = #{table_name}.queued_count + #{queued_count}, + publishing_count = #{table_name}.publishing_count + #{publishing_count}, + published_count = #{table_name}.published_count + #{published_count}, + failed_count = #{table_name}.failed_count + #{failed_count}, + updated_at = EXCLUDED.updated_at + SQL + else + <<~SQL + INSERT INTO #{table_name} + (hostname, process_id, thread_id, + queued_count, publishing_count, published_count, failed_count, + created_at, updated_at) + VALUES (?, ?, ?, #{queued_count}, #{publishing_count}, + #{published_count}, #{failed_count}, ?, ?) + ON DUPLICATE KEY UPDATE + queued_count = queued_count + #{queued_count}, + publishing_count = publishing_count + #{publishing_count}, + published_count = published_count + #{published_count}, + failed_count = failed_count + #{failed_count}, + updated_at = VALUES(updated_at) + SQL + end + + ActiveRecord::Base.connection.exec_query( + ActiveRecord::Base.sanitize_sql_array( + [sql, hostname, process_id, thread_id, now, now] + ) + ) + + nil + end + end + end +end diff --git a/spec/lib/outboxer/models/thread_counter_spec.rb b/spec/lib/outboxer/models/thread_counter_spec.rb new file mode 100644 index 00000000..427f4e8e --- /dev/null +++ b/spec/lib/outboxer/models/thread_counter_spec.rb @@ -0,0 +1,99 @@ +require "rails_helper" + +module Outboxer + module Models + RSpec.describe ThreadCounter, type: :model do + describe ".insert_or_increment_by" do + let(:hostname) { "test-host" } + let(:process_id) { 12_345 } + let(:thread_id) { 999 } + let(:now) { Time.now.utc } + + it "inserts a new row successfully" do + expect do + ThreadCounter.insert_or_increment_by( + hostname: hostname, + process_id: process_id, + thread_id: thread_id, + queued_count: 1, + time: now + ) + end.to change(ThreadCounter, :count).by(1) + + row = ThreadCounter.find_by( + hostname: hostname, + process_id: process_id, + thread_id: thread_id + ) + + expect(row).not_to be_nil + expect(row.queued_count).to eq(1) + end + + it "increments existing counts if called again" do + ThreadCounter.insert_or_increment_by( + hostname: hostname, + process_id: process_id, + thread_id: thread_id, + queued_count: 1 + ) + + ThreadCounter.insert_or_increment_by( + hostname: hostname, + process_id: process_id, + thread_id: thread_id, + queued_count: 2 + ) + + row = ThreadCounter.find_by( + hostname: hostname, + process_id: process_id, + thread_id: thread_id + ) + + expect(row.queued_count).to eq(3) + end + + it "updates multiple counters atomically" do + ThreadCounter.insert_or_increment_by( + hostname: hostname, + process_id: process_id, + thread_id: thread_id, + publishing_count: 1, + published_count: 2, + failed_count: 3 + ) + + row = ThreadCounter.find_by( + hostname: hostname, + process_id: process_id, + thread_id: thread_id + ) + + expect(row.publishing_count).to eq(1) + expect(row.published_count).to eq(2) + expect(row.failed_count).to eq(3) + end + + it "does not create duplicate rows on repeated calls" do + 3.times do + ThreadCounter.insert_or_increment_by( + hostname: hostname, + process_id: process_id, + thread_id: thread_id, + queued_count: 1 + ) + end + + count = ThreadCounter.where( + hostname: hostname, + process_id: process_id, + thread_id: thread_id + ).count + + expect(count).to eq(1) + end + end + end + end +end diff --git a/tasks/database.rake b/tasks/database.rake index 7397a1d2..f3e0b3c1 100644 --- a/tasks/database.rake +++ b/tasks/database.rake @@ -28,6 +28,9 @@ namespace :outboxer do db_config = Outboxer::Database.config(environment: environment, pool: 1) ActiveRecord::Base.establish_connection(db_config) + require_relative "../db/migrate/create_outboxer_thread_counters" + CreateOutboxerThreadCounters.new.up + require_relative "../db/migrate/create_outboxer_messages" CreateOutboxerMessages.new.up