-
Notifications
You must be signed in to change notification settings - Fork 12
feat: Add database-backed event processing with Outbox pattern #56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add database-backed event processing with Outbox pattern #56
Conversation
Extract Kinesis client creation and batch sending logic into reusable components. Introduce a DeliveryAdapter interface to support alternative event delivery mechanisms while maintaining backward compatibility with the existing ActiveJob-based delivery. Changes: - Extract KinesisClientFactory for centralized Kinesis client creation - Extract KinesisBatchSender for shared batch sending logic - Introduce DeliveryAdapter base class defining adapter interface - Create ActiveJobAdapter wrapping existing ActiveJob delivery behavior - Refactor Writer to use delivery_adapter.deliver() instead of direct job enqueueing - Refactor Connection to delegate to delivery_adapter.transaction_connection - Update Engine to use adapter.validate_configuration! instead of detect_queue_adapter! - Update DeliveryJob to use KinesisClientFactory All existing behavior is preserved through the ActiveJobAdapter, which is the default adapter. No breaking changes for existing users.
Add PostgreSQL gem as a development dependency and configure CI to run tests against both SQLite and PostgreSQL databases. This ensures the database-backed event processing works across both database systems. Changes: - Add PostgreSQL service container to GitHub Actions workflow - Configure database matrix to run tests against sqlite3 and postgresql - Add pg gem to Gemfile and Rails-specific gemfiles - Update test database configuration with PostgreSQL connection details
dabc0cc to
e25b6c6
Compare
README.md
Outdated
| Journaled.worker_max_attempts = 3 # Retries before moving to DLQ | ||
| ``` | ||
|
|
||
| **Note:** When using the Outbox adapter, you do **not** need to configure an ActiveJob queue adapter (skip step 1 of Installation). The Outbox adapter uses the `journaled_events` table for event storage and its own worker daemons for processing, making it independent of ActiveJob. Transactional batching still works seamlessly with the Outbox adapter. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is better than enqueuing a "drain the outbox" job we could enqueue with a little bit of a debounce whenever journaled entries are flowing in and not have to manage a daemon?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair question! I think we can further decouple the storing of events in an outbox table and the way we process the events to allow both implementations. For now, I'd like to experiment with managing a separate daemon to get a sense of how difficult that is vs keeping the processing in the job queue.
README.md
Outdated
| ```ruby | ||
| # Find failed events | ||
| Journaled::Outbox::DeadLetter.where(stream_name: 'my_stream') | ||
| Journaled::Outbox::DeadLetter.failed_since(1.hour.ago) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it might be attractive to block whole shards on failure to enable in order delivery rather than have a deadletter concept.
|
Some changes i'll be implementing after talking with @smudge :
|
9d0f399 to
f2f42cd
Compare
63d0a8a to
b28d747
Compare
smudge
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TAFN -- left some thoughts, but I think the main thing is ensuring that we don't fail events for reasons that have nothing to do with the actual event payload.
b28d747 to
77497e8
Compare
af3c103 to
b64f68e
Compare
b64f68e to
7b380de
Compare
7b380de to
1c4d40d
Compare
Co-authored-by: Nathan G <nathan@ngriffith.com>
smudge
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TAFN - nothing major, just leaving this here so I can get a ping for re-review.
…t instead of custom case stement, use fewer queries to emit metrics
|
|
||
| records = events.map do |event| | ||
| # Exclude the application-level id - the database will generate its own using uuid_generate_v7() | ||
| event_data = event.journaled_attributes.except(:id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Confirmed that we're injecting the true PK ID into the payload later & that tests cover that, but I'm wondering what else might even care about the id prior to record insertion -- maybe we can set id to nil or remove it from the Event API surface entirely if Journaled is configured to use the outbox adapter... 🤔
| break if shutdown_requested | ||
|
|
||
| # Only sleep if no events were processed to prevent excessive polling on empty table | ||
| sleep(Journaled.worker_poll_interval) if events_processed.zero? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
smudge
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
domain LGTM && platform LGTM!
Summary
This PR introduces a proof of concept for an optional database-backed event processing system using the Outbox pattern as an alternative to ActiveJob-based delivery.
Configuration Example