Skip to content

Commit 32b12d9

Browse files
authored
Merge 111d9a1 into d2293c9
2 parents d2293c9 + 111d9a1 commit 32b12d9

File tree

4 files changed

+38
-36
lines changed

4 files changed

+38
-36
lines changed

README.md

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,15 @@ end
5151
```ruby
5252
# bin/outboxer_publisher
5353

54-
Outboxer::Publisher.publish_message(...) do |message|
55-
# TODO: publish message here
56-
57-
logger.info "Outboxer published message " \
58-
"id=#{message[:id]} " \
59-
"messageable_type=#{message[:messageable_type]} " \
60-
"messageable_id=#{message[:messageable_id]} "
54+
Outboxer::Publisher.publish_messages do |messages|
55+
# TODO: publish messages here
56+
57+
messages.each do |message|
58+
logger.info "Outboxer published message " \
59+
"id=#{message[:id]} " \
60+
"messageable_type=#{message[:messageable_type]} " \
61+
"messageable_id=#{message[:messageable_id]} "
62+
end
6163
end
6264
```
6365

bin/outboxer_publisher

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ cli_options = Outboxer::Publisher.parse_cli_options(ARGV)
77
environment = cli_options.delete(:environment) || ENV["APP_ENV"] || ENV["RAILS_ENV"] || "development"
88
config_path = cli_options.delete(:config) || Outboxer::Publisher::CONFIG_DEFAULTS[:path]
99
config = Outboxer::Publisher.config(environment: environment, path: config_path)
10-
options = Outboxer::Publisher::PUBLISH_MESSAGE_DEFAULTS.merge(config.merge(cli_options))
10+
options = Outboxer::Publisher::PUBLISH_MESSAGES_DEFAULTS.merge(config.merge(cli_options))
1111
logger = Outboxer::Logger.new($stdout, level: options[:log_level])
1212

