diff --git a/.rubocop.yml b/.rubocop.yml index 4f696b6a..32a9aa72 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -23,10 +23,13 @@ Style/StringLiterals: Style/StringLiteralsInInterpolation: Enabled: true EnforcedStyle: double_quotes +Style/RescueStandardError: + Enabled: false Layout/LineLength: Max: 100 Exclude: - 'bin/outboxer_publisher' + - 'bin/outboxer_load' Layout/ArgumentAlignment: EnforcedStyle: with_fixed_indentation Style/Documentation: @@ -215,6 +218,8 @@ Style/NegatedIfElseCondition: # new in 1.2 Enabled: true Style/NegatedIf: Enabled: false +Style/NegatedWhile: + Enabled: false Style/NestedFileDirname: # new in 1.26 Enabled: true Style/NilLambda: # new in 1.3 @@ -233,6 +238,8 @@ Style/QuotedSymbols: # new in 1.16 Enabled: true Style/RedundantArgument: # new in 1.4 Enabled: true +Style/RedundantBegin: + Enabled: false Style/RedundantArrayConstructor: # new in 1.52 Enabled: true Style/RedundantConstantBase: # new in 1.40 diff --git a/README.md b/README.md index 30b184af..1fbef8ce 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ [![Coverage Status](https://coveralls.io/repos/github/fast-programmer/outboxer/badge.svg)](https://coveralls.io/github/fast-programmer/outboxer) [![Join our Discord](https://img.shields.io/badge/Discord-blue?style=flat&logo=discord&logoColor=white)](https://discord.gg/x6EUehX6vU) -**Outboxer** is an implementation of the [**transactional outbox pattern**](https://docs.aws.amazon.com/prescriptive-guidance/latest/cloud-design-patterns/transactional-outbox.html) for **Ruby on Rails** applications. +**Outboxer** is an implementation of the [transactional outbox pattern](https://docs.aws.amazon.com/prescriptive-guidance/latest/cloud-design-patterns/transactional-outbox.html) for **Ruby on Rails** applications. It helps you migrate to **event-driven architecture** with at least once delivery guarantees. @@ -60,8 +60,23 @@ end **7. Create derived event type** +```bash +bin/rails c +``` + ```ruby -Accountify::InvoiceRaisedEvent.create! +ActiveRecord::Base.logger = Logger.new(STDOUT) + +Accountify::InvoiceRaisedEvent.create!(created_at: Time.current) +``` + +**8. Observe transactional consistency** + +```log +TRANSACTION (0.2ms) BEGIN +Event Create (0.4ms) INSERT INTO "events" ... +Outboxer::Message Create (0.3ms) INSERT INTO "outboxer_messages" ... +TRANSACTION (0.2ms) COMMIT ``` **8. Publish outboxer messages** @@ -70,26 +85,30 @@ Accountify::InvoiceRaisedEvent.create! # bin/outboxer_publisher Outboxer::Publisher.publish_messages do |publisher, messages| - # TODO: publish messages here - - Outboxer::Message.published_by_ids( - ids: messages.map { |message| message[:id] }, - publisher_id: publisher[:id], - publisher_name: publisher[:name]) -rescue => error - Outboxer::Message.failed_by_ids( - ids: messages.map { |message| message[:id] }, - exception: error, - publisher_id: publisher[:id], - publisher_name: publisher[:name]) + begin + # TODO: publish messages here + rescue => error + Outboxer::Message.failed_by_ids( + ids: messages.map { |message| message[:id] }, + exception: error, + publisher_id: publisher[:id], + publisher_name: publisher[:name]) + else + Outboxer::Message.published_by_ids( + ids: messages.map { |message| message[:id] }, + publisher_id: publisher[:id], + publisher_name: publisher[:name]) + end end ``` -see https://github.com/fast-programmer/outboxer/wiki/Outboxer-publisher-block-examples +To integrate with Sidekiq, Bunny, Kafka and AWS SQS see the [publisher block examples](https://github.com/fast-programmer/outboxer/wiki/Outboxer-publisher-block-examples). # Testing -The generated `spec/bin/outboxer_publisher` adds end to end queue and publish message test coverage. +To ensure you have end to end coverage: + +`bin/rspec spec/bin/outboxer_publisher` # Monitoring diff --git a/bin/console b/bin/console index 50683cd8..f7e1716a 100755 --- a/bin/console +++ b/bin/console @@ -6,7 +6,7 @@ require "outboxer" require "irb" environment = ENV["RAILS_ENV"] || "development" -db_config = Outboxer::Database.config(environment: environment, concurrency: 1) +db_config = Outboxer::Database.config(environment: environment, pool: 1) Outboxer::Database.connect(config: db_config) ActiveRecord::Base.logger = Logger.new($stdout) diff --git a/bin/outboxer_load b/bin/outboxer_load index 1a51e5bf..e2335ff0 100755 --- a/bin/outboxer_load +++ b/bin/outboxer_load @@ -1,48 +1,25 @@ #!/usr/bin/env ruby +# frozen_string_literal: true require "bundler/setup" require "outboxer" -require "securerandom" +require "outboxer/loader" +cli_options = Outboxer::Loader.parse_cli_options(ARGV) +environment = cli_options.delete(:environment) || ENV["APP_ENV"] || ENV["RAILS_ENV"] || "development" +options = Outboxer::Loader::LOAD_DEFAULTS.merge(cli_options) logger = Outboxer::Logger.new($stdout, level: Outboxer::Logger::INFO) -max_messages_count = ARGV.empty? ? 500_000 : ARGV[0].to_i +database_config = Outboxer::Database.config(environment: environment, pool: options[:concurrency]) -if max_messages_count <= 0 - logger.info "Please provide a positive integer for max messages count" - exit(1) -end - -environment = ENV.fetch("RAILS_ENV", "development") -db_config = Outboxer::Database.config(environment: environment, concurrency: 1) -Outboxer::Database.connect(config: db_config) - -reader, writer = IO.pipe - -Signal.trap("INT") { writer.puts("INT") } - -queued_messages_count = 0 - -while queued_messages_count < max_messages_count - break if reader.wait_readable(0) && reader.gets.strip == "INT" +Outboxer::Database.connect(config: database_config, logger: logger) - class Event - attr_accessor :id - - def initialize(id:) - @id = id - end - end - - messageable = Event.new(id: SecureRandom.hex(3)) - - message = Outboxer::Message.queue(messageable: messageable) - - logger.info "Outboxer queued message " \ - "id=#{message[:id]} " \ - "messageable_type=#{message[:messageable_type]} " \ - "messageable_id=#{message[:messageable_id]} " - queued_messages_count += 1 +begin + Outboxer::Loader.load( + size: options[:size], + batch_size: options[:batch_size], + concurrency: options[:concurrency], + logger: logger) +ensure + Outboxer::Database.disconnect(logger: logger) end - -logger.info "Outboxer finished queuing #{queued_messages_count} messages" diff --git a/bin/outboxer_published_messages_deleter b/bin/outboxer_published_messages_deleter index 6e3a2030..a6257800 100755 --- a/bin/outboxer_published_messages_deleter +++ b/bin/outboxer_published_messages_deleter @@ -42,7 +42,7 @@ environment = ENV["APP_ENV"] || "development" logger = Outboxer::Logger.new($stdout, level: log_level) -db_config = Outboxer::Database.config(environment: environment, concurrency: 1) +db_config = Outboxer::Database.config(environment: environment, pool: 1) Outboxer::Database.connect(config: db_config, logger: logger) reader, writer = IO.pipe diff --git a/bin/outboxer_publisher b/bin/outboxer_publisher index f2e324af..8c8ef791 100755 --- a/bin/outboxer_publisher +++ b/bin/outboxer_publisher @@ -10,15 +10,19 @@ config = Outboxer::Publisher.config(environment: environment, path: config_path) options = Outboxer::Publisher::PUBLISH_MESSAGES_DEFAULTS.merge(config.merge(cli_options)) logger = Outboxer::Logger.new($stdout, level: options[:log_level]) -database_config = Outboxer::Database.config( - environment: environment, concurrency: options[:concurrency]) +connection_pool_size = Outboxer::Publisher.connection_pool_size( + buffering_concurrency: options[:buffering_concurrency], + publishing_concurrency: options[:publishing_concurrency]) +database_config = Outboxer::Database.config(environment: environment, pool: connection_pool_size) Outboxer::Database.connect(config: database_config, logger: logger) begin Outboxer::Publisher.publish_messages( + batch_size: options[:batch_size], buffer_size: options[:buffer_size], - concurrency: options[:concurrency], + buffering_concurrency: options[:buffering_concurrency], + publishing_concurrency: options[:publishing_concurrency], tick_interval: options[:tick_interval], poll_interval: options[:poll_interval], heartbeat_interval: options[:heartbeat_interval], @@ -27,18 +31,22 @@ begin sweep_batch_size: options[:sweep_batch_size], logger: logger ) do |publisher, messages| - # TODO: publish messages here + begin + # TODO: publish messages here + rescue => error + Outboxer::Message.failed_by_ids( + ids: messages.map { |message| message[:id] }, + exception: error, + publisher_id: publisher[:id], + publisher_name: publisher[:name]) + else + Outboxer::Message.published_by_ids( + ids: messages.map { |message| message[:id] }, + publisher_id: publisher[:id], + publisher_name: publisher[:name]) - Outboxer::Message.published_by_ids( - ids: messages.map { |message| message[:id] }, - publisher_id: publisher[:id], - publisher_name: publisher[:name]) - rescue StandardError => error - Outboxer::Message.failed_by_ids( - ids: messages.map { |message| message[:id] }, - exception: error, - publisher_id: publisher[:id], - publisher_name: publisher[:name]) + # logger.info "#{messages.count} messages published" + end end ensure Outboxer::Database.disconnect(logger: logger) diff --git a/config/outboxer.yml b/config/outboxer.yml index b35c26ae..7dafd826 100644 --- a/config/outboxer.yml +++ b/config/outboxer.yml @@ -1,6 +1,8 @@ --- -:buffer_size: 100 -:concurrency: 1 +:batch_size: 1000 +:buffer_size: 10 +:buffering_concurrency: 1 +:publishing_concurrency: 1 :tick_interval: 0.1 :poll_interval: 5.0 :heartbeat_interval: 5.0 diff --git a/lib/outboxer/database.rb b/lib/outboxer/database.rb index 8f9b0284..e6634c95 100644 --- a/lib/outboxer/database.rb +++ b/lib/outboxer/database.rb @@ -9,21 +9,20 @@ module Database CONFIG_DEFAULTS = { environment: "development", - concurrency: 1, + buffering_concurrency: 1, + publishing_concurrency: 1, path: "config/database.yml" } # Loads the database configuration from a YAML file, processes ERB, # and merges with CONFIG_DEFAULTS. + # @param pool [Integer] the connection pool size. # @param environment [String, Symbol] the environment name to load configuration for. - # @param concurrency [Integer] the number of connections in the pool. # @param path [String] the path to the database configuration file. # @return [Hash] the database configuration with symbolized keys. # @note Extra connections are added to the pool to cover for internal threads like main # and the heartbeat thread. - def config(environment: CONFIG_DEFAULTS[:environment], - concurrency: CONFIG_DEFAULTS[:concurrency], - path: CONFIG_DEFAULTS[:path]) + def config(pool:, environment: CONFIG_DEFAULTS[:environment], path: CONFIG_DEFAULTS[:path]) path_expanded = ::File.expand_path(path) text = File.read(path_expanded) erb = ERB.new(text, trim_mode: "-") @@ -32,7 +31,7 @@ def config(environment: CONFIG_DEFAULTS[:environment], yaml = YAML.safe_load(erb_result, permitted_classes: [Symbol], aliases: true) yaml.deep_symbolize_keys! yaml = yaml[environment.to_sym] || {} - yaml[:pool] = concurrency + 3 # workers + (main + heartbeat + sweeper) + yaml[:pool] = pool yaml rescue Errno::ENOENT diff --git a/lib/outboxer/loader.rb b/lib/outboxer/loader.rb new file mode 100644 index 00000000..fa085192 --- /dev/null +++ b/lib/outboxer/loader.rb @@ -0,0 +1,152 @@ +# frozen_string_literal: true + +require "securerandom" +require "logger" +require "optparse" +require "active_support/number_helper" + +module Outboxer + module Loader + module_function + + LOAD_DEFAULTS = { + batch_size: 1_000, + concurrency: 5, + size: 1_000_000, + tick_interval: 1 + } + + def parse_cli_options(argv) + options = {} + + parser = OptionParser.new do |opts| + opts.banner = "Usage: outboxer_load [options]" + + opts.on("--environment ENV", "Environment (default: development)") do |v| + options[:environment] = v + end + + opts.on("--batch-size SIZE", Integer, "Batch size (default: 1000)") do |v| + options[:batch_size] = v + end + + opts.on("--concurrency COUNT", Integer, "Concurrency (default: 5)") do |v| + options[:concurrency] = v + end + + opts.on("--size SIZE", Integer, "Number of messages to load") do |v| + options[:size] = v + end + + opts.on("--tick-interval SECONDS", Float, "Tick interval in seconds (default: 1)") do |v| + options[:tick_interval] = v + end + + opts.on("--help", "Show this help message") do + puts opts + exit + end + end + + parser.parse!(argv) + + options + end + + def display_metrics(logger) + metrics = { + memory_kb: `ps -o rss= -p #{Process.pid}`.to_i, + cpu_percent: `ps -o %cpu= -p #{Process.pid}`.strip.to_f + } + + logger.info "[metrics] memory=#{metrics[:memory_kb]}KB cpu=#{metrics[:cpu_percent]}%" + end + + def load(batch_size: LOAD_DEFAULTS[:batch_size], + concurrency: LOAD_DEFAULTS[:concurrency], + size: LOAD_DEFAULTS[:size], + tick_interval: LOAD_DEFAULTS[:tick_interval], + logger: Outboxer::Logger.new($stdout)) + status = :loading + reader, _writer = trap_signals + queue = SizedQueue.new(concurrency) + threads = spawn_workers(concurrency, queue, logger) + + enqueued = 0 + started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) + + while enqueued < size + status = read_status(reader) || status + + case status + when :terminating + break + when :stopped + sleep tick_interval + when :loading + batch = Array.new(batch_size) do + { + messageable_type: "Event", + messageable_id: SecureRandom.hex(3), + status: "queued", + queued_at: Time.current + } + end + queue << batch + enqueued += batch.size + # logger.info "[main] enqueued #{enqueued}/#{size}" if (enqueued % (batch_size * 2)).zero? + end + + # display_metrics(logger) + end + + concurrency.times { queue << nil } + threads.each(&:join) + + finished_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) + elapsed = finished_at - started_at + rate = (enqueued / elapsed).round(2) + formatted_rate = ActiveSupport::NumberHelper.number_to_delimited(rate) + + logger.info "[main] duration: #{elapsed.round(2)}s" + logger.info "[main] throughput: #{formatted_rate} messages/sec" + logger.info "[main] done" + end + + def trap_signals + reader, writer = IO.pipe + + %w[INT TERM TSTP CONT].each do |signal| + Signal.trap(signal) { writer.puts(signal) } + end + + [reader, writer] + end + + def read_status(reader) + line = reader.ready? ? reader.gets&.strip : nil + + case line + when "INT", "TERM" then :terminating + when "TSTP" then :stopped + when "CONT" then :loading + end + end + + def spawn_workers(concurrency, queue, logger) + Array.new(concurrency) do |index| + Thread.new do + loop do + batch = queue.pop + break if batch.nil? + + Outboxer::Models::Message.insert_all(batch) + # logger.info "[thread-#{index}] inserted #{batch.size}" + rescue StandardError => error + logger.error "[thread-#{index}] #{error.class}: #{error.message}" + end + end + end + end + end +end diff --git a/lib/outboxer/message.rb b/lib/outboxer/message.rb index bd9b2180..5df22b40 100644 --- a/lib/outboxer/message.rb +++ b/lib/outboxer/message.rb @@ -7,8 +7,6 @@ module Message # Queues a new message. # @param messageable [Object, nil] the object associated with the message. - # @param messageable_type [String, nil] type of the polymorphic messageable model - # @param messageable_id [Integer, nil] ID of the polymorphic messageable model # @param time [Time] time context for setting timestamps. # @return [Hash] a hash with message details including IDs and timestamps. def queue(messageable:, time: ::Time) @@ -37,38 +35,47 @@ def queue(messageable:, time: ::Time) # @param time [Time] current time context used to update timestamps. # @return [Array] details of buffered messages. def buffer(limit: 1, publisher_id: nil, publisher_name: nil, time: ::Time) + current_utc_time = time.now.utc + messages = [] + + # logger = Outboxer::Logger.new($stdout) + ActiveRecord::Base.connection_pool.with_connection do ActiveRecord::Base.transaction do messages = Models::Message .where(status: Message::Status::QUEUED) .order(updated_at: :asc) - .lock("FOR UPDATE SKIP LOCKED") .limit(limit) - .select(:id, :messageable_type, :messageable_id, :queued_at) + .lock("FOR UPDATE SKIP LOCKED") + .pluck(:id, :messageable_type, :messageable_id) - current_utc_time = time.now.utc + message_ids = messages.map(&:first) - if messages.present? - Models::Message - .where(id: messages.map { |message| message[:id] }) - .update_all( - status: Message::Status::BUFFERED, - updated_at: current_utc_time, - buffered_at: current_utc_time, - publisher_id: publisher_id, - publisher_name: publisher_name) - end + # logger.info("[outboxer] .buffer > locked message ids #{message_ids.inspect}") - messages.map do |message| - serialize( - id: message.id, + updated_rows = Models::Message + .where(id: message_ids, status: Message::Status::QUEUED) + .update_all( status: Message::Status::BUFFERED, - messageable_type: message.messageable_type, - messageable_id: message.messageable_id, - updated_at: current_utc_time) - end + updated_at: current_utc_time, + buffered_at: current_utc_time, + publisher_id: publisher_id, + publisher_name: publisher_name) + + raise ArgumentError, "Some messages not buffered" if updated_rows != message_ids.size end end + + # logger.info("[outboxer] .buffer > unlocked message ids #{ids.inspect}") + + messages.map do |(id, type, mid)| + serialize( + id: id, + status: Message::Status::BUFFERED, + messageable_type: type, + messageable_id: mid, + updated_at: current_utc_time) + end end # Marks buffered messages as publishing. @@ -84,19 +91,21 @@ def publishing_by_ids(ids:, publisher_id: nil, publisher_name: nil, time: ::Time ActiveRecord::Base.transaction do messages = Models::Message .where(status: Status::BUFFERED, id: ids) - .lock("FOR UPDATE SKIP LOCKED") + .lock("FOR UPDATE") .to_a raise ArgumentError, "Some messages not buffered" if messages.size != ids.size current_utc_time = time.now.utc - Models::Message.where(status: Status::BUFFERED, id: ids).update_all( - status: Status::PUBLISHING, - updated_at: current_utc_time, - publishing_at: current_utc_time, - publisher_id: publisher_id, - publisher_name: publisher_name) + Models::Message + .where(status: Status::BUFFERED, id: ids) + .update_all( + status: Status::PUBLISHING, + updated_at: current_utc_time, + publishing_at: current_utc_time, + publisher_id: publisher_id, + publisher_name: publisher_name) messages.map do |message| Message.serialize( @@ -125,19 +134,21 @@ def published_by_ids(ids:, publisher_id: nil, publisher_name: nil, time: ::Time) ActiveRecord::Base.transaction do messages = Models::Message .where(status: Status::PUBLISHING, id: ids) - .lock("FOR UPDATE SKIP LOCKED") + .lock("FOR UPDATE") .to_a raise ArgumentError, "Some messages not publishing" if messages.size != ids.size current_utc_time = time.now.utc - Models::Message.where(status: Status::PUBLISHING, id: ids).update_all( - status: Status::PUBLISHED, - updated_at: current_utc_time, - published_at: current_utc_time, - publisher_id: publisher_id, - publisher_name: publisher_name) + Models::Message + .where(status: Status::PUBLISHING, id: ids) + .update_all( + status: Status::PUBLISHED, + updated_at: current_utc_time, + published_at: current_utc_time, + publisher_id: publisher_id, + publisher_name: publisher_name) messages.map do |message| Message.serialize( @@ -167,19 +178,21 @@ def failed_by_ids(ids:, exception:, publisher_id: nil, publisher_name: nil, time ActiveRecord::Base.transaction do messages = Models::Message .where(status: Status::PUBLISHING, id: ids) - .lock("FOR UPDATE SKIP LOCKED") + .lock("FOR UPDATE") .to_a raise ArgumentError, "Some messages not publishing" if messages.size != ids.size current_utc_time = time.now.utc - Models::Message.where(status: Status::PUBLISHING, id: ids).update_all( - status: Status::FAILED, - updated_at: current_utc_time, - failed_at: current_utc_time, - publisher_id: publisher_id, - publisher_name: publisher_name) + Models::Message + .where(status: Status::PUBLISHING, id: ids) + .update_all( + status: Status::FAILED, + updated_at: current_utc_time, + failed_at: current_utc_time, + publisher_id: publisher_id, + publisher_name: publisher_name) messages.each do |message| outboxer_exception = message.exceptions.create!( diff --git a/lib/outboxer/publisher.rb b/lib/outboxer/publisher.rb index 742aa2d7..678c33aa 100644 --- a/lib/outboxer/publisher.rb +++ b/lib/outboxer/publisher.rb @@ -5,66 +5,79 @@ module Outboxer module Publisher module_function + def terminating? + @status == Status::TERMINATING + end + # Parses command line arguments to configure the publisher. # @param args [Array] The arguments passed via the command line. # @return [Hash] The parsed options including configuration path, environment, - # buffer size, concurrency, and intervals. + # buffer size, batch_size, and intervals. def self.parse_cli_options(args) options = {} parser = ::OptionParser.new do |opts| opts.banner = "Usage: outboxer_publisher [options]" - opts.on("-e", "--environment ENV", "Application environment") do |v| + opts.on("--environment ENV", "Application environment") do |v| options[:environment] = v end - opts.on("-b", "--buffer-size SIZE", Integer, "Buffer size") do |v| + opts.on("--batch-size SIZE", Integer, "Batch size") do |v| + options[:batch_size] = v + end + + opts.on("--buffer-size SIZE", Integer, "Buffer size") do |v| options[:buffer_size] = v end - opts.on("-c", "--concurrency SIZE", Integer, "Concurrency") do |v| - options[:concurrency] = v + opts.on("--buffering-concurrency N", Integer, "Number of threads to buffer messages") do |v| + options[:buffering_concurrency] = v end - opts.on("-t", "--tick-interval SECS", Float, "Tick interval in seconds") do |v| + opts.on("--publishing-concurrency N", Integer, + "Number of threads to publish messages") do |v| + options[:publishing_concurrency] = v + end + + opts.on("--tick-interval SECS", Float, "Tick interval in seconds") do |v| options[:tick_interval] = v end - opts.on("-p", "--poll-interval SECS", Float, "Poll interval in seconds") do |v| + opts.on("--poll-interval SECS", Float, "Poll interval in seconds") do |v| options[:poll_interval] = v end - opts.on("-a", "--heartbeat-interval SECS", Float, "Heartbeat interval in seconds") do |v| + opts.on("--heartbeat-interval SECS", Float, "Heartbeat interval in seconds") do |v| options[:heartbeat_interval] = v end - opts.on("-s", "--sweep-interval SECS", Float, "Sweep interval in seconds") do |v| + opts.on("--sweep-interval SECS", Float, "Sweep interval in seconds") do |v| options[:sweep_interval] = v end - opts.on("-r", "--sweep-retention SECS", Float, "Sweep retention in seconds") do |v| + opts.on("--sweep-retention SECS", Float, "Sweep retention in seconds") do |v| options[:sweep_retention] = v end - opts.on("-w", "--sweep-batch-size SIZE", Integer, "Sweep batch size") do |v| + opts.on("--sweep-batch-size SIZE", Integer, "Sweep batch size") do |v| options[:sweep_batch_size] = v end - opts.on("-l", "--log-level LEVEL", Integer, "Log level") do |v| + opts.on("--log-level LEVEL", Integer, "Log level") do |v| options[:log_level] = v end - opts.on("-C", "--config PATH", "Path to YAML config file") do |v| + opts.on("--config PATH", "Path to YAML config file") do |v| options[:config] = v end - opts.on("-V", "--version", "Print version and exit") do + opts.on("--version", "Print version and exit") do puts "Outboxer version #{Outboxer::VERSION}" exit end - opts.on("-h", "--help", "Show this help message") do + opts.on("--help", "Show this help message") do puts opts exit end @@ -130,16 +143,48 @@ def find_by_id(id:) end end + # Retrieves all publishers including signals. + # @return [Array] A list of all publishers and their details. + def all + ActiveRecord::Base.connection_pool.with_connection do + ActiveRecord::Base.transaction do + publishers = Models::Publisher.includes(:signals).all + + publishers.map do |publisher| + { + id: publisher.id, + name: publisher.name, + status: publisher.status, + settings: publisher.settings, + metrics: publisher.metrics, + created_at: publisher.created_at.utc, + updated_at: publisher.updated_at.utc, + signals: publisher.signals.map do |signal| + { + id: signal.id, + name: signal.name, + created_at: signal.created_at.utc + } + end + } + end + end + end + end + # Creates a new publisher with specified settings and metrics. # @param name [String] The name of the publisher. + # @param batch_size [Integer] The batch size. # @param buffer_size [Integer] The buffer size. - # @param concurrency [Integer] The number of concurrent operations. + # @param buffering_concurrency [Integer] The number of buffering threads. + # @param publishing_concurrency [Integer] The number of publishing threads. # @param tick_interval [Float] The tick interval in seconds. # @param poll_interval [Float] The poll interval in seconds. # @param heartbeat_interval [Float] The heartbeat interval in seconds. # @param time [Time] The current time context for timestamping. # @return [Hash] Details of the created publisher. - def create(name:, buffer_size:, concurrency:, + def create(name:, batch_size:, buffer_size:, + buffering_concurrency:, publishing_concurrency:, tick_interval:, poll_interval:, heartbeat_interval:, sweep_interval:, sweep_retention:, sweep_batch_size:, time: ::Time) @@ -151,8 +196,10 @@ def create(name:, buffer_size:, concurrency:, name: name, status: Status::PUBLISHING, settings: { + "batch_size" => batch_size, "buffer_size" => buffer_size, - "concurrency" => concurrency, + "buffering_concurrency" => buffering_concurrency, + "publishing_concurrency" => publishing_concurrency, "tick_interval" => tick_interval, "poll_interval" => poll_interval, "heartbeat_interval" => heartbeat_interval, @@ -257,17 +304,16 @@ def signal(id:, name:, time: ::Time) end end - # Sleeps for the specified duration or until a signal is received, whichever comes first. + # Sleeps for the specified duration or until terminating, in tick-sized intervals. # @param duration [Float] The maximum duration to sleep in seconds. - # @param start_time [Time] The start time from which the duration is calculated. - # @param tick [Float] The interval in seconds to check for signals. - # @param signal_read [IO] An IO object to check for readable data. - # @param process [Process] The process module used for timing. - # @param kernel [Kernel] The kernel module used for sleeping. - def sleep(duration, start_time:, tick_interval:, signal_read:, process:, kernel:) - while (@status != Status::TERMINATING) && - ((process.clock_gettime(process::CLOCK_MONOTONIC) - start_time) < duration) && - !signal_read.wait_readable(0) + # @param tick_interval [Float] Interval between termination checks. + # @param process [Process] Module used for monotonic clock timing. + # @param kernel [Kernel] Module used for sleep operations. + def sleep(duration, tick_interval:, process:, kernel:) + start_time = process.clock_gettime(process::CLOCK_MONOTONIC) + + while !terminating? && + (process.clock_gettime(process::CLOCK_MONOTONIC) - start_time) < duration kernel.sleep(tick_interval) end end @@ -286,101 +332,23 @@ def trap_signals [signal_read, signal_write] end - # Creates and manages threads dedicated to publishing operations. - # @param id [Integer] The ID of the publisher. - # @param name [String] The name of the publisher. - # @param queue [Queue] The queue to manage publishing messages. - # @param concurrency [Integer] The number of concurrent threads. - # @param logger [Logger] The logger to use for logging operations. - # @return [Array] An array of threads managing publishing. - # @yieldparam message [Hash] A message being processed. - def create_worker_threads(id:, name:, - queue:, concurrency:, - logger:, &block) - concurrency.times.each_with_index.map do |_, index| - Thread.new do - Thread.current.name = "publisher-#{index + 1}" - - while (buffered_messages = queue.pop) - break if buffered_messages.nil? - - publish_buffered_messages( - id: id, name: name, buffered_messages: buffered_messages, - logger: logger, &block) - end - end - end - end - - # Handles the buffering of messages based on the publisher's buffer capacity. - # @param id [Integer] The ID of the publisher. - # @param name [String] The name of the publisher. - # @param queue [Queue] The queue of messages to be published. - # @param buffer_size [Integer] The buffer size. - # @param poll_interval [Float] The poll interval in seconds. - # @param tick_interval [Float] The tick interval in seconds. - # @param signal_read [IO] The IO object to read signals. - # @param logger [Logger] The logger to use for logging operations. - # @param process [Process] The process object to use for timing. - # @param kernel [Kernel] The kernel module to use for sleep operations. - def buffer_messages(id:, name:, queue:, buffer_size:, - poll_interval:, tick_interval:, - signal_read:, logger:, process:, kernel:) - buffer_remaining = buffer_size - queue.size - - if buffer_remaining > 0 - buffered_messages = Message.buffer( - limit: buffer_remaining, - publisher_id: id, publisher_name: name) - - if buffered_messages.any? - queue.push(buffered_messages) - else - Publisher.sleep( - poll_interval, - start_time: process.clock_gettime(process::CLOCK_MONOTONIC), - tick_interval: tick_interval, - signal_read: signal_read, - process: process, kernel: kernel) - end - else - Publisher.sleep( - tick_interval, - start_time: process.clock_gettime(process::CLOCK_MONOTONIC), - tick_interval: tick_interval, - signal_read: signal_read, - process: process, kernel: kernel) - end - rescue StandardError => error - logger.error( - "#{error.class}: #{error.message}\n" \ - "#{error.backtrace.join("\n")}") - rescue ::Exception => error - logger.fatal( - "#{error.class}: #{error.message}\n" \ - "#{error.backtrace.join("\n")}") - - terminate(id: id) - end - # Creates a new thread that manages heartbeat checks, providing system metrics and # handling the first signal in the queue. # @param id [Integer] The ID of the publisher. - # @param heartbeat [Float] The interval in seconds between heartbeats. - # @param tick [Float] The base interval in seconds for sleeping during the loop. - # @param signal_read [IO] An IO object for reading signals. + # @param heartbeat_interval [Float] The interval in seconds between heartbeats. + # @param tick_interval [Float] The base interval in seconds for sleeping during the loop. # @param logger [Logger] A logger for logging heartbeat and system status. # @param time [Time] Current time context. # @param process [Process] The process module for accessing system metrics. # @param kernel [Kernel] The kernel module for sleeping operations. # @return [Thread] The heartbeat thread. def create_heartbeat_thread(id:, - heartbeat_interval:, tick_interval:, signal_read:, + heartbeat_interval:, tick_interval:, logger:, time:, process:, kernel:) Thread.new do Thread.current.name = "heartbeat" - while @status != Status::TERMINATING + while !terminating? begin cpu = `ps -p #{process.pid} -o %cpu`.split("\n").last.to_f rss = `ps -p #{process.pid} -o rss`.split("\n").last.to_i @@ -433,8 +401,6 @@ def create_heartbeat_thread(id:, Publisher.sleep( heartbeat_interval, - signal_read: signal_read, - start_time: process.clock_gettime(process::CLOCK_MONOTONIC), tick_interval: tick_interval, process: process, kernel: kernel) rescue ActiveRecord::RecordNotFound => error @@ -450,10 +416,7 @@ def create_heartbeat_thread(id:, Publisher.sleep( heartbeat_interval, - signal_read: signal_read, - start_time: process.clock_gettime(process::CLOCK_MONOTONIC), - tick_interval: tick_interval, - process: process, kernel: kernel) + tick_interval: tick_interval, process: process, kernel: kernel) rescue ::Exception => error logger.fatal( "#{error.class}: #{error.message}\n" \ @@ -512,8 +475,10 @@ def handle_signal(id:, name:, logger:) end PUBLISH_MESSAGES_DEFAULTS = { - buffer_size: 100, - concurrency: 1, + batch_size: 1000, + buffer_size: 10, + buffering_concurrency: 1, + publishing_concurrency: 1, tick_interval: 0.1, poll_interval: 5.0, heartbeat_interval: 5.0, @@ -525,8 +490,10 @@ def handle_signal(id:, name:, logger:) # Publish queued messages concurrently # @param name [String] The name of the publisher. + # @param batch_size [Integer] The batch size. # @param buffer_size [Integer] The buffer size. - # @param concurrency [Integer] The number of threads for concurrent publishing. + # @param buffering_concurrency [Integer] The number of threads buffering queued messages. + # @param publishing_concurrency [Integer] The number of threads publishing buffered messages. # @param tick_interval [Float] The tick interval in seconds. # @param poll_interval [Float] The poll interval in seconds. # @param heartbeat_interval [Float] The heartbeat interval in seconds. @@ -542,8 +509,10 @@ def handle_signal(id:, name:, logger:) # @yieldparam messages [Array] An array of message hashes retrieved from the buffer. def publish_messages( name: "#{::Socket.gethostname}:#{::Process.pid}", + batch_size: PUBLISH_MESSAGES_DEFAULTS[:batch_size], buffer_size: PUBLISH_MESSAGES_DEFAULTS[:buffer_size], - concurrency: PUBLISH_MESSAGES_DEFAULTS[:concurrency], + buffering_concurrency: PUBLISH_MESSAGES_DEFAULTS[:buffering_concurrency], + publishing_concurrency: PUBLISH_MESSAGES_DEFAULTS[:publishing_concurrency], tick_interval: PUBLISH_MESSAGES_DEFAULTS[:tick_interval], poll_interval: PUBLISH_MESSAGES_DEFAULTS[:poll_interval], heartbeat_interval: PUBLISH_MESSAGES_DEFAULTS[:heartbeat_interval], @@ -558,8 +527,10 @@ def publish_messages( "(#{RUBY_RELEASE_DATE} revision #{RUBY_REVISION[0, 10]}) [#{RUBY_PLATFORM}]" logger.info "Outboxer config " \ + "batch_size=#{batch_size}, " \ "buffer_size=#{buffer_size}, " \ - "concurrency=#{concurrency}, " \ + "buffering_concurrency=#{buffering_concurrency}, " \ + "publishing_concurrency=#{publishing_concurrency}, " \ "tick_interval=#{tick_interval} " \ "poll_interval=#{poll_interval}, " \ "heartbeat_interval=#{heartbeat_interval}, " \ @@ -570,137 +541,150 @@ def publish_messages( Setting.create_all - queue = Queue.new + queue = SizedQueue.new(buffer_size) publisher = create( - name: name, buffer_size: buffer_size, concurrency: concurrency, + name: name, batch_size: batch_size, buffer_size: buffer_size, + buffering_concurrency: buffering_concurrency, + publishing_concurrency: publishing_concurrency, tick_interval: tick_interval, poll_interval: poll_interval, heartbeat_interval: heartbeat_interval, sweep_interval: sweep_interval, sweep_retention: sweep_retention, sweep_batch_size: sweep_batch_size) - id = publisher[:id] + buffering_threads = create_buffering_threads( + id: publisher[:id], name: name, + concurrency: buffering_concurrency, + queue: queue, + batch_size: batch_size, + poll_interval: poll_interval, tick_interval: tick_interval, + logger: logger, process: process, kernel: kernel) - worker_threads = create_worker_threads( - id: id, name: name, queue: queue, concurrency: concurrency, + publishing_threads = create_publishing_threads( + id: publisher[:id], name: name, queue: queue, concurrency: publishing_concurrency, logger: logger, &block) - signal_read, _signal_write = trap_signals - heartbeat_thread = create_heartbeat_thread( - id: id, - heartbeat_interval: heartbeat_interval, - tick_interval: tick_interval, - signal_read: signal_read, - logger: logger, - time: time, process: process, kernel: kernel) + id: publisher[:id], heartbeat_interval: heartbeat_interval, tick_interval: tick_interval, + logger: logger, time: time, process: process, kernel: kernel) sweeper_thread = create_sweeper_thread( - id: id, + id: publisher[:id], sweep_interval: sweep_interval, sweep_retention: sweep_retention, sweep_batch_size: sweep_batch_size, tick_interval: tick_interval, - signal_read: signal_read, logger: logger, time: time, process: process, kernel: kernel) - loop do - case @status - when Status::PUBLISHING - buffer_messages( - id: id, name: name, - queue: queue, buffer_size: buffer_size, - poll_interval: poll_interval, tick_interval: tick_interval, - signal_read: signal_read, logger: logger, process: process, kernel: kernel) - when Status::STOPPED - Publisher.sleep( - tick_interval, - start_time: process.clock_gettime(process::CLOCK_MONOTONIC), - tick_interval: tick_interval, - signal_read: signal_read, process: process, kernel: kernel) - when Status::TERMINATING - break - end - - if signal_read.wait_readable(0) - signal_name = - begin - signal_read.gets.strip - rescue StandardError - nil - end + signal_read, _signal_write = trap_signals - handle_signal(id: id, name: signal_name, logger: logger) + while !terminating? + if signal_read.wait_readable(tick_interval) + handle_signal(id: publisher[:id], name: signal_read.gets.chomp, logger: logger) end end logger.info "Outboxer terminating" - concurrency.times { queue.push(nil) } + buffering_threads.each(&:join) + + publishing_concurrency.times { queue.push(nil) } + publishing_threads.each(&:join) - worker_threads.each(&:join) heartbeat_thread.join sweeper_thread.join - delete(id: id) + delete(id: publisher[:id]) logger.info "Outboxer terminated" end - # Publishes buffered messages + def connection_pool_size(buffering_concurrency:, publishing_concurrency:) + buffering_concurrency + publishing_concurrency + 3 # (main + heartbeat + sweeper) + end + # @param id [Integer] The ID of the publisher. # @param name [String] The name of the publisher. - # @param buffered_messages [Hash] The message data retrieved from the buffer. - # @param logger [Logger] Logger for recording the outcome of the publishing attempt. - # @yield [Hash] A block to process the publishing of the message. - def publish_buffered_messages(id:, name:, buffered_messages:, logger:, &block) - buffered_message_ids = buffered_messages.map { |buffered_message| buffered_message[:id] } - - publishing_messages = Message.publishing_by_ids( - ids: buffered_message_ids, publisher_id: id, publisher_name: name) - - block.call({ id: id, name: name }, publishing_messages) - rescue StandardError => error - logger.error( - "#{error.class}: #{error.message}\n" \ - "#{error.backtrace.join("\n")}") - rescue ::Exception => error - logger.fatal( - "#{error.class}: #{error.message}\n" \ - "#{error.backtrace.join("\n")}") - - terminate(id: id) + # @param queue [Queue] A sized queue to hold buffered messages. + # @param concurrency [Integer] The number of concurrent threads. + # @param batch_size [Integer] The number of messages to buffer per batch. + # @param logger [Logger] Logger instance for logging errors or diagnostics. + # @return [Array] An array of started buffering threads. + def create_buffering_threads(id:, name:, concurrency:, queue:, batch_size:, + poll_interval:, tick_interval:, logger:, + process:, kernel:) + Array.new(concurrency) do |index| + Thread.new do + Thread.current.name = "buffering-#{index + 1}" + + while !terminating? + begin + buffered_messages = Message.buffer( + publisher_id: id, publisher_name: name, limit: batch_size) + + if buffered_messages.any? + queue.push(buffered_messages) + else + Publisher.sleep( + poll_interval, tick_interval: tick_interval, process: process, kernel: kernel) + end + rescue StandardError => error + logger.error("#{error.class}: #{error.message}\n#{error.backtrace.join("\n")}") + rescue Exception => error + logger.fatal("#{error.class}: #{error.message}\n#{error.backtrace.join("\n")}") + + terminate(id: id) + end + end + + logger.info "#{Thread.current.name} shutting down" + end + end end - # Retrieves all publishers including signals. - # @return [Array] A list of all publishers and their details. - def all - ActiveRecord::Base.connection_pool.with_connection do - ActiveRecord::Base.transaction do - publishers = Models::Publisher.includes(:signals).all + # Creates and manages threads dedicated to publishing operations. + # @param id [Integer] The ID of the publisher. + # @param name [String] The name of the publisher. + # @param queue [Queue] The queue to manage publishing messages. + # @param concurrency [Integer] The number of concurrent threads. + # @param logger [Logger] The logger to use for logging operations. + # @return [Array] An array of threads managing publishing. + # @yieldparam message [Hash] A message being processed. + def create_publishing_threads(id:, name:, queue:, concurrency:, + logger:, &block) + Array.new(concurrency) do |index| + Thread.new do + Thread.current.name = "publishing-#{index + 1}" - publishers.map do |publisher| - { - id: publisher.id, - name: publisher.name, - status: publisher.status, - settings: publisher.settings, - metrics: publisher.metrics, - created_at: publisher.created_at.utc, - updated_at: publisher.updated_at.utc, - signals: publisher.signals.map do |signal| - { - id: signal.id, - name: signal.name, - created_at: signal.created_at.utc - } - end - } + while (messages = queue.pop) + + # logger.info "#{Thread.current.name} popped from queue and size is now #{queue.size}" + + begin + message_ids = messages.map { |message| message[:id] } + + publishing_messages = Message.publishing_by_ids( + ids: message_ids, publisher_id: id, publisher_name: name) + + block.call({ id: id, name: name }, publishing_messages) + rescue StandardError => error + logger.error( + "#{error.class}: #{error.message}\n" \ + "#{error.backtrace.join("\n")}") + rescue ::Exception => error + logger.fatal( + "#{error.class}: #{error.message}\n" \ + "#{error.backtrace.join("\n")}") + + terminate(id: id) + end end + + logger.info "#{Thread.current.name} shutting down" end end end @@ -712,18 +696,17 @@ def all # @param sweep_retention [Float] Retention period in seconds for keeping published messages. # @param sweep_batch_size [Integer] Max number of messages to delete in each sweep. # @param tick_interval [Float] Time in seconds between signal checks while sleeping. - # @param signal_read [IO] IO pipe to receive signals for graceful handling. # @param logger [Logger] Logger instance to report errors and fatal issues. # @param time [Time] Module used for fetching current UTC time. # @param process [Process] Process module used for monotonic clock timings. # @param kernel [Kernel] Kernel module used for sleeping between sweeps. # @return [Thread] The thread executing the sweeping logic. def create_sweeper_thread(id:, sweep_interval:, sweep_retention:, sweep_batch_size:, - tick_interval:, signal_read:, logger:, time:, process:, kernel:) + tick_interval:, logger:, time:, process:, kernel:) Thread.new do Thread.current.name = "sweeper" - while @status != Status::TERMINATING + while !terminating? begin deleted_count = Message.delete_batch( status: Message::Status::PUBLISHED, @@ -733,9 +716,7 @@ def create_sweeper_thread(id:, sweep_interval:, sweep_retention:, sweep_batch_si if deleted_count == 0 Publisher.sleep( sweep_interval, - start_time: process.clock_gettime(process::CLOCK_MONOTONIC), tick_interval: tick_interval, - signal_read: signal_read, process: process, kernel: kernel) end diff --git a/lib/outboxer/web.rb b/lib/outboxer/web.rb index 6943b2db..64549437 100755 --- a/lib/outboxer/web.rb +++ b/lib/outboxer/web.rb @@ -22,7 +22,7 @@ environment = ENV["APP_ENV"] || ENV["RAILS_ENV"] || "development" -config = Outboxer::Database.config(environment: environment, concurrency: 5) +config = Outboxer::Database.config(environment: environment, pool: 1) Outboxer::Database.connect(config: config) module Outboxer diff --git a/lib/outboxer/web/views/home.erb b/lib/outboxer/web/views/home.erb index b40b97f6..96b71386 100644 --- a/lib/outboxer/web/views/home.erb +++ b/lib/outboxer/web/views/home.erb @@ -34,14 +34,6 @@ href="<%= outboxer_path("/publisher/#{publisher[:id]}#{normalise_query_string(time_zone: denormalised_query_params[:time_zone])}") %>"> <%= publisher[:name] %> -
- buffer_size: <%= publisher[:settings]['buffer_size'] %>, concurrency: <%= publisher[:settings]['concurrency'] %>
- -
<%= Outboxer::Web.time_ago_in_words(publisher[:created_at]) %> ago <%= Outboxer::Web.time_ago_in_words(publisher[:updated_at]) %> ago diff --git a/lib/outboxer/web/views/publisher.erb b/lib/outboxer/web/views/publisher.erb index a364cb07..899bd377 100644 --- a/lib/outboxer/web/views/publisher.erb +++ b/lib/outboxer/web/views/publisher.erb @@ -78,13 +78,21 @@
Settings
+ + + + - - + + + + + + diff --git a/spec/fixtures/config/outboxer.yml b/spec/fixtures/config/outboxer.yml index 7f5374c9..d89ece71 100644 --- a/spec/fixtures/config/outboxer.yml +++ b/spec/fixtures/config/outboxer.yml @@ -1,14 +1,19 @@ --- -:buffer_size: 100 -:concurrency: 1 -:tick_interval: 0.1 -:poll_interval: 5.0 -:heartbeat_interval: 5.0 -:sweep_interval: 60 -:sweep_retention: 60 -:sweep_batch_size: 100 -:log_level: 1 +:batch_size: 999 +:buffer_size: 9 +:buffering_concurrency: 2 +:publishing_concurrency: 2 +:tick_interval: 0.2 +:poll_interval: 3.0 +:heartbeat_interval: 2.0 +:sweep_interval: 30 +:sweep_retention: 30 +:sweep_batch_size: 50 +:log_level: 0 development: - :concurrency: 2 + :buffering_concurrency: 3 + :publishing_concurrency: 3 test: + :buffering_concurrency: 4 + :publishing_concurrency: 4 :concurrency: <%= ENV['RAILS_MAX_THREADS'] %> diff --git a/spec/lib/outboxer/database/config_spec.rb b/spec/lib/outboxer/database/config_spec.rb index ad14e8ce..83b3bdfe 100644 --- a/spec/lib/outboxer/database/config_spec.rb +++ b/spec/lib/outboxer/database/config_spec.rb @@ -30,12 +30,12 @@ module Outboxer end it "returns correct config" do - config = Database.config(environment: :test, concurrency: 3, path: tmp_path) + config = Database.config(environment: :test, pool: 3, path: tmp_path) expect(config).to include( adapter: "postgresql", database: "override_db", - pool: 6) + pool: 3) end end @@ -48,7 +48,7 @@ module Outboxer it "returns an empty hash" do missing_path = File.join(tmp_dir, "missing_database_override.yml") - config = Database.config(environment: :test, concurrency: 2, path: missing_path) + config = Database.config(environment: :test, pool: 2, path: missing_path) expect(config).to eq({}) end @@ -56,7 +56,7 @@ module Outboxer context "when no path is specified" do it "uses the default path" do - config = Database.config(environment: :development, concurrency: 3) + config = Database.config(environment: :development, pool: 3) expect(config).to include( adapter: a_string_matching(/postgresql|mysql2/), @@ -65,7 +65,7 @@ module Outboxer username: "outboxer_developer", password: "outboxer_password", database: "outboxer_development", - pool: 6 + pool: 3 ) end end diff --git a/spec/lib/outboxer/database/connect_spec.rb b/spec/lib/outboxer/database/connect_spec.rb index 10315ab0..fdbaf086 100644 --- a/spec/lib/outboxer/database/connect_spec.rb +++ b/spec/lib/outboxer/database/connect_spec.rb @@ -6,12 +6,12 @@ module Outboxer before(:each) { Database.disconnect(logger: nil) } after(:all) do - config = Database.config(environment: "test", concurrency: 20) + config = Database.config(environment: "test", pool: 20) Database.connect(config: config, logger: nil) end context "when db config valid" do - let(:config) { Database.config(environment: "test", concurrency: 20) } + let(:config) { Database.config(environment: "test", pool: 20) } it "establishes a connection without errors" do expect do diff --git a/spec/lib/outboxer/logger_spec.rb b/spec/lib/outboxer/logger_spec.rb index 38e6201b..67bfa2cf 100644 --- a/spec/lib/outboxer/logger_spec.rb +++ b/spec/lib/outboxer/logger_spec.rb @@ -1,5 +1,4 @@ require "rails_helper" - require "stringio" module Outboxer @@ -9,17 +8,43 @@ module Outboxer let(:logger) { Logger.new(output, level: Logger.const_get(level.upcase)) } describe "logging" do - it "logs messages with correct format" do - message = "test message" - logger.info { message } + context "thread name not set" do + it "logs messages with generated tid" do + Thread.current["outboxer_tid"] = nil + Thread.current.name = nil + + message = "test message" + logger.info { message } + + log_output = output.string + log_lines = log_output.strip.split("\n") + + log_lines.each do |line| + expect(line.length).to be <= 100 + expect(line).to match( + /\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{3}Z pid=\d+ tid=\w+ INFO: #{message}\n?/ + ) + end + end + end + + context "thread name set" do + it "logs messages with thread name tid" do + Thread.current["outboxer_tid"] = nil + Thread.current.name = "blah" + + message = "test message" + logger.info { message } - log_output = output.string - log_lines = log_output.strip.split("\n") + log_output = output.string + log_lines = log_output.strip.split("\n") - log_lines.each do |line| - expect(line.length).to be <= 100 - expect(line).to match( - /\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{3}Z pid=\d+ tid=\w+ INFO: #{message}\n?/) + log_lines.each do |line| + expect(line.length).to be <= 100 + expect(line).to match( + /\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{3}Z pid=\d+ tid=\w+ INFO: #{message}\n?/ + ) + end end end end diff --git a/spec/lib/outboxer/publisher/config_spec.rb b/spec/lib/outboxer/publisher/config_spec.rb index 3f7f3e42..7e5d9782 100644 --- a/spec/lib/outboxer/publisher/config_spec.rb +++ b/spec/lib/outboxer/publisher/config_spec.rb @@ -9,44 +9,50 @@ module Outboxer allow(ENV).to receive(:[]).with("RAILS_MAX_THREADS").and_return("4") config = Publisher.config(environment: "test", path: path) expect(config).to eq({ - buffer_size: 100, - concurrency: 4, - tick_interval: 0.1, - poll_interval: 5.0, - heartbeat_interval: 5.0, - sweep_interval: 60, - sweep_retention: 60, - sweep_batch_size: 100, - log_level: 1 + batch_size: 999, + buffer_size: 9, + buffering_concurrency: 4, + publishing_concurrency: 4, + tick_interval: 0.2, + poll_interval: 3.0, + heartbeat_interval: 2.0, + sweep_interval: 30, + sweep_retention: 30, + sweep_batch_size: 50, + log_level: 0 }) end it "returns root values when environment not overridden" do expect(Publisher.config(path: path)).to eq({ - buffer_size: 100, - concurrency: 1, - tick_interval: 0.1, - poll_interval: 5.0, - heartbeat_interval: 5.0, - sweep_interval: 60, - sweep_retention: 60, - sweep_batch_size: 100, - log_level: 1 + batch_size: 999, + buffer_size: 9, + buffering_concurrency: 2, + publishing_concurrency: 2, + tick_interval: 0.2, + poll_interval: 3.0, + heartbeat_interval: 2.0, + sweep_interval: 30, + sweep_retention: 30, + sweep_batch_size: 50, + log_level: 0 }) end it "returns environment overrides" do expect(Publisher.config(environment: "development", path: path)) .to eq({ - buffer_size: 100, - concurrency: 2, - tick_interval: 0.1, - poll_interval: 5.0, - heartbeat_interval: 5.0, - sweep_interval: 60, - sweep_retention: 60, - sweep_batch_size: 100, - log_level: 1 + batch_size: 999, + buffer_size: 9, + buffering_concurrency: 3, + publishing_concurrency: 3, + tick_interval: 0.2, + poll_interval: 3.0, + heartbeat_interval: 2.0, + sweep_interval: 30, + sweep_retention: 30, + sweep_batch_size: 50, + log_level: 0 }) end diff --git a/spec/lib/outboxer/publisher/parse_cli_options_spec.rb b/spec/lib/outboxer/publisher/parse_cli_options_spec.rb index ec86ebf6..cfa556c0 100644 --- a/spec/lib/outboxer/publisher/parse_cli_options_spec.rb +++ b/spec/lib/outboxer/publisher/parse_cli_options_spec.rb @@ -6,121 +6,91 @@ module Outboxer context "when no options are passed" do let(:argv) { [] } - it "parses correctly" do + it "returns empty hash" do expect(Publisher.parse_cli_options(argv)).to eq({}) end end - context "when parsing the concurrency option" do - it "parses short and long flags" do - expect(Publisher.parse_cli_options(["-c", "5"])).to eq({ concurrency: 5 }) - expect(Publisher.parse_cli_options(["--concurrency", "5"])).to eq({ concurrency: 5 }) + context "when options are passed" do + context "when parsing the environment option" do + it "parses the environment flag" do + expect(Publisher.parse_cli_options(["--environment", "staging"])) + .to eq({ environment: "staging" }) + end end - end - context "when parsing the environment option" do - it "parses short and long flags" do - expect(Publisher.parse_cli_options(["-e", "staging"])).to eq({ environment: "staging" }) - expect( - Publisher.parse_cli_options(["--environment", "staging"]) - ).to eq({ environment: "staging" }) + it "parses the batch size flag" do + expect(Publisher.parse_cli_options(["--batch-size", "100"])).to eq({ batch_size: 100 }) end - end - context "when parsing the buffer size option" do - it "parses short and long flags" do - expect(Publisher.parse_cli_options(["-b", "100"])).to eq({ buffer_size: 100 }) + it "parses the buffer size flag" do expect(Publisher.parse_cli_options(["--buffer-size", "100"])).to eq({ buffer_size: 100 }) end - end - context "when parsing the tick interval option" do - it "parses short and long flags" do - expect(Publisher.parse_cli_options(["-t", "0.5"])).to eq({ tick_interval: 0.5 }) + it "parses buffering concurrency flag" do + expect(Publisher.parse_cli_options(["--buffering-concurrency", "8"])) + .to eq({ buffering_concurrency: 8 }) + end + + it "parses publishing concurrency flag" do + expect(Publisher.parse_cli_options(["--publishing-concurrency", "7"])) + .to eq({ publishing_concurrency: 7 }) + end + + it "parses the tick interval flag" do expect( Publisher.parse_cli_options(["--tick-interval", "0.5"]) ).to eq({ tick_interval: 0.5 }) end - end - context "when parsing the poll interval option" do - it "parses short and long flags" do - expect(Publisher.parse_cli_options(["-p", "30"])).to eq({ poll_interval: 30 }) + it "parses the poll interval flag" do expect( Publisher.parse_cli_options(["--poll-interval", "30"]) ).to eq({ poll_interval: 30 }) end - end - context "when parsing the heartbeat interval option" do - it "parses short and long flags" do - expect(Publisher.parse_cli_options(["-a", "10"])).to eq({ heartbeat_interval: 10 }) + it "parses the heartbeat interval flag" do expect( Publisher.parse_cli_options(["--heartbeat-interval", "10"]) ).to eq({ heartbeat_interval: 10 }) end - end - context "when parsing the sweep interval option" do - it "parses short and long flags" do - expect(Publisher.parse_cli_options(["-s", "10"])).to eq({ sweep_interval: 10 }) + it "parses the sweep interval flag" do expect( Publisher.parse_cli_options(["--sweep-interval", "10"]) ).to eq({ sweep_interval: 10 }) end - end - context "when parsing the sweep retention option" do - it "parses short and long flags" do - expect(Publisher.parse_cli_options(["-r", "10"])).to eq({ sweep_retention: 10 }) + it "parses the sweep retention flag" do expect( Publisher.parse_cli_options(["--sweep-retention", "10"]) ).to eq({ sweep_retention: 10 }) end - end - context "when parsing the sweep batch size option" do - it "parses short and long flags" do - expect(Publisher.parse_cli_options(["-w", "10"])).to eq({ sweep_batch_size: 10 }) + it "parses the sweep batch size flag" do expect( Publisher.parse_cli_options(["--sweep-batch_size", "10"]) ).to eq({ sweep_batch_size: 10 }) end - end - context "when parsing the config file path option" do - it "parses short and long flags" do - expect( - Publisher.parse_cli_options(["-C", "config/path.yml"]) - ).to eq({ config: "config/path.yml" }) + it "parses the config file flag" do expect( Publisher.parse_cli_options(["--config", "config/path.yml"]) ).to eq({ config: "config/path.yml" }) end - end - context "when parsing the log level option" do - it "parses short and long flags" do - expect(Publisher.parse_cli_options(["-l", "0"])).to eq({ log_level: 0 }) + it "parses the log level flag" do expect(Publisher.parse_cli_options(["--log-level", "0"])).to eq({ log_level: 0 }) end - end - - context "when the version option is used" do - let(:argv) { ["-V"] } - it "prints version and exits" do - expect { Publisher.parse_cli_options(argv) } + it "parses the version flag, prints version and exits" do + expect { Publisher.parse_cli_options(["--version"]) } .to output(/Outboxer version/).to_stdout.and raise_error(SystemExit) end - end - - context "when the help option is used" do - let(:argv) { ["-h"] } - it "prints help and exits" do + it "parses the help flag, prints help and exits" do expect do - Publisher.parse_cli_options(argv) + Publisher.parse_cli_options(["--help"]) end.to output( /Usage: outboxer_publisher \[options\]/).to_stdout.and raise_error(SystemExit) end diff --git a/spec/lib/outboxer/publisher/publish_messages_spec.rb b/spec/lib/outboxer/publisher/publish_messages_spec.rb index 1cef16e6..463f6c55 100644 --- a/spec/lib/outboxer/publisher/publish_messages_spec.rb +++ b/spec/lib/outboxer/publisher/publish_messages_spec.rb @@ -8,7 +8,7 @@ module Outboxer let(:poll_interval) { 1 } let(:tick_interval) { 0.1 } let(:logger) { instance_double(Logger, debug: true, error: true, fatal: true, info: true, level: 1) } - let(:kernel) { class_double(Kernel, sleep: nil) } + let(:kernel) { Kernel } before do allow(logger).to receive(:level=) diff --git a/spec/lib/outboxer/publisher/terminate_spec.rb b/spec/lib/outboxer/publisher/terminate_spec.rb new file mode 100644 index 00000000..1d587245 --- /dev/null +++ b/spec/lib/outboxer/publisher/terminate_spec.rb @@ -0,0 +1,13 @@ +require "rails_helper" + +module Outboxer + RSpec.describe Publisher do + describe ".terminate" do + it "sets publisher status to TERMINATING" do + Publisher.terminate(id: 1) + + expect(Publisher.terminating?).to eq(true) + end + end + end +end diff --git a/spec/rails_helper.rb b/spec/rails_helper.rb index 33543587..e66e10e7 100644 --- a/spec/rails_helper.rb +++ b/spec/rails_helper.rb @@ -1,7 +1,11 @@ require "simplecov" require "coveralls" -SimpleCov.formatter = Coveralls::SimpleCov::Formatter +SimpleCov.formatters = SimpleCov::Formatter::MultiFormatter.new([ + SimpleCov::Formatter::HTMLFormatter, + Coveralls::SimpleCov::Formatter +]) + SimpleCov.start require "spec_helper" @@ -17,7 +21,7 @@ config.include FactoryBot::Syntax::Methods config.before(:all) do - db_config = Outboxer::Database.config(environment: "test", concurrency: 2) + db_config = Outboxer::Database.config(environment: "test", pool: 2) Outboxer::Database.connect(config: db_config, logger: nil) DatabaseCleaner.strategy = :truncation end diff --git a/tasks/database.rake b/tasks/database.rake index 0969709c..4e9767b0 100644 --- a/tasks/database.rake +++ b/tasks/database.rake @@ -7,7 +7,7 @@ namespace :outboxer do namespace :db do task :drop do environment = ENV["APP_ENV"] || ENV["RAILS_ENV"] || "development" - db_config = Outboxer::Database.config(environment: environment, concurrency: 1) + db_config = Outboxer::Database.config(environment: environment, pool: 1) ActiveRecord::Base.establish_connection(db_config.merge(database: "postgres")) ActiveRecord::Base.connection.drop_database(db_config[:database]) @@ -16,7 +16,7 @@ namespace :outboxer do task :create do environment = ENV["APP_ENV"] || ENV["RAILS_ENV"] || "development" - db_config = Outboxer::Database.config(environment: environment, concurrency: 1) + db_config = Outboxer::Database.config(environment: environment, pool: 1) ActiveRecord::Base.establish_connection(db_config.merge(database: "postgres")) ActiveRecord::Base.connection.create_database(db_config[:database]) @@ -25,7 +25,7 @@ namespace :outboxer do task :migrate do environment = ENV["APP_ENV"] || ENV["RAILS_ENV"] || "development" - db_config = Outboxer::Database.config(environment: environment, concurrency: 1) + db_config = Outboxer::Database.config(environment: environment, pool: 1) ActiveRecord::Base.establish_connection(db_config) require_relative "../db/migrate/create_outboxer_settings" @@ -51,7 +51,7 @@ namespace :outboxer do task :seed do environment = ENV["APP_ENV"] || ENV["RAILS_ENV"] || "development" - db_config = Outboxer::Database.config(environment: environment, concurrency: 1) + db_config = Outboxer::Database.config(environment: environment, pool: 1) ActiveRecord::Base.establish_connection(db_config) ActiveRecord::Base.connection.disconnect!
Batch Size<%= publisher[:settings]['batch_size'] %>
Buffer Size <%= publisher[:settings]['buffer_size'] %>
Concurrency<%= publisher[:settings]['concurrency'] %>Buffering Concurrency<%= publisher[:settings]['buffering_concurrency'] %>
Publishing Concurrency<%= publisher[:settings]['publishing_concurrency'] %>
Tick Interval