Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/outboxer_publisher
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ database_config = Outboxer::Database.config(
Outboxer::Database.connect(config: database_config, logger: logger)

begin
Outboxer::Publisher.publish(
Outboxer::Publisher.publish_message(
buffer: options[:buffer],
concurrency: options[:concurrency],
tick: options[:tick],
Expand Down
16 changes: 8 additions & 8 deletions lib/outboxer/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -284,11 +284,11 @@ def create_publisher_threads(id:, name:,
Thread.new do
Thread.current.name = "publisher-#{index + 1}"

while (message = queue.pop)
break if message.nil?
while (buffered_message = queue.pop)
break if buffered_message.nil?

publish_message(
id: id, name: name, buffered_message: message,
publish_buffered_message(
id: id, name: name, buffered_message: buffered_message,
logger: logger, &block)
end
end
Expand Down Expand Up @@ -501,7 +501,7 @@ def handle_signal(id:, name:, logger:)
log_level: 1
}

# Publishes messages by managing multiple threads.
# Publish queued messages concurrently
# @param name [String] The name of the publisher.
# @param buffer [Integer] The buffer size.
# @param concurrency [Integer] The number of threads for concurrent publishing.
Expand All @@ -513,7 +513,7 @@ def handle_signal(id:, name:, logger:)
# @param process [Process] The process module for system metrics.
# @param kernel [Kernel] The kernel module for sleeping operations.
# @yield [Hash] A block to handle the publishing of each message.
def publish(
def publish_message(
name: "#{::Socket.gethostname}:#{::Process.pid}",
buffer: PUBLISH_DEFAULTS[:buffer],
concurrency: PUBLISH_DEFAULTS[:concurrency],
Expand Down Expand Up @@ -596,13 +596,13 @@ def publish(
logger.info "Outboxer terminated"
end

# Handles the publication process for a single message.
# Publishes a buffered message
# @param id [Integer] The ID of the publisher.
# @param name [String] The name of the publisher.
# @param buffered_message [Hash] The message data retrieved from the buffer.
# @param logger [Logger] Logger for recording the outcome of the publishing attempt.
# @yield [Hash] A block to process the publishing of the message.
def publish_message(id:, name:, buffered_message:, logger:, &block)
def publish_buffered_message(id:, name:, buffered_message:, logger:, &block)
publishing_message = Message.publishing(
id: buffered_message[:id], publisher_id: id, publisher_name: name)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

module Outboxer
RSpec.describe Publisher do
describe ".publish" do
describe ".publish_message" do
let(:buffer) { 1 }
let(:poll) { 1 }
let(:tick) { 0.1 }
Expand All @@ -18,8 +18,8 @@ module Outboxer

context "when TTIN signal sent" do
it "dumps stack trace" do
publish_thread = Thread.new do
Outboxer::Publisher.publish(
publish_message_thread = Thread.new do
Outboxer::Publisher.publish_message(
buffer: buffer,
poll: poll,
tick: tick,
Expand All @@ -32,7 +32,7 @@ module Outboxer

::Process.kill("TERM", ::Process.pid)

publish_thread.join
publish_message_thread.join

expect(logger)
.to have_received(:info)
Expand All @@ -43,8 +43,8 @@ module Outboxer

context "when stopped and resumed during message publishing" do
it "stops and resumes the publishing process correctly" do
publish_thread = Thread.new do
Outboxer::Publisher.publish(
publish_message_thread = Thread.new do
Outboxer::Publisher.publish_message(
buffer: buffer,
poll: poll,
tick: tick,
Expand All @@ -60,13 +60,13 @@ module Outboxer

::Process.kill("TERM", ::Process.pid)

publish_thread.join
publish_message_thread.join
end
end

context "when message published successfully" do
it "sets the message to published" do
Publisher.publish(
Publisher.publish_message(
buffer: buffer,
poll: poll,
tick: tick,
Expand All @@ -89,7 +89,7 @@ module Outboxer
let(:standard_error) { StandardError.new("some error") }

before do
Publisher.publish(
Publisher.publish_message(
buffer: buffer,
poll: poll,
tick: tick,
Expand All @@ -113,7 +113,7 @@ module Outboxer
expect(queued_message.exceptions[0].frames[0].index).to eq(0)

expect(queued_message.exceptions[0].frames[0].text).to match(
/outboxer\/publisher\/publish_spec.rb:\d+:in `block \(6 levels\) in <module:Outboxer>'/)
/outboxer\/publisher\/publish_message_spec.rb:\d+:in `block \(6 levels\) in <module:Outboxer>'/)
end

it "logs errors" do
Expand All @@ -130,7 +130,7 @@ module Outboxer
let(:no_memory_error) { NoMemoryError.new }

before do
Publisher.publish(
Publisher.publish_message(
buffer: buffer,
poll: poll,
tick: tick,
Expand All @@ -151,7 +151,7 @@ module Outboxer

expect(queued_message.exceptions[0].frames[0].index).to eq(0)
expect(queued_message.exceptions[0].frames[0].text).to match(
/outboxer\/publisher\/publish_spec.rb:\d+:in `block \(6 levels\) in <module:Outboxer>'/)
/outboxer\/publisher\/publish_message_spec.rb:\d+:in `block \(6 levels\) in <module:Outboxer>'/)
end

it "logs errors" do
Expand Down Expand Up @@ -183,7 +183,7 @@ module Outboxer

expect(logger).to receive(:error).with(include("StandardError: queue error")).once

Publisher.publish(
Publisher.publish_message(
buffer: buffer,
poll: poll,
tick: tick,
Expand All @@ -201,7 +201,7 @@ module Outboxer
.with(include("NoMemoryError: failed to allocate memory"))
.once

Publisher.publish(
Publisher.publish_message(
buffer: buffer,
poll: poll,
tick: tick,
Expand Down