1313
database_config = Outboxer::Database.config(
@@ -16,7 +16,7 @@ database_config = Outboxer::Database.config(
1616
Outboxer::Database.connect(config: database_config, logger: logger)
1717

1818
begin
19-
Outboxer::Publisher.publish_message(
19+
Outboxer::Publisher.publish_messages(
2020
batch_size: options[:batch_size],
2121
buffer_size: options[:buffer_size],
2222
concurrency: options[:concurrency],

lib/outboxer/publisher.rb

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,8 @@ def config(
8888

8989
yaml = YAML.safe_load(erb_result, permitted_classes: [Symbol], aliases: true)
9090
yaml.deep_symbolize_keys!
91-
yaml_override = yaml.fetch(environment&.to_sym, {}).slice(*PUBLISH_MESSAGE_DEFAULTS.keys)
92-
yaml.slice(*PUBLISH_MESSAGE_DEFAULTS.keys).merge(yaml_override)
91+
yaml_override = yaml.fetch(environment&.to_sym, {}).slice(*PUBLISH_MESSAGES_DEFAULTS.keys)
92+
yaml.slice(*PUBLISH_MESSAGES_DEFAULTS.keys).merge(yaml_override)
9393
rescue Errno::ENOENT
9494
{}
9595
end
@@ -499,7 +499,7 @@ def handle_signal(id:, name:, logger:)
499499
end
500500
end
501501

502-
PUBLISH_MESSAGE_DEFAULTS = {
502+
PUBLISH_MESSAGES_DEFAULTS = {
503503
batch_size: 10,
504504
buffer_size: 100,
505505
concurrency: 1,
@@ -522,15 +522,15 @@ def handle_signal(id:, name:, logger:)
522522
# @param process [Process] The process module for system metrics.
523523
# @param kernel [Kernel] The kernel module for sleeping operations.
524524
# @yield [Hash] A block to handle the publishing of each message.
525-
def publish_message(
525+
def publish_messages(
526526
name: "#{::Socket.gethostname}:#{::Process.pid}",
527-
batch_size: PUBLISH_MESSAGE_DEFAULTS[:batch_size],
528-
buffer_size: PUBLISH_MESSAGE_DEFAULTS[:buffer_size],
529-
concurrency: PUBLISH_MESSAGE_DEFAULTS[:concurrency],
530-
tick_interval: PUBLISH_MESSAGE_DEFAULTS[:tick_interval],
531-
poll_interval: PUBLISH_MESSAGE_DEFAULTS[:poll_interval],
532-
heartbeat_interval: PUBLISH_MESSAGE_DEFAULTS[:heartbeat_interval],
533-
logger: Logger.new($stdout, level: PUBLISH_MESSAGE_DEFAULTS[:log_level]),
527+
batch_size: PUBLISH_MESSAGES_DEFAULTS[:batch_size],
528+
buffer_size: PUBLISH_MESSAGES_DEFAULTS[:buffer_size],
529+
concurrency: PUBLISH_MESSAGES_DEFAULTS[:concurrency],
530+
tick_interval: PUBLISH_MESSAGES_DEFAULTS[:tick_interval],
531+
poll_interval: PUBLISH_MESSAGES_DEFAULTS[:poll_interval],
532+
heartbeat_interval: PUBLISH_MESSAGES_DEFAULTS[:heartbeat_interval],
533+
logger: Logger.new($stdout, level: PUBLISH_MESSAGES_DEFAULTS[:log_level]),
534534
time: ::Time, process: ::Process, kernel: ::Kernel,
535535
&block
536536
)

spec/lib/outboxer/publisher/publish_message_spec.rb renamed to spec/lib/outboxer/publisher/publish_messages_spec.rb

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
module Outboxer
55
RSpec.describe Publisher do
6-
describe ".publish_message" do
6+
describe ".publish_messages" do
77
let(:batch_size) { 1 }
88
let(:buffer_size) { 1 }
99
let(:poll_interval) { 1 }
@@ -19,13 +19,13 @@ module Outboxer
1919

2020
context "when TTIN signal sent" do
2121
it "dumps stack trace" do
22-
publish_message_thread = Thread.new do
23-
Outboxer::Publisher.publish_message(
22+
publish_messages_thread = Thread.new do
23+
Outboxer::Publisher.publish_messages(
2424
batch_size: batch_size,
2525
buffer_size: buffer_size,
2626
poll_interval: poll_interval,
2727
tick_interval: tick_interval,
28-
logger: logger, kernel: kernel) do |_message|
28+
logger: logger, kernel: kernel) do |_messages|
2929
::Process.kill("TTIN", ::Process.pid)
3030
end
3131
end
@@ -34,7 +34,7 @@ module Outboxer
3434

3535
::Process.kill("TERM", ::Process.pid)
3636

37-
publish_message_thread.join
37+
publish_messages_thread.join
3838

3939
expect(logger)
4040
.to have_received(:info)
@@ -45,14 +45,14 @@ module Outboxer
4545

4646
context "when stopped and resumed during message publishing" do
4747
it "stops and resumes the publishing process correctly" do
48-
publish_message_thread = Thread.new do
49-
Outboxer::Publisher.publish_message(
48+
publish_messages_thread = Thread.new do
49+
Outboxer::Publisher.publish_messages(
5050
batch_size: batch_size,
5151
buffer_size: buffer_size,
5252
poll_interval: poll_interval,
5353
tick_interval: tick_interval,
5454
logger: logger,
55-
kernel: kernel) do |_message|
55+
kernel: kernel) do |_messages|
5656
::Process.kill("TSTP", ::Process.pid)
5757
end
5858
end
@@ -63,13 +63,13 @@ module Outboxer
6363

6464
::Process.kill("TERM", ::Process.pid)
6565

66-
publish_message_thread.join
66+
publish_messages_thread.join
6767
end
6868
end
6969

7070
context "when message published successfully" do
7171
it "sets the message to published" do
72-
Publisher.publish_message(
72+
Publisher.publish_messages(
7373
batch_size: batch_size,
7474
buffer_size: buffer_size,
7575
poll_interval: poll_interval,
@@ -95,7 +95,7 @@ module Outboxer
9595
let(:standard_error) { StandardError.new("some error") }
9696

9797
before do
98-
Publisher.publish_message(
98+
Publisher.publish_messages(
9999
batch_size: batch_size,
100100
buffer_size: buffer_size,
101101
poll_interval: poll_interval,
@@ -120,7 +120,7 @@ module Outboxer
120120
expect(queued_message.exceptions[0].frames[0].index).to eq(0)
121121

122122
expect(queued_message.exceptions[0].frames[0].text).to match(
123-
/outboxer\/publisher\/publish_message_spec.rb:\d+:in `block \(6 levels\) in <module:Outboxer>'/)
123+
/outboxer\/publisher\/publish_messages_spec.rb:\d+:in `block \(6 levels\) in <module:Outboxer>'/)
124124
end
125125

126126
it "logs errors" do
@@ -137,7 +137,7 @@ module Outboxer
137137
let(:no_memory_error) { NoMemoryError.new }
138138

139139
before do
140-
Publisher.publish_message(
140+
Publisher.publish_messages(
141141
batch_size: batch_size,
142142
buffer_size: buffer_size,
143143
poll_interval: poll_interval,
@@ -159,7 +159,7 @@ module Outboxer
159159

160160
expect(queued_message.exceptions[0].frames[0].index).to eq(0)
161161
expect(queued_message.exceptions[0].frames[0].text).to match(
162-
/outboxer\/publisher\/publish_message_spec.rb:\d+:in `block \(6 levels\) in <module:Outboxer>'/)
162+
/outboxer\/publisher\/publish_messages_spec.rb:\d+:in `block \(6 levels\) in <module:Outboxer>'/)
163163
end
164164

165165
it "logs errors" do
@@ -191,7 +191,7 @@ module Outboxer
191191

192192
expect(logger).to receive(:error).with(include("StandardError: queue error")).once
193193

194-
Publisher.publish_message(
194+
Publisher.publish_messages(
195195
batch_size: batch_size,
196196
buffer_size: buffer_size,
197197
poll_interval: poll_interval,
@@ -210,7 +210,7 @@ module Outboxer
210210
.with(include("NoMemoryError: failed to allocate memory"))
211211
.once
212212

213-
Publisher.publish_message(
213+
Publisher.publish_messages(
214214
batch_size: batch_size,
215215
buffer_size: buffer_size,
216216
poll_interval: poll_interval,

0 commit comments

Comments
 (0)