diff --git a/lib/outboxer/message.rb b/lib/outboxer/message.rb index 7f97b7cf..f77e68f7 100644 --- a/lib/outboxer/message.rb +++ b/lib/outboxer/message.rb @@ -5,6 +5,8 @@ module Message Status = Models::Message::Status + class Error < StandardError; end + # Queues a new message. # @param messageable [Object, nil] the object associated with the message. # @param time [Time] time context for setting timestamps. @@ -27,6 +29,197 @@ def queue(messageable:, time: ::Time) updated_at: message.updated_at) end + # Publishes the next queued message by yielding it to the caller’s block. + # + # Transitions the selected message from **queued** → **publishing**, + # yields it for processing, then marks it **published** on success or **failed** on error. + # + # - Logs informational, error, or fatal output if a logger is provided. + # - Rescues `StandardError` (marks failed, logs as error) and continues. + # - Rescues `Exception` (marks failed, logs as fatal) and re-raises. + # - Returns the message hash with its identifiers, or `nil` if no queued message exists. + # + # @param logger [#info, #error, #fatal, nil] optional logger for lifecycle logging + # @param time [Time, #now] a time source; must respond to `now` + # @yield [message] yields the message hash for publishing + # @yieldparam message [Hash] the message data with: + # - `:id` [Integer] + # - `:messageable_type` [String] + # - `:messageable_id` [Integer, String] + # @yieldreturn [Object] the result of the block, ignored by this method + # @return [Hash, nil] the yielded message hash or `nil` if no queued message was found + # @raise [Exception] re-raises any non-StandardError exceptions after marking failed + # @example + # Outboxer::Message.publish(logger: logger) do |message| + # TODO: # publish message to broker + # end + def publish(logger: nil, time: ::Time) + publishing_started_at = time.now.utc + message = Message.publishing(time: time) + + if message.nil? + nil + else + logger&.info( + "Outboxer message publishing id=#{message[:id]}" \ + "messageable_type=#{message[:messageable_type]} " \ + "messageable_id=#{message[:messageable_id]}") + + begin + yield message + rescue StandardError => error + publishing_failed(id: message[:id], error: error, time: time) + + logger&.error( + "Outboxer message publishing failed id=#{message[:id]} " \ + "duration_ms=#{((time.now.utc - publishing_started_at) * 1000).to_i}\n" \ + "#{error.class}: #{error.message}\n" \ + "#{error.backtrace.join("\n")}") + rescue Exception => error + publishing_failed(id: message[:id], error: error, time: time) + + logger&.fatal( + "Outboxer message publishing failed id=#{message[:id]} " \ + "duration_ms=#{((time.now.utc - publishing_started_at) * 1000).to_i}\n" \ + "#{error.class}: #{error.message}\n" \ + "#{error.backtrace.join("\n")}") + + raise + else + published(id: message[:id], time: time) + + logger&.info( + "Outboxer message published id=#{message[:id]} " \ + "duration_ms=#{((time.now.utc - publishing_started_at) * 1000).to_i}") + end + + message + end + end + + # Selects and locks the next available message in **queued** status + # and transitions it to **publishing**. + # + # Uses `FOR UPDATE SKIP LOCKED` to safely publish one message from the queue + # across concurrent processes. Updates `publishing_at` and `updated_at` + # to the current UTC time before returning. + # + # @param time [Time, #now] a time source; must respond to `now` + # @return [Hash, nil] a hash with: + # - `:id` [Integer] + # - `:messageable_type` [String] + # - `:messageable_id` [Integer, String] + # or `nil` if no queued message was available + # @note Runs inside a database transaction and uses the ActiveRecord connection pool. + # @example + # message = Outboxer::Message.publishing(time: Time) + def publishing(time: ::Time) + current_utc_time = time.now.utc + + ActiveRecord::Base.connection_pool.with_connection do + ActiveRecord::Base.transaction do + message = Models::Message + .select(:id, :messageable_type, :messageable_id) + .where(status: Status::QUEUED) + .order(:id) + .limit(1) + .lock("FOR UPDATE SKIP LOCKED") + .first + + if message.nil? + nil + else + message.update!( + status: Status::PUBLISHING, + updated_at: current_utc_time, + publishing_at: current_utc_time) + + { + id: message[:id], + messageable_type: message[:messageable_type], + messageable_id: message[:messageable_id] + } + end + end + end + end + + # Marks a message in **publishing** status as **published**. + # + # Ensures that the message is currently in the **publishing** state + # before transitioning. Updates `published_at` and `updated_at` + # to the current UTC time. + # + # @param id [Integer] the ID of the message to mark as published + # @param time [Time, #now] a time source; must respond to `now` + # @return [nil] + # @raise [Outboxer::Message::Error] if the message is not in publishing state + # @note Executes within a transaction using a `SELECT ... FOR UPDATE` lock. + # @example + # Outboxer::Message.published(id: message[:id], time: Time) + def published(id:, time: ::Time) + current_utc_time = time.now.utc + + ActiveRecord::Base.connection_pool.with_connection do + ActiveRecord::Base.transaction do + message = Models::Message.lock.find_by!(id: id) + + if message.status != Models::Message::Status::PUBLISHING + raise Error, "Status must be publishing" + end + + message.update!( + status: Status::PUBLISHED, + updated_at: current_utc_time, published_at: current_utc_time) + + nil + end + end + end + + # Marks a message in **publishing** status as **failed** and records the exception details. + # + # Sets `failed_at` and `updated_at` to the current UTC time, + # persists the error’s class, message, and backtrace (if present). + # Ensures the message is currently in **publishing** state before updating. + # + # @param id [Integer] the ID of the message to mark as failed + # @param error [Exception, nil] optional error to persist (class name, message text, backtrace) + # @param time [Time, #now] a time source; must respond to `now` + # @return [nil] + # @raise [Outboxer::Message::Error] if the message is not in publishing state + # @note Runs inside a transaction and uses `SELECT ... FOR UPDATE` locking. + # @example + # Outboxer::Message.publishing_failed(id: message[:id], time: Time) + def publishing_failed(id:, error: nil, time: ::Time) + current_utc_time = time.now.utc + + ActiveRecord::Base.connection_pool.with_connection do + ActiveRecord::Base.transaction do + message = Models::Message.lock.find_by!(id: id) + + if message.status != Models::Message::Status::PUBLISHING + raise Error, "Status must be publishing" + end + + message.update!( + status: Status::FAILED, + updated_at: current_utc_time, failed_at: current_utc_time) + + if error + exception = message.exceptions.create!( + class_name: error.class.name, message_text: error.message) + + Array(error.backtrace).each_with_index do |backtrace_line, index| + exception.frames.create!(index: index, text: backtrace_line) + end + end + + nil + end + end + end + # Serializes message attributes into a hash. # # @param id [Integer] message ID. diff --git a/lib/outboxer/publisher.rb b/lib/outboxer/publisher.rb index b9a46dce..ac054adc 100644 --- a/lib/outboxer/publisher.rb +++ b/lib/outboxer/publisher.rb @@ -505,7 +505,7 @@ def publish_message( publisher_threads = Array.new(concurrency) do |index| create_publisher_thread( - id: publisher[:id], name: name, index: index, + id: publisher[:id], index: index, poll_interval: poll_interval, tick_interval: tick_interval, logger: logger, process: process, kernel: kernel, &block) end @@ -535,7 +535,6 @@ def pool(concurrency:) end # @param id [Integer] Publisher id. - # @param name [String] Publisher name. # @param index [Integer] Zero-based thread index (used for thread name). # @param poll_interval [Numeric] Seconds to wait when no messages found. # @param tick_interval [Numeric] Seconds between signal checks during sleep. @@ -546,73 +545,31 @@ def pool(concurrency:) # e.g., `{ id: Integer, name: String }`. # @yieldparam messages [Array] Batch of messages to publish. # @return [Thread] The created publishing thread. - def create_publisher_thread(id:, name:, index:, + def create_publisher_thread(id:, index:, poll_interval:, tick_interval:, logger:, process:, kernel:, &block) Thread.new do begin Thread.current.name = "publisher-#{index + 1}" + # Thread.current.report_on_exception = true while !terminating? - messages = [] - begin - messages = buffer_messages(id: id, name: name, limit: 1) + published_message = Message.publish(logger: logger) do |message| + block.call({ id: id }, message) + end rescue StandardError => error logger.error( "#{error.class}: #{error.message}\n" \ "#{error.backtrace.join("\n")}") - end - - if messages.any? - begin - block.call({ id: id, name: name }, messages[0]) - rescue StandardError => error - logger.error( - "#{error.class}: #{error.message}\n" \ - "#{error.backtrace.join("\n")}") - - Publisher.update_messages( - id: id, - failed_messages: [ - { - id: messages[0][:id], - exception: { - class_name: error.class.name, - message_text: error.message, - backtrace: error.backtrace - } - } - ]) - rescue ::Exception => error - logger.fatal( - "#{error.class}: #{error.message}\n" \ - "#{error.backtrace.join("\n")}") - - Publisher.update_messages( - id: id, - failed_messages: [ - { - id: messages[0][:id], - exception: { - class_name: error.class.name, - message_text: error.message, - backtrace: error.backtrace - } - } - ]) - - terminate(id: id) - else - Outboxer::Publisher.update_messages( - id: id, published_message_ids: [messages[0][:id]]) - end else - Publisher.sleep( - poll_interval, - tick_interval: tick_interval, - process: process, - kernel: kernel) + if published_message.nil? + Publisher.sleep( + poll_interval, + tick_interval: tick_interval, + process: process, + kernel: kernel) + end end end rescue ::Exception => error @@ -672,119 +629,5 @@ def create_sweeper_thread(id:, sweep_interval:, sweep_retention:, sweep_batch_si end end end - - # Marks queued messages as buffered. - # - # @param id [Integer] the ID of the publisher. - # @param name [String] the name of the publisher. - # @param limit [Integer] the number of messages to buffer. - # @param time [Time] current time context used to update timestamps. - # @return [Array] buffered messages. - def buffer_messages(id:, name:, limit: 1, time: ::Time) - current_utc_time = time.now.utc - messages = [] - - ActiveRecord::Base.connection_pool.with_connection do - ActiveRecord::Base.transaction do - messages = Models::Message - .where(status: Message::Status::QUEUED) - .order(updated_at: :asc) - .limit(limit) - .lock("FOR UPDATE SKIP LOCKED") - .pluck(:id, :messageable_type, :messageable_id) - - message_ids = messages.map(&:first) - - updated_rows = Models::Message - .where(id: message_ids, status: Message::Status::QUEUED) - .update_all( - status: Message::Status::PUBLISHING, - updated_at: current_utc_time, - buffered_at: current_utc_time, - publisher_id: id, - publisher_name: name) - - if updated_rows != message_ids.size - raise ArgumentError, "Some messages not buffered" - end - end - end - - messages.map do |message_id, messageable_type, messageable_id| - { - id: message_id, - messageable_type: messageable_type, - messageable_id: messageable_id - } - end - end - - # Updates messages as published or failed. - # - # @param id [Integer] - # @param published_message_ids [Array] - # @param failed_messages [Array] Array of failed message hashes: - # [ - # { - # id: Integer, - # exception: { - # class_name: String, - # message_text: String, - # backtrace: Array - # } - # } - # ] - # @param time [Time] - # @return [nil] - def update_messages(id:, published_message_ids: [], failed_messages: [], - time: ::Time) - current_utc_time = time.now.utc - - ActiveRecord::Base.connection_pool.with_connection do - ActiveRecord::Base.transaction do - if published_message_ids.any? - messages = Models::Message - .where(id: published_message_ids, status: Message::Status::PUBLISHING) - .lock("FOR UPDATE") - .pluck(:id) - - if messages.size != published_message_ids.size - raise ArgumentError, "Some messages publishing not locked for update" - end - - updated_rows = Models::Message - .where(status: Status::PUBLISHING, id: published_message_ids) - .update_all( - status: Message::Status::PUBLISHED, - updated_at: current_utc_time, - published_at: current_utc_time, - publisher_id: id) - - if updated_rows != published_message_ids.size - raise ArgumentError, "Some messages publishing not updated to published" - end - end - - failed_messages.each do |failed_message| - message = Models::Message - .lock("FOR UPDATE") - .find_by!(id: failed_message[:id], status: Message::Status::PUBLISHING) - - message.update!(status: Message::Status::FAILED, updated_at: current_utc_time) - - exception = message.exceptions.create!( - class_name: failed_message[:exception][:class_name], - message_text: failed_message[:exception][:message_text], - created_at: current_utc_time) - - (failed_message[:exception][:backtrace] || []).each_with_index do |frame, index| - exception.frames.create!(index: index, text: frame) - end - end - end - end - - nil - end end end diff --git a/spec/lib/outboxer/message/publish_spec.rb b/spec/lib/outboxer/message/publish_spec.rb new file mode 100644 index 00000000..57c40759 --- /dev/null +++ b/spec/lib/outboxer/message/publish_spec.rb @@ -0,0 +1,139 @@ +require "rails_helper" + +module Outboxer + RSpec.describe Message, type: :service do + describe ".publish" do + context "without a logger" do + context "when a queued message exists" do + let!(:queued_message) do + Models::Message.create!( + status: Models::Message::Status::QUEUED, + messageable_type: "Event", + messageable_id: "1", + queued_at: ::Time.now.utc + ) + end + + it "yields { id: ... } and marks record published when block succeeds" do + yielded_id = nil + + result = Message.publish do |message| + yielded_id = message[:id] + # publish to broker + end + + expect(yielded_id).to eq(queued_message.id) + expect(result).to eq({ + id: queued_message.id, + messageable_type: queued_message.messageable_type, + messageable_id: queued_message.messageable_id + }) + expect(queued_message.reload.status) + .to eq(Models::Message::Status::PUBLISHED) + end + + it "returns { id: ... } and marks as failed on StandardError" do + result = Message.publish do |_message| + raise StandardError, "temporary failure" + end + + expect(result).to eq({ + id: queued_message.id, + messageable_type: queued_message.messageable_type, + messageable_id: queued_message.messageable_id + }) + expect(queued_message.reload.status) + .to eq(Models::Message::Status::FAILED) + end + + it "marks failed and re-raises on fatal Exception" do + expect do + Message.publish do |_message| + raise Exception, "fatal crash" # rubocop:disable Lint/RaiseException + end + end.to raise_error(Exception, "fatal crash") + + expect(queued_message.reload.status) + .to eq(Models::Message::Status::FAILED) + end + end + + context "when no queued messages exist" do + it "returns nil" do + result = Message.publish { raise "block should not be called" } + + expect(result).to be_nil + expect( + Models::Message.where(status: Models::Message::Status::QUEUED).count + ).to eq(0) + end + end + end + + context "with a logger" do + let(:logger) { instance_double(Logger, info: nil, error: nil, fatal: nil) } + + context "when a queued message exists" do + let!(:queued_message) do + Models::Message.create!( + status: Models::Message::Status::QUEUED, + messageable_type: "Event", + messageable_id: "1", + queued_at: ::Time.now.utc + ) + end + + it "logs the 'publishing' line" do + Message.publish(logger: logger) { |_message| } # rubocop:disable Lint/EmptyBlock + + expect(logger).to have_received(:info).with( + a_string_matching(/Outboxer message publishing id=\d+/) + ) + end + + it "logs 'publishing failed' on StandardError" do + Message.publish(logger: logger) do |_message| + raise StandardError, "temporary failure" + end + + expect(logger).to have_received(:error).with( + a_string_matching(/Outboxer message publishing failed id=\d+/) + ) + end + + it "logs 'fatal' on fatal Exception and re-raises" do + expect do + Message.publish(logger: logger) do |_message| + raise Exception, "fatal crash" # rubocop:disable Lint/RaiseException + end + end.to raise_error(Exception, "fatal crash") + + expect(logger).to have_received(:fatal).with( + a_string_matching(/Outboxer message publishing failed id=\d+/) + ) + end + end + + context "when no queued messages exist" do + let!(:queued_message) do + Models::Message.create!( + status: Models::Message::Status::PUBLISHED, + messageable_type: "Event", + messageable_id: "1", + queued_at: ::Time.now.utc + ) + end + + it "returns nil and does not log" do + result = Message.publish(logger: logger) { raise "should not run" } + + expect(result).to be_nil + expect(logger).not_to have_received(:info) + expect(logger).not_to have_received(:error) + expect(logger).not_to have_received(:fatal) + end + end + end + end + end +end diff --git a/spec/lib/outboxer/message/requeue_spec.rb b/spec/lib/outboxer/message/requeue_spec.rb index 6fc315c3..29bb4d8c 100644 --- a/spec/lib/outboxer/message/requeue_spec.rb +++ b/spec/lib/outboxer/message/requeue_spec.rb @@ -31,18 +31,6 @@ module Outboxer expect(queued_message.status).to eq(Message::Status::QUEUED) expect(queued_message.updated_at).to be > updated_at end - - it "does not delete queued message" do - begin - Publisher.update_messages( - id: 1, published_message_ids: [queued_message.id]) - rescue ArgumentError - # ignore - end - - expect(Models::Message.count).to eq(1) - expect(Models::Message.first).to eq(queued_message) - end end end end diff --git a/spec/lib/outboxer/publisher/buffer_messages_spec.rb b/spec/lib/outboxer/publisher/buffer_messages_spec.rb deleted file mode 100644 index b86a95cc..00000000 --- a/spec/lib/outboxer/publisher/buffer_messages_spec.rb +++ /dev/null @@ -1,39 +0,0 @@ -require "rails_helper" - -module Outboxer - RSpec.describe Publisher do - let(:id) { 1 } - let(:name) { "test" } - - describe ".buffer_messages" do - context "when there are 2 queued messages" do - let!(:queued_messages) do - [ - create(:outboxer_message, :queued, updated_at: 2.minutes.ago), - create(:outboxer_message, :queued, updated_at: 1.minute.ago) - ] - end - - context "when limit is 1" do - let!(:buffered_messages) { Publisher.buffer_messages(id: id, name: name, limit: 1) } - - it "returns first buffered message" do - expect(buffered_messages.count).to eq(1) - - buffered_message = buffered_messages.first - expect(buffered_message[:id]).to eq(queued_messages[0].id) - end - - it "keeps last queued message" do - remaining_messages = Models::Message.where(status: Message::Status::QUEUED) - - expect(remaining_messages.count).to eq(1) - - remaining_message = remaining_messages.last - expect(remaining_message).to eq(queued_messages.last) - end - end - end - end - end -end diff --git a/spec/lib/outboxer/publisher/publish_message_spec.rb b/spec/lib/outboxer/publisher/publish_message_spec.rb index 75aef789..75ad0aa2 100644 --- a/spec/lib/outboxer/publisher/publish_message_spec.rb +++ b/spec/lib/outboxer/publisher/publish_message_spec.rb @@ -271,15 +271,15 @@ module Outboxer it "logs errors" do expect(logger).to have_received(:fatal).with( - a_string_matching("#{no_memory_error.class}: #{no_memory_error.message}")).once + a_string_matching("#{no_memory_error.class}: #{no_memory_error.message}")).twice end end - context "when buffer_messages raises a StandardError" do + context "when Message.publishing raises a StandardError" do it "logs the error and continues processing" do call_count = 0 - allow(Publisher).to receive(:buffer_messages) do + allow(Message).to receive(:publishing) do call_count += 1 case call_count @@ -288,7 +288,7 @@ module Outboxer else ::Process.kill("TERM", ::Process.pid) - [] + nil end end @@ -304,9 +304,9 @@ module Outboxer end end - context "when buffer_messages raises an Exception" do + context "when Message.publishing raises an Exception" do it "logs the exception and shuts down" do - allow(Publisher).to receive(:buffer_messages) + allow(Message).to receive(:publishing) .and_raise(NoMemoryError, "failed to allocate memory") expect(logger).to receive(:fatal) diff --git a/spec/lib/outboxer/publisher/update_messages_spec.rb b/spec/lib/outboxer/publisher/update_messages_spec.rb deleted file mode 100644 index eb50083d..00000000 --- a/spec/lib/outboxer/publisher/update_messages_spec.rb +++ /dev/null @@ -1,85 +0,0 @@ -require "rails_helper" - -module Outboxer - RSpec.describe Publisher do - let(:id) { 1 } - - describe ".update_messages" do - context "when messages transition from publishing" do - let!(:message_to_mark_failed) { create(:outboxer_message, :publishing) } - let!(:message_to_mark_published) { create(:outboxer_message, :publishing) } - - let(:backtrace) do - ["a.rb:1:in `a'", "b.rb:2:in `b'", "c.rb:3:in `c'"] - end - - let(:failed_messages) do - [{ - id: message_to_mark_failed.id, - exception: { - class_name: "Sidekiq::ClientError", - message_text: "Job not enqueued (blocked by client middleware)", - backtrace: backtrace - } - }] - end - - before do - Publisher.update_messages( - id: id, - published_message_ids: [message_to_mark_published.id], - failed_messages: failed_messages) - end - - it "marks the published message as published" do - expect(Models::Message.published.pluck(:id)) - .to eq([message_to_mark_published.id]) - end - - it "marks the failed message as failed" do - expect(Models::Message.failed.pluck(:id)) - .to eq([message_to_mark_failed.id]) - end - - it "writes the exception row for the failed message" do - exception = Models::Exception.find_by(message_id: message_to_mark_failed.id) - - expect(exception).to be_present - expect(exception.class_name).to eq("Sidekiq::ClientError") - expect(exception.message_text) - .to eq("Job not enqueued (blocked by client middleware)") - end - - it "writes frames in order for the backtrace" do - exception = Models::Exception.find_by(message_id: message_to_mark_failed.id) - - expect(exception.frames.count).to eq(backtrace.size) - expect(exception.frames.order(:index).pluck(:text)) - .to eq(backtrace) - expect(exception.frames.order(:index).pluck(:index)) - .to eq([0, 1, 2]) - end - end - - context "when exception has no backtrace" do - let!(:message_to_mark_failed) { create(:outboxer_message, :publishing) } - - it "creates exception without frames" do - Publisher.update_messages( - id: id, - failed_messages: [{ - id: message_to_mark_failed.id, - exception: { class_name: "RuntimeError", message_text: "oops" } - }], - time: Time - ) - - exception = Models::Exception.find_by(message_id: message_to_mark_failed.id) - - expect(exception).to be_present - expect(exception.frames).to be_empty - end - end - end - end -end