Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions db/migrate/create_outboxer_messages.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
class CreateOutboxerMessages < ActiveRecord::Migration[6.1]
def up
create_table :outboxer_messages do |t|
t.integer :lock_version, null: false, default: 0

t.string :status, limit: 255, null: false

t.string :messageable_id, limit: 255, null: false
Expand Down
61 changes: 38 additions & 23 deletions lib/outboxer/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,7 @@ def queue(messageable: nil, messageable_type: nil, messageable_id: nil,

{
id: message.id,
status: message.status,
messageable_type: message.messageable_type,
messageable_id: message.messageable_id,
updated_at: message.updated_at
lock_version: message.lock_version
}
end
rescue MissingPartition
Expand Down Expand Up @@ -203,15 +200,18 @@ def publish(logger: nil, time: ::Time)
begin
yield message
rescue StandardError => error
publishing_failed(id: message[:id], error: error, time: time)
publishing_failed(
id: message[:id], lock_version: message[:lock_version],
error: error, time: time)

logger&.error(
"Outboxer message publishing failed id=#{message[:id]} " \
"duration_ms=#{((time.now.utc - publishing_started_at) * 1000).to_i}\n" \
"#{error.class}: #{error.message}\n" \
"#{error.backtrace.join("\n")}")
rescue Exception => error
publishing_failed(id: message[:id], error: error, time: time)
publishing_failed(
id: message[:id], lock_version: message[:lock_version], error: error, time: time)

logger&.fatal(
"Outboxer message publishing failed id=#{message[:id]} " \
Expand All @@ -221,7 +221,7 @@ def publish(logger: nil, time: ::Time)

raise
else
published(id: message[:id], time: time)
published(id: message[:id], lock_version: message[:lock_version], time: time)

logger&.info(
"Outboxer message published id=#{message[:id]} " \
Expand Down Expand Up @@ -254,7 +254,7 @@ def publishing(time: ::Time)
ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction do
message = Models::Message
.select(:id, :messageable_type, :messageable_id)
.select(:id, :lock_version, :messageable_type, :messageable_id)
.where(status: Status::QUEUED)
.order(:id)
.limit(1)
Expand All @@ -265,6 +265,7 @@ def publishing(time: ::Time)
nil
else
message.update!(
lock_version: message.lock_version,
status: Status::PUBLISHING,
updated_at: current_utc_time,
publishing_at: current_utc_time)
Expand All @@ -284,9 +285,10 @@ def publishing(time: ::Time)
.update_all(["value = value + ?, updated_at = ?", 1, current_utc_time])

{
id: message[:id],
messageable_type: message[:messageable_type],
messageable_id: message[:messageable_id]
id: message.id,
lock_version: message.lock_version,
messageable_type: message.messageable_type,
messageable_id: message.messageable_id
}
end
end
Expand All @@ -306,7 +308,7 @@ def publishing(time: ::Time)
# @note Executes within a transaction using a `SELECT ... FOR UPDATE` lock.
# @example
# Outboxer::Message.published(id: message[:id], time: Time)
def published(id:, time: ::Time)
def published(id:, lock_version:, time: ::Time)
current_utc_time = time.now.utc

ActiveRecord::Base.connection_pool.with_connection do
Expand All @@ -317,6 +319,8 @@ def published(id:, time: ::Time)
raise Error, "Status must be publishing"
end

message.update!(lock_version: lock_version, updated_at: Time.now.utc)

message.exceptions.each { |exception| exception.frames.each(&:delete) }
message.exceptions.delete_all
message.delete
Expand All @@ -335,7 +339,7 @@ def published(id:, time: ::Time)
.where(status: Status::PUBLISHED, partition: partition)
.update_all(["value = value + ?, updated_at = ?", 1, current_utc_time])

nil
{}
end
end
end
Expand All @@ -354,7 +358,7 @@ def published(id:, time: ::Time)
# @note Runs inside a transaction and uses `SELECT ... FOR UPDATE` locking.
# @example
# Outboxer::Message.publishing_failed(id: message[:id], time: Time)
def publishing_failed(id:, error: nil, time: ::Time)
def publishing_failed(id:, lock_version:, error: nil, time: ::Time)
current_utc_time = time.now.utc

ActiveRecord::Base.connection_pool.with_connection do
Expand All @@ -366,6 +370,7 @@ def publishing_failed(id:, error: nil, time: ::Time)
end

message.update!(
lock_version: lock_version,
status: Status::FAILED,
updated_at: current_utc_time, failed_at: current_utc_time)

Expand All @@ -392,14 +397,17 @@ def publishing_failed(id:, error: nil, time: ::Time)
.where(status: Status::FAILED, partition: partition)
.update_all(["value = value + ?, updated_at = ?", 1, current_utc_time])

nil
{
lock_version: message.lock_version
}
end
end
end

# Serializes message attributes into a hash.
#
# @param id [Integer] message ID.
# @param lock_version [Integer] message lock_version.
# @param status [String] message status.
# @param messageable_type [String] type of the messageable entity.
# @param messageable_id [String] ID of the messageable entity.
Expand All @@ -411,11 +419,12 @@ def publishing_failed(id:, error: nil, time: ::Time)
# @param publisher_id [Integer, nil] optional publisher ID.
# @param publisher_name [String, nil] optional publisher name.
# @return [Hash] serialized message details.
def serialize(id:, status:, messageable_type:, messageable_id:, updated_at:,
def serialize(id:, lock_version:, status:, messageable_type:, messageable_id:, updated_at:,
queued_at: nil, publishing_at: nil, published_at: nil, failed_at: nil,
publisher_id: nil, publisher_name: nil)
{
id: id,
lock_version: lock_version,
status: status,
messageable_type: messageable_type,
messageable_id: messageable_id,
Expand Down Expand Up @@ -445,6 +454,7 @@ def find_by_id(id:)

{
id: message.id,
lock_version: message.lock_version,
status: message.status,
messageable_type: message.messageable_type,
messageable_id: message.messageable_id,
Expand Down Expand Up @@ -479,12 +489,13 @@ def find_by_id(id:)
# Deletes a message by ID.
# @param id [Integer] the ID of the message to delete.
# @return [Hash] details of the deleted message.
def delete(id:, time: ::Time)
def delete(id:, lock_version:, time: ::Time)
current_utc_time = time.now.utc

ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction do
message = Models::Message.includes(exceptions: :frames).lock.find_by!(id: id)
message.update!(lock_version: lock_version, updated_at: current_utc_time)

message.exceptions.each { |exception| exception.frames.each(&:delete) }
message.exceptions.delete_all
Expand Down Expand Up @@ -516,7 +527,7 @@ def can_requeue?(status:)
# @param publisher_name [String, nil] the name of the publisher.
# @param time [Time] current time context used to update timestamps.
# @return [Hash] updated message details.
def requeue(id:, publisher_id: nil, publisher_name: nil,
def requeue(id:, lock_version:, publisher_id: nil, publisher_name: nil,
time: ::Time)
current_utc_time = time.now.utc

Expand All @@ -526,11 +537,10 @@ def requeue(id:, publisher_id: nil, publisher_name: nil,

partition = calculate_partition(id: message.id)

Models::MessageCount
.where(status: message.status, partition: partition)
.update_all(["value = value - ?, updated_at = ?", 1, current_utc_time])
original_status = message.status

message.update!(
lock_version: lock_version,
status: Message::Status::QUEUED,
updated_at: current_utc_time,
queued_at: current_utc_time,
Expand All @@ -541,11 +551,15 @@ def requeue(id:, publisher_id: nil, publisher_name: nil,
publisher_name: publisher_name)

Models::MessageCount
.where(status: Status::QUEUED, partition: partition)
.where(status: original_status, partition: partition)
.update_all(["value = value - ?, updated_at = ?", 1, current_utc_time])

Models::MessageCount
.where(status: message.status, partition: partition)
.update_all(["value = value + ?, updated_at = ?", 1, current_utc_time])

Models::MessageTotal
.where(status: Status::QUEUED, partition: partition)
.where(status: message.status, partition: partition)
.update_all(["value = value + ?, updated_at = ?", 1, current_utc_time])

{ id: id }
Expand Down Expand Up @@ -631,6 +645,7 @@ def list(status: LIST_STATUS_DEFAULT,
messages: paginated_messages.map do |message|
{
id: message.id,
lock_version: message.lock_version,
status: message.status.to_sym,
messageable_type: message.messageable_type,
messageable_id: message.messageable_id,
Expand Down
62 changes: 37 additions & 25 deletions lib/outboxer/web.rb
Original file line number Diff line number Diff line change
Expand Up @@ -494,56 +494,68 @@ def normalise_query_string(status: Message::LIST_STATUS_DEFAULT,
end

post "/messages/update" do
ids = params[:selected_ids].map(&:to_i)
flash = {}

selected_messages = params.fetch("selected_messages", []).map do |pair|
id, lock_version = pair.split(":").map(&:to_i)
{ id: id, lock_version: lock_version }
end

case params[:action]
when "requeue_by_ids"
requeued_count = 0
failed_count = 0
requeued_message_count = 0
failed_message_count = 0

ids.each do |id|
Outboxer::Message.requeue(id: id)
requeued_count += 1
selected_messages.each do |selected_message|
Outboxer::Message.requeue(
id: selected_message[:id],
lock_version: selected_message[:lock_version]
)
requeued_message_count += 1
rescue StandardError => error
settings.logger.error(
"[Outboxer::Web] Failed to requeue message id=#{id}\n" \
"[Outboxer::Web] Failed to requeue message id=#{selected_message[:id]}\n" \
"error_class=#{error.class}\n" \
"error_message=#{error.message.inspect}"
)
failed_count += 1
failed_message_count += 1
end

if requeued_count.positive?
flash[:success] = "Requeued #{pluralise(requeued_count, "message")}"
if requeued_message_count.positive?
flash[:success] = "Requeued #{pluralise(requeued_message_count, "message")}"
end

if failed_count.positive?
flash[:danger] = "Requeue failed for #{pluralise(failed_count, "message")}"
if failed_message_count.positive?
flash[:danger] = "Requeue failed for #{pluralise(failed_message_count, "message")}"
end

when "delete_by_ids"
deleted_count = 0
failed_count = 0
deleted_message_count = 0
failed_message_count = 0

ids.each do |id|
Outboxer::Message.delete(id: id)
deleted_count += 1
selected_messages.each do |selected_message|
Outboxer::Message.delete(
id: selected_message[:id],
lock_version: selected_message[:lock_version]
)
deleted_message_count += 1
rescue StandardError => error
settings.logger.error(
"[Outboxer::Web] Failed to delete message id=#{id}\n" \
"[Outboxer::Web] Failed to delete message id=#{selected_message[:id]}\n" \
"error_class=#{error.class}\n" \
"error_message=#{error.message.inspect}"
)
failed_count += 1
failed_message_count += 1
end

if deleted_count.positive?
flash[:success] = "Deleted #{pluralise(deleted_count, "message")}"
if deleted_message_count.positive?
flash[:success] = "Deleted #{pluralise(deleted_message_count, "message")}"
end

if failed_count.positive?
flash[:danger] = "Delete failed for #{pluralise(failed_count, "message")}"
if failed_message_count.positive?
flash[:danger] = "Delete failed for #{pluralise(failed_message_count, "message")}"
end

else
raise "Unknown action: #{params[:action]}"
end
Expand Down Expand Up @@ -656,7 +668,7 @@ def normalise_query_string(status: Message::LIST_STATUS_DEFAULT,
end

post "/message/:id/requeue" do
Message.requeue(id: params[:id])
Message.requeue(id: params[:id], lock_version: params[:lock_version])

denormalised_query_params = denormalise_query_params(
status: params[:status],
Expand All @@ -679,7 +691,7 @@ def normalise_query_string(status: Message::LIST_STATUS_DEFAULT,
end

post "/message/:id/delete" do
Message.delete(id: params[:id])
Message.delete(id: params[:id], lock_version: params[:lock_version])

denormalised_query_params = denormalise_query_params(
status: params[:status],
Expand Down
38 changes: 23 additions & 15 deletions lib/outboxer/web/views/message.erb
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,33 @@
<h3 class="mb-0">Message <%= message[:id] %></h3>
<div class="card-header-buttons">
<!-- Requeue Form -->
<form action="<%= outboxer_path("/message/#{message[:id]}/requeue") %>" method="post" style="display: inline;">
<% normalised_query_params.each do |key, param| %>
<input type="hidden" name="<%= key %>" value="<%= param %>">
<% end %>
<button type="submit" class="btn btn-sm btn-outline-secondary me-2" aria-label="Requeue"
<%= 'disabled' unless Outboxer::Message.can_requeue?(status: message[:status]) %>>
<i class="bi bi-arrow-clockwise"></i> Requeue
</button>
<form action="<%= outboxer_path("/message/#{message[:id]}/requeue") %>" method="post"
style="display: inline;">
<% normalised_query_params.each do |key, param| %>
<input type="hidden" name="<%= key %>" value="<%= param %>">
<% end %>
<input type="hidden" name="lock_version" value="<%= message[:lock_version] %>">

<button type="submit" class="btn btn-sm btn-outline-secondary me-2"
aria-label="Requeue"
<%= 'disabled' unless Outboxer::Message
.can_requeue?(status: message[:status]) %>>
<i class="bi bi-arrow-clockwise"></i> Requeue
</button>
</form>

<!-- Delete Form -->
<form action="<%= outboxer_path("/message/#{message[:id]}/delete") %>" method="post" style="display: inline;"
<form action="<%= outboxer_path("/message/#{message[:id]}/delete") %>" method="post"
style="display: inline;"
onsubmit="return confirm('Are you sure you want to delete this message?');">
<% normalised_query_params.each do |key, param| %>
<input type="hidden" name="<%= key %>" value="<%= param %>">
<% end %>
<input type="hidden" name="lock_version" value="<%= message[:lock_version] %>">

<% normalised_query_params.each do |key, param| %>
<input type="hidden" name="<%= key %>" value="<%= param %>">
<% end %>
<button type="submit" class="btn btn-sm btn-outline-danger" aria-label="Delete">
<i class="bi bi-trash"></i> Delete
</button>
<button type="submit" class="btn btn-sm btn-outline-danger" aria-label="Delete">
<i class="bi bi-trash"></i> Delete
</button>
</form>
</div>
</div>
Expand Down
Loading