Skip to content

Commit 0e45be7

Browse files
authored
Merge e919d51 into 1513c71
2 parents 1513c71 + e919d51 commit 0e45be7

File tree

4 files changed

+38
-36
lines changed

4 files changed

+38
-36
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ end
6464
Accountify::InvoiceRaisedEvent.create!(created_at: Time.current)
6565
```
6666

67-
**8. Publish messages**
67+
**8. Publish outboxer messages**
6868

6969
```ruby
7070
# bin/outboxer_publisher

db/migrate/create_outboxer_messages.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,12 @@ def up
2626
name: "idx_outboxer_status_updated_at"
2727

2828
# publisher latency
29-
add_index :outboxer_messages, [:publisher_id, :updated_at],
30-
name: "idx_outboxer_pub_id_updated_at"
29+
add_index :outboxer_messages, [:publisher_id, :published_at],
30+
name: "idx_outboxer_pub_id_published_at"
3131

3232
# publisher throughput
33-
add_index :outboxer_messages, [:status, :publisher_id, :updated_at],
34-
name: "idx_outboxer_status_pub_id_updated_at"
33+
add_index :outboxer_messages, [:status, :publisher_id, :published_at],
34+
name: "idx_outboxer_status_pub_id_published_at"
3535

3636
# bulk status + id locking
3737
add_index :outboxer_messages, [:status, :id],

lib/outboxer/publisher.rb

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -404,18 +404,19 @@ def create_heartbeat_thread(id:,
404404
throughput = Models::Message
405405
.where(status: Message::Status::PUBLISHED)
406406
.where(publisher_id: id)
407-
.where("updated_at >= ?", 1.second.ago)
407+
.where("published_at >= ?", 1.second.ago)
408408
.count
409409

410-
last_updated_message = Models::Message
410+
last_published_message = Models::Message
411+
.where(status: Message::Status::PUBLISHED)
411412
.where(publisher_id: id)
412-
.order(updated_at: :desc)
413+
.order(published_at: :desc)
413414
.first
414415

415-
latency = if last_updated_message.nil?
416+
latency = if last_published_message.nil?
416417
0
417418
else
418-
(Time.now.utc - last_updated_message.updated_at).to_i
419+
(Time.now.utc - last_published_message.published_at).to_i
419420
end
420421

421422
publisher.update!(

quickstart_e2e_tests.sh

Lines changed: 27 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -53,44 +53,45 @@ bundle exec ruby -pi -e \
5353
"sub(/class Event < ApplicationRecord/, \"class Event < ApplicationRecord\\n after_create { Outboxer::Message.queue(messageable: self) }\")" \
5454
app/models/event.rb
5555

56-
bundle exec rails runner 'Event.create!'
56+
bundle exec ruby - <<'RUBY'
57+
require_relative "config/environment"
5758
58-
bundle exec ruby - <<RUBY
59-
publisher_cmd = File.join(Dir.pwd, "bin", "outboxer_publisher")
60-
read_io, write_io = IO.pipe
59+
event = Event.create!
6160
62-
pid = spawn("ruby", publisher_cmd, out: write_io, err: write_io)
63-
write_io.close
61+
env = { "RAILS_ENV" => ENV["RAILS_ENV"] }
62+
publisher_cmd = File.join(Dir.pwd, "bin", "outboxer_publisher")
63+
publisher_pid = spawn(env, "ruby", publisher_cmd)
6464
65-
output = +""
6665
attempt = 1
67-
max_attempts = 20
66+
max_attempts = 10
6867
delay = 1
6968
70-
while attempt <= max_attempts
71-
begin
72-
partial = read_io.read_nonblock(1024)
73-
output << partial if partial
74-
rescue IO::WaitReadable, EOFError
75-
end
69+
messageable_was_published = false
7670
77-
break if output.include?("published message")
71+
published_messages = Outboxer::Message.list(status: :published)[:messages]
7872
79-
sleep delay
80-
attempt += 1
73+
messageable_was_published = published_messages.any? do |published_message|
74+
published_message[:messageable_type] == event.class.name &&
75+
published_message[:messageable_id] == event.id.to_s
8176
end
8277
83-
Process.kill("TERM", pid)
84-
Process.wait(pid)
85-
read_io.close
78+
while (attempt <= max_attempts) && !messageable_was_published
79+
warn "Outboxer message not published yet. Retrying (#{attempt}/#{max_attempts})..."
80+
sleep delay
81+
attempt += 1
8682
87-
if output.include?("published message")
88-
puts "Outboxer published message found"
89-
exit 0
90-
else
91-
puts "Outboxer published message not found after #{max_attempts} attempts"
92-
exit 1
83+
published_messages = Outboxer::Message.list(status: :published)[:messages]
84+
85+
messageable_was_published = published_messages.any? do |published_message|
86+
published_message[:messageable_type] == event.class.name &&
87+
published_message[:messageable_id] == event.id.to_s
88+
end
9389
end
90+
91+
Process.kill("TERM", publisher_pid)
92+
Process.wait(publisher_pid)
93+
94+
exit(messageable_was_published ? 0 : 1)
9495
RUBY
9596

9697
# TARGET_RUBY_VERSION=3.2.2 TARGET_RAILS_VERSION=7.1.5.1 TARGET_DATABASE_ADAPTER=postgresql ./quickstart_e2e_tests.sh

0 commit comments

Comments
 (0)