Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions bin/outboxer_publisher
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ begin
tick_interval: options[:tick_interval],
poll_interval: options[:poll_interval],
heartbeat_interval: options[:heartbeat_interval],
sweep_interval: options[:sweep_interval],
sweep_retention: options[:sweep_retention],
sweep_batch_size: options[:sweep_batch_size],
logger: logger
) do |publisher, message|
# TODO: publish message here
Expand Down
95 changes: 3 additions & 92 deletions lib/outboxer/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,6 @@ def self.parse_cli_options(args)
options[:heartbeat_interval] = v
end

opts.on("--sweep-interval SECS", Float, "Sweep interval in seconds") do |v|
options[:sweep_interval] = v
end

opts.on("--sweep-retention SECS", Float, "Sweep retention in seconds") do |v|
options[:sweep_retention] = v
end

opts.on("--sweep-batch-size SIZE", Integer, "Sweep batch size") do |v|
options[:sweep_batch_size] = v
end

opts.on("--log-level LEVEL", Integer, "Log level") do |v|
options[:log_level] = v
end
Expand Down Expand Up @@ -138,7 +126,6 @@ def all
# @return [Hash] Details of the created publisher.
def create(name:, concurrency:,
tick_interval:, poll_interval:, heartbeat_interval:,
sweep_interval:, sweep_retention:, sweep_batch_size:,
time: ::Time)
ActiveRecord::Base.connection_pool.with_connection do
ActiveRecord::Base.transaction do
Expand All @@ -151,10 +138,7 @@ def create(name:, concurrency:,
"concurrency" => concurrency,
"tick_interval" => tick_interval,
"poll_interval" => poll_interval,
"heartbeat_interval" => heartbeat_interval,
"sweep_interval" => sweep_interval,
"sweep_retention" => sweep_retention,
"sweep_batch_size" => sweep_batch_size
"heartbeat_interval" => heartbeat_interval
},
metrics: {
"throughput" => 0,
Expand Down Expand Up @@ -428,9 +412,6 @@ def handle_signal(id:, name:, logger:)
tick_interval: 0.1,
poll_interval: 5.0,
heartbeat_interval: 5.0,
sweep_interval: 60,
sweep_retention: 60,
sweep_batch_size: 100,
log_level: 1
}

Expand All @@ -440,9 +421,6 @@ def handle_signal(id:, name:, logger:)
# @param tick_interval [Float] The tick interval in seconds.
# @param poll_interval [Float] The poll interval in seconds.
# @param heartbeat_interval [Float] The heartbeat interval in seconds.
# @param sweep_interval [Float] The interval in seconds between sweeper runs.
# @param sweep_retention [Float] The retention period in seconds for published messages.
# @param sweep_batch_size [Integer] The maximum number of messages to delete per batch.
# @param logger [Logger] Logger for recording publishing activities.
# @param time [Time] The current time context.
# @param process [Process] The process module for system metrics.
Expand All @@ -456,9 +434,6 @@ def publish_message(
tick_interval: PUBLISH_MESSAGE_DEFAULTS[:tick_interval],
poll_interval: PUBLISH_MESSAGE_DEFAULTS[:poll_interval],
heartbeat_interval: PUBLISH_MESSAGE_DEFAULTS[:heartbeat_interval],
sweep_interval: PUBLISH_MESSAGE_DEFAULTS[:sweep_interval],
sweep_retention: PUBLISH_MESSAGE_DEFAULTS[:sweep_retention],
sweep_batch_size: PUBLISH_MESSAGE_DEFAULTS[:sweep_batch_size],
logger: Logger.new($stdout, level: PUBLISH_MESSAGE_DEFAULTS[:log_level]),
time: ::Time, process: ::Process, kernel: ::Kernel,
&block
Expand All @@ -471,9 +446,6 @@ def publish_message(
"tick_interval=#{tick_interval} " \
"poll_interval=#{poll_interval}, " \
"heartbeat_interval=#{heartbeat_interval}, " \
"sweep_interval=#{sweep_interval}, " \
"sweep_retention=#{sweep_retention}, " \
"sweep_batch_size=#{sweep_batch_size}, " \
"log_level=#{logger.level}"

Setting.create_all
Expand All @@ -483,26 +455,12 @@ def publish_message(
concurrency: concurrency,
tick_interval: tick_interval,
poll_interval: poll_interval,
heartbeat_interval: heartbeat_interval,
sweep_interval: sweep_interval,
sweep_retention: sweep_retention,
sweep_batch_size: sweep_batch_size)
heartbeat_interval: heartbeat_interval)

heartbeat_thread = create_heartbeat_thread(
id: publisher[:id], heartbeat_interval: heartbeat_interval, tick_interval: tick_interval,
logger: logger, time: time, process: process, kernel: kernel)

sweeper_thread = create_sweeper_thread(
id: publisher[:id],
sweep_interval: sweep_interval,
sweep_retention: sweep_retention,
sweep_batch_size: sweep_batch_size,
tick_interval: tick_interval,
logger: logger,
time: time,
process: process,
kernel: kernel)

publisher_threads = Array.new(concurrency) do |index|
create_publisher_thread(
id: publisher[:id], index: index,
Expand All @@ -523,15 +481,14 @@ def publish_message(
publisher_threads.each(&:join)

heartbeat_thread.join
sweeper_thread.join

delete(id: publisher[:id])

logger.info "Outboxer terminated"
end

def pool(concurrency:)
concurrency + 3 # (main + heartbeat + sweeper)
concurrency + 2 # (main + heartbeat)
end

# @param id [Integer] Publisher id.
Expand Down Expand Up @@ -589,51 +546,5 @@ def create_publisher_thread(id:, index:,
end
end
end

# Creates a sweeper thread to periodically delete old published messages for a publisher.
#
# @param id [Integer] The ID of the publisher.
# @param sweep_interval [Float] Time in seconds between each sweep run.
# @param sweep_retention [Float] Retention period in seconds for keeping published messages.
# @param sweep_batch_size [Integer] Max number of messages to delete in each sweep.
# @param tick_interval [Float] Time in seconds between signal checks while sleeping.
# @param logger [Logger] Logger instance to report errors and fatal issues.
# @param time [Time] Module used for fetching current UTC time.
# @param process [Process] Process module used for monotonic clock timings.
# @param kernel [Kernel] Kernel module used for sleeping between sweeps.
# @return [Thread] The thread executing the sweeping logic.
def create_sweeper_thread(id:, sweep_interval:, sweep_retention:, sweep_batch_size:,
tick_interval:, logger:, time:, process:, kernel:)
Thread.new do
Thread.current.name = "sweeper"

while !terminating?
begin
deleted_count = Message.delete_batch(
status: Message::Status::PUBLISHED,
older_than: time.now.utc - sweep_retention,
batch_size: sweep_batch_size)[:deleted_count]

if deleted_count == 0
Publisher.sleep(
sweep_interval,
tick_interval: tick_interval,
process: process,
kernel: kernel)
end
rescue StandardError => error
logger.error(
"#{error.class}: #{error.message}\n" \
"#{error.backtrace.join("\n")}")
rescue ::Exception => error
logger.fatal(
"#{error.class}: #{error.message}\n" \
"#{error.backtrace.join("\n")}")

terminate(id: id)
end
end
end
end
end
end
15 changes: 0 additions & 15 deletions lib/outboxer/web/views/publisher.erb
Original file line number Diff line number Diff line change
Expand Up @@ -98,21 +98,6 @@
<th scope="row">Heartbeat Interval</th>
<td><%= publisher[:settings]['heartbeat_interval'] %> seconds</td>
</tr>

<tr>
<th scope="row">Sweep Interval</th>
<td><%= publisher[:settings]['sweep_interval'] %> seconds</td>
</tr>

<tr>
<th scope="row">Sweep Retention</th>
<td><%= publisher[:settings]['sweep_retention'] %> seconds</td>
</tr>

<tr>
<th scope="row">Sweep Batch Size</th>
<td><%= publisher[:settings]['sweep_batch_size'] %></td>
</tr>
</tbody>
</table>

Expand Down
2 changes: 1 addition & 1 deletion quickstart_e2e_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ bundle exec rails new . \

bundle install
bundle exec rails db:create
echo 'gem "outboxer", git: "https://github.com/fast-programmer/outboxer.git", branch: "refactor/delete_published_outboxer_messages"' \
echo 'gem "outboxer", git: "https://github.com/fast-programmer/outboxer.git", branch: "master"' \
>> Gemfile
bundle install
bundle exec rails generate outboxer:install
Expand Down
18 changes: 0 additions & 18 deletions spec/lib/outboxer/publisher/parse_cli_options_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,24 +42,6 @@ module Outboxer
).to eq({ heartbeat_interval: 10 })
end

it "parses the sweep interval flag" do
expect(
Publisher.parse_cli_options(["--sweep-interval", "10"])
).to eq({ sweep_interval: 10 })
end

it "parses the sweep retention flag" do
expect(
Publisher.parse_cli_options(["--sweep-retention", "10"])
).to eq({ sweep_retention: 10 })
end

it "parses the sweep batch size flag" do
expect(
Publisher.parse_cli_options(["--sweep-batch_size", "10"])
).to eq({ sweep_batch_size: 10 })
end

it "parses the log level flag" do
expect(Publisher.parse_cli_options(["--log-level", "0"])).to eq({ log_level: 0 })
end
Expand Down
2 changes: 1 addition & 1 deletion spec/lib/outboxer/publisher/pool_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module Outboxer
RSpec.describe Publisher do
describe ".pool" do
it "returns correct value" do
expect(Publisher.pool(concurrency: 5)).to eql(8)
expect(Publisher.pool(concurrency: 5)).to eql(7)
end
end
end
Expand Down
120 changes: 0 additions & 120 deletions spec/lib/outboxer/publisher/publish_message_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,126 +28,6 @@ module Outboxer
end
end

context "when sweeper deletes a message" do
let!(:message_1) { create(:outboxer_message, :published, updated_at: 2.seconds.ago) }
let!(:message_2) { create(:outboxer_message, :published, updated_at: 2.seconds.ago) }

it "deletes the old published message" do
publish_messages_thread = Thread.new do
Publisher.publish_message(
poll_interval: poll_interval,
tick_interval: tick_interval,
sweep_interval: 0.1,
sweep_retention: 0.1,
sweep_batch_size: 100,
logger: logger,
kernel: kernel
) do |_publisher, _message|
# no op
end
end

sleep 0.3

::Process.kill("TERM", ::Process.pid)

publish_messages_thread.join

expect(Models::Message.count).to be(0)
end
end

context "when sweeper raises StandardError" do
let!(:old_message) { create(:outboxer_message, :published, updated_at: 2.seconds.ago) }

before do
allow(Message).to receive(:delete_batch)
.and_raise(StandardError, "sweep fail")
end

it "logs error" do
publish_messages_thread = Thread.new do
Publisher.publish_message(
poll_interval: poll_interval,
tick_interval: tick_interval,
sweep_interval: 0.1,
sweep_retention: 0.1,
sweep_batch_size: 100,
logger: logger,
kernel: kernel
) do |_publisher, _message|
# no op
end
end

sleep 0.3
::Process.kill("TERM", ::Process.pid)
publish_messages_thread.join

expect(logger).to have_received(:error)
.with(include("StandardError: sweep fail"))
.at_least(:once)
end

it "does not delete the old message" do
thread = Thread.new do
Publisher.publish_message(
poll_interval: poll_interval,
tick_interval: tick_interval,
sweep_interval: 0.1,
sweep_retention: 1,
sweep_batch_size: 1,
logger: logger,
kernel: kernel
) do |_publisher, message|
# no op
end
end

sleep 0.3
::Process.kill("TERM", ::Process.pid)
thread.join

expect(Models::Message.exists?(id: old_message.id)).to be(true)
end
end

context "when sweeper raises critical error" do
let!(:old_message) { create(:outboxer_message, :published, updated_at: 2.seconds.ago) }

before do
allow(Message).to receive(:delete_batch)
.and_raise(NoMemoryError, "boom")

thread = Thread.new do
Publisher.publish_message(
poll_interval: poll_interval,
tick_interval: tick_interval,
sweep_interval: 0.1,
sweep_retention: 0.1,
sweep_batch_size: 100,
logger: logger,
kernel: kernel
) do |_publisher, message|
# no op
end
end

sleep 0.3
::Process.kill("TERM", ::Process.pid)
thread.join
end

it "logs fatal error" do
expect(logger).to have_received(:fatal)
.with(include("NoMemoryError: boom"))
end

it "does not delete the old message" do
expect(Models::Message.exists?(id: old_message.id)).to be(true)
end
end

context "when TTIN signal sent" do
let!(:old_message) { create(:outboxer_message, :queued, updated_at: 2.seconds.ago) }

Expand Down