From a722e967471491102e7396de0c52fa2fbe301236 Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Sun, 7 Dec 2025 18:21:22 +1100 Subject: [PATCH 1/5] update message --- lib/outboxer/message.rb | 30 ++++++++++++------------------ 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/lib/outboxer/message.rb b/lib/outboxer/message.rb index d233b732..4c0c2f48 100644 --- a/lib/outboxer/message.rb +++ b/lib/outboxer/message.rb @@ -285,8 +285,9 @@ def published(id:, lock_version:, raise ActiveRecord::StaleObjectError.new(Models::Message.new(id: id), "destroy") end - exception_ids = Models::Exception.where(message_id: id).pluck(:id) - Models::Frame.where(exception_id: exception_ids).delete_all + Models::Frame + .where(exception_id: Models::Exception.select(:id).where(message_id: id)) + .delete_all Models::Exception.where(message_id: id).delete_all Models::Message.where(id: id).delete_all @@ -645,22 +646,15 @@ def rollup_counts(time: Time) ActiveRecord::Base.connection_pool.with_connection do ActiveRecord::Base.transaction do # 1. Ensure the historic thread exists (idempotent upsert) - begin - ActiveRecord::Base.transaction(requires_new: true) do - Models::Thread.create!( - hostname: Models::Thread::HISTORIC_HOSTNAME, - process_id: Models::Thread::HISTORIC_PROCESS_ID, - thread_id: Models::Thread::HISTORIC_THREAD_ID, - queued_message_count: 0, - publishing_message_count: 0, - published_message_count: 0, - failed_message_count: 0, - created_at: current_utc_time, - updated_at: current_utc_time) - end - rescue ActiveRecord::RecordNotUnique - # no op - end + Models::Thread.update_message_counts_by!( + hostname: Models::Thread::HISTORIC_HOSTNAME, + process_id: Models::Thread::HISTORIC_PROCESS_ID, + thread_id: Models::Thread::HISTORIC_THREAD_ID, + queued_message_count: 0, + publishing_message_count: 0, + published_message_count: 0, + failed_message_count: 0, + current_utc_time: current_utc_time) # 2. Lock *all* rows (historic thread + thread threads) locked_threads = Models::Thread.lock("FOR UPDATE").to_a From d6b7963fe1dde2e8401d4a79fd3b44b8d8edc6cd Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Sun, 7 Dec 2025 18:32:40 +1100 Subject: [PATCH 2/5] more optimisations --- db/migrate/create_outboxer_exceptions.rb | 13 ++++++------- db/migrate/create_outboxer_frames.rb | 3 ++- lib/outboxer/message.rb | 11 +++-------- 3 files changed, 11 insertions(+), 16 deletions(-) diff --git a/db/migrate/create_outboxer_exceptions.rb b/db/migrate/create_outboxer_exceptions.rb index d5dae1a1..8555f14c 100644 --- a/db/migrate/create_outboxer_exceptions.rb +++ b/db/migrate/create_outboxer_exceptions.rb @@ -1,14 +1,13 @@ class CreateOutboxerExceptions < ActiveRecord::Migration[6.1] def up - ActiveRecord::Base.transaction do - create_table :outboxer_exceptions do |t| - t.string :class_name, limit: 255 - t.text :message_text, null: false + create_table :outboxer_exceptions do |t| + t.string :class_name, limit: 255 + t.text :message_text, null: false - t.datetime :created_at, null: false + t.datetime :created_at, null: false - t.references :message, foreign_key: { to_table: :outboxer_messages }, null: false - end + t.references :message, null: false, + foreign_key: { to_table: :outboxer_messages, on_delete: :cascade } end end diff --git a/db/migrate/create_outboxer_frames.rb b/db/migrate/create_outboxer_frames.rb index 5aa9cd58..1b1e7bbd 100644 --- a/db/migrate/create_outboxer_frames.rb +++ b/db/migrate/create_outboxer_frames.rb @@ -4,7 +4,8 @@ def up t.integer :index, null: false t.text :text, null: false - t.references :exception, foreign_key: { to_table: :outboxer_exceptions }, null: false + t.references :exception, null: false, + foreign_key: { to_table: :outboxer_exceptions, on_delete: :cascade } t.index [:exception_id, :index], unique: true end diff --git a/lib/outboxer/message.rb b/lib/outboxer/message.rb index 4c0c2f48..4c276444 100644 --- a/lib/outboxer/message.rb +++ b/lib/outboxer/message.rb @@ -285,10 +285,6 @@ def published(id:, lock_version:, raise ActiveRecord::StaleObjectError.new(Models::Message.new(id: id), "destroy") end - Models::Frame - .where(exception_id: Models::Exception.select(:id).where(message_id: id)) - .delete_all - Models::Exception.where(message_id: id).delete_all Models::Message.where(id: id).delete_all Models::Thread.update_message_counts_by!( @@ -457,13 +453,12 @@ def delete(id:, lock_version:, ActiveRecord::Base.connection_pool.with_connection do ActiveRecord::Base.transaction do - message = Models::Message.includes(exceptions: :frames).lock.find_by!(id: id) + message = Models::Message.lock.find_by!(id: id) message.update!(lock_version: lock_version, updated_at: current_utc_time) - - message.exceptions.each { |exception| exception.frames.each(&:delete) } - message.exceptions.delete_all message.delete + Models::Message.lock.where(id: id, lock_version: lock_version).delete_all + Models::Thread.update_message_counts_by!( hostname: hostname, process_id: process_id, From 457b7585a2ffe75681458d8da949a61b9bacd903 Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Sun, 7 Dec 2025 18:48:07 +1100 Subject: [PATCH 3/5] commit optimisations --- lib/outboxer/models/thread.rb | 72 +++++++++++++++++++++-------------- 1 file changed, 44 insertions(+), 28 deletions(-) diff --git a/lib/outboxer/models/thread.rb b/lib/outboxer/models/thread.rb index 5084c85b..6bce1d9b 100644 --- a/lib/outboxer/models/thread.rb +++ b/lib/outboxer/models/thread.rb @@ -3,10 +3,25 @@ module Models class Thread < ActiveRecord::Base self.table_name = "outboxer_threads" - HISTORIC_HOSTNAME = "historic" + HISTORIC_HOSTNAME = "historic".freeze HISTORIC_PROCESS_ID = 0 HISTORIC_THREAD_ID = 0 + STATUS_COLUMNS = { + queued: ["queued_message_count", "queued_message_count_last_updated_at"], + publishing: ["publishing_message_count", "publishing_message_count_last_updated_at"], + published: ["published_message_count", "published_message_count_last_updated_at"], + failed: ["failed_message_count", "failed_message_count_last_updated_at"] + }.freeze + + BASE_INSERT_COLUMNS = %w[ + hostname + process_id + thread_id + created_at + updated_at + ].freeze + def self.update_message_counts_by!( hostname: Socket.gethostname, process_id: Process.pid, @@ -19,43 +34,44 @@ def self.update_message_counts_by!( ) is_postgres = connection.adapter_name.downcase.include?("postgres") - insert_columns = %w[hostname process_id thread_id created_at updated_at] - insert_values = [hostname, process_id, thread_id, current_utc_time, current_utc_time] + insert_columns = BASE_INSERT_COLUMNS.dup + insert_values = [hostname, process_id, thread_id, current_utc_time, current_utc_time] update_columns = [] update_values = [] - [ - [:queued, queued_message_count], - [:publishing, publishing_message_count], - [:published, published_message_count], - [:failed, failed_message_count] - ] - .reject { |_name, message_count| message_count.to_i == 0 } - .each do |name, message_count| - message_count_column = "#{name}_message_count" - message_count_last_updated_at_column = "#{name}_message_count_last_updated_at" + { + queued: queued_message_count, + publishing: publishing_message_count, + published: published_message_count, + failed: failed_message_count + }.each do |name, message_count| + next if message_count.to_i == 0 - insert_columns << message_count_column - insert_columns << message_count_last_updated_at_column - insert_values << message_count - insert_values << current_utc_time + message_count_column, + last_updated_column = STATUS_COLUMNS.fetch(name) - if is_postgres - update_columns << - "#{message_count_column} = #{table_name}.#{message_count_column} + " \ - "EXCLUDED.#{message_count_column}" - else - update_columns << "#{message_count_column} = #{message_count_column} + ?" - update_values << message_count - end + insert_columns << message_count_column + insert_columns << last_updated_column + insert_values << message_count + insert_values << current_utc_time - update_columns << "#{message_count_last_updated_at_column} = ?" - update_values << current_utc_time + if is_postgres + update_columns << + "#{message_count_column} = " \ + "#{table_name}.#{message_count_column} + " \ + "EXCLUDED.#{message_count_column}" + else + update_columns << "#{message_count_column} = #{message_count_column} + ?" + update_values << message_count end + update_columns << "#{last_updated_column} = ?" + update_values << current_utc_time + end + update_columns << "updated_at = ?" - update_values << current_utc_time + update_values << current_utc_time insert_sql = <<~SQL INSERT INTO #{table_name} (#{insert_columns.join(", ")}) From 4b134091545f563d5625584bcfa18b93b13315b0 Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Sun, 7 Dec 2025 18:49:33 +1100 Subject: [PATCH 4/5] improve CPU optimisation --- lib/outboxer/models/thread.rb | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/lib/outboxer/models/thread.rb b/lib/outboxer/models/thread.rb index 6bce1d9b..5b5c2293 100644 --- a/lib/outboxer/models/thread.rb +++ b/lib/outboxer/models/thread.rb @@ -8,10 +8,10 @@ class Thread < ActiveRecord::Base HISTORIC_THREAD_ID = 0 STATUS_COLUMNS = { - queued: ["queued_message_count", "queued_message_count_last_updated_at"], + queued: ["queued_message_count", "queued_message_count_last_updated_at"], publishing: ["publishing_message_count", "publishing_message_count_last_updated_at"], - published: ["published_message_count", "published_message_count_last_updated_at"], - failed: ["failed_message_count", "failed_message_count_last_updated_at"] + published: ["published_message_count", "published_message_count_last_updated_at"], + failed: ["failed_message_count", "failed_message_count_last_updated_at"] }.freeze BASE_INSERT_COLUMNS = %w[ @@ -41,15 +41,14 @@ def self.update_message_counts_by!( update_values = [] { - queued: queued_message_count, + queued: queued_message_count, publishing: publishing_message_count, - published: published_message_count, - failed: failed_message_count + published: published_message_count, + failed: failed_message_count }.each do |name, message_count| next if message_count.to_i == 0 - message_count_column, - last_updated_column = STATUS_COLUMNS.fetch(name) + message_count_column, last_updated_column = STATUS_COLUMNS.fetch(name) insert_columns << message_count_column insert_columns << last_updated_column From 1b5f3e1ebbb21abfbbb65c113a47e0a216b7a652 Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Sun, 7 Dec 2025 19:42:27 +1100 Subject: [PATCH 5/5] fix e2e --- quickstart_e2e_tests.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quickstart_e2e_tests.sh b/quickstart_e2e_tests.sh index 15f05a9c..16853bbf 100755 --- a/quickstart_e2e_tests.sh +++ b/quickstart_e2e_tests.sh @@ -42,7 +42,7 @@ bundle exec rails new . \ bundle install bundle exec rails db:create -echo 'gem "outboxer", git: "https://github.com/fast-programmer/outboxer.git", branch: "refactor/rename_thread_message_columns"' \ +echo 'gem "outboxer", git: "https://github.com/fast-programmer/outboxer.git", branch: "optimisation/increase_throughput"' \ >> Gemfile bundle install bundle exec rails generate outboxer:install