Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions db/migrate/create_outboxer_exceptions.rb
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
class CreateOutboxerExceptions < ActiveRecord::Migration[6.1]
def up
ActiveRecord::Base.transaction do
create_table :outboxer_exceptions do |t|
t.string :class_name, limit: 255
t.text :message_text, null: false
create_table :outboxer_exceptions do |t|
t.string :class_name, limit: 255
t.text :message_text, null: false

t.datetime :created_at, null: false
t.datetime :created_at, null: false

t.references :message, foreign_key: { to_table: :outboxer_messages }, null: false
end
t.references :message, null: false,
foreign_key: { to_table: :outboxer_messages, on_delete: :cascade }
end
end

Expand Down
3 changes: 2 additions & 1 deletion db/migrate/create_outboxer_frames.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ def up
t.integer :index, null: false
t.text :text, null: false

t.references :exception, foreign_key: { to_table: :outboxer_exceptions }, null: false
t.references :exception, null: false,
foreign_key: { to_table: :outboxer_exceptions, on_delete: :cascade }

t.index [:exception_id, :index], unique: true
end
Expand Down
35 changes: 12 additions & 23 deletions lib/outboxer/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,6 @@ def published(id:, lock_version:,
raise ActiveRecord::StaleObjectError.new(Models::Message.new(id: id), "destroy")
end

exception_ids = Models::Exception.where(message_id: id).pluck(:id)
Models::Frame.where(exception_id: exception_ids).delete_all
Models::Exception.where(message_id: id).delete_all
Models::Message.where(id: id).delete_all

Models::Thread.update_message_counts_by!(
Expand Down Expand Up @@ -456,13 +453,12 @@ def delete(id:, lock_version:,

ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction do
message = Models::Message.includes(exceptions: :frames).lock.find_by!(id: id)
message = Models::Message.lock.find_by!(id: id)
message.update!(lock_version: lock_version, updated_at: current_utc_time)

message.exceptions.each { |exception| exception.frames.each(&:delete) }
message.exceptions.delete_all
message.delete

Models::Message.lock.where(id: id, lock_version: lock_version).delete_all

Models::Thread.update_message_counts_by!(
hostname: hostname,
process_id: process_id,
Expand Down Expand Up @@ -645,22 +641,15 @@ def rollup_counts(time: Time)
ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction do
# 1. Ensure the historic thread exists (idempotent upsert)
begin
ActiveRecord::Base.transaction(requires_new: true) do
Models::Thread.create!(
hostname: Models::Thread::HISTORIC_HOSTNAME,
process_id: Models::Thread::HISTORIC_PROCESS_ID,
thread_id: Models::Thread::HISTORIC_THREAD_ID,
queued_message_count: 0,
publishing_message_count: 0,
published_message_count: 0,
failed_message_count: 0,
created_at: current_utc_time,
updated_at: current_utc_time)
end
rescue ActiveRecord::RecordNotUnique
# no op
end
Models::Thread.update_message_counts_by!(
hostname: Models::Thread::HISTORIC_HOSTNAME,
process_id: Models::Thread::HISTORIC_PROCESS_ID,
thread_id: Models::Thread::HISTORIC_THREAD_ID,
queued_message_count: 0,
publishing_message_count: 0,
published_message_count: 0,
failed_message_count: 0,
current_utc_time: current_utc_time)

# 2. Lock *all* rows (historic thread + thread threads)
locked_threads = Models::Thread.lock("FOR UPDATE").to_a
Expand Down
206 changes: 142 additions & 64 deletions lib/outboxer/models/thread.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,135 @@ module Models
class Thread < ActiveRecord::Base
self.table_name = "outboxer_threads"

HISTORIC_HOSTNAME = "historic"
HISTORIC_HOSTNAME = "historic".freeze
HISTORIC_PROCESS_ID = 0
HISTORIC_THREAD_ID = 0

POSTGRES_SQL = <<~SQL.freeze
INSERT INTO outboxer_threads (
hostname,
process_id,
thread_id,
queued_message_count,
queued_message_count_last_updated_at,
publishing_message_count,
publishing_message_count_last_updated_at,
published_message_count,
published_message_count_last_updated_at,
failed_message_count,
failed_message_count_last_updated_at,
created_at,
updated_at
)
VALUES (
$1, $2, $3,
$4, $5,
$6, $7,
$8, $9,
$10, $11,
$12, $13
)
ON CONFLICT (hostname, process_id, thread_id)
DO UPDATE SET
queued_message_count =
outboxer_threads.queued_message_count +
EXCLUDED.queued_message_count,
queued_message_count_last_updated_at =
CASE
WHEN EXCLUDED.queued_message_count != 0
THEN EXCLUDED.queued_message_count_last_updated_at
ELSE outboxer_threads.queued_message_count_last_updated_at
END,
publishing_message_count =
outboxer_threads.publishing_message_count +
EXCLUDED.publishing_message_count,
publishing_message_count_last_updated_at =
CASE
WHEN EXCLUDED.publishing_message_count != 0
THEN EXCLUDED.publishing_message_count_last_updated_at
ELSE outboxer_threads.publishing_message_count_last_updated_at
END,
published_message_count =
outboxer_threads.published_message_count +
EXCLUDED.published_message_count,
published_message_count_last_updated_at =
CASE
WHEN EXCLUDED.published_message_count != 0
THEN EXCLUDED.published_message_count_last_updated_at
ELSE outboxer_threads.published_message_count_last_updated_at
END,
failed_message_count =
outboxer_threads.failed_message_count +
EXCLUDED.failed_message_count,
failed_message_count_last_updated_at =
CASE
WHEN EXCLUDED.failed_message_count != 0
THEN EXCLUDED.failed_message_count_last_updated_at
ELSE outboxer_threads.failed_message_count_last_updated_at
END,
updated_at = EXCLUDED.updated_at
SQL

MYSQL_SQL = <<~SQL.freeze
INSERT INTO outboxer_threads (
hostname,
process_id,
thread_id,
queued_message_count,
queued_message_count_last_updated_at,
publishing_message_count,
publishing_message_count_last_updated_at,
published_message_count,
published_message_count_last_updated_at,
failed_message_count,
failed_message_count_last_updated_at,
created_at,
updated_at
)
VALUES (
?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?
)
ON DUPLICATE KEY UPDATE
queued_message_count =
queued_message_count + VALUES(queued_message_count),
queued_message_count_last_updated_at =
IF(
VALUES(queued_message_count) != 0,
VALUES(queued_message_count_last_updated_at),
queued_message_count_last_updated_at
),
publishing_message_count =
publishing_message_count +
VALUES(publishing_message_count),
publishing_message_count_last_updated_at =
IF(
VALUES(publishing_message_count) != 0,
VALUES(publishing_message_count_last_updated_at),
publishing_message_count_last_updated_at
),
published_message_count =
published_message_count +
VALUES(published_message_count),
published_message_count_last_updated_at =
IF(
VALUES(published_message_count) != 0,
VALUES(published_message_count_last_updated_at),
published_message_count_last_updated_at
),
failed_message_count =
failed_message_count + VALUES(failed_message_count),
failed_message_count_last_updated_at =
IF(
VALUES(failed_message_count) != 0,
VALUES(failed_message_count_last_updated_at),
failed_message_count_last_updated_at
),
updated_at = VALUES(updated_at)
SQL

SQL_QUERY_NAME = "Outboxer::Models::Thread.update_message_counts_by!".freeze

def self.update_message_counts_by!(
hostname: Socket.gethostname,
process_id: Process.pid,
Expand All @@ -17,70 +142,23 @@ def self.update_message_counts_by!(
failed_message_count: 0,
current_utc_time: Time.now.utc
)
is_postgres = connection.adapter_name.downcase.include?("postgres")

insert_columns = %w[hostname process_id thread_id created_at updated_at]
insert_values = [hostname, process_id, thread_id, current_utc_time, current_utc_time]

update_columns = []
update_values = []
@is_postgres ||= connection.adapter_name.downcase.include?("postgres")

[
[:queued, queued_message_count],
[:publishing, publishing_message_count],
[:published, published_message_count],
[:failed, failed_message_count]
]
.reject { |_name, message_count| message_count.to_i == 0 }
.each do |name, message_count|
message_count_column = "#{name}_message_count"
message_count_last_updated_at_column = "#{name}_message_count_last_updated_at"

insert_columns << message_count_column
insert_columns << message_count_last_updated_at_column
insert_values << message_count
insert_values << current_utc_time

if is_postgres
update_columns <<
"#{message_count_column} = #{table_name}.#{message_count_column} + " \
"EXCLUDED.#{message_count_column}"
else
update_columns << "#{message_count_column} = #{message_count_column} + ?"
update_values << message_count
end

update_columns << "#{message_count_last_updated_at_column} = ?"
update_values << current_utc_time
end

update_columns << "updated_at = ?"
update_values << current_utc_time

insert_sql = <<~SQL
INSERT INTO #{table_name} (#{insert_columns.join(", ")})
VALUES (#{(["?"] * insert_columns.length).join(", ")})
SQL

sql =
if is_postgres
<<~SQL
#{insert_sql}
ON CONFLICT (hostname, process_id, thread_id)
DO UPDATE SET
#{update_columns.join(",\n ")}
SQL
else
<<~SQL
#{insert_sql}
ON DUPLICATE KEY UPDATE
#{update_columns.join(",\n ")}
SQL
end

connection.exec_query(
sanitize_sql_array([sql, *insert_values, *update_values])
)
connection.exec_update(@is_postgres ? POSTGRES_SQL : MYSQL_SQL, SQL_QUERY_NAME, [
hostname,
process_id,
thread_id,
queued_message_count,
current_utc_time,
publishing_message_count,
current_utc_time,
published_message_count,
current_utc_time,
failed_message_count,
current_utc_time,
current_utc_time,
current_utc_time
])
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion quickstart_e2e_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ bundle exec rails new . \

bundle install
bundle exec rails db:create
echo 'gem "outboxer", git: "https://github.com/fast-programmer/outboxer.git", branch: "refactor/rename_thread_message_columns"' \
echo 'gem "outboxer", git: "https://github.com/fast-programmer/outboxer.git", branch: "optimisation/no_string_allocation_upserts"' \
>> Gemfile
bundle install
bundle exec rails generate outboxer:install
Expand Down
Loading