Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions db/migrate/create_outboxer_exceptions.rb
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
class CreateOutboxerExceptions < ActiveRecord::Migration[6.1]
def up
ActiveRecord::Base.transaction do
create_table :outboxer_exceptions do |t|
t.string :class_name, limit: 255
t.text :message_text, null: false
create_table :outboxer_exceptions do |t|
t.string :class_name, limit: 255
t.text :message_text, null: false

t.datetime :created_at, null: false
t.datetime :created_at, null: false

t.references :message, foreign_key: { to_table: :outboxer_messages }, null: false
end
t.references :message, null: false,
foreign_key: { to_table: :outboxer_messages, on_delete: :cascade }
end
end

Expand Down
3 changes: 2 additions & 1 deletion db/migrate/create_outboxer_frames.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ def up
t.integer :index, null: false
t.text :text, null: false

t.references :exception, foreign_key: { to_table: :outboxer_exceptions }, null: false
t.references :exception, null: false,
foreign_key: { to_table: :outboxer_exceptions, on_delete: :cascade }

t.index [:exception_id, :index], unique: true
end
Expand Down
35 changes: 12 additions & 23 deletions lib/outboxer/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,6 @@ def published(id:, lock_version:,
raise ActiveRecord::StaleObjectError.new(Models::Message.new(id: id), "destroy")
end

exception_ids = Models::Exception.where(message_id: id).pluck(:id)
Models::Frame.where(exception_id: exception_ids).delete_all
Models::Exception.where(message_id: id).delete_all
Models::Message.where(id: id).delete_all

Models::Thread.update_message_counts_by!(
Expand Down Expand Up @@ -456,13 +453,12 @@ def delete(id:, lock_version:,

ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction do
message = Models::Message.includes(exceptions: :frames).lock.find_by!(id: id)
message = Models::Message.lock.find_by!(id: id)
message.update!(lock_version: lock_version, updated_at: current_utc_time)

message.exceptions.each { |exception| exception.frames.each(&:delete) }
message.exceptions.delete_all
message.delete

Models::Message.lock.where(id: id, lock_version: lock_version).delete_all

Models::Thread.update_message_counts_by!(
hostname: hostname,
process_id: process_id,
Expand Down Expand Up @@ -645,22 +641,15 @@ def rollup_counts(time: Time)
ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction do
# 1. Ensure the historic thread exists (idempotent upsert)
begin
ActiveRecord::Base.transaction(requires_new: true) do
Models::Thread.create!(
hostname: Models::Thread::HISTORIC_HOSTNAME,
process_id: Models::Thread::HISTORIC_PROCESS_ID,
thread_id: Models::Thread::HISTORIC_THREAD_ID,
queued_message_count: 0,
publishing_message_count: 0,
published_message_count: 0,
failed_message_count: 0,
created_at: current_utc_time,
updated_at: current_utc_time)
end
rescue ActiveRecord::RecordNotUnique
# no op
end
Models::Thread.update_message_counts_by!(
hostname: Models::Thread::HISTORIC_HOSTNAME,
process_id: Models::Thread::HISTORIC_PROCESS_ID,
thread_id: Models::Thread::HISTORIC_THREAD_ID,
queued_message_count: 0,
publishing_message_count: 0,
published_message_count: 0,
failed_message_count: 0,
current_utc_time: current_utc_time)

# 2. Lock *all* rows (historic thread + thread threads)
locked_threads = Models::Thread.lock("FOR UPDATE").to_a
Expand Down
71 changes: 43 additions & 28 deletions lib/outboxer/models/thread.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,25 @@ module Models
class Thread < ActiveRecord::Base
self.table_name = "outboxer_threads"

HISTORIC_HOSTNAME = "historic"
HISTORIC_HOSTNAME = "historic".freeze
HISTORIC_PROCESS_ID = 0
HISTORIC_THREAD_ID = 0

STATUS_COLUMNS = {
queued: ["queued_message_count", "queued_message_count_last_updated_at"],
publishing: ["publishing_message_count", "publishing_message_count_last_updated_at"],
published: ["published_message_count", "published_message_count_last_updated_at"],
failed: ["failed_message_count", "failed_message_count_last_updated_at"]
}.freeze

BASE_INSERT_COLUMNS = %w[
hostname
process_id
thread_id
created_at
updated_at
].freeze

def self.update_message_counts_by!(
hostname: Socket.gethostname,
process_id: Process.pid,
Expand All @@ -19,43 +34,43 @@ def self.update_message_counts_by!(
)
is_postgres = connection.adapter_name.downcase.include?("postgres")

insert_columns = %w[hostname process_id thread_id created_at updated_at]
insert_values = [hostname, process_id, thread_id, current_utc_time, current_utc_time]
insert_columns = BASE_INSERT_COLUMNS.dup
insert_values = [hostname, process_id, thread_id, current_utc_time, current_utc_time]

update_columns = []
update_values = []

[
[:queued, queued_message_count],
[:publishing, publishing_message_count],
[:published, published_message_count],
[:failed, failed_message_count]
]
.reject { |_name, message_count| message_count.to_i == 0 }
.each do |name, message_count|
message_count_column = "#{name}_message_count"
message_count_last_updated_at_column = "#{name}_message_count_last_updated_at"
{
queued: queued_message_count,
publishing: publishing_message_count,
published: published_message_count,
failed: failed_message_count
}.each do |name, message_count|
next if message_count.to_i == 0

insert_columns << message_count_column
insert_columns << message_count_last_updated_at_column
insert_values << message_count
insert_values << current_utc_time
message_count_column, last_updated_column = STATUS_COLUMNS.fetch(name)

if is_postgres
update_columns <<
"#{message_count_column} = #{table_name}.#{message_count_column} + " \
"EXCLUDED.#{message_count_column}"
else
update_columns << "#{message_count_column} = #{message_count_column} + ?"
update_values << message_count
end
insert_columns << message_count_column
insert_columns << last_updated_column
insert_values << message_count
insert_values << current_utc_time

update_columns << "#{message_count_last_updated_at_column} = ?"
update_values << current_utc_time
if is_postgres
update_columns <<
"#{message_count_column} = " \
"#{table_name}.#{message_count_column} + " \
"EXCLUDED.#{message_count_column}"
else
update_columns << "#{message_count_column} = #{message_count_column} + ?"
update_values << message_count
end

update_columns << "#{last_updated_column} = ?"
update_values << current_utc_time
end

update_columns << "updated_at = ?"
update_values << current_utc_time
update_values << current_utc_time

insert_sql = <<~SQL
INSERT INTO #{table_name} (#{insert_columns.join(", ")})
Expand Down
2 changes: 1 addition & 1 deletion quickstart_e2e_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ bundle exec rails new . \

bundle install
bundle exec rails db:create
echo 'gem "outboxer", git: "https://github.com/fast-programmer/outboxer.git", branch: "refactor/rename_thread_message_columns"' \
echo 'gem "outboxer", git: "https://github.com/fast-programmer/outboxer.git", branch: "optimisation/increase_throughput"' \
>> Gemfile
bundle install
bundle exec rails generate outboxer:install
Expand Down