diff --git a/README.md b/README.md index 0d07722d..20bbdbf1 100644 --- a/README.md +++ b/README.md @@ -51,7 +51,7 @@ end ```ruby # bin/outboxer_publisher -Outboxer::Publisher.publish_messages do |messages| +Outboxer::Publisher.publish_messages do |publisher, messages| # TODO: publish messages here messages.each do |message| diff --git a/bin/outboxer_publisher b/bin/outboxer_publisher index 4fcbc9c0..f35013c2 100755 --- a/bin/outboxer_publisher +++ b/bin/outboxer_publisher @@ -26,11 +26,11 @@ begin sweep_retention: options[:sweep_retention], sweep_batch_size: options[:sweep_batch_size], logger: logger - ) do |messages| + ) do |publisher, messages| # TODO: publish messages here messages.each do |message| - logger.info "Outboxer published message " \ + logger.info "Outboxer publisher id=#{publisher[:id]} published message " \ "id=#{message[:id]} " \ "messageable_type=#{message[:messageable_type]} " \ "messageable_id=#{message[:messageable_id]} " diff --git a/lib/outboxer/publisher.rb b/lib/outboxer/publisher.rb index 0ea11e87..8f58e22c 100644 --- a/lib/outboxer/publisher.rb +++ b/lib/outboxer/publisher.rb @@ -536,7 +536,9 @@ def handle_signal(id:, name:, logger:) # @param time [Time] The current time context. # @param process [Process] The process module for system metrics. # @param kernel [Kernel] The kernel module for sleeping operations. - # @yield [Hash] A block to handle the publishing of each message. + # @yield [publisher, messages] Yields publisher and messages to be published. + # @yieldparam publisher [Hash] A hash with keys `:id` and `:name` representing the publisher. + # @yieldparam messages [Array] An array of message hashes retrieved from the buffer. def publish_messages( name: "#{::Socket.gethostname}:#{::Process.pid}", buffer_size: PUBLISH_MESSAGES_DEFAULTS[:buffer_size], @@ -661,7 +663,7 @@ def publish_buffered_messages(id:, name:, buffered_messages:, logger:, &block) ids: buffered_message_ids, publisher_id: id, publisher_name: name) begin - block.call(publishing_messages) + block.call({ id: id, name: name }, publishing_messages) rescue ::Exception => error failed_messages = Message.failed_by_ids( ids: buffered_message_ids, exception: error, publisher_id: id, publisher_name: name) diff --git a/spec/bin/outboxer_publisher_spec.rb b/spec/bin/outboxer_publisher_spec.rb index b27be14e..9be15b20 100644 --- a/spec/bin/outboxer_publisher_spec.rb +++ b/spec/bin/outboxer_publisher_spec.rb @@ -27,7 +27,7 @@ # no output yet end - break if output.include?("Outboxer published message") + break if output.match(/published message/) sleep delay warn "Outboxer published message not found. Retrying (attempt #{attempt}/#{max_attempts})..." @@ -38,8 +38,8 @@ Process.wait(publisher_pid) read_io.close - expect(output).to include( - "Outboxer published message " \ + expect(output).to match( + "published message " \ "id=#{message[:id]} " \ "messageable_type=#{message[:messageable_type]} " \ "messageable_id=#{message[:messageable_id]} " diff --git a/spec/lib/outboxer/publisher/publish_messages_spec.rb b/spec/lib/outboxer/publisher/publish_messages_spec.rb index d7139df4..d2ca4394 100644 --- a/spec/lib/outboxer/publisher/publish_messages_spec.rb +++ b/spec/lib/outboxer/publisher/publish_messages_spec.rb @@ -29,7 +29,7 @@ module Outboxer sweep_batch_size: 100, logger: logger, kernel: kernel - ) do |_messages| # no op + ) do |_publisher, _messages| # no op end end @@ -62,7 +62,7 @@ module Outboxer sweep_batch_size: 100, logger: logger, kernel: kernel - ) do |_messages| # no op + ) do |_publisher, _messages| # no op end end @@ -86,7 +86,7 @@ module Outboxer sweep_batch_size: 1, logger: logger, kernel: kernel - ) do |_messages| # not called + ) do |_publisher, _messages| # no op end end @@ -115,7 +115,7 @@ module Outboxer sweep_batch_size: 100, logger: logger, kernel: kernel - ) do |_messages| # no op + ) do |_publisher, _messages| # no op end end @@ -143,7 +143,7 @@ module Outboxer buffer_size: buffer_size, poll_interval: poll_interval, tick_interval: tick_interval, - logger: logger, kernel: kernel) do |_messages| + logger: logger, kernel: kernel) do |_publisher, _messages| ::Process.kill("TTIN", ::Process.pid) end end @@ -169,7 +169,7 @@ module Outboxer poll_interval: poll_interval, tick_interval: tick_interval, logger: logger, - kernel: kernel) do |_messages| + kernel: kernel) do |_publisher, _messages| ::Process.kill("TSTP", ::Process.pid) end end @@ -193,7 +193,7 @@ module Outboxer poll_interval: poll_interval, tick_interval: tick_interval, logger: logger, - kernel: kernel) do |messages| + kernel: kernel) do |_publisher, messages| message = messages.first expect(message[:id]).to eq(queued_message.id) @@ -220,7 +220,7 @@ module Outboxer poll_interval: poll_interval, tick_interval: tick_interval, logger: logger, - kernel: kernel) do |_message| + kernel: kernel) do |_publisher, _message| ::Process.kill("TERM", ::Process.pid) raise standard_error @@ -262,7 +262,7 @@ module Outboxer poll_interval: poll_interval, tick_interval: tick_interval, logger: logger, - kernel: kernel) do |_buffer_sizeed_message| + kernel: kernel) do |_publisher, _buffer_sized_message| raise no_memory_error end end