Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OutboxTableConfig #3

Closed
jkears opened this issue Aug 17, 2022 · 13 comments
Closed

OutboxTableConfig #3

jkears opened this issue Aug 17, 2022 · 13 comments
Assignees
Labels
documentation Improvements or additions to documentation enhancement New feature or request question Further information is requested

Comments

@jkears
Copy link

jkears commented Aug 17, 2022

How do we alter the schema name in OutboxTableConfig?

@cajuncoding
Copy link
Owner

cajuncoding commented Aug 17, 2022

Hey John... Thanks again for the Coffee! I truly appreciate it!

Great question, it's quite easy . . . the OutboxTableConfig class actually implements the interface ISqlTransactionalOutboxTableConfig which can be injected as an optional dependency when constructing the DefaultSqlServerOutboxRepository (or it's base class SqlServerOutboxRepository if you are customizing any other elements)...

Though the default OutboxTableConfig is readonly (on purpose as a good habit), all you have to do is implement your own class with the interface and pass it in, and you can inherit from the default OutboxTableConfig so that you don't have to implement all properties.... so here's an example:

public class CustomSchemaNameOutboxTableConfig : OutboxTableConfig, ISqlTransactionalOutboxTableConfig
{
	public CustomSchemaNameOutboxTableConfig(string customOutboxSchemaName)
	{
		TransactionalOutboxSchemaName = customOutboxSchemaName;
	}

	public new string TransactionalOutboxSchemaName { get; }
}

Then you can specify it as an optional dependency when you construct the DefaultSqlServerOutboxRepository:

var customOutboxTableConfig = new CustomSchemaOutboxTableConfig("sqloutbox");

var sqlTransaction = (SqlTransaction)await sqlConnection.BeginTransactionAsync().ConfigureAwait(false);

var sqlOutboxRepository = new DefaultSqlServerOutboxRepository<string>(
	sqlTransaction,
	outboxTableConfig: customOutboxTableConfig //<== Inject your Custom Configuration!
);

/// . . . Carry on with the Outbox processing . . .

@cajuncoding cajuncoding added the question Further information is requested label Aug 17, 2022
@cajuncoding cajuncoding self-assigned this Aug 17, 2022
@cajuncoding
Copy link
Owner

cajuncoding commented Aug 17, 2022

I also just realized that the properties of OutboxTableConfig should be virtual to be a little cleaner, so I've updated that to be included in a future incremental update.

But that doesn't affect the above solution which should work just fine since the interface is key and there is no other polymorphic access to the base class.

To validate and illustrate, I added a Unit Test which illustrates the above example in the project:

@jkears
Copy link
Author

jkears commented Aug 17, 2022

Thank you @cajuncoding (Brandon).

I am implementing a background task within a microservice that pushes domain events to the service bus from the TransactionalOutBoxQueue at a defined interval using the code below.

It appears that I will have to create my own SQLServerOutboxRepository as there appears no other means to inject the ISqlTransactionalOutboxTableConfig using the OTB classes.

  protected virtual async Task ExecuteSqlTransactionalOutboxProcessingInternalAsync()
        {
            try{
          
                  //************************************************************
                  //*** Execute processing of the Transactional Outbox...
                  //************************************************************
                  await using var sqlConnection = new Microsoft.Data.SqlClient.SqlConnection(SqlConnectionString);
                  await sqlConnection.OpenAsync().ConfigureAwait(false);
                  await sqlConnection.ProcessPendingOutboxItemsAsync(OutboxPublisher, SqlTransactionalOutboxProcessingOptions).ConfigureAwait(false);
       
                  //************************************************************
                  //*** If specified then Execute Cleanup of Historical Outbox Data...
                  //************************************************************
                  if (OutboxHistoryToKeepTimeSpan > TimeSpan.Zero)
                  {
                       
                      await sqlConnection.CleanupHistoricalOutboxItemsAsync(OutboxHistoryToKeepTimeSpan).ConfigureAwait(false);
                  }
            }
            catch (Exception e)
            {
                _logger.LogError($"  ERROR => {e.GetMessagesRecursively()}");
                throw e;
            }
        }

Once again I want to thank you for providing this solution. I do have one other question that I am concerned about, which I am hoping you can help me to better understand if I will have an issue or not.

As mentioned, each microservice will have a background processor task, who's primary role is to process domain events from the `TransactionalOutBoxQueue table and where, each microservice will own it's own TransactionalOutBoxQueue table. I need to ensure that if there are multiple instances of a microservice and as such, competing consumers to those events, that the background task processor(s) will read and process each block of events to the Azure service bus in the order (and at most one) that they were added to the event table. It seems like that this will work using your library, but I just wanted to be absolutely certain it will.

Thanks for your support!
John

@jkears
Copy link
Author

jkears commented Aug 17, 2022

I am really confused as to how I would integrate with your library. I fully understand that I need to create a custom outbox config however what class am I placing the code below into?

var customOutboxTableConfig = new CustomSchemaOutboxTableConfig("sqloutbox");

