diff --git a/lib/outboxer/models/message/count.rb b/lib/outboxer/models/message/count.rb index 440daee6..dabf3857 100644 --- a/lib/outboxer/models/message/count.rb +++ b/lib/outboxer/models/message/count.rb @@ -18,47 +18,85 @@ def self.insert_or_increment_by( failed: 0, current_utc_time: Time.now.utc ) - sql = if connection.adapter_name.downcase.include?("postgres") - <<~SQL - INSERT INTO #{table_name} - (hostname, process_id, thread_id, - queued, publishing, published, failed, - created_at, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) - ON CONFLICT (hostname, process_id, thread_id) - DO UPDATE SET - queued = #{table_name}.queued + ?, - publishing = #{table_name}.publishing + ?, - published = #{table_name}.published + ?, - failed = #{table_name}.failed + ?, - updated_at = ? - SQL - else - <<~SQL - INSERT INTO #{table_name} - (hostname, process_id, thread_id, - queued, publishing, published, failed, - created_at, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) - ON DUPLICATE KEY UPDATE - queued = queued + ?, - publishing = publishing + ?, - published = published + ?, - failed = failed + ?, - updated_at = ? - SQL - end + deltas = { + queued: queued, + publishing: publishing, + published: published, + failed: failed + }.reject { |_k, v| v == 0 } - connection.exec_query( - sanitize_sql_array([ - sql, - hostname, process_id, thread_id, - queued, publishing, published, failed, - current_utc_time, current_utc_time, - queued, publishing, published, failed, current_utc_time - ]) + cols = %i[ + hostname process_id thread_id + queued publishing published failed + created_at updated_at + ] + + placeholders = (["?"] * cols.size).join(", ") + + conflict_clause = if connection.adapter_name.downcase.include?("postgres") + "ON CONFLICT (hostname, process_id, thread_id)" + else + "ON DUPLICATE KEY" + end + + updates = deltas.map do |k, v| + op = v.positive? ? "+" : "-" + if connection.adapter_name.downcase.include?("postgres") + # e.g. "queued = outboxer_message_counts.queued + 2" + "#{k} = #{table_name}.#{k} #{op} #{v.abs}" + else + # e.g. "queued = queued + 2" + "#{k} = #{k} #{op} #{v.abs}" + end + end + + updates << ( + if connection.adapter_name.downcase.include?("postgres") + "updated_at = EXCLUDED.updated_at" + else + "updated_at = VALUES(updated_at)" + end ) + sql = <<~SQL + INSERT INTO #{table_name} + (#{cols.join(", ")}) + VALUES (#{placeholders}) + #{conflict_clause} + DO UPDATE SET #{updates.join(", ")} + SQL + # Example (PostgreSQL): + # INSERT INTO outboxer_message_counts + # (hostname, process_id, thread_id, queued, publishing, published, failed, + # created_at, updated_at) + # VALUES ('test', 111, 123, 1, 0, 0, 0, '2025-11-10 06:32:00', + # '2025-11-10 06:32:00') + # ON CONFLICT (hostname, process_id, thread_id) + # DO UPDATE SET queued = outboxer_message_counts.queued + 1, + # updated_at = EXCLUDED.updated_at + # + # Example (MySQL): + # INSERT INTO outboxer_message_counts + # (hostname, process_id, thread_id, queued, publishing, published, failed, + # created_at, updated_at) + # VALUES ('test', 111, 123, 1, 0, 0, 0, '2025-11-10 06:32:00', + # '2025-11-10 06:32:00') + # ON DUPLICATE KEY UPDATE queued = queued + 1, + # updated_at = VALUES(updated_at) + + values = [ + hostname, + process_id, + thread_id, + queued, + publishing, + published, + failed, + current_utc_time, + current_utc_time + ] + + connection.exec_query(sanitize_sql_array([sql, *values])) nil end end