Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ end
```ruby
# bin/outboxer_publisher

Outboxer::Publisher.publish_messages do |messages|
Outboxer::Publisher.publish_messages do |publisher, messages|
# TODO: publish messages here

messages.each do |message|
Expand Down
4 changes: 2 additions & 2 deletions bin/outboxer_publisher
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ begin
sweep_retention: options[:sweep_retention],
sweep_batch_size: options[:sweep_batch_size],
logger: logger
) do |messages|
) do |publisher, messages|
# TODO: publish messages here

messages.each do |message|
logger.info "Outboxer published message " \
logger.info "Outboxer publisher id=#{publisher[:id]} published message " \
"id=#{message[:id]} " \
"messageable_type=#{message[:messageable_type]} " \
"messageable_id=#{message[:messageable_id]} "
Expand Down
6 changes: 4 additions & 2 deletions lib/outboxer/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,9 @@ def handle_signal(id:, name:, logger:)
# @param time [Time] The current time context.
# @param process [Process] The process module for system metrics.
# @param kernel [Kernel] The kernel module for sleeping operations.
# @yield [Hash] A block to handle the publishing of each message.
# @yield [publisher, messages] Yields publisher and messages to be published.
# @yieldparam publisher [Hash] A hash with keys `:id` and `:name` representing the publisher.
# @yieldparam messages [Array<Hash>] An array of message hashes retrieved from the buffer.
def publish_messages(
name: "#{::Socket.gethostname}:#{::Process.pid}",
buffer_size: PUBLISH_MESSAGES_DEFAULTS[:buffer_size],
Expand Down Expand Up @@ -661,7 +663,7 @@ def publish_buffered_messages(id:, name:, buffered_messages:, logger:, &block)
ids: buffered_message_ids, publisher_id: id, publisher_name: name)

begin
block.call(publishing_messages)
block.call({ id: id, name: name }, publishing_messages)
rescue ::Exception => error
failed_messages = Message.failed_by_ids(
ids: buffered_message_ids, exception: error, publisher_id: id, publisher_name: name)
Expand Down
6 changes: 3 additions & 3 deletions spec/bin/outboxer_publisher_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
# no output yet
end

break if output.include?("Outboxer published message")
break if output.match(/published message/)

sleep delay
warn "Outboxer published message not found. Retrying (attempt #{attempt}/#{max_attempts})..."
Expand All @@ -38,8 +38,8 @@
Process.wait(publisher_pid)
read_io.close

expect(output).to include(
"Outboxer published message " \
expect(output).to match(
"published message " \
"id=#{message[:id]} " \
"messageable_type=#{message[:messageable_type]} " \
"messageable_id=#{message[:messageable_id]} "
Expand Down
18 changes: 9 additions & 9 deletions spec/lib/outboxer/publisher/publish_messages_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ module Outboxer
sweep_batch_size: 100,
logger: logger,
kernel: kernel
) do |_messages| # no op
) do |_publisher, _messages| # no op
end
end

Expand Down Expand Up @@ -62,7 +62,7 @@ module Outboxer
sweep_batch_size: 100,
logger: logger,
kernel: kernel
) do |_messages| # no op
) do |_publisher, _messages| # no op
end
end

Expand All @@ -86,7 +86,7 @@ module Outboxer
sweep_batch_size: 1,
logger: logger,
kernel: kernel
) do |_messages| # not called
) do |_publisher, _messages| # no op
end
end

Expand Down Expand Up @@ -115,7 +115,7 @@ module Outboxer
sweep_batch_size: 100,
logger: logger,
kernel: kernel
) do |_messages| # no op
) do |_publisher, _messages| # no op
end
end

Expand Down Expand Up @@ -143,7 +143,7 @@ module Outboxer
buffer_size: buffer_size,
poll_interval: poll_interval,
tick_interval: tick_interval,
logger: logger, kernel: kernel) do |_messages|
logger: logger, kernel: kernel) do |_publisher, _messages|
::Process.kill("TTIN", ::Process.pid)
end
end
Expand All @@ -169,7 +169,7 @@ module Outboxer
poll_interval: poll_interval,
tick_interval: tick_interval,
logger: logger,
kernel: kernel) do |_messages|
kernel: kernel) do |_publisher, _messages|
::Process.kill("TSTP", ::Process.pid)
end
end
Expand All @@ -193,7 +193,7 @@ module Outboxer
poll_interval: poll_interval,
tick_interval: tick_interval,
logger: logger,
kernel: kernel) do |messages|
kernel: kernel) do |_publisher, messages|
message = messages.first

expect(message[:id]).to eq(queued_message.id)
Expand All @@ -220,7 +220,7 @@ module Outboxer
poll_interval: poll_interval,
tick_interval: tick_interval,
logger: logger,
kernel: kernel) do |_message|
kernel: kernel) do |_publisher, _message|
::Process.kill("TERM", ::Process.pid)

raise standard_error
Expand Down Expand Up @@ -262,7 +262,7 @@ module Outboxer
poll_interval: poll_interval,
tick_interval: tick_interval,
logger: logger,
kernel: kernel) do |_buffer_sizeed_message|
kernel: kernel) do |_publisher, _buffer_sized_message|
raise no_memory_error
end
end
Expand Down