var sqlTransaction = (SqlTransaction)await sqlConnection.BeginTransactionAsync().ConfigureAwait(false);

var sqlOutboxRepository = new DefaultSqlServerOutboxRepository<string>(
	sqlTransaction,
	outboxTableConfig: customOutboxTableConfig //<== Inject your Custom Configuration!
);

... such that I can easily call through the SqlClientOutboxProcessingCustomExtensions. ProcessPendingOutboxItemsAsync method from within my background task.

Can you please help me with the classes I need to implement.

@cajuncoding
Copy link
Owner

Regarding customizing the OutboxTableConfig and using the Sql Connection/Transaction custom extensions.... yeah I see your point now. The custom extensions are very helpful and handle a bunch of logistics for transaction, rollback, etc. And ideally you wouldn't have to copy/paste that 🤔

Ok, I've got an idea or two.... but let me think on it and I'll take a closer look tomorrow.

Ideally I think it'd be good offer an easy to use way to change the OutboxTableConfig... in an unobtrusive way (globally) so that all Custom Extensions will use the values... Without having to add optional parameters to clutter up the custom extensions...

@cajuncoding
Copy link
Owner

cajuncoding commented Aug 18, 2022

Regarding your more involved question above:

As mentioned, each microservice will have a background processor task, who's primary role is to process domain events from the `TransactionalOutBoxQueue table and where, each microservice will own it's own TransactionalOutBoxQueue table. I need to ensure that if there are multiple instances of a microservice and as such, competing consumers to those events, that the background task processor(s) will read and process each block of events to the Azure service bus in the order (and at most one) that they were added to the event table. It seems like that this will work using your library, but I just wanted to be absolutely certain it will.

Yes I'm pretty sure that the library will accomplish what you are looking to do... but just to make sure I understand the nuances of the question here are some thoughts off the top of my head...

  • Yes, by design many instances of the Outbox library can attempt to run at the same time -- as it was intentionally developed to work reliably in Azure Functions which could have dozens or hundreds of instances spin up and attempt ot process the outbox. It does this by either:
    • A) letting them all grab a batch and process, knowing that some messages could be sent multiple times (but guaranteeing at-least-once delivery). This will provide maximum throughput but does not guarantee order, or that a given message might not be processed multiple times due to race condition before SQL updates the record.
    • B) Using a distributed Mutex Lock (via Sql Server database lock) so that one and only one instance can process the outbox at any givent time. All other instances will attemp to acquire a lock and skip processing for that attempt if they are unable to get a lock... this is controlled by the Options that you can pass in OutboxProcessingOptions.FifoEnforcedPublishingEnabled = true
      • It sounds like you def. want to use option (B)... which is also the primary mode I use also.
      • The default time that a single instance will wait to acquire a lock is 1 second by default... but that is also a parameter that is customizeable . . . but not from the Custom Extensions, so this would also be included in what mentioned in my comment above 👆 .
  • You also called out "(and at most once)" in your question... in general that is very likely to hold true, but it cannot be guaranteed. This is an implication of the Outbox pattern in that it only guarantees 'at-least-once' delivery... though it will rarely if ever duplicate.
    • This is because once the event if fired off on an external message bus, then it will attempt to clear it in the Outbox Queue, but there is always a residual risk that a network glitch, Cloud DB outate, etc. could cause that followup update to fail. This means that the outbox item is left in a pending state and will be picked up again . . . eventually when the outage or network hiccup is no longer an issue. But, the item was never lost, and was safely left pending, so it will be processed again . . . proving out the guarantee of 'at-least-once' delivery (which is a fundamental element of the Outbox pattern).
      • Now in reality the likelihood of this will be closely related to your infrastructure, and other details, but in most cases will none-the-less be rare (if ever) in reality.

@jkears
Copy link
Author

jkears commented Aug 18, 2022

Hi and thank you @cajuncoding for your detailed response, most awesome!

From review of your code and stepping through of your console sample I was thinking it worked as described above, for which I am super happy to learn that it works that way.

To provide a bit more perspective as to who we are and what we are doing, my company is NextWare Group and we are a very small tech-start-up.

We have developed an in-house Domain modeling tool that is a VS Code extension that we use to capture Business Domains for some up-coming product ideas we are working on.

We are following the key principals of Domain Driven Design, in which we utilize Event Storming to capture any business domain and then use our Domain Tooling to graphically capture each related Bounded Context per the domain, including each related Aggregate schema(s), API Command(s) and Domain Event(s) in a drag-n-drop diagram editor as seen below.

We have a separate CLI tool that utilizes MS Roslyn to code-generate each domain model into the actual microservices.

A domain model contains 1 or more Bounded context(s) which code-generates to a ASP.Net Web API microservice and each Bounded Context has 1 or more Domain Aggregates. We are deploying to Azure Container Apps.

The domain model screen shot below is from our ERP platform we are currently working on. Here, I am showing just two aggregates (JournalConfiguration and JournalBatch) within the GeneralLedgerServices microservice.

