From a722e967471491102e7396de0c52fa2fbe301236 Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Sun, 7 Dec 2025 18:21:22 +1100 Subject: [PATCH 1/8] update message --- lib/outboxer/message.rb | 30 ++++++++++++------------------ 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/lib/outboxer/message.rb b/lib/outboxer/message.rb index d233b732..4c0c2f48 100644 --- a/lib/outboxer/message.rb +++ b/lib/outboxer/message.rb @@ -285,8 +285,9 @@ def published(id:, lock_version:, raise ActiveRecord::StaleObjectError.new(Models::Message.new(id: id), "destroy") end - exception_ids = Models::Exception.where(message_id: id).pluck(:id) - Models::Frame.where(exception_id: exception_ids).delete_all + Models::Frame + .where(exception_id: Models::Exception.select(:id).where(message_id: id)) + .delete_all Models::Exception.where(message_id: id).delete_all Models::Message.where(id: id).delete_all @@ -645,22 +646,15 @@ def rollup_counts(time: Time) ActiveRecord::Base.connection_pool.with_connection do ActiveRecord::Base.transaction do # 1. Ensure the historic thread exists (idempotent upsert) - begin - ActiveRecord::Base.transaction(requires_new: true) do - Models::Thread.create!( - hostname: Models::Thread::HISTORIC_HOSTNAME, - process_id: Models::Thread::HISTORIC_PROCESS_ID, - thread_id: Models::Thread::HISTORIC_THREAD_ID, - queued_message_count: 0, - publishing_message_count: 0, - published_message_count: 0, - failed_message_count: 0, - created_at: current_utc_time, - updated_at: current_utc_time) - end - rescue ActiveRecord::RecordNotUnique - # no op - end + Models::Thread.update_message_counts_by!( + hostname: Models::Thread::HISTORIC_HOSTNAME, + process_id: Models::Thread::HISTORIC_PROCESS_ID, + thread_id: Models::Thread::HISTORIC_THREAD_ID, + queued_message_count: 0, + publishing_message_count: 0, + published_message_count: 0, + failed_message_count: 0, + current_utc_time: current_utc_time) # 2. Lock *all* rows (historic thread + thread threads) locked_threads = Models::Thread.lock("FOR UPDATE").to_a From d6b7963fe1dde2e8401d4a79fd3b44b8d8edc6cd Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Sun, 7 Dec 2025 18:32:40 +1100 Subject: [PATCH 2/8] more optimisations --- db/migrate/create_outboxer_exceptions.rb | 13 ++++++------- db/migrate/create_outboxer_frames.rb | 3 ++- lib/outboxer/message.rb | 11 +++-------- 3 files changed, 11 insertions(+), 16 deletions(-) diff --git a/db/migrate/create_outboxer_exceptions.rb b/db/migrate/create_outboxer_exceptions.rb index d5dae1a1..8555f14c 100644 --- a/db/migrate/create_outboxer_exceptions.rb +++ b/db/migrate/create_outboxer_exceptions.rb @@ -1,14 +1,13 @@ class CreateOutboxerExceptions < ActiveRecord::Migration[6.1] def up - ActiveRecord::Base.transaction do - create_table :outboxer_exceptions do |t| - t.string :class_name, limit: 255 - t.text :message_text, null: false + create_table :outboxer_exceptions do |t| + t.string :class_name, limit: 255 + t.text :message_text, null: false - t.datetime :created_at, null: false + t.datetime :created_at, null: false - t.references :message, foreign_key: { to_table: :outboxer_messages }, null: false - end + t.references :message, null: false, + foreign_key: { to_table: :outboxer_messages, on_delete: :cascade } end end diff --git a/db/migrate/create_outboxer_frames.rb b/db/migrate/create_outboxer_frames.rb index 5aa9cd58..1b1e7bbd 100644 --- a/db/migrate/create_outboxer_frames.rb +++ b/db/migrate/create_outboxer_frames.rb @@ -4,7 +4,8 @@ def up t.integer :index, null: false t.text :text, null: false - t.references :exception, foreign_key: { to_table: :outboxer_exceptions }, null: false + t.references :exception, null: false, + foreign_key: { to_table: :outboxer_exceptions, on_delete: :cascade } t.index [:exception_id, :index], unique: true end diff --git a/lib/outboxer/message.rb b/lib/outboxer/message.rb index 4c0c2f48..4c276444 100644 --- a/lib/outboxer/message.rb +++ b/lib/outboxer/message.rb @@ -285,10 +285,6 @@ def published(id:, lock_version:, raise ActiveRecord::StaleObjectError.new(Models::Message.new(id: id), "destroy") end - Models::Frame - .where(exception_id: Models::Exception.select(:id).where(message_id: id)) - .delete_all - Models::Exception.where(message_id: id).delete_all Models::Message.where(id: id).delete_all Models::Thread.update_message_counts_by!( @@ -457,13 +453,12 @@ def delete(id:, lock_version:, ActiveRecord::Base.connection_pool.with_connection do ActiveRecord::Base.transaction do - message = Models::Message.includes(exceptions: :frames).lock.find_by!(id: id) + message = Models::Message.lock.find_by!(id: id) message.update!(lock_version: lock_version, updated_at: current_utc_time) - - message.exceptions.each { |exception| exception.frames.each(&:delete) } - message.exceptions.delete_all message.delete + Models::Message.lock.where(id: id, lock_version: lock_version).delete_all + Models::Thread.update_message_counts_by!( hostname: hostname, process_id: process_id, From 457b7585a2ffe75681458d8da949a61b9bacd903 Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Sun, 7 Dec 2025 18:48:07 +1100 Subject: [PATCH 3/8] commit optimisations --- lib/outboxer/models/thread.rb | 72 +++++++++++++++++++++-------------- 1 file changed, 44 insertions(+), 28 deletions(-) diff --git a/lib/outboxer/models/thread.rb b/lib/outboxer/models/thread.rb index 5084c85b..6bce1d9b 100644 --- a/lib/outboxer/models/thread.rb +++ b/lib/outboxer/models/thread.rb @@ -3,10 +3,25 @@ module Models class Thread < ActiveRecord::Base self.table_name = "outboxer_threads" - HISTORIC_HOSTNAME = "historic" + HISTORIC_HOSTNAME = "historic".freeze HISTORIC_PROCESS_ID = 0 HISTORIC_THREAD_ID = 0 + STATUS_COLUMNS = { + queued: ["queued_message_count", "queued_message_count_last_updated_at"], + publishing: ["publishing_message_count", "publishing_message_count_last_updated_at"], + published: ["published_message_count", "published_message_count_last_updated_at"], + failed: ["failed_message_count", "failed_message_count_last_updated_at"] + }.freeze + + BASE_INSERT_COLUMNS = %w[ + hostname + process_id + thread_id + created_at + updated_at + ].freeze + def self.update_message_counts_by!( hostname: Socket.gethostname, process_id: Process.pid, @@ -19,43 +34,44 @@ def self.update_message_counts_by!( ) is_postgres = connection.adapter_name.downcase.include?("postgres") - insert_columns = %w[hostname process_id thread_id created_at updated_at] - insert_values = [hostname, process_id, thread_id, current_utc_time, current_utc_time] + insert_columns = BASE_INSERT_COLUMNS.dup + insert_values = [hostname, process_id, thread_id, current_utc_time, current_utc_time] update_columns = [] update_values = [] - [ - [:queued, queued_message_count], - [:publishing, publishing_message_count], - [:published, published_message_count], - [:failed, failed_message_count] - ] - .reject { |_name, message_count| message_count.to_i == 0 } - .each do |name, message_count| - message_count_column = "#{name}_message_count" - message_count_last_updated_at_column = "#{name}_message_count_last_updated_at" + { + queued: queued_message_count, + publishing: publishing_message_count, + published: published_message_count, + failed: failed_message_count + }.each do |name, message_count| + next if message_count.to_i == 0 - insert_columns << message_count_column - insert_columns << message_count_last_updated_at_column - insert_values << message_count - insert_values << current_utc_time + message_count_column, + last_updated_column = STATUS_COLUMNS.fetch(name) - if is_postgres - update_columns << - "#{message_count_column} = #{table_name}.#{message_count_column} + " \ - "EXCLUDED.#{message_count_column}" - else - update_columns << "#{message_count_column} = #{message_count_column} + ?" - update_values << message_count - end + insert_columns << message_count_column + insert_columns << last_updated_column + insert_values << message_count + insert_values << current_utc_time - update_columns << "#{message_count_last_updated_at_column} = ?" - update_values << current_utc_time + if is_postgres + update_columns << + "#{message_count_column} = " \ + "#{table_name}.#{message_count_column} + " \ + "EXCLUDED.#{message_count_column}" + else + update_columns << "#{message_count_column} = #{message_count_column} + ?" + update_values << message_count end + update_columns << "#{last_updated_column} = ?" + update_values << current_utc_time + end + update_columns << "updated_at = ?" - update_values << current_utc_time + update_values << current_utc_time insert_sql = <<~SQL INSERT INTO #{table_name} (#{insert_columns.join(", ")}) From 4b134091545f563d5625584bcfa18b93b13315b0 Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Sun, 7 Dec 2025 18:49:33 +1100 Subject: [PATCH 4/8] improve CPU optimisation --- lib/outboxer/models/thread.rb | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/lib/outboxer/models/thread.rb b/lib/outboxer/models/thread.rb index 6bce1d9b..5b5c2293 100644 --- a/lib/outboxer/models/thread.rb +++ b/lib/outboxer/models/thread.rb @@ -8,10 +8,10 @@ class Thread < ActiveRecord::Base HISTORIC_THREAD_ID = 0 STATUS_COLUMNS = { - queued: ["queued_message_count", "queued_message_count_last_updated_at"], + queued: ["queued_message_count", "queued_message_count_last_updated_at"], publishing: ["publishing_message_count", "publishing_message_count_last_updated_at"], - published: ["published_message_count", "published_message_count_last_updated_at"], - failed: ["failed_message_count", "failed_message_count_last_updated_at"] + published: ["published_message_count", "published_message_count_last_updated_at"], + failed: ["failed_message_count", "failed_message_count_last_updated_at"] }.freeze BASE_INSERT_COLUMNS = %w[ @@ -41,15 +41,14 @@ def self.update_message_counts_by!( update_values = [] { - queued: queued_message_count, + queued: queued_message_count, publishing: publishing_message_count, - published: published_message_count, - failed: failed_message_count + published: published_message_count, + failed: failed_message_count }.each do |name, message_count| next if message_count.to_i == 0 - message_count_column, - last_updated_column = STATUS_COLUMNS.fetch(name) + message_count_column, last_updated_column = STATUS_COLUMNS.fetch(name) insert_columns << message_count_column insert_columns << last_updated_column From 919acbe28ebe0b11dfbfb7b03ca90ee67ce12685 Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Sun, 7 Dec 2025 18:55:47 +1100 Subject: [PATCH 5/8] commit --- lib/outboxer/models/thread.rb | 209 +++++++++++++++++++++++----------- 1 file changed, 140 insertions(+), 69 deletions(-) diff --git a/lib/outboxer/models/thread.rb b/lib/outboxer/models/thread.rb index 5b5c2293..086a3910 100644 --- a/lib/outboxer/models/thread.rb +++ b/lib/outboxer/models/thread.rb @@ -7,20 +7,128 @@ class Thread < ActiveRecord::Base HISTORIC_PROCESS_ID = 0 HISTORIC_THREAD_ID = 0 - STATUS_COLUMNS = { - queued: ["queued_message_count", "queued_message_count_last_updated_at"], - publishing: ["publishing_message_count", "publishing_message_count_last_updated_at"], - published: ["published_message_count", "published_message_count_last_updated_at"], - failed: ["failed_message_count", "failed_message_count_last_updated_at"] - }.freeze + POSTGRES_SQL = <<~SQL.freeze + INSERT INTO outboxer_threads ( + hostname, + process_id, + thread_id, + queued_message_count, + queued_message_count_last_updated_at, + publishing_message_count, + publishing_message_count_last_updated_at, + published_message_count, + published_message_count_last_updated_at, + failed_message_count, + failed_message_count_last_updated_at, + created_at, + updated_at + ) + VALUES ( + $1, $2, $3, + $4, $5, + $6, $7, + $8, $9, + $10, $11, + $12, $13 + ) + ON CONFLICT (hostname, process_id, thread_id) + DO UPDATE SET + queued_message_count = + outboxer_threads.queued_message_count + + EXCLUDED.queued_message_count, + queued_message_count_last_updated_at = + CASE + WHEN EXCLUDED.queued_message_count != 0 + THEN EXCLUDED.queued_message_count_last_updated_at + ELSE outboxer_threads.queued_message_count_last_updated_at + END, + publishing_message_count = + outboxer_threads.publishing_message_count + + EXCLUDED.publishing_message_count, + publishing_message_count_last_updated_at = + CASE + WHEN EXCLUDED.publishing_message_count != 0 + THEN EXCLUDED.publishing_message_count_last_updated_at + ELSE outboxer_threads.publishing_message_count_last_updated_at + END, + published_message_count = + outboxer_threads.published_message_count + + EXCLUDED.published_message_count, + published_message_count_last_updated_at = + CASE + WHEN EXCLUDED.published_message_count != 0 + THEN EXCLUDED.published_message_count_last_updated_at + ELSE outboxer_threads.published_message_count_last_updated_at + END, + failed_message_count = + outboxer_threads.failed_message_count + + EXCLUDED.failed_message_count, + failed_message_count_last_updated_at = + CASE + WHEN EXCLUDED.failed_message_count != 0 + THEN EXCLUDED.failed_message_count_last_updated_at + ELSE outboxer_threads.failed_message_count_last_updated_at + END, + updated_at = EXCLUDED.updated_at + SQL - BASE_INSERT_COLUMNS = %w[ - hostname - process_id - thread_id - created_at - updated_at - ].freeze + MYSQL_SQL = <<~SQL.freeze + INSERT INTO outboxer_threads ( + hostname, + process_id, + thread_id, + queued_message_count, + queued_message_count_last_updated_at, + publishing_message_count, + publishing_message_count_last_updated_at, + published_message_count, + published_message_count_last_updated_at, + failed_message_count, + failed_message_count_last_updated_at, + created_at, + updated_at + ) + VALUES ( + ?, ?, ?, + ?, ?, ?, ?, ?, ?, ?, ?, ?, ? + ) + ON DUPLICATE KEY UPDATE + queued_message_count = + queued_message_count + VALUES(queued_message_count), + queued_message_count_last_updated_at = + IF( + VALUES(queued_message_count) != 0, + VALUES(queued_message_count_last_updated_at), + queued_message_count_last_updated_at + ), + publishing_message_count = + publishing_message_count + + VALUES(publishing_message_count), + publishing_message_count_last_updated_at = + IF( + VALUES(publishing_message_count) != 0, + VALUES(publishing_message_count_last_updated_at), + publishing_message_count_last_updated_at + ), + published_message_count = + published_message_count + + VALUES(published_message_count), + published_message_count_last_updated_at = + IF( + VALUES(published_message_count) != 0, + VALUES(published_message_count_last_updated_at), + published_message_count_last_updated_at + ), + failed_message_count = + failed_message_count + VALUES(failed_message_count), + failed_message_count_last_updated_at = + IF( + VALUES(failed_message_count) != 0, + VALUES(failed_message_count_last_updated_at), + failed_message_count_last_updated_at + ), + updated_at = VALUES(updated_at) + SQL def self.update_message_counts_by!( hostname: Socket.gethostname, @@ -32,70 +140,33 @@ def self.update_message_counts_by!( failed_message_count: 0, current_utc_time: Time.now.utc ) - is_postgres = connection.adapter_name.downcase.include?("postgres") + adapter_name = connection.adapter_name.downcase + is_postgres = adapter_name.include?("postgres") - insert_columns = BASE_INSERT_COLUMNS.dup - insert_values = [hostname, process_id, thread_id, current_utc_time, current_utc_time] + values = [ + hostname, + process_id, + thread_id, - update_columns = [] - update_values = [] + queued_message_count, + queued_message_count.to_i != 0 ? current_utc_time : nil, - { - queued: queued_message_count, - publishing: publishing_message_count, - published: published_message_count, - failed: failed_message_count - }.each do |name, message_count| - next if message_count.to_i == 0 + publishing_message_count, + publishing_message_count.to_i != 0 ? current_utc_time : nil, - message_count_column, last_updated_column = STATUS_COLUMNS.fetch(name) + published_message_count, + published_message_count.to_i != 0 ? current_utc_time : nil, - insert_columns << message_count_column - insert_columns << last_updated_column - insert_values << message_count - insert_values << current_utc_time + failed_message_count, + failed_message_count.to_i != 0 ? current_utc_time : nil, - if is_postgres - update_columns << - "#{message_count_column} = " \ - "#{table_name}.#{message_count_column} + " \ - "EXCLUDED.#{message_count_column}" - else - update_columns << "#{message_count_column} = #{message_count_column} + ?" - update_values << message_count - end + current_utc_time, + current_utc_time + ] - update_columns << "#{last_updated_column} = ?" - update_values << current_utc_time - end + sql = is_postgres ? POSTGRES_SQL : MYSQL_SQL - update_columns << "updated_at = ?" - update_values << current_utc_time - - insert_sql = <<~SQL - INSERT INTO #{table_name} (#{insert_columns.join(", ")}) - VALUES (#{(["?"] * insert_columns.length).join(", ")}) - SQL - - sql = - if is_postgres - <<~SQL - #{insert_sql} - ON CONFLICT (hostname, process_id, thread_id) - DO UPDATE SET - #{update_columns.join(",\n ")} - SQL - else - <<~SQL - #{insert_sql} - ON DUPLICATE KEY UPDATE - #{update_columns.join(",\n ")} - SQL - end - - connection.exec_query( - sanitize_sql_array([sql, *insert_values, *update_values]) - ) + connection.exec_query(sql, "Outboxer::Thread", values) end end end From b3271d0a47c501148aec5fbe24f55b2447fee57d Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Sun, 7 Dec 2025 19:03:52 +1100 Subject: [PATCH 6/8] further optimisations --- lib/outboxer/models/thread.rb | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/lib/outboxer/models/thread.rb b/lib/outboxer/models/thread.rb index 086a3910..1a4af879 100644 --- a/lib/outboxer/models/thread.rb +++ b/lib/outboxer/models/thread.rb @@ -147,19 +147,14 @@ def self.update_message_counts_by!( hostname, process_id, thread_id, - queued_message_count, - queued_message_count.to_i != 0 ? current_utc_time : nil, - + current_utc_time, publishing_message_count, - publishing_message_count.to_i != 0 ? current_utc_time : nil, - + current_utc_time, published_message_count, - published_message_count.to_i != 0 ? current_utc_time : nil, - + current_utc_time, failed_message_count, - failed_message_count.to_i != 0 ? current_utc_time : nil, - + current_utc_time, current_utc_time, current_utc_time ] From 91848c383b7b0f38c0400154e0f8b444189e5b61 Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Sun, 7 Dec 2025 19:16:13 +1100 Subject: [PATCH 7/8] commit optimisations --- lib/outboxer/models/thread.rb | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/lib/outboxer/models/thread.rb b/lib/outboxer/models/thread.rb index 1a4af879..9fa13692 100644 --- a/lib/outboxer/models/thread.rb +++ b/lib/outboxer/models/thread.rb @@ -130,6 +130,8 @@ class Thread < ActiveRecord::Base updated_at = VALUES(updated_at) SQL + SQL_QUERY_NAME = "Outboxer::Models::Thread.update_message_counts_by!".freeze + def self.update_message_counts_by!( hostname: Socket.gethostname, process_id: Process.pid, @@ -140,10 +142,9 @@ def self.update_message_counts_by!( failed_message_count: 0, current_utc_time: Time.now.utc ) - adapter_name = connection.adapter_name.downcase - is_postgres = adapter_name.include?("postgres") + @is_postgres ||= connection.adapter_name.downcase.include?("postgres") - values = [ + connection.exec_update(@is_postgres ? POSTGRES_SQL : MYSQL_SQL, SQL_QUERY_NAME, [ hostname, process_id, thread_id, @@ -157,11 +158,7 @@ def self.update_message_counts_by!( current_utc_time, current_utc_time, current_utc_time - ] - - sql = is_postgres ? POSTGRES_SQL : MYSQL_SQL - - connection.exec_query(sql, "Outboxer::Thread", values) + ]) end end end From 0be722f09f1ad7242ddb728436e5886891770210 Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Sun, 7 Dec 2025 19:40:26 +1100 Subject: [PATCH 8/8] update e2e --- quickstart_e2e_tests.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quickstart_e2e_tests.sh b/quickstart_e2e_tests.sh index 15f05a9c..6e96fe17 100755 --- a/quickstart_e2e_tests.sh +++ b/quickstart_e2e_tests.sh @@ -42,7 +42,7 @@ bundle exec rails new . \ bundle install bundle exec rails db:create -echo 'gem "outboxer", git: "https://github.com/fast-programmer/outboxer.git", branch: "refactor/rename_thread_message_columns"' \ +echo 'gem "outboxer", git: "https://github.com/fast-programmer/outboxer.git", branch: "optimisation/no_string_allocation_upserts"' \ >> Gemfile bundle install bundle exec rails generate outboxer:install