diff --git a/Gemfile b/Gemfile index 86b6a54..b18f3e6 100644 --- a/Gemfile +++ b/Gemfile @@ -3,6 +3,8 @@ git_source(:github) { |repo| "https://github.com/#{repo}.git" } ruby "3.1.6" +gem "concurrent-ruby", "1.3.4" + gem "rails", "7.0.8.6" gem "sinatra" @@ -14,6 +16,8 @@ gem "activerecord" gem "sidekiq" +gem "outboxer", git: 'https://github.com/fast-programmer/outboxer.git', branch: "master" + group :development do end diff --git a/Gemfile.lock b/Gemfile.lock index 7da0472..8d2b8e9 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,3 +1,11 @@ +GIT + remote: https://github.com/fast-programmer/outboxer.git + revision: 0322c17658d7a1a0448c95a4570985d8d0b3158a + branch: master + specs: + outboxer (1.0.0.pre.beta) + activerecord (>= 7.0.8.6) + GEM remote: https://rubygems.org/ specs: @@ -70,7 +78,7 @@ GEM builder (3.2.4) byebug (11.1.3) coderay (1.1.3) - concurrent-ruby (1.2.3) + concurrent-ruby (1.3.4) connection_pool (2.4.1) crass (1.0.6) database_cleaner-active_record (2.2.0) @@ -87,7 +95,7 @@ GEM railties (>= 5.0.0) globalid (1.2.1) activesupport (>= 6.1) - i18n (1.14.4) + i18n (1.14.7) concurrent-ruby (~> 1.0) loofah (2.22.0) crass (~> 1.0.2) @@ -101,7 +109,7 @@ GEM method_source (1.0.0) mini_mime (1.1.5) mini_portile2 (2.8.5) - minitest (5.22.3) + minitest (5.25.4) mustermann (3.0.3) ruby2_keywords (~> 0.0.1) net-imap (0.5.1) @@ -210,8 +218,10 @@ PLATFORMS DEPENDENCIES activerecord + concurrent-ruby (= 1.3.4) database_cleaner-active_record factory_bot_rails + outboxer! pg pry-byebug puma diff --git a/app/jobs/outboxer_integration/complete_test_job.rb b/app/jobs/outboxer_integration/complete_test_job.rb new file mode 100644 index 0000000..dcf7268 --- /dev/null +++ b/app/jobs/outboxer_integration/complete_test_job.rb @@ -0,0 +1,9 @@ +module OutboxerIntegration + class CompleteTestJob + include Sidekiq::Job + + def perform(args) + TestService.complete(event_id: args["event_id"]) + end + end +end diff --git a/app/jobs/outboxer_integration/publish_message_job.rb b/app/jobs/outboxer_integration/publish_message_job.rb new file mode 100644 index 0000000..e748705 --- /dev/null +++ b/app/jobs/outboxer_integration/publish_message_job.rb @@ -0,0 +1,17 @@ +module OutboxerIntegration + class PublishMessageJob + include Sidekiq::Job + + def perform(args) + job_class_name = to_job_class_name(messageable_type: args["messageable_type"]) + job_class_name&.safe_constantize&.perform_async("event_id" => args["messageable_id"]) + end + + def to_job_class_name(messageable_type:) + captures = messageable_type.match(/\A(?:(\w+)::)?(\w+)Event\z/)&.captures + return if captures.nil? + + captures[0] ? "#{captures[0]}::#{captures[1]}Job" : "#{captures[1]}Job" + end + end +end diff --git a/app/jobs/outboxer_integration/test_started_job.rb b/app/jobs/outboxer_integration/test_started_job.rb new file mode 100644 index 0000000..399b748 --- /dev/null +++ b/app/jobs/outboxer_integration/test_started_job.rb @@ -0,0 +1,9 @@ +module OutboxerIntegration + class TestStartedJob + include Sidekiq::Job + + def perform(args) + CompleteTestJob.perform_async({ "event_id" => args["event_id"] }) + end + end +end diff --git a/app/models/event.rb b/app/models/event.rb index 99104be..544a429 100644 --- a/app/models/event.rb +++ b/app/models/event.rb @@ -1,5 +1,5 @@ class Event < ApplicationRecord - self.table_name = 'events' + self.table_name = "events" # associations @@ -7,13 +7,11 @@ class Event < ApplicationRecord # validations - validates :user_id, presence: true - validates :tenant_id, presence: true - validates :type, presence: true, length: { maximum: 255 } - validates :eventable_type, presence: true, length: { maximum: 255 }, - if: -> { eventable_id.present? } + # callbacks - validates :eventable_id, presence: true, if: -> { eventable_type.present? } + after_create do |event| + Outboxer::Message.queue(messageable: event) + end end diff --git a/app/models/outboxer_integration/test.rb b/app/models/outboxer_integration/test.rb new file mode 100644 index 0000000..c06f8d5 --- /dev/null +++ b/app/models/outboxer_integration/test.rb @@ -0,0 +1,7 @@ +module OutboxerIntegration + class Test < ApplicationRecord + self.table_name = "outboxer_integration_tests" + + has_many :events, as: :eventable + end +end diff --git a/app/models/outboxer_integration/test_completed_event.rb b/app/models/outboxer_integration/test_completed_event.rb new file mode 100644 index 0000000..8750e5e --- /dev/null +++ b/app/models/outboxer_integration/test_completed_event.rb @@ -0,0 +1,4 @@ +module OutboxerIntegration + class TestCompletedEvent < Event + end +end diff --git a/app/models/outboxer_integration/test_started_event.rb b/app/models/outboxer_integration/test_started_event.rb new file mode 100644 index 0000000..5a80692 --- /dev/null +++ b/app/models/outboxer_integration/test_started_event.rb @@ -0,0 +1,4 @@ +module OutboxerIntegration + class TestStartedEvent < Event + end +end diff --git a/app/services/outboxer_integration/test_service.rb b/app/services/outboxer_integration/test_service.rb new file mode 100644 index 0000000..9c26727 --- /dev/null +++ b/app/services/outboxer_integration/test_service.rb @@ -0,0 +1,39 @@ +module OutboxerIntegration + module TestService + module_function + + def start + ActiveRecord::Base.transaction do + test = Test.create! + + event = TestStartedEvent.create!( + eventable: test, + body: { + "test" => { + "id" => test.id + } + }) + + { id: test.id, events: [{ id: event.id, type: event.type }] } + end + end + + def complete(event_id:) + ActiveRecord::Base.transaction do + started_event = TestStartedEvent.find(event_id) + test = started_event.eventable + test.touch + + event = TestCompletedEvent.create!( + eventable: test, + body: { + "test" => { + "id" => test.id + } + }) + + { id: test.id, events: [{ id: event.id, type: event.type }] } + end + end + end +end diff --git a/bin/outboxer_publisher b/bin/outboxer_publisher new file mode 100755 index 0000000..afed543 --- /dev/null +++ b/bin/outboxer_publisher @@ -0,0 +1,14 @@ +#!/usr/bin/env ruby + +require "bundler/setup" +require "sidekiq" +require "outboxer" +require_relative "../app/jobs/outboxer_integration/publish_message_job" + +Outboxer::Publisher.publish do |message| + OutboxerIntegration::PublishMessageJob.perform_async({ + "message_id" => message[:id], + "messageable_id" => message[:messageable_id], + "messageable_type" => message[:messageable_type] + }) +end diff --git a/config/sidekiq.rb b/config/sidekiq.rb index 8b6999c..e56cb70 100644 --- a/config/sidekiq.rb +++ b/config/sidekiq.rb @@ -6,7 +6,7 @@ config.concurrency = 10 - config.queues = ['events', 'reporting'] + config.queues = ['default', 'events', 'reporting'] end Sidekiq.configure_client do |config| diff --git a/db/migrate/20240114120424_create_events.rb b/db/migrate/20240114120424_create_events.rb index b7b82a5..085bd23 100644 --- a/db/migrate/20240114120424_create_events.rb +++ b/db/migrate/20240114120424_create_events.rb @@ -1,16 +1,31 @@ class CreateEvents < ActiveRecord::Migration[7.0] - def change + def up create_table :events do |t| - t.bigint :user_id, null: false - t.bigint :tenant_id, null: false + t.bigint :user_id + t.bigint :tenant_id + + t.string :eventable_type, limit: 255 + t.bigint :eventable_id + t.index [:eventable_type, :eventable_id] t.string :type, null: false, limit: 255 - t.jsonb :body + t.send(json_column_type, :body) t.datetime :created_at, null: false + end + end - t.string :eventable_type, null: false, limit: 255 - t.bigint :eventable_id, null: false - t.index [:eventable_type, :eventable_id] + def down + drop_table :events if table_exists?(:events) + end + + private + + def json_column_type + case ActiveRecord::Base.connection.adapter_name + when /PostgreSQL/ + :jsonb + else + :json end end end diff --git a/db/migrate/20240706053510_create_accountify_invoice_status_summary.rb b/db/migrate/20240706053510_create_accountify_invoice_status_summary.rb index 593265b..285e56e 100644 --- a/db/migrate/20240706053510_create_accountify_invoice_status_summary.rb +++ b/db/migrate/20240706053510_create_accountify_invoice_status_summary.rb @@ -18,6 +18,8 @@ def change t.timestamps end - add_index :accountify_invoice_status_summaries, [:tenant_id, :organisation_id], unique: true + add_index :accountify_invoice_status_summaries, [:tenant_id, :organisation_id], + unique: true, + name: 'index_accountify_invoice_status_summaries_on_tenant_id_org_id' end end diff --git a/db/migrate/20250126124842_create_outboxer_settings.rb b/db/migrate/20250126124842_create_outboxer_settings.rb new file mode 100644 index 0000000..e1e43c3 --- /dev/null +++ b/db/migrate/20250126124842_create_outboxer_settings.rb @@ -0,0 +1,18 @@ +class CreateOutboxerSettings < ActiveRecord::Migration[6.1] + def up + ActiveRecord::Base.transaction do + create_table :outboxer_settings do |t| + t.string :name, limit: 255, null: false + t.string :value, limit: 255, null: false + + t.timestamps + end + + add_index :outboxer_settings, :name, unique: true + end + end + + def down + drop_table :outboxer_settings + end +end diff --git a/db/migrate/20250126124843_create_outboxer_messages.rb b/db/migrate/20250126124843_create_outboxer_messages.rb new file mode 100644 index 0000000..371e456 --- /dev/null +++ b/db/migrate/20250126124843_create_outboxer_messages.rb @@ -0,0 +1,40 @@ +class CreateOutboxerMessages < ActiveRecord::Migration[6.1] + def up + create_table :outboxer_messages do |t| + t.string :status, limit: 255, null: false + + t.string :messageable_id, limit: 255, null: false + t.string :messageable_type, limit: 255, null: false + + t.datetime :queued_at, precision: 6, null: false + + t.datetime :buffered_at, precision: 6 + + t.datetime :publishing_at, precision: 6 + + t.datetime :updated_at, precision: 6, null: false + + t.bigint :publisher_id + t.string :publisher_name, limit: 263 # 255 (hostname) + 1 (colon) + 7 (pid) + end + + # messages by status count + add_index :outboxer_messages, :status, name: "idx_outboxer_status" + + # messages by status latency + add_index :outboxer_messages, [:status, :updated_at], + name: "idx_outboxer_status_updated_at" + + # publisher latency + add_index :outboxer_messages, [:publisher_id, :updated_at], + name: "idx_outboxer_pub_id_updated_at" + + # publisher throughput + add_index :outboxer_messages, [:status, :publisher_id, :updated_at], + name: "idx_outboxer_status_pub_id_updated_at" + end + + def down + drop_table :outboxer_messages if table_exists?(:outboxer_messages) + end +end diff --git a/db/migrate/20250126124844_create_outboxer_exceptions.rb b/db/migrate/20250126124844_create_outboxer_exceptions.rb new file mode 100644 index 0000000..369215b --- /dev/null +++ b/db/migrate/20250126124844_create_outboxer_exceptions.rb @@ -0,0 +1,20 @@ +class CreateOutboxerExceptions < ActiveRecord::Migration[6.1] + def up + ActiveRecord::Base.transaction do + create_table :outboxer_exceptions do |t| + t.references :message, foreign_key: { to_table: :outboxer_messages }, null: false + + t.string :class_name, limit: 255, null: false + t.text :message_text, null: false + + t.timestamps + end + + remove_column :outboxer_exceptions, :updated_at + end + end + + def down + drop_table :outboxer_exceptions if table_exists?(:outboxer_exceptions) + end +end diff --git a/db/migrate/20250126124845_create_outboxer_frames.rb b/db/migrate/20250126124845_create_outboxer_frames.rb new file mode 100644 index 0000000..1e7f7ec --- /dev/null +++ b/db/migrate/20250126124845_create_outboxer_frames.rb @@ -0,0 +1,16 @@ +class CreateOutboxerFrames < ActiveRecord::Migration[6.1] + def up + create_table :outboxer_frames do |t| + t.references :exception, foreign_key: { to_table: :outboxer_exceptions }, null: false + + t.integer :index, null: false + t.text :text, null: false + + t.index [:exception_id, :index], unique: true + end + end + + def down + drop_table :outboxer_frames if table_exists?(:outboxer_frames) + end +end diff --git a/db/migrate/20250126124846_create_outboxer_publishers.rb b/db/migrate/20250126124846_create_outboxer_publishers.rb new file mode 100644 index 0000000..d57f47c --- /dev/null +++ b/db/migrate/20250126124846_create_outboxer_publishers.rb @@ -0,0 +1,16 @@ +class CreateOutboxerPublishers < ActiveRecord::Migration[6.1] + def up + create_table :outboxer_publishers do |t| + t.string :name, limit: 263, null: false # 255 (hostname) + 1 (colon) + 7 (pid) + t.string :status, limit: 255, null: false + t.json :settings, null: false + t.json :metrics, null: false + + t.timestamps + end + end + + def down + drop_table :outboxer_publishers + end +end diff --git a/db/migrate/20250126124847_create_outboxer_signals.rb b/db/migrate/20250126124847_create_outboxer_signals.rb new file mode 100644 index 0000000..747d475 --- /dev/null +++ b/db/migrate/20250126124847_create_outboxer_signals.rb @@ -0,0 +1,16 @@ +class CreateOutboxerSignals < ActiveRecord::Migration[6.1] + def up + ActiveRecord::Base.transaction do + create_table :outboxer_signals do |t| + t.string :name, limit: 9, null: false + t.references :publisher, foreign_key: { to_table: :outboxer_publishers }, null: false + + t.datetime :created_at, null: false + end + end + end + + def down + drop_table :outboxer_signals + end +end diff --git a/db/migrate/20250126124848_create_outboxer_integration_tests.rb b/db/migrate/20250126124848_create_outboxer_integration_tests.rb new file mode 100644 index 0000000..6c0b5b9 --- /dev/null +++ b/db/migrate/20250126124848_create_outboxer_integration_tests.rb @@ -0,0 +1,13 @@ +class CreateOutboxerIntegrationTests < ActiveRecord::Migration[7.0] + def up + create_table :outboxer_integration_tests do |t| + t.integer :lock_version, default: 0, null: false + + t.timestamps + end + end + + def down + drop_table :outboxer_integration_tests + end +end diff --git a/db/schema.rb b/db/schema.rb index 4c88e5a..f27453d 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema[7.0].define(version: 2024_07_06_053510) do +ActiveRecord::Schema[7.0].define(version: 2025_01_26_124848) do # These are extensions that must be enabled in order to support this database enable_extension "plpgsql" @@ -50,7 +50,7 @@ t.datetime "created_at", null: false t.datetime "updated_at", null: false t.index ["organisation_id"], name: "index_accountify_invoice_status_summaries_on_organisation_id" - t.index ["tenant_id", "organisation_id"], name: "idx_on_tenant_id_organisation_id_33a11db97a", unique: true + t.index ["tenant_id", "organisation_id"], name: "index_accountify_invoice_status_summaries_on_tenant_id_org_id", unique: true end create_table "accountify_invoices", force: :cascade do |t| @@ -84,19 +84,84 @@ end create_table "events", force: :cascade do |t| - t.bigint "user_id", null: false - t.bigint "tenant_id", null: false - t.text "type", null: false - t.text "eventable_type", null: false - t.bigint "eventable_id", null: false + t.bigint "user_id" + t.bigint "tenant_id" + t.string "eventable_type", limit: 255 + t.bigint "eventable_id" + t.string "type", limit: 255, null: false t.jsonb "body" t.datetime "created_at", null: false t.index ["eventable_type", "eventable_id"], name: "index_events_on_eventable_type_and_eventable_id" end + create_table "outboxer_exceptions", force: :cascade do |t| + t.bigint "message_id", null: false + t.string "class_name", limit: 255, null: false + t.text "message_text", null: false + t.datetime "created_at", null: false + t.index ["message_id"], name: "index_outboxer_exceptions_on_message_id" + end + + create_table "outboxer_frames", force: :cascade do |t| + t.bigint "exception_id", null: false + t.integer "index", null: false + t.text "text", null: false + t.index ["exception_id", "index"], name: "index_outboxer_frames_on_exception_id_and_index", unique: true + t.index ["exception_id"], name: "index_outboxer_frames_on_exception_id" + end + + create_table "outboxer_integration_tests", force: :cascade do |t| + t.integer "lock_version", default: 0, null: false + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + end + + create_table "outboxer_messages", force: :cascade do |t| + t.string "status", limit: 255, null: false + t.string "messageable_id", limit: 255, null: false + t.string "messageable_type", limit: 255, null: false + t.datetime "queued_at", null: false + t.datetime "buffered_at" + t.datetime "publishing_at" + t.datetime "updated_at", null: false + t.bigint "publisher_id" + t.string "publisher_name", limit: 263 + t.index ["publisher_id", "updated_at"], name: "idx_outboxer_pub_id_updated_at" + t.index ["status", "publisher_id", "updated_at"], name: "idx_outboxer_status_pub_id_updated_at" + t.index ["status", "updated_at"], name: "idx_outboxer_status_updated_at" + t.index ["status"], name: "idx_outboxer_status" + end + + create_table "outboxer_publishers", force: :cascade do |t| + t.string "name", limit: 263, null: false + t.string "status", limit: 255, null: false + t.json "settings", null: false + t.json "metrics", null: false + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + end + + create_table "outboxer_settings", force: :cascade do |t| + t.string "name", limit: 255, null: false + t.string "value", limit: 255, null: false + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index ["name"], name: "index_outboxer_settings_on_name", unique: true + end + + create_table "outboxer_signals", force: :cascade do |t| + t.string "name", limit: 9, null: false + t.bigint "publisher_id", null: false + t.datetime "created_at", precision: nil, null: false + t.index ["publisher_id"], name: "index_outboxer_signals_on_publisher_id" + end + add_foreign_key "accountify_contacts", "accountify_organisations", column: "organisation_id" add_foreign_key "accountify_invoice_line_items", "accountify_invoices", column: "invoice_id" add_foreign_key "accountify_invoice_status_summaries", "accountify_organisations", column: "organisation_id" add_foreign_key "accountify_invoices", "accountify_contacts", column: "contact_id" add_foreign_key "accountify_invoices", "accountify_organisations", column: "organisation_id" + add_foreign_key "outboxer_exceptions", "outboxer_messages", column: "message_id" + add_foreign_key "outboxer_frames", "outboxer_exceptions", column: "exception_id" + add_foreign_key "outboxer_signals", "outboxer_publishers", column: "publisher_id" end diff --git a/spec/bin/outboxer_publisher_spec.rb b/spec/bin/outboxer_publisher_spec.rb new file mode 100644 index 0000000..e4d7496 --- /dev/null +++ b/spec/bin/outboxer_publisher_spec.rb @@ -0,0 +1,57 @@ +require "rails_helper" + +require_relative "../../app/models/application_record" +require_relative "../../app/models/event" +require_relative "../../app/models/outboxer_integration/test" +require_relative "../../app/models/outboxer_integration/test_started_event" +require_relative "../../app/models/outboxer_integration/test_completed_event" + +require_relative "../../app/services/outboxer_integration/test_service" + +RSpec.describe "bin/outboxer_publisher" do + it "performs event job handler async" do + Sidekiq::Testing.disable! + + test, _events = OutboxerIntegration::TestService.start + + env = { + "RAILS_ENV" => "test", + "REDIS_URL" => "redis://localhost:6379/0" + } + + outboxer_publisher_cmd = File.join(Dir.pwd, "bin", "outboxer_publisher") + outboxer_publisher_pid = spawn(env, outboxer_publisher_cmd) + + sidekiq_cmd = "bundle exec sidekiq -r ./config/sidekiq.rb" + sidekiq_pid = spawn(env, sidekiq_cmd) + + max_attempts = 10 + + test_completed_event = nil + + max_attempts.times do |attempt| + test = OutboxerIntegration::Test.find(test[:id]) + test_completed_event = OutboxerIntegration::TestCompletedEvent.last + break if test_completed_event && (test.events.last == test_completed_event) + + sleep 1 + + Sidekiq.logger.warn "OutboxerIntegration::TestCompletedEvent not found. " \ + "Retrying (attempt #{attempt + 1}/#{max_attempts})..." + end + + expect(test_completed_event.body["test"]["id"]).to eql(test.id) + ensure + if sidekiq_pid + Process.kill("TERM", sidekiq_pid) + Process.wait(sidekiq_pid) + end + + if outboxer_publisher_pid + Process.kill("TERM", outboxer_publisher_pid) + Process.wait(outboxer_publisher_pid) + end + + Sidekiq::Testing.fake! + end +end