On the left side in the VS Code Explorer you will see the code that is generated by the CLI tooling from this model. Currently we have 20 domain models in our ERP domain, consisting of 40+ microservices and 500+ aggregates.

image

Below is a test model and some code-generated ChangeFeedBackGroundService.cs task, we will use to process MS SQL domain events using your SQLTransactionalOutbox pattern.

image

Our tooling code-generates either to MS SQL (EF Core), or MS CosmosDB.

With CosmosDB we utilize the built-in change-feed mechanism to publish the domain events to the service bus as well as project to MongoDB, via a code-generated background task processor.

We generate REST, gRPC and GraphQL API integrations for each microservice (configurable).

We have an integrated Business Rules Engine (Code Effects), as such we rarely need to write any code, although we can if we need some special business logic not able to be captured via Business Rules.

I would love to hook up and provide you a demo of this if you have time, and then we can chat more about what we are doing with your library.

Please let me know if you have time to join a Zoom call, I can be reached via LinkedIn.

Cheers
John Kears
CTO NextWare Group LLC.

@cajuncoding
Copy link
Owner

John,

That's alot of details to process 🤯😁

I've just merged in the PR to provide full configuration at initialization... added some details to the README for how to use it during app startup.... it's very straightforward bootstrapping/initialization feature now added. All unit tests are passing, so it's pushed to an updated Nuget Package v1.0.1 now 👍

For your case -- if all you need to change is the Schema name -- then you'll just add this to your Startup (application root):

    //This is the global SqlTransactionalOutbox initializer that allows configuring custom settings to be used...
    //NOTE: Not all values need to be specified, any values that are not specified (e.g. or are set to null)
    //      will retain the default values.
    SqlTransactionalOutboxInitializer.Configure(config =>
    {
        config.WithOutboxTableConfig(new OutboxTableConfig(
            transactionalOutboxSchemaName: "YourCustomSchemaName"
        ));
    });

@cajuncoding cajuncoding added bug Something isn't working documentation Improvements or additions to documentation enhancement New feature or request and removed bug Something isn't working labels Aug 19, 2022
@jkears
Copy link
Author

jkears commented Aug 22, 2022

Hi @cajuncoding (Brandon), that worked, however I am had to make one change, which I am certain I shouldn't have to do.

I changed the following line in the base initializers within the DefaultSqlServerOutboxRepository....

outboxTableConfig: outboxTableConfig ?? SqlTransactionalOutboxDefaults.OutboxTableConfig,

namespace SqlTransactionalOutbox.SqlServer.MicrosoftDataNS
{
    public class DefaultSqlServerOutboxRepository<TPayload> : SqlServerOutboxRepository<Guid, TPayload>
    {
        public DefaultSqlServerOutboxRepository(
            SqlTransaction sqlTransaction,
            ISqlTransactionalOutboxTableConfig outboxTableConfig = null,
            ISqlTransactionalOutboxItemFactory<Guid, TPayload> outboxItemFactory = null,
            int? distributedMutexAcquisitionTimeoutSeconds = null
        ) 
        : base (
            sqlTransaction: sqlTransaction,
            outboxTableConfig: outboxTableConfig ?? SqlTransactionalOutboxDefaults.OutboxTableConfig,
            outboxItemFactory: outboxItemFactory ?? new DefaultOutboxItemFactory<TPayload>(),
            distributedMutexAcquisitionTimeoutSeconds
        )
        {
            //Preconfigured defaults are used in base constructor above...
        }
    }
}

Is that the correct way to bring the new configuration in, or is there a proper way that I need to follow?

@jkears jkears closed this as completed Aug 22, 2022
@cajuncoding
Copy link
Owner

cajuncoding commented Aug 22, 2022

@jkears Yes, good catch, that is a bug that I missed in the DefaultSqlServerOutboxRepository().... I'm pushing a fix to Nuget now 👍

v1.0.2 should now resolve this as expected ✅

PR: #5

@jkears
Copy link
Author

jkears commented Aug 22, 2022

Awesome! I will try now.

@jkears
Copy link
Author

jkears commented Aug 22, 2022

Perfection! Many, many thanks!

Here is a quick test model that a created to test this. The UpdateAgg1Name command takes in a string and applies that to the name property of the the Aggregate.

image

GraphQL of that aggregate prior to the change...

image

Make a change...

image

This will generate the Agg1NameHasChanged domain event which is pushed into the TransactionalOutBoxQueue table for that service, which is subsequently pushed out by a background task running in the same service to Azure Service Bus (session based)...

image

Which arrives nicely as a topic in ASB ...

image

And the changes is applied to the aggregate...

image

@jkears
Copy link
Author

jkears commented Aug 22, 2022

Thank you so very much (@cajuncoding) for both this wonderful community contribution as well as helping me out. We will continue testing against real domain models and will let you know if we find any issues.

Cheers
John

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements or additions to documentation enhancement New feature or request question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants