Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions generators/install_generator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ def copy_migrations
"db/migrate/create_outboxer_messages.rb",
"db/migrate/create_outboxer_messages.rb")

migration_template(
"db/migrate/create_outboxer_message_counts.rb",
"db/migrate/create_outboxer_message_counts.rb")

migration_template(
"db/migrate/create_outboxer_message_totals.rb",
"db/migrate/create_outboxer_message_totals.rb")

migration_template(
"db/migrate/create_outboxer_exceptions.rb",
"db/migrate/create_outboxer_exceptions.rb")
Expand Down
10 changes: 5 additions & 5 deletions lib/outboxer/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -258,15 +258,15 @@ def published(id:, time: ::Time)

ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction do
message = Models::Message.lock.find_by!(id: id)
message = Models::Message.includes(exceptions: :frames).lock.find_by!(id: id)

if message.status != Models::Message::Status::PUBLISHING
raise Error, "Status must be publishing"
end

message.update!(
status: Status::PUBLISHED,
updated_at: current_utc_time, published_at: current_utc_time)
message.exceptions.each { |exception| exception.frames.each(&:delete) }
message.exceptions.delete_all
message.delete

partition = calculate_partition(id: message.id)

Expand Down Expand Up @@ -500,7 +500,7 @@ def requeue(id:, publisher_id: nil, publisher_name: nil,
end
end

LIST_STATUS_OPTIONS = [nil, :queued, :publishing, :published, :failed]
LIST_STATUS_OPTIONS = [nil, :queued, :publishing, :failed]
LIST_STATUS_DEFAULT = nil

LIST_SORT_OPTIONS = [:id, :status, :messageable, :queued_at, :updated_at, :publisher_name]
Expand Down
1 change: 0 additions & 1 deletion lib/outboxer/web/views/layout.erb
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
<% statuses = [
{ name: 'Queued', key: 'queued' },
{ name: 'Publishing', key: 'publishing' },
{ name: 'Published', key: 'published' },
{ name: 'Failed', key: 'failed' }
] %>

Expand Down
29 changes: 8 additions & 21 deletions quickstart_e2e_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ bundle exec rails new . \

bundle install
bundle exec rails db:create
echo 'gem "outboxer", git: "https://github.com/fast-programmer/outboxer.git", branch: "master"' \
echo 'gem "outboxer", git: "https://github.com/fast-programmer/outboxer.git", branch: "refactor/delete_published_outboxer_messages"' \
>> Gemfile
bundle install
bundle exec rails generate outboxer:install
Expand Down Expand Up @@ -76,32 +76,19 @@ attempt = 1
max_attempts = 10
delay = 1

messageable_was_published = false
published_count = Outboxer::Message.count_by_status["published"]

published_messages = Outboxer::Message.list(status: :published)[:messages]

messageable_was_published = published_messages.any? do |published_message|
published_message[:messageable_type] == event.class.name &&
published_message[:messageable_id] == event.id.to_s
end

while (attempt <= max_attempts) && !messageable_was_published
warn "Outboxer message not published yet. Retrying (#{attempt}/#{max_attempts})..."
sleep delay
attempt += 1

published_messages = Outboxer::Message.list(status: :published)[:messages]

messageable_was_published = published_messages.any? do |published_message|
published_message[:messageable_type] == event.class.name &&
published_message[:messageable_id] == event.id.to_s
end
while (attempt <= max_attempts) && published_count.zero?
warn "Outboxer not published yet (#{attempt}/#{max_attempts})..."
sleep delay
attempt += 1
published_count = Outboxer::Message.count_by_status["published"]
end

Process.kill("TERM", publisher_pid)
Process.wait(publisher_pid)

exit(messageable_was_published ? 0 : 1)
exit(published_count.positive? ? 0 : 1)
RUBY

# TARGET_RUBY_VERSION=3.2.2 TARGET_RAILS_VERSION=7.1.5.1 TARGET_DATABASE_ADAPTER=postgresql ./quickstart_e2e_tests.sh
16 changes: 4 additions & 12 deletions spec/bin/outboxer_publisher_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,18 @@

it "publishes message" do
attempt = 1
published_count = Outboxer::Message.count_by_status["published"]

was_published = Outboxer::Message
.list(status: :published)
.fetch(:messages)
.any? { |published_message| published_message[:id] == message[:id] }

while (attempt <= max_attempts) && !was_published
while (attempt <= max_attempts) && published_count.zero?
warn "Outboxer message not published yet. Retrying (#{attempt}/#{max_attempts})..."
sleep delay
attempt += 1

was_published = Outboxer::Message
.list(status: :published)
.fetch(:messages)
.any? { |published_message| published_message[:id] == message[:id] }
published_count = Outboxer::Message.count_by_status["published"]
end

Process.kill("TERM", publisher_pid)
Process.wait(publisher_pid)

expect(was_published).to be true
expect(published_count).to be > 0
end
end
4 changes: 2 additions & 2 deletions spec/lib/outboxer/message/publish_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ module Outboxer
messageable_type: queued_message.messageable_type,
messageable_id: queued_message.messageable_id
})
expect(queued_message.reload.status)
.to eq(Models::Message::Status::PUBLISHED)

expect(Models::Message.published.count).to eql(0)
end

it "returns { id: ... } and marks as failed on StandardError" do
Expand Down
4 changes: 2 additions & 2 deletions spec/lib/outboxer/publisher/publish_message_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ module Outboxer
::Process.kill("TERM", ::Process.pid)
end

expect(Models::Message.published.count).to eq(1)
expect(Models::Message.published.count).to eq(0)
end
end

Expand All @@ -213,7 +213,7 @@ module Outboxer
::Process.kill("TERM", ::Process.pid)
end

expect(Models::Message.published.count).to eq(1)
expect(Models::Message.published.count).to eq(0)
end
end

Expand Down