Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ Style/StringLiterals:
Style/StringLiteralsInInterpolation:
Enabled: true
EnforcedStyle: double_quotes
Style/RescueStandardError:
Enabled: false
Layout/LineLength:
Max: 100
Exclude:
- 'bin/outboxer_publisher'
- 'bin/outboxer_load'
Layout/ArgumentAlignment:
EnforcedStyle: with_fixed_indentation
Style/Documentation:
Expand Down Expand Up @@ -215,6 +218,8 @@ Style/NegatedIfElseCondition: # new in 1.2
Enabled: true
Style/NegatedIf:
Enabled: false
Style/NegatedWhile:
Enabled: false
Style/NestedFileDirname: # new in 1.26
Enabled: true
Style/NilLambda: # new in 1.3
Expand All @@ -233,6 +238,8 @@ Style/QuotedSymbols: # new in 1.16
Enabled: true
Style/RedundantArgument: # new in 1.4
Enabled: true
Style/RedundantBegin:
Enabled: false
Style/RedundantArrayConstructor: # new in 1.52
Enabled: true
Style/RedundantConstantBase: # new in 1.40
Expand Down
51 changes: 35 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
[![Coverage Status](https://coveralls.io/repos/github/fast-programmer/outboxer/badge.svg)](https://coveralls.io/github/fast-programmer/outboxer)
[![Join our Discord](https://img.shields.io/badge/Discord-blue?style=flat&logo=discord&logoColor=white)](https://discord.gg/x6EUehX6vU)

**Outboxer** is an implementation of the [**transactional outbox pattern**](https://docs.aws.amazon.com/prescriptive-guidance/latest/cloud-design-patterns/transactional-outbox.html) for **Ruby on Rails** applications.
**Outboxer** is an implementation of the [transactional outbox pattern](https://docs.aws.amazon.com/prescriptive-guidance/latest/cloud-design-patterns/transactional-outbox.html) for **Ruby on Rails** applications.

It helps you migrate to **event-driven architecture** with at least once delivery guarantees.

Expand Down Expand Up @@ -60,8 +60,23 @@ end

**7. Create derived event type**

```bash
bin/rails c
```

```ruby
Accountify::InvoiceRaisedEvent.create!
ActiveRecord::Base.logger = Logger.new(STDOUT)

Accountify::InvoiceRaisedEvent.create!(created_at: Time.current)
```

**8. Observe transactional consistency**

```log
TRANSACTION (0.2ms) BEGIN
Event Create (0.4ms) INSERT INTO "events" ...
Outboxer::Message Create (0.3ms) INSERT INTO "outboxer_messages" ...
TRANSACTION (0.2ms) COMMIT
```

**8. Publish outboxer messages**
Expand All @@ -70,26 +85,30 @@ Accountify::InvoiceRaisedEvent.create!
# bin/outboxer_publisher

Outboxer::Publisher.publish_messages do |publisher, messages|
# TODO: publish messages here

Outboxer::Message.published_by_ids(
ids: messages.map { |message| message[:id] },
publisher_id: publisher[:id],
publisher_name: publisher[:name])
rescue => error
Outboxer::Message.failed_by_ids(
ids: messages.map { |message| message[:id] },
exception: error,
publisher_id: publisher[:id],
publisher_name: publisher[:name])
begin
# TODO: publish messages here
rescue => error
Outboxer::Message.failed_by_ids(
ids: messages.map { |message| message[:id] },
exception: error,
publisher_id: publisher[:id],
publisher_name: publisher[:name])
else
Outboxer::Message.published_by_ids(
ids: messages.map { |message| message[:id] },
publisher_id: publisher[:id],
publisher_name: publisher[:name])
end
end
```

see https://github.com/fast-programmer/outboxer/wiki/Outboxer-publisher-block-examples
To integrate with Sidekiq, Bunny, Kafka and AWS SQS see the [publisher block examples](https://github.com/fast-programmer/outboxer/wiki/Outboxer-publisher-block-examples).

# Testing

The generated `spec/bin/outboxer_publisher` adds end to end queue and publish message test coverage.
To ensure you have end to end coverage:

`bin/rspec spec/bin/outboxer_publisher`

# Monitoring

Expand Down
2 changes: 1 addition & 1 deletion bin/console
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require "outboxer"
require "irb"

environment = ENV["RAILS_ENV"] || "development"
db_config = Outboxer::Database.config(environment: environment, concurrency: 1)
db_config = Outboxer::Database.config(environment: environment, pool: 1)
Outboxer::Database.connect(config: db_config)

ActiveRecord::Base.logger = Logger.new($stdout)
Expand Down
53 changes: 15 additions & 38 deletions bin/outboxer_load
Original file line number Diff line number Diff line change
@@ -1,48 +1,25 @@
#!/usr/bin/env ruby
# frozen_string_literal: true

require "bundler/setup"
require "outboxer"
require "securerandom"
require "outboxer/loader"

cli_options = Outboxer::Loader.parse_cli_options(ARGV)
environment = cli_options.delete(:environment) || ENV["APP_ENV"] || ENV["RAILS_ENV"] || "development"
options = Outboxer::Loader::LOAD_DEFAULTS.merge(cli_options)
logger = Outboxer::Logger.new($stdout, level: Outboxer::Logger::INFO)

max_messages_count = ARGV.empty? ? 500_000 : ARGV[0].to_i
database_config = Outboxer::Database.config(environment: environment, pool: options[:concurrency])

if max_messages_count <= 0
logger.info "Please provide a positive integer for max messages count"
exit(1)
end

environment = ENV.fetch("RAILS_ENV", "development")
db_config = Outboxer::Database.config(environment: environment, concurrency: 1)
Outboxer::Database.connect(config: db_config)

reader, writer = IO.pipe

Signal.trap("INT") { writer.puts("INT") }

queued_messages_count = 0

while queued_messages_count < max_messages_count
break if reader.wait_readable(0) && reader.gets.strip == "INT"
Outboxer::Database.connect(config: database_config, logger: logger)

class Event
attr_accessor :id

def initialize(id:)
@id = id
end
end

messageable = Event.new(id: SecureRandom.hex(3))

message = Outboxer::Message.queue(messageable: messageable)

logger.info "Outboxer queued message " \
"id=#{message[:id]} " \
"messageable_type=#{message[:messageable_type]} " \
"messageable_id=#{message[:messageable_id]} "
queued_messages_count += 1
begin
Outboxer::Loader.load(
size: options[:size],
batch_size: options[:batch_size],
concurrency: options[:concurrency],
logger: logger)
ensure
Outboxer::Database.disconnect(logger: logger)
end

logger.info "Outboxer finished queuing #{queued_messages_count} messages"
2 changes: 1 addition & 1 deletion bin/outboxer_published_messages_deleter
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ environment = ENV["APP_ENV"] || "development"

logger = Outboxer::Logger.new($stdout, level: log_level)

db_config = Outboxer::Database.config(environment: environment, concurrency: 1)
db_config = Outboxer::Database.config(environment: environment, pool: 1)
Outboxer::Database.connect(config: db_config, logger: logger)

reader, writer = IO.pipe
Expand Down
36 changes: 22 additions & 14 deletions bin/outboxer_publisher
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,19 @@ config = Outboxer::Publisher.config(environment: environment, path: config_path)
options = Outboxer::Publisher::PUBLISH_MESSAGES_DEFAULTS.merge(config.merge(cli_options))
logger = Outboxer::Logger.new($stdout, level: options[:log_level])

database_config = Outboxer::Database.config(
environment: environment, concurrency: options[:concurrency])
connection_pool_size = Outboxer::Publisher.connection_pool_size(
buffering_concurrency: options[:buffering_concurrency],
publishing_concurrency: options[:publishing_concurrency])

database_config = Outboxer::Database.config(environment: environment, pool: connection_pool_size)
Outboxer::Database.connect(config: database_config, logger: logger)

begin
Outboxer::Publisher.publish_messages(
batch_size: options[:batch_size],
buffer_size: options[:buffer_size],
concurrency: options[:concurrency],
buffering_concurrency: options[:buffering_concurrency],
publishing_concurrency: options[:publishing_concurrency],
tick_interval: options[:tick_interval],
poll_interval: options[:poll_interval],
heartbeat_interval: options[:heartbeat_interval],
Expand All @@ -27,18 +31,22 @@ begin
sweep_batch_size: options[:sweep_batch_size],
logger: logger
) do |publisher, messages|
# TODO: publish messages here
begin
# TODO: publish messages here
rescue => error
Outboxer::Message.failed_by_ids(
ids: messages.map { |message| message[:id] },
exception: error,
publisher_id: publisher[:id],
publisher_name: publisher[:name])
else
Outboxer::Message.published_by_ids(
ids: messages.map { |message| message[:id] },
publisher_id: publisher[:id],
publisher_name: publisher[:name])

Outboxer::Message.published_by_ids(
ids: messages.map { |message| message[:id] },
publisher_id: publisher[:id],
publisher_name: publisher[:name])
rescue StandardError => error
Outboxer::Message.failed_by_ids(
ids: messages.map { |message| message[:id] },
exception: error,
publisher_id: publisher[:id],
publisher_name: publisher[:name])
# logger.info "#{messages.count} messages published"
end
end
ensure
Outboxer::Database.disconnect(logger: logger)
Expand Down
6 changes: 4 additions & 2 deletions config/outboxer.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
---
:buffer_size: 100
:concurrency: 1
:batch_size: 1000
:buffer_size: 10
:buffering_concurrency: 1
:publishing_concurrency: 1
:tick_interval: 0.1
:poll_interval: 5.0
:heartbeat_interval: 5.0
Expand Down
11 changes: 5 additions & 6 deletions lib/outboxer/database.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,20 @@ module Database

CONFIG_DEFAULTS = {
environment: "development",
concurrency: 1,
buffering_concurrency: 1,
publishing_concurrency: 1,
path: "config/database.yml"
}

# Loads the database configuration from a YAML file, processes ERB,
# and merges with CONFIG_DEFAULTS.
# @param pool [Integer] the connection pool size.
# @param environment [String, Symbol] the environment name to load configuration for.
# @param concurrency [Integer] the number of connections in the pool.
# @param path [String] the path to the database configuration file.
# @return [Hash] the database configuration with symbolized keys.
# @note Extra connections are added to the pool to cover for internal threads like main
# and the heartbeat thread.
def config(environment: CONFIG_DEFAULTS[:environment],
concurrency: CONFIG_DEFAULTS[:concurrency],
path: CONFIG_DEFAULTS[:path])
def config(pool:, environment: CONFIG_DEFAULTS[:environment], path: CONFIG_DEFAULTS[:path])
path_expanded = ::File.expand_path(path)
text = File.read(path_expanded)
erb = ERB.new(text, trim_mode: "-")
Expand All @@ -32,7 +31,7 @@ def config(environment: CONFIG_DEFAULTS[:environment],
yaml = YAML.safe_load(erb_result, permitted_classes: [Symbol], aliases: true)
yaml.deep_symbolize_keys!
yaml = yaml[environment.to_sym] || {}
yaml[:pool] = concurrency + 3 # workers + (main + heartbeat + sweeper)
yaml[:pool] = pool

yaml
rescue Errno::ENOENT
Expand Down
Loading