diff --git a/db/migrate/create_outboxer_messages.rb b/db/migrate/create_outboxer_messages.rb index 3ba45ab1..be4ad8ba 100644 --- a/db/migrate/create_outboxer_messages.rb +++ b/db/migrate/create_outboxer_messages.rb @@ -1,6 +1,8 @@ class CreateOutboxerMessages < ActiveRecord::Migration[6.1] def up create_table :outboxer_messages do |t| + t.integer :lock_version, null: false, default: 0 + t.string :status, limit: 255, null: false t.string :messageable_id, limit: 255, null: false diff --git a/lib/outboxer/message.rb b/lib/outboxer/message.rb index 958b0aa7..601a8f83 100644 --- a/lib/outboxer/message.rb +++ b/lib/outboxer/message.rb @@ -119,10 +119,7 @@ def queue(messageable: nil, messageable_type: nil, messageable_id: nil, { id: message.id, - status: message.status, - messageable_type: message.messageable_type, - messageable_id: message.messageable_id, - updated_at: message.updated_at + lock_version: message.lock_version } end rescue MissingPartition @@ -203,7 +200,9 @@ def publish(logger: nil, time: ::Time) begin yield message rescue StandardError => error - publishing_failed(id: message[:id], error: error, time: time) + publishing_failed( + id: message[:id], lock_version: message[:lock_version], + error: error, time: time) logger&.error( "Outboxer message publishing failed id=#{message[:id]} " \ @@ -211,7 +210,8 @@ def publish(logger: nil, time: ::Time) "#{error.class}: #{error.message}\n" \ "#{error.backtrace.join("\n")}") rescue Exception => error - publishing_failed(id: message[:id], error: error, time: time) + publishing_failed( + id: message[:id], lock_version: message[:lock_version], error: error, time: time) logger&.fatal( "Outboxer message publishing failed id=#{message[:id]} " \ @@ -221,7 +221,7 @@ def publish(logger: nil, time: ::Time) raise else - published(id: message[:id], time: time) + published(id: message[:id], lock_version: message[:lock_version], time: time) logger&.info( "Outboxer message published id=#{message[:id]} " \ @@ -254,7 +254,7 @@ def publishing(time: ::Time) ActiveRecord::Base.connection_pool.with_connection do ActiveRecord::Base.transaction do message = Models::Message - .select(:id, :messageable_type, :messageable_id) + .select(:id, :lock_version, :messageable_type, :messageable_id) .where(status: Status::QUEUED) .order(:id) .limit(1) @@ -265,6 +265,7 @@ def publishing(time: ::Time) nil else message.update!( + lock_version: message.lock_version, status: Status::PUBLISHING, updated_at: current_utc_time, publishing_at: current_utc_time) @@ -284,9 +285,10 @@ def publishing(time: ::Time) .update_all(["value = value + ?, updated_at = ?", 1, current_utc_time]) { - id: message[:id], - messageable_type: message[:messageable_type], - messageable_id: message[:messageable_id] + id: message.id, + lock_version: message.lock_version, + messageable_type: message.messageable_type, + messageable_id: message.messageable_id } end end @@ -306,7 +308,7 @@ def publishing(time: ::Time) # @note Executes within a transaction using a `SELECT ... FOR UPDATE` lock. # @example # Outboxer::Message.published(id: message[:id], time: Time) - def published(id:, time: ::Time) + def published(id:, lock_version:, time: ::Time) current_utc_time = time.now.utc ActiveRecord::Base.connection_pool.with_connection do @@ -317,6 +319,8 @@ def published(id:, time: ::Time) raise Error, "Status must be publishing" end + message.update!(lock_version: lock_version, updated_at: Time.now.utc) + message.exceptions.each { |exception| exception.frames.each(&:delete) } message.exceptions.delete_all message.delete @@ -335,7 +339,7 @@ def published(id:, time: ::Time) .where(status: Status::PUBLISHED, partition: partition) .update_all(["value = value + ?, updated_at = ?", 1, current_utc_time]) - nil + {} end end end @@ -354,7 +358,7 @@ def published(id:, time: ::Time) # @note Runs inside a transaction and uses `SELECT ... FOR UPDATE` locking. # @example # Outboxer::Message.publishing_failed(id: message[:id], time: Time) - def publishing_failed(id:, error: nil, time: ::Time) + def publishing_failed(id:, lock_version:, error: nil, time: ::Time) current_utc_time = time.now.utc ActiveRecord::Base.connection_pool.with_connection do @@ -366,6 +370,7 @@ def publishing_failed(id:, error: nil, time: ::Time) end message.update!( + lock_version: lock_version, status: Status::FAILED, updated_at: current_utc_time, failed_at: current_utc_time) @@ -392,7 +397,9 @@ def publishing_failed(id:, error: nil, time: ::Time) .where(status: Status::FAILED, partition: partition) .update_all(["value = value + ?, updated_at = ?", 1, current_utc_time]) - nil + { + lock_version: message.lock_version + } end end end @@ -400,6 +407,7 @@ def publishing_failed(id:, error: nil, time: ::Time) # Serializes message attributes into a hash. # # @param id [Integer] message ID. + # @param lock_version [Integer] message lock_version. # @param status [String] message status. # @param messageable_type [String] type of the messageable entity. # @param messageable_id [String] ID of the messageable entity. @@ -411,11 +419,12 @@ def publishing_failed(id:, error: nil, time: ::Time) # @param publisher_id [Integer, nil] optional publisher ID. # @param publisher_name [String, nil] optional publisher name. # @return [Hash] serialized message details. - def serialize(id:, status:, messageable_type:, messageable_id:, updated_at:, + def serialize(id:, lock_version:, status:, messageable_type:, messageable_id:, updated_at:, queued_at: nil, publishing_at: nil, published_at: nil, failed_at: nil, publisher_id: nil, publisher_name: nil) { id: id, + lock_version: lock_version, status: status, messageable_type: messageable_type, messageable_id: messageable_id, @@ -445,6 +454,7 @@ def find_by_id(id:) { id: message.id, + lock_version: message.lock_version, status: message.status, messageable_type: message.messageable_type, messageable_id: message.messageable_id, @@ -479,12 +489,13 @@ def find_by_id(id:) # Deletes a message by ID. # @param id [Integer] the ID of the message to delete. # @return [Hash] details of the deleted message. - def delete(id:, time: ::Time) + def delete(id:, lock_version:, time: ::Time) current_utc_time = time.now.utc ActiveRecord::Base.connection_pool.with_connection do ActiveRecord::Base.transaction do message = Models::Message.includes(exceptions: :frames).lock.find_by!(id: id) + message.update!(lock_version: lock_version, updated_at: current_utc_time) message.exceptions.each { |exception| exception.frames.each(&:delete) } message.exceptions.delete_all @@ -516,7 +527,7 @@ def can_requeue?(status:) # @param publisher_name [String, nil] the name of the publisher. # @param time [Time] current time context used to update timestamps. # @return [Hash] updated message details. - def requeue(id:, publisher_id: nil, publisher_name: nil, + def requeue(id:, lock_version:, publisher_id: nil, publisher_name: nil, time: ::Time) current_utc_time = time.now.utc @@ -526,11 +537,10 @@ def requeue(id:, publisher_id: nil, publisher_name: nil, partition = calculate_partition(id: message.id) - Models::MessageCount - .where(status: message.status, partition: partition) - .update_all(["value = value - ?, updated_at = ?", 1, current_utc_time]) + original_status = message.status message.update!( + lock_version: lock_version, status: Message::Status::QUEUED, updated_at: current_utc_time, queued_at: current_utc_time, @@ -541,11 +551,15 @@ def requeue(id:, publisher_id: nil, publisher_name: nil, publisher_name: publisher_name) Models::MessageCount - .where(status: Status::QUEUED, partition: partition) + .where(status: original_status, partition: partition) + .update_all(["value = value - ?, updated_at = ?", 1, current_utc_time]) + + Models::MessageCount + .where(status: message.status, partition: partition) .update_all(["value = value + ?, updated_at = ?", 1, current_utc_time]) Models::MessageTotal - .where(status: Status::QUEUED, partition: partition) + .where(status: message.status, partition: partition) .update_all(["value = value + ?, updated_at = ?", 1, current_utc_time]) { id: id } @@ -631,6 +645,7 @@ def list(status: LIST_STATUS_DEFAULT, messages: paginated_messages.map do |message| { id: message.id, + lock_version: message.lock_version, status: message.status.to_sym, messageable_type: message.messageable_type, messageable_id: message.messageable_id, diff --git a/lib/outboxer/web.rb b/lib/outboxer/web.rb index c0394447..005edd67 100755 --- a/lib/outboxer/web.rb +++ b/lib/outboxer/web.rb @@ -494,56 +494,68 @@ def normalise_query_string(status: Message::LIST_STATUS_DEFAULT, end post "/messages/update" do - ids = params[:selected_ids].map(&:to_i) flash = {} + selected_messages = params.fetch("selected_messages", []).map do |pair| + id, lock_version = pair.split(":").map(&:to_i) + { id: id, lock_version: lock_version } + end + case params[:action] when "requeue_by_ids" - requeued_count = 0 - failed_count = 0 + requeued_message_count = 0 + failed_message_count = 0 - ids.each do |id| - Outboxer::Message.requeue(id: id) - requeued_count += 1 + selected_messages.each do |selected_message| + Outboxer::Message.requeue( + id: selected_message[:id], + lock_version: selected_message[:lock_version] + ) + requeued_message_count += 1 rescue StandardError => error settings.logger.error( - "[Outboxer::Web] Failed to requeue message id=#{id}\n" \ + "[Outboxer::Web] Failed to requeue message id=#{selected_message[:id]}\n" \ "error_class=#{error.class}\n" \ "error_message=#{error.message.inspect}" ) - failed_count += 1 + failed_message_count += 1 end - if requeued_count.positive? - flash[:success] = "Requeued #{pluralise(requeued_count, "message")}" + if requeued_message_count.positive? + flash[:success] = "Requeued #{pluralise(requeued_message_count, "message")}" end - if failed_count.positive? - flash[:danger] = "Requeue failed for #{pluralise(failed_count, "message")}" + if failed_message_count.positive? + flash[:danger] = "Requeue failed for #{pluralise(failed_message_count, "message")}" end + when "delete_by_ids" - deleted_count = 0 - failed_count = 0 + deleted_message_count = 0 + failed_message_count = 0 - ids.each do |id| - Outboxer::Message.delete(id: id) - deleted_count += 1 + selected_messages.each do |selected_message| + Outboxer::Message.delete( + id: selected_message[:id], + lock_version: selected_message[:lock_version] + ) + deleted_message_count += 1 rescue StandardError => error settings.logger.error( - "[Outboxer::Web] Failed to delete message id=#{id}\n" \ + "[Outboxer::Web] Failed to delete message id=#{selected_message[:id]}\n" \ "error_class=#{error.class}\n" \ "error_message=#{error.message.inspect}" ) - failed_count += 1 + failed_message_count += 1 end - if deleted_count.positive? - flash[:success] = "Deleted #{pluralise(deleted_count, "message")}" + if deleted_message_count.positive? + flash[:success] = "Deleted #{pluralise(deleted_message_count, "message")}" end - if failed_count.positive? - flash[:danger] = "Delete failed for #{pluralise(failed_count, "message")}" + if failed_message_count.positive? + flash[:danger] = "Delete failed for #{pluralise(failed_message_count, "message")}" end + else raise "Unknown action: #{params[:action]}" end @@ -656,7 +668,7 @@ def normalise_query_string(status: Message::LIST_STATUS_DEFAULT, end post "/message/:id/requeue" do - Message.requeue(id: params[:id]) + Message.requeue(id: params[:id], lock_version: params[:lock_version]) denormalised_query_params = denormalise_query_params( status: params[:status], @@ -679,7 +691,7 @@ def normalise_query_string(status: Message::LIST_STATUS_DEFAULT, end post "/message/:id/delete" do - Message.delete(id: params[:id]) + Message.delete(id: params[:id], lock_version: params[:lock_version]) denormalised_query_params = denormalise_query_params( status: params[:status], diff --git a/lib/outboxer/web/views/message.erb b/lib/outboxer/web/views/message.erb index 055d3c1d..119bf69a 100644 --- a/lib/outboxer/web/views/message.erb +++ b/lib/outboxer/web/views/message.erb @@ -4,25 +4,33 @@