diff --git a/db/migrate/create_outboxer_exceptions.rb b/db/migrate/create_outboxer_exceptions.rb index d5dae1a1..8555f14c 100644 --- a/db/migrate/create_outboxer_exceptions.rb +++ b/db/migrate/create_outboxer_exceptions.rb @@ -1,14 +1,13 @@ class CreateOutboxerExceptions < ActiveRecord::Migration[6.1] def up - ActiveRecord::Base.transaction do - create_table :outboxer_exceptions do |t| - t.string :class_name, limit: 255 - t.text :message_text, null: false + create_table :outboxer_exceptions do |t| + t.string :class_name, limit: 255 + t.text :message_text, null: false - t.datetime :created_at, null: false + t.datetime :created_at, null: false - t.references :message, foreign_key: { to_table: :outboxer_messages }, null: false - end + t.references :message, null: false, + foreign_key: { to_table: :outboxer_messages, on_delete: :cascade } end end diff --git a/db/migrate/create_outboxer_frames.rb b/db/migrate/create_outboxer_frames.rb index 5aa9cd58..1b1e7bbd 100644 --- a/db/migrate/create_outboxer_frames.rb +++ b/db/migrate/create_outboxer_frames.rb @@ -4,7 +4,8 @@ def up t.integer :index, null: false t.text :text, null: false - t.references :exception, foreign_key: { to_table: :outboxer_exceptions }, null: false + t.references :exception, null: false, + foreign_key: { to_table: :outboxer_exceptions, on_delete: :cascade } t.index [:exception_id, :index], unique: true end diff --git a/lib/outboxer/message.rb b/lib/outboxer/message.rb index d233b732..4c276444 100644 --- a/lib/outboxer/message.rb +++ b/lib/outboxer/message.rb @@ -285,9 +285,6 @@ def published(id:, lock_version:, raise ActiveRecord::StaleObjectError.new(Models::Message.new(id: id), "destroy") end - exception_ids = Models::Exception.where(message_id: id).pluck(:id) - Models::Frame.where(exception_id: exception_ids).delete_all - Models::Exception.where(message_id: id).delete_all Models::Message.where(id: id).delete_all Models::Thread.update_message_counts_by!( @@ -456,13 +453,12 @@ def delete(id:, lock_version:, ActiveRecord::Base.connection_pool.with_connection do ActiveRecord::Base.transaction do - message = Models::Message.includes(exceptions: :frames).lock.find_by!(id: id) + message = Models::Message.lock.find_by!(id: id) message.update!(lock_version: lock_version, updated_at: current_utc_time) - - message.exceptions.each { |exception| exception.frames.each(&:delete) } - message.exceptions.delete_all message.delete + Models::Message.lock.where(id: id, lock_version: lock_version).delete_all + Models::Thread.update_message_counts_by!( hostname: hostname, process_id: process_id, @@ -645,22 +641,15 @@ def rollup_counts(time: Time) ActiveRecord::Base.connection_pool.with_connection do ActiveRecord::Base.transaction do # 1. Ensure the historic thread exists (idempotent upsert) - begin - ActiveRecord::Base.transaction(requires_new: true) do - Models::Thread.create!( - hostname: Models::Thread::HISTORIC_HOSTNAME, - process_id: Models::Thread::HISTORIC_PROCESS_ID, - thread_id: Models::Thread::HISTORIC_THREAD_ID, - queued_message_count: 0, - publishing_message_count: 0, - published_message_count: 0, - failed_message_count: 0, - created_at: current_utc_time, - updated_at: current_utc_time) - end - rescue ActiveRecord::RecordNotUnique - # no op - end + Models::Thread.update_message_counts_by!( + hostname: Models::Thread::HISTORIC_HOSTNAME, + process_id: Models::Thread::HISTORIC_PROCESS_ID, + thread_id: Models::Thread::HISTORIC_THREAD_ID, + queued_message_count: 0, + publishing_message_count: 0, + published_message_count: 0, + failed_message_count: 0, + current_utc_time: current_utc_time) # 2. Lock *all* rows (historic thread + thread threads) locked_threads = Models::Thread.lock("FOR UPDATE").to_a diff --git a/lib/outboxer/models/thread.rb b/lib/outboxer/models/thread.rb index 5084c85b..9fa13692 100644 --- a/lib/outboxer/models/thread.rb +++ b/lib/outboxer/models/thread.rb @@ -3,10 +3,135 @@ module Models class Thread < ActiveRecord::Base self.table_name = "outboxer_threads" - HISTORIC_HOSTNAME = "historic" + HISTORIC_HOSTNAME = "historic".freeze HISTORIC_PROCESS_ID = 0 HISTORIC_THREAD_ID = 0 + POSTGRES_SQL = <<~SQL.freeze + INSERT INTO outboxer_threads ( + hostname, + process_id, + thread_id, + queued_message_count, + queued_message_count_last_updated_at, + publishing_message_count, + publishing_message_count_last_updated_at, + published_message_count, + published_message_count_last_updated_at, + failed_message_count, + failed_message_count_last_updated_at, + created_at, + updated_at + ) + VALUES ( + $1, $2, $3, + $4, $5, + $6, $7, + $8, $9, + $10, $11, + $12, $13 + ) + ON CONFLICT (hostname, process_id, thread_id) + DO UPDATE SET + queued_message_count = + outboxer_threads.queued_message_count + + EXCLUDED.queued_message_count, + queued_message_count_last_updated_at = + CASE + WHEN EXCLUDED.queued_message_count != 0 + THEN EXCLUDED.queued_message_count_last_updated_at + ELSE outboxer_threads.queued_message_count_last_updated_at + END, + publishing_message_count = + outboxer_threads.publishing_message_count + + EXCLUDED.publishing_message_count, + publishing_message_count_last_updated_at = + CASE + WHEN EXCLUDED.publishing_message_count != 0 + THEN EXCLUDED.publishing_message_count_last_updated_at + ELSE outboxer_threads.publishing_message_count_last_updated_at + END, + published_message_count = + outboxer_threads.published_message_count + + EXCLUDED.published_message_count, + published_message_count_last_updated_at = + CASE + WHEN EXCLUDED.published_message_count != 0 + THEN EXCLUDED.published_message_count_last_updated_at + ELSE outboxer_threads.published_message_count_last_updated_at + END, + failed_message_count = + outboxer_threads.failed_message_count + + EXCLUDED.failed_message_count, + failed_message_count_last_updated_at = + CASE + WHEN EXCLUDED.failed_message_count != 0 + THEN EXCLUDED.failed_message_count_last_updated_at + ELSE outboxer_threads.failed_message_count_last_updated_at + END, + updated_at = EXCLUDED.updated_at + SQL + + MYSQL_SQL = <<~SQL.freeze + INSERT INTO outboxer_threads ( + hostname, + process_id, + thread_id, + queued_message_count, + queued_message_count_last_updated_at, + publishing_message_count, + publishing_message_count_last_updated_at, + published_message_count, + published_message_count_last_updated_at, + failed_message_count, + failed_message_count_last_updated_at, + created_at, + updated_at + ) + VALUES ( + ?, ?, ?, + ?, ?, ?, ?, ?, ?, ?, ?, ?, ? + ) + ON DUPLICATE KEY UPDATE + queued_message_count = + queued_message_count + VALUES(queued_message_count), + queued_message_count_last_updated_at = + IF( + VALUES(queued_message_count) != 0, + VALUES(queued_message_count_last_updated_at), + queued_message_count_last_updated_at + ), + publishing_message_count = + publishing_message_count + + VALUES(publishing_message_count), + publishing_message_count_last_updated_at = + IF( + VALUES(publishing_message_count) != 0, + VALUES(publishing_message_count_last_updated_at), + publishing_message_count_last_updated_at + ), + published_message_count = + published_message_count + + VALUES(published_message_count), + published_message_count_last_updated_at = + IF( + VALUES(published_message_count) != 0, + VALUES(published_message_count_last_updated_at), + published_message_count_last_updated_at + ), + failed_message_count = + failed_message_count + VALUES(failed_message_count), + failed_message_count_last_updated_at = + IF( + VALUES(failed_message_count) != 0, + VALUES(failed_message_count_last_updated_at), + failed_message_count_last_updated_at + ), + updated_at = VALUES(updated_at) + SQL + + SQL_QUERY_NAME = "Outboxer::Models::Thread.update_message_counts_by!".freeze + def self.update_message_counts_by!( hostname: Socket.gethostname, process_id: Process.pid, @@ -17,70 +142,23 @@ def self.update_message_counts_by!( failed_message_count: 0, current_utc_time: Time.now.utc ) - is_postgres = connection.adapter_name.downcase.include?("postgres") - - insert_columns = %w[hostname process_id thread_id created_at updated_at] - insert_values = [hostname, process_id, thread_id, current_utc_time, current_utc_time] - - update_columns = [] - update_values = [] + @is_postgres ||= connection.adapter_name.downcase.include?("postgres") - [ - [:queued, queued_message_count], - [:publishing, publishing_message_count], - [:published, published_message_count], - [:failed, failed_message_count] - ] - .reject { |_name, message_count| message_count.to_i == 0 } - .each do |name, message_count| - message_count_column = "#{name}_message_count" - message_count_last_updated_at_column = "#{name}_message_count_last_updated_at" - - insert_columns << message_count_column - insert_columns << message_count_last_updated_at_column - insert_values << message_count - insert_values << current_utc_time - - if is_postgres - update_columns << - "#{message_count_column} = #{table_name}.#{message_count_column} + " \ - "EXCLUDED.#{message_count_column}" - else - update_columns << "#{message_count_column} = #{message_count_column} + ?" - update_values << message_count - end - - update_columns << "#{message_count_last_updated_at_column} = ?" - update_values << current_utc_time - end - - update_columns << "updated_at = ?" - update_values << current_utc_time - - insert_sql = <<~SQL - INSERT INTO #{table_name} (#{insert_columns.join(", ")}) - VALUES (#{(["?"] * insert_columns.length).join(", ")}) - SQL - - sql = - if is_postgres - <<~SQL - #{insert_sql} - ON CONFLICT (hostname, process_id, thread_id) - DO UPDATE SET - #{update_columns.join(",\n ")} - SQL - else - <<~SQL - #{insert_sql} - ON DUPLICATE KEY UPDATE - #{update_columns.join(",\n ")} - SQL - end - - connection.exec_query( - sanitize_sql_array([sql, *insert_values, *update_values]) - ) + connection.exec_update(@is_postgres ? POSTGRES_SQL : MYSQL_SQL, SQL_QUERY_NAME, [ + hostname, + process_id, + thread_id, + queued_message_count, + current_utc_time, + publishing_message_count, + current_utc_time, + published_message_count, + current_utc_time, + failed_message_count, + current_utc_time, + current_utc_time, + current_utc_time + ]) end end end diff --git a/quickstart_e2e_tests.sh b/quickstart_e2e_tests.sh index 15f05a9c..6e96fe17 100755 --- a/quickstart_e2e_tests.sh +++ b/quickstart_e2e_tests.sh @@ -42,7 +42,7 @@ bundle exec rails new . \ bundle install bundle exec rails db:create -echo 'gem "outboxer", git: "https://github.com/fast-programmer/outboxer.git", branch: "refactor/rename_thread_message_columns"' \ +echo 'gem "outboxer", git: "https://github.com/fast-programmer/outboxer.git", branch: "optimisation/no_string_allocation_upserts"' \ >> Gemfile bundle install bundle exec rails generate outboxer:install