From 0b7ba611fd3099252b927619a65901ff23efcf7f Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Mon, 10 Nov 2025 17:41:56 +1100 Subject: [PATCH 1/2] optimise query --- lib/outboxer/models/message/count.rb | 105 +++++++++++++++++---------- 1 file changed, 67 insertions(+), 38 deletions(-) diff --git a/lib/outboxer/models/message/count.rb b/lib/outboxer/models/message/count.rb index 440daee6..9823b9d4 100644 --- a/lib/outboxer/models/message/count.rb +++ b/lib/outboxer/models/message/count.rb @@ -18,47 +18,76 @@ def self.insert_or_increment_by( failed: 0, current_utc_time: Time.now.utc ) - sql = if connection.adapter_name.downcase.include?("postgres") - <<~SQL - INSERT INTO #{table_name} - (hostname, process_id, thread_id, - queued, publishing, published, failed, - created_at, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) - ON CONFLICT (hostname, process_id, thread_id) - DO UPDATE SET - queued = #{table_name}.queued + ?, - publishing = #{table_name}.publishing + ?, - published = #{table_name}.published + ?, - failed = #{table_name}.failed + ?, - updated_at = ? - SQL - else - <<~SQL - INSERT INTO #{table_name} - (hostname, process_id, thread_id, - queued, publishing, published, failed, - created_at, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) - ON DUPLICATE KEY UPDATE - queued = queued + ?, - publishing = publishing + ?, - published = published + ?, - failed = failed + ?, - updated_at = ? - SQL - end + deltas = { + queued: queued, + publishing: publishing, + published: published, + failed: failed + }.reject { |_k, v| v == 0 } - connection.exec_query( - sanitize_sql_array([ - sql, - hostname, process_id, thread_id, - queued, publishing, published, failed, - current_utc_time, current_utc_time, - queued, publishing, published, failed, current_utc_time - ]) + table = table_name + adapter = connection.adapter_name.downcase + is_postgres = adapter.include?("postgres") + + cols = %i[hostname process_id thread_id queued publishing published failed created_at updated_at] + placeholders = (["?"] * cols.size).join(", ") + + conflict_clause = is_postgres ? + "ON CONFLICT (hostname, process_id, thread_id)" : + "ON DUPLICATE KEY" + + updates = deltas.map do |k, v| + op = v.positive? ? "+" : "-" + if is_postgres + # e.g. "queued = outboxer_message_counts.queued + 2" + "#{k} = #{table}.#{k} #{op} #{v.abs}" + else + # e.g. "queued = queued + 2" + "#{k} = #{k} #{op} #{v.abs}" + end + end + + updates << ( + is_postgres ? + # e.g. "updated_at = EXCLUDED.updated_at" + "updated_at = EXCLUDED.updated_at" : + # e.g. "updated_at = VALUES(updated_at)" + "updated_at = VALUES(updated_at)" ) + sql = <<~SQL + INSERT INTO #{table} + (#{cols.join(", ")}) + VALUES (#{placeholders}) + #{conflict_clause} + DO UPDATE SET #{updates.join(", ")} + SQL + # Example (PostgreSQL): + # INSERT INTO outboxer_message_counts + # (hostname, process_id, thread_id, queued, publishing, published, failed, created_at, updated_at) + # VALUES ('test', 111, 123, 1, 0, 0, 0, '2025-11-10 06:32:00', '2025-11-10 06:32:00') + # ON CONFLICT (hostname, process_id, thread_id) + # DO UPDATE SET queued = outboxer_message_counts.queued + 1, updated_at = EXCLUDED.updated_at + # + # Example (MySQL): + # INSERT INTO outboxer_message_counts + # (hostname, process_id, thread_id, queued, publishing, published, failed, created_at, updated_at) + # VALUES ('test', 111, 123, 1, 0, 0, 0, '2025-11-10 06:32:00', '2025-11-10 06:32:00') + # ON DUPLICATE KEY UPDATE queued = queued + 1, updated_at = VALUES(updated_at) + + values = [ + hostname, + process_id, + thread_id, + queued, + publishing, + published, + failed, + current_utc_time, + current_utc_time + ] + + connection.exec_query(sanitize_sql_array([sql, *values])) nil end end From 7ccdac4f0365cba13410ec27c093f0ce0482700f Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Mon, 10 Nov 2025 17:48:10 +1100 Subject: [PATCH 2/2] fix rubocop --- lib/outboxer/models/message/count.rb | 49 ++++++++++++++++------------ 1 file changed, 29 insertions(+), 20 deletions(-) diff --git a/lib/outboxer/models/message/count.rb b/lib/outboxer/models/message/count.rb index 9823b9d4..dabf3857 100644 --- a/lib/outboxer/models/message/count.rb +++ b/lib/outboxer/models/message/count.rb @@ -25,22 +25,25 @@ def self.insert_or_increment_by( failed: failed }.reject { |_k, v| v == 0 } - table = table_name - adapter = connection.adapter_name.downcase - is_postgres = adapter.include?("postgres") + cols = %i[ + hostname process_id thread_id + queued publishing published failed + created_at updated_at + ] - cols = %i[hostname process_id thread_id queued publishing published failed created_at updated_at] placeholders = (["?"] * cols.size).join(", ") - conflict_clause = is_postgres ? - "ON CONFLICT (hostname, process_id, thread_id)" : - "ON DUPLICATE KEY" + conflict_clause = if connection.adapter_name.downcase.include?("postgres") + "ON CONFLICT (hostname, process_id, thread_id)" + else + "ON DUPLICATE KEY" + end updates = deltas.map do |k, v| op = v.positive? ? "+" : "-" - if is_postgres + if connection.adapter_name.downcase.include?("postgres") # e.g. "queued = outboxer_message_counts.queued + 2" - "#{k} = #{table}.#{k} #{op} #{v.abs}" + "#{k} = #{table_name}.#{k} #{op} #{v.abs}" else # e.g. "queued = queued + 2" "#{k} = #{k} #{op} #{v.abs}" @@ -48,15 +51,15 @@ def self.insert_or_increment_by( end updates << ( - is_postgres ? - # e.g. "updated_at = EXCLUDED.updated_at" - "updated_at = EXCLUDED.updated_at" : - # e.g. "updated_at = VALUES(updated_at)" + if connection.adapter_name.downcase.include?("postgres") + "updated_at = EXCLUDED.updated_at" + else "updated_at = VALUES(updated_at)" + end ) sql = <<~SQL - INSERT INTO #{table} + INSERT INTO #{table_name} (#{cols.join(", ")}) VALUES (#{placeholders}) #{conflict_clause} @@ -64,16 +67,22 @@ def self.insert_or_increment_by( SQL # Example (PostgreSQL): # INSERT INTO outboxer_message_counts - # (hostname, process_id, thread_id, queued, publishing, published, failed, created_at, updated_at) - # VALUES ('test', 111, 123, 1, 0, 0, 0, '2025-11-10 06:32:00', '2025-11-10 06:32:00') + # (hostname, process_id, thread_id, queued, publishing, published, failed, + # created_at, updated_at) + # VALUES ('test', 111, 123, 1, 0, 0, 0, '2025-11-10 06:32:00', + # '2025-11-10 06:32:00') # ON CONFLICT (hostname, process_id, thread_id) - # DO UPDATE SET queued = outboxer_message_counts.queued + 1, updated_at = EXCLUDED.updated_at + # DO UPDATE SET queued = outboxer_message_counts.queued + 1, + # updated_at = EXCLUDED.updated_at # # Example (MySQL): # INSERT INTO outboxer_message_counts - # (hostname, process_id, thread_id, queued, publishing, published, failed, created_at, updated_at) - # VALUES ('test', 111, 123, 1, 0, 0, 0, '2025-11-10 06:32:00', '2025-11-10 06:32:00') - # ON DUPLICATE KEY UPDATE queued = queued + 1, updated_at = VALUES(updated_at) + # (hostname, process_id, thread_id, queued, publishing, published, failed, + # created_at, updated_at) + # VALUES ('test', 111, 123, 1, 0, 0, 0, '2025-11-10 06:32:00', + # '2025-11-10 06:32:00') + # ON DUPLICATE KEY UPDATE queued = queued + 1, + # updated_at = VALUES(updated_at) values = [ hostname,