diff --git a/lib/outboxer/publisher.rb b/lib/outboxer/publisher.rb index b2136fcf..5fd90ce7 100644 --- a/lib/outboxer/publisher.rb +++ b/lib/outboxer/publisher.rb @@ -9,6 +9,10 @@ def terminating? @status == Status::TERMINATING end + def publishing? + @status == Status::PUBLISHING + end + # Parses command line arguments to configure the publisher. # @param args [Array] The arguments passed via the command line. # @return [Hash] The parsed options. @@ -549,28 +553,33 @@ def create_publisher_thread(id:, index:, # Thread.current.report_on_exception = true while !terminating? - begin - published_message = Message.publish(logger: logger) do |message| - block.call({ id: id }, message) - end - rescue StandardError => error - logger.error( - "#{error.class}: #{error.message}\n" \ - "#{error.backtrace.join("\n")}") - - Publisher.sleep( - poll_interval, - tick_interval: tick_interval, - process: process, - kernel: kernel) - else - if published_message.nil? + if publishing? + begin + published_message = Message.publish(logger: logger) do |message| + block.call({ id: id }, message) + end + rescue StandardError => error + logger.error( + "#{error.class}: #{error.message}\n" \ + "#{error.backtrace.join("\n")}") + Publisher.sleep( poll_interval, tick_interval: tick_interval, process: process, kernel: kernel) + else + if published_message.nil? + Publisher.sleep( + poll_interval, + tick_interval: tick_interval, + process: process, + kernel: kernel) + end end + else + Publisher.sleep(tick_interval, tick_interval: tick_interval, + process: process, kernel: kernel) end end rescue ::Exception => error