From f6e3a26dad2769c92f2be4ddf8a48a88e654b091 Mon Sep 17 00:00:00 2001 From: bedrock-adam Date: Sat, 12 Apr 2025 17:37:07 +1000 Subject: [PATCH] commit changes --- bin/outboxer_publisher | 2 +- lib/outboxer/publisher.rb | 16 +++++------ ...ublish_spec.rb => publish_message_spec.rb} | 28 +++++++++---------- 3 files changed, 23 insertions(+), 23 deletions(-) rename spec/lib/outboxer/publisher/{publish_spec.rb => publish_message_spec.rb} (90%) diff --git a/bin/outboxer_publisher b/bin/outboxer_publisher index 2964204d..1cd1901d 100755 --- a/bin/outboxer_publisher +++ b/bin/outboxer_publisher @@ -23,7 +23,7 @@ database_config = Outboxer::Database.config( Outboxer::Database.connect(config: database_config, logger: logger) begin - Outboxer::Publisher.publish( + Outboxer::Publisher.publish_message( buffer: options[:buffer], concurrency: options[:concurrency], tick: options[:tick], diff --git a/lib/outboxer/publisher.rb b/lib/outboxer/publisher.rb index 099ff49f..9da4cfef 100644 --- a/lib/outboxer/publisher.rb +++ b/lib/outboxer/publisher.rb @@ -284,11 +284,11 @@ def create_publisher_threads(id:, name:, Thread.new do Thread.current.name = "publisher-#{index + 1}" - while (message = queue.pop) - break if message.nil? + while (buffered_message = queue.pop) + break if buffered_message.nil? - publish_message( - id: id, name: name, buffered_message: message, + publish_buffered_message( + id: id, name: name, buffered_message: buffered_message, logger: logger, &block) end end @@ -501,7 +501,7 @@ def handle_signal(id:, name:, logger:) log_level: 1 } - # Publishes messages by managing multiple threads. + # Publish queued messages concurrently # @param name [String] The name of the publisher. # @param buffer [Integer] The buffer size. # @param concurrency [Integer] The number of threads for concurrent publishing. @@ -513,7 +513,7 @@ def handle_signal(id:, name:, logger:) # @param process [Process] The process module for system metrics. # @param kernel [Kernel] The kernel module for sleeping operations. # @yield [Hash] A block to handle the publishing of each message. - def publish( + def publish_message( name: "#{::Socket.gethostname}:#{::Process.pid}", buffer: PUBLISH_DEFAULTS[:buffer], concurrency: PUBLISH_DEFAULTS[:concurrency], @@ -596,13 +596,13 @@ def publish( logger.info "Outboxer terminated" end - # Handles the publication process for a single message. + # Publishes a buffered message # @param id [Integer] The ID of the publisher. # @param name [String] The name of the publisher. # @param buffered_message [Hash] The message data retrieved from the buffer. # @param logger [Logger] Logger for recording the outcome of the publishing attempt. # @yield [Hash] A block to process the publishing of the message. - def publish_message(id:, name:, buffered_message:, logger:, &block) + def publish_buffered_message(id:, name:, buffered_message:, logger:, &block) publishing_message = Message.publishing( id: buffered_message[:id], publisher_id: id, publisher_name: name) diff --git a/spec/lib/outboxer/publisher/publish_spec.rb b/spec/lib/outboxer/publisher/publish_message_spec.rb similarity index 90% rename from spec/lib/outboxer/publisher/publish_spec.rb rename to spec/lib/outboxer/publisher/publish_message_spec.rb index 00ee3a9e..5662de32 100644 --- a/spec/lib/outboxer/publisher/publish_spec.rb +++ b/spec/lib/outboxer/publisher/publish_message_spec.rb @@ -3,7 +3,7 @@ module Outboxer RSpec.describe Publisher do - describe ".publish" do + describe ".publish_message" do let(:buffer) { 1 } let(:poll) { 1 } let(:tick) { 0.1 } @@ -18,8 +18,8 @@ module Outboxer context "when TTIN signal sent" do it "dumps stack trace" do - publish_thread = Thread.new do - Outboxer::Publisher.publish( + publish_message_thread = Thread.new do + Outboxer::Publisher.publish_message( buffer: buffer, poll: poll, tick: tick, @@ -32,7 +32,7 @@ module Outboxer ::Process.kill("TERM", ::Process.pid) - publish_thread.join + publish_message_thread.join expect(logger) .to have_received(:info) @@ -43,8 +43,8 @@ module Outboxer context "when stopped and resumed during message publishing" do it "stops and resumes the publishing process correctly" do - publish_thread = Thread.new do - Outboxer::Publisher.publish( + publish_message_thread = Thread.new do + Outboxer::Publisher.publish_message( buffer: buffer, poll: poll, tick: tick, @@ -60,13 +60,13 @@ module Outboxer ::Process.kill("TERM", ::Process.pid) - publish_thread.join + publish_message_thread.join end end context "when message published successfully" do it "sets the message to published" do - Publisher.publish( + Publisher.publish_message( buffer: buffer, poll: poll, tick: tick, @@ -89,7 +89,7 @@ module Outboxer let(:standard_error) { StandardError.new("some error") } before do - Publisher.publish( + Publisher.publish_message( buffer: buffer, poll: poll, tick: tick, @@ -113,7 +113,7 @@ module Outboxer expect(queued_message.exceptions[0].frames[0].index).to eq(0) expect(queued_message.exceptions[0].frames[0].text).to match( - /outboxer\/publisher\/publish_spec.rb:\d+:in `block \(6 levels\) in '/) + /outboxer\/publisher\/publish_message_spec.rb:\d+:in `block \(6 levels\) in '/) end it "logs errors" do @@ -130,7 +130,7 @@ module Outboxer let(:no_memory_error) { NoMemoryError.new } before do - Publisher.publish( + Publisher.publish_message( buffer: buffer, poll: poll, tick: tick, @@ -151,7 +151,7 @@ module Outboxer expect(queued_message.exceptions[0].frames[0].index).to eq(0) expect(queued_message.exceptions[0].frames[0].text).to match( - /outboxer\/publisher\/publish_spec.rb:\d+:in `block \(6 levels\) in '/) + /outboxer\/publisher\/publish_message_spec.rb:\d+:in `block \(6 levels\) in '/) end it "logs errors" do @@ -183,7 +183,7 @@ module Outboxer expect(logger).to receive(:error).with(include("StandardError: queue error")).once - Publisher.publish( + Publisher.publish_message( buffer: buffer, poll: poll, tick: tick, @@ -201,7 +201,7 @@ module Outboxer .with(include("NoMemoryError: failed to allocate memory")) .once - Publisher.publish( + Publisher.publish_message( buffer: buffer, poll: poll, tick: tick,