Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 2 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,27 +84,8 @@ TRANSACTION (0.2ms) COMMIT
```ruby
# bin/outboxer_publisher

Outboxer::Publisher.publish_messages(batch_size: 1_000, concurrency: 10) do |publisher, messages|
begin
# TODO: publish messages here
rescue => error
Outboxer::Publisher.update_messages(
id: publisher[:id],
failed_messages: messages.map do |message|
{
id: message[:id],
exception: {
class_name: error.class.name,
message_text: error.message,
backtrace: error.backtrace.join("\n")
}
}
end)
else
Outboxer::Publisher.update_messages(
id: publisher[:id],
published_message_ids: messages.map { |message| message[:id] })
end
Outboxer::Publisher.publish_message(concurrency: 10) do |publisher, message|
# TODO: publish messages here
end
```

Expand Down
4 changes: 2 additions & 2 deletions bin/outboxer_bunny_publisher
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require "json"
cli_options = Outboxer::Publisher.parse_cli_options(ARGV)
environment = cli_options.delete(:environment) ||
ENV["APP_ENV"] || ENV["RAILS_ENV"] || "development"
options = Outboxer::Publisher::PUBLISH_MESSAGES_DEFAULTS.merge(cli_options)
options = Outboxer::Publisher::PUBLISH_MESSAGE_DEFAULTS.merge(cli_options)

database_config = Outboxer::Database.config(
environment: environment,
Expand All @@ -28,7 +28,7 @@ exchange = channel.direct(ENV.fetch("BUNNY_EXCHANGE"), durable: true)
routing_key = ENV.fetch("BUNNY_ROUTING_KEY", "")

begin
Outboxer::Publisher.publish_messages(
Outboxer::Publisher.publish_message(
batch_size: 1, concurrency: options[:concurrency]) do |publisher, messages|
begin
exchange.publish(
Expand Down
2 changes: 1 addition & 1 deletion bin/outboxer_kafka_publisher
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require "kafka"
cli_options = Outboxer::Publisher.parse_cli_options(ARGV)
environment = cli_options.delete(:environment) ||
ENV["APP_ENV"] || ENV["RAILS_ENV"] || "development"
options = Outboxer::Publisher::PUBLISH_MESSAGES_DEFAULTS.merge(cli_options)
options = Outboxer::Publisher::PUBLISH_MESSAGE_DEFAULTS.merge(cli_options)

database_config = Outboxer::Database.config(
environment: environment,
Expand Down
28 changes: 4 additions & 24 deletions bin/outboxer_publisher
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require "outboxer"
cli_options = Outboxer::Publisher.parse_cli_options(ARGV)
environment = cli_options.delete(:environment) ||
ENV["APP_ENV"] || ENV["RAILS_ENV"] || "development"
options = Outboxer::Publisher::PUBLISH_MESSAGES_DEFAULTS.merge(cli_options)
options = Outboxer::Publisher::PUBLISH_MESSAGE_DEFAULTS.merge(cli_options)
logger = Outboxer::Logger.new($stdout, level: options[:log_level])

database_config = Outboxer::Database.config(
Expand All @@ -16,8 +16,7 @@ database_config = Outboxer::Database.config(
Outboxer::Database.connect(config: database_config, logger: logger)

begin
Outboxer::Publisher.publish_messages(
batch_size: options[:batch_size],
Outboxer::Publisher.publish_message(
concurrency: options[:concurrency],
tick_interval: options[:tick_interval],
poll_interval: options[:poll_interval],
Expand All @@ -26,27 +25,8 @@ begin
sweep_retention: options[:sweep_retention],
sweep_batch_size: options[:sweep_batch_size],
logger: logger
) do |publisher, messages|
begin
# TODO: publish messages here
rescue => error
Outboxer::Publisher.update_messages(
id: publisher[:id],
failed_messages: messages.map do |message|
{
id: message[:id],
exception: {
class_name: error.class.name,
message_text: error.message,
backtrace: error.backtrace.join("\n")
}
}
end)
else
Outboxer::Publisher.update_messages(
id: publisher[:id],
published_message_ids: messages.map { |message| message[:id] })
end
) do |publisher, message|
# TODO: publish message here
end
ensure
Outboxer::Database.disconnect(logger: logger)
Expand Down
2 changes: 1 addition & 1 deletion bin/outboxer_sidekiq_push_bulk_publisher
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ require "sidekiq"
cli_options = Outboxer::Publisher.parse_cli_options(ARGV)
environment = cli_options.delete(:environment) ||
ENV["APP_ENV"] || ENV["RAILS_ENV"] || "development"
options = Outboxer::Publisher::PUBLISH_MESSAGES_DEFAULTS.merge(cli_options)
options = Outboxer::Publisher::PUBLISH_MESSAGE_DEFAULTS.merge(cli_options)

database_config = Outboxer::Database.config(
environment: environment,
Expand Down
2 changes: 1 addition & 1 deletion bin/outboxer_sqs_send_message_batch_publisher
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require "aws-sdk-sqs"
cli_options = Outboxer::Publisher.parse_cli_options(ARGV)
environment = cli_options.delete(:environment) ||
ENV["APP_ENV"] || ENV["RAILS_ENV"] || "development"
options = Outboxer::Publisher::PUBLISH_MESSAGES_DEFAULTS.merge(cli_options)
options = Outboxer::Publisher::PUBLISH_MESSAGE_DEFAULTS.merge(cli_options)

database_config = Outboxer::Database.config(
environment: environment,
Expand Down
138 changes: 70 additions & 68 deletions lib/outboxer/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ def self.parse_cli_options(args)
options[:environment] = v
end

opts.on("--batch-size SIZE", Integer, "Batch size") do |v|
options[:batch_size] = v
end

opts.on("--concurrency N", Integer, "Number of threads to publish messages") do |v|
options[:concurrency] = v
end
Expand Down Expand Up @@ -59,10 +55,6 @@ def self.parse_cli_options(args)
options[:log_level] = v
end

opts.on("--config PATH", "Path to YAML config file") do |v|
options[:config] = v
end

opts.on("--version", "Print version and exit") do
puts "Outboxer version #{Outboxer::VERSION}"
exit
Expand All @@ -79,33 +71,6 @@ def self.parse_cli_options(args)
options
end

CONFIG_DEFAULTS = {
path: "config/outboxer.yml",
enviroment: "development"
}

# Loads and processes the YAML configuration for the publisher.
# @param environment [String] The application environment.
# @param path [String] The path to the configuration file.
# @return [Hash] The processed configuration data with environment-specific overrides.
def config(
environment: CONFIG_DEFAULTS[:environment],
path: CONFIG_DEFAULTS[:path]
)
path_expanded = ::File.expand_path(path)
text = File.read(path_expanded)
erb = ERB.new(text, trim_mode: "-")
erb.filename = path_expanded
erb_result = erb.result

yaml = YAML.safe_load(erb_result, permitted_classes: [Symbol], aliases: true)
yaml.deep_symbolize_keys!
yaml_override = yaml.fetch(environment&.to_sym, {}).slice(*PUBLISH_MESSAGES_DEFAULTS.keys)
yaml.slice(*PUBLISH_MESSAGES_DEFAULTS.keys).merge(yaml_override)
rescue Errno::ENOENT
{}
end

# Retrieves publisher data by ID including associated signals.
# @param id [Integer] The ID of the publisher to find.
# @return [Hash] Detailed information about the publisher including its signals.
Expand Down Expand Up @@ -165,14 +130,13 @@ def all

# Creates a new publisher with specified settings and metrics.
# @param name [String] The name of the publisher.
# @param batch_size [Integer] The batch size.
# @param concurrency [Integer] The number of publishing threads.
# @param tick_interval [Float] The tick interval in seconds.
# @param poll_interval [Float] The poll interval in seconds.
# @param heartbeat_interval [Float] The heartbeat interval in seconds.
# @param time [Time] The current time context for timestamping.
# @return [Hash] Details of the created publisher.
def create(name:, batch_size:, concurrency:,
def create(name:, concurrency:,
tick_interval:, poll_interval:, heartbeat_interval:,
sweep_interval:, sweep_retention:, sweep_batch_size:,
time: ::Time)
Expand All @@ -184,7 +148,6 @@ def create(name:, batch_size:, concurrency:,
name: name,
status: Status::PUBLISHING,
settings: {
"batch_size" => batch_size,
"concurrency" => concurrency,
"tick_interval" => tick_interval,
"poll_interval" => poll_interval,
Expand Down Expand Up @@ -460,8 +423,7 @@ def handle_signal(id:, name:, logger:)
end
end

PUBLISH_MESSAGES_DEFAULTS = {
batch_size: 1000,
PUBLISH_MESSAGE_DEFAULTS = {
concurrency: 1,
tick_interval: 0.1,
poll_interval: 5.0,
Expand All @@ -474,7 +436,6 @@ def handle_signal(id:, name:, logger:)

# Publish queued messages concurrently
# @param name [String] The name of the publisher.
# @param batch_size [Integer] The batch size.
# @param concurrency [Integer] The number of publisher threads.
# @param tick_interval [Float] The tick interval in seconds.
# @param poll_interval [Float] The poll interval in seconds.
Expand All @@ -489,25 +450,23 @@ def handle_signal(id:, name:, logger:)
# @yield [publisher, messages] Yields publisher and messages to be published.
# @yieldparam publisher [Hash] A hash with keys `:id` and `:name` representing the publisher.
# @yieldparam messages [Array<Hash>] An array of message hashes retrieved from the buffer.
def publish_messages(
def publish_message(
name: "#{::Socket.gethostname}:#{::Process.pid}",
batch_size: PUBLISH_MESSAGES_DEFAULTS[:batch_size],
concurrency: PUBLISH_MESSAGES_DEFAULTS[:concurrency],
tick_interval: PUBLISH_MESSAGES_DEFAULTS[:tick_interval],
poll_interval: PUBLISH_MESSAGES_DEFAULTS[:poll_interval],
heartbeat_interval: PUBLISH_MESSAGES_DEFAULTS[:heartbeat_interval],
sweep_interval: PUBLISH_MESSAGES_DEFAULTS[:sweep_interval],
sweep_retention: PUBLISH_MESSAGES_DEFAULTS[:sweep_retention],
sweep_batch_size: PUBLISH_MESSAGES_DEFAULTS[:sweep_batch_size],
logger: Logger.new($stdout, level: PUBLISH_MESSAGES_DEFAULTS[:log_level]),
concurrency: PUBLISH_MESSAGE_DEFAULTS[:concurrency],
tick_interval: PUBLISH_MESSAGE_DEFAULTS[:tick_interval],
poll_interval: PUBLISH_MESSAGE_DEFAULTS[:poll_interval],
heartbeat_interval: PUBLISH_MESSAGE_DEFAULTS[:heartbeat_interval],
sweep_interval: PUBLISH_MESSAGE_DEFAULTS[:sweep_interval],
sweep_retention: PUBLISH_MESSAGE_DEFAULTS[:sweep_retention],
sweep_batch_size: PUBLISH_MESSAGE_DEFAULTS[:sweep_batch_size],
logger: Logger.new($stdout, level: PUBLISH_MESSAGE_DEFAULTS[:log_level]),
time: ::Time, process: ::Process, kernel: ::Kernel,
&block
)
logger.info "Outboxer v#{Outboxer::VERSION} running in ruby #{RUBY_VERSION} " \
"(#{RUBY_RELEASE_DATE} revision #{RUBY_REVISION[0, 10]}) [#{RUBY_PLATFORM}]"

logger.info "Outboxer config " \
"batch_size=#{batch_size}, " \
"concurrency=#{concurrency}, " \
"tick_interval=#{tick_interval} " \
"poll_interval=#{poll_interval}, " \
Expand All @@ -520,9 +479,10 @@ def publish_messages(
Setting.create_all

publisher = create(
name: name, batch_size: batch_size,
name: name,
concurrency: concurrency,
tick_interval: tick_interval, poll_interval: poll_interval,
tick_interval: tick_interval,
poll_interval: poll_interval,
heartbeat_interval: heartbeat_interval,
sweep_interval: sweep_interval,
sweep_retention: sweep_retention,
Expand All @@ -545,7 +505,7 @@ def publish_messages(

publisher_threads = Array.new(concurrency) do |index|
create_publisher_thread(
id: publisher[:id], name: name, index: index, batch_size: batch_size,
id: publisher[:id], name: name, index: index,
poll_interval: poll_interval, tick_interval: tick_interval,
logger: logger, process: process, kernel: kernel, &block)
end
Expand Down Expand Up @@ -576,7 +536,6 @@ def pool(concurrency:)

# @param id [Integer] Publisher id.
# @param name [String] Publisher name.
# @param batch_size [Integer] Max number of messages per batch.
# @param index [Integer] Zero-based thread index (used for thread name).
# @param poll_interval [Numeric] Seconds to wait when no messages found.
# @param tick_interval [Numeric] Seconds between signal checks during sleep.
Expand All @@ -587,31 +546,74 @@ def pool(concurrency:)
# e.g., `{ id: Integer, name: String }`.
# @yieldparam messages [Array<Hash>] Batch of messages to publish.
# @return [Thread] The created publishing thread.
def create_publisher_thread(id:, name:, batch_size:, index:,
def create_publisher_thread(id:, name:, index:,
poll_interval:, tick_interval:,
logger:, process:, kernel:, &block)
Thread.new do
begin
Thread.current.name = "publisher-#{index + 1}"

while !terminating?
begin
messages = buffer_messages(id: id, name: name, limit: batch_size)
messages = []

if messages.any?
block.call({ id: id, name: name }, messages)
else
Publisher.sleep(
poll_interval,
tick_interval: tick_interval,
process: process,
kernel: kernel)
end
begin
messages = buffer_messages(id: id, name: name, limit: 1)
rescue StandardError => error
logger.error(
"#{error.class}: #{error.message}\n" \
"#{error.backtrace.join("\n")}")
end

if messages.any?
begin
block.call({ id: id, name: name }, messages[0])
rescue StandardError => error
logger.error(
"#{error.class}: #{error.message}\n" \
"#{error.backtrace.join("\n")}")

Publisher.update_messages(
id: id,
failed_messages: [
{
id: messages[0][:id],
exception: {
class_name: error.class.name,
message_text: error.message,
backtrace: error.backtrace
}
}
])
rescue ::Exception => error
logger.fatal(
"#{error.class}: #{error.message}\n" \
"#{error.backtrace.join("\n")}")

Publisher.update_messages(
id: id,
failed_messages: [
{
id: messages[0][:id],
exception: {
class_name: error.class.name,
message_text: error.message,
backtrace: error.backtrace
}
}
])

terminate(id: id)
else
Outboxer::Publisher.update_messages(
id: id, published_message_ids: [messages[0][:id]])
end
else
Publisher.sleep(
poll_interval,
tick_interval: tick_interval,
process: process,
kernel: kernel)
end
end
rescue ::Exception => error
logger.fatal(
Expand Down
10 changes: 0 additions & 10 deletions spec/lib/outboxer/publisher/parse_cli_options_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@ module Outboxer
end
end

it "parses the batch size flag" do
expect(Publisher.parse_cli_options(["--batch-size", "100"])).to eq({ batch_size: 100 })
end

it "parses concurrency flag" do
expect(Publisher.parse_cli_options(["--concurrency", "8"]))
.to eq({ concurrency: 8 })
Expand Down Expand Up @@ -64,12 +60,6 @@ module Outboxer
).to eq({ sweep_batch_size: 10 })
end

it "parses the config file flag" do
expect(
Publisher.parse_cli_options(["--config", "config/path.yml"])
).to eq({ config: "config/path.yml" })
end

it "parses the log level flag" do
expect(Publisher.parse_cli_options(["--log-level", "0"])).to eq({ log_level: 0 })
end
Expand Down
Loading