Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature Request]: Honor typed handler lifetime #142

Open
seruminar opened this issue Nov 10, 2023 · 2 comments
Open

[Feature Request]: Honor typed handler lifetime #142

seruminar opened this issue Nov 10, 2023 · 2 comments
Labels
enhancement New feature or request

Comments

@seruminar
Copy link

Is your request related to a problem you have?

Please let me know if this discussed somewhere 馃檪

It appears that a consumer configuration similar to the following is not supported:

middlewareConfiguration
    .RetryForever((retryDefinition) =>
        retryDefinition
            .HandleAnyException()
            .WithTimeBetweenTriesPlan(
                TimeSpan.FromSeconds(10)
                )
        );

middlewareConfiguration
    .AddTypedHandlers(handlerConfiguration => handlerConfiguration
        .WithHandlerLifetime(InstanceLifetime.Transient)  // <----------------------------------------
        .AddHandlersFromAssemblyOf(assembly.DefinedTypes.First())
        );

Specifically, non-singleton values of InstanceLifetime do not imply that a new handler is created when an exception is thrown (deep) inside a handler and it bubbles up all the way to Polly. I can observe this by altering a handler dependency (i.e. setting a property value), watching the retry occur, and then inspecting that altered property.

This tends to cause a problem with disposed DB connections 馃槄

Describe the solution you'd like

Does this make sense? Is there a workaround? Should I try to minimally reproduce in case something else is going on?

Ideally the retry middleware would be placed in a way that creates the handler again as if it was a new incoming message.

Are you able to help bring it to life and contribute with a Pull Request?

No

Additional context

It may not be possible to do this with middlewares based on the implementation of https://github.com/Farfetch/kafkaflow/blob/master/src/KafkaFlow/MiddlewareExecutor.cs

@seruminar seruminar added the enhancement New feature or request label Nov 10, 2023
@luispfgarces
Copy link
Contributor

Hi @seruminar,

From your description, it seems to be a bug, but we'll check this out and get back to you with further details.
Can you provide us with a minimal repro for us to look into it?

@seruminar
Copy link
Author

Hi @luispfgarces thank you for the response!

Attached is a csproj and Program.cs for .NET 6 and KafkaFlow 2.5.0 and KafkaFlow.Retry 2.1.3.

KafkaflowRetryScopeIssue1.zip

Assuming there is Kafka running at localhost:9092, for example from https://hub.docker.com/r/confluentinc/cp-kafka, my console output is as follows:

Hello, World!
Starting host

KafkaFlow: An error occurred creating topic {Topic}: {Reason} | Data: {"Topic":"topic1","Reason":"Topic \u0027topic1\u0027 already exists."}
info: Microsoft.Hosting.Lifetime[0]
      Application started. Press Ctrl+C to shut down.
info: Microsoft.Hosting.Lifetime[0]
      Hosting environment: Production
info: Microsoft.Hosting.Lifetime[0]
      Content root path: C:\S\s\KafkaflowRetryScopeIssue1\KafkaflowRetryScopeIssue1\bin\Debug\net6.0
Starting test
Producing message

KafkaFlow: Partitions assigned | Data: {"GroupId":"KafkaflowRetryScopeIssue1_topic1","ConsumerName":"f8e87759-9de4-4f86-94da-11d33325acce","Topics":[{"Topic":"topic1","PartitionsCount":1,"Partitions":[0]}]}
Message with a new scope every time: test1

KafkaFlow: Consumer paused by retry process | Data: {"ConsumerGroup":"KafkaflowRetryScopeIssue1_topic1","ConsumerName":"f8e87759-9de4-4f86-94da-11d33325acce","Worker":0}

KafkaFlow: Exception captured by RetryForeverMiddleware. Retry in process. | Data: {"AttemptNumber":1,"WaitMilliseconds":2000,"PartitionNumber":0,"Worker":0,"ExceptionType":"System.NotSupportedException"} | Exception: {"Type":"System.NotSupportedException","Message":"Specified method is not supported.","StackTrace":"   at TestMessageHandler.Handle(IMessageContext context, TestMessage message) in C:\\S\\s\\KafkaflowRetryScopeIssue1\\KafkaflowRetryScopeIssue1\\Program.cs:line 186\r\n   at KafkaFlow.TypedHandler.TypedHandlerMiddleware.\u003C\u003Ec__DisplayClass3_0.\u003CInvoke\u003Eb__0(Type handler)\r\n   at System.Linq.Enumerable.SelectListIterator\u00602.MoveNext()\r\n   at System.Threading.Tasks.Task.WhenAll(IEnumerable\u00601 tasks)\r\n   at KafkaFlow.TypedHandler.TypedHandlerMiddleware.Invoke(IMessageContext context, MiddlewareDelegate next)\r\n   at Polly.AsyncPolicy.\u003C\u003Ec__DisplayClass40_0.\u003C\u003CImplementationAsync\u003Eb__0\u003Ed.MoveNext()\r\n--- End of stack trace from previous location ---\r\n   at Polly.Retry.AsyncRetryEngine.ImplementationAsync[TResult](Func\u00603 action, Context context, CancellationToken cancellationToken, ExceptionPredicates shouldRetryExceptionPredicates, ResultPredicates\u00601 shouldRetryResultPredicates, Func\u00605 onRetryAsync, Int32 permittedRetryCount, IEnumerable\u00601 sleepDurationsEnumerable, Func\u00604 sleepDurationProvider, Boolean continueOnCapturedContext)"}
Message with a reused scope: test1

KafkaFlow: Consumer resumed by retry process | Data: {"ConsumerGroup":"KafkaflowRetryScopeIssue1_topic1","ConsumerName":"f8e87759-9de4-4f86-94da-11d33325acce","Worker":0}

KafkaFlow: Offsets committed | Data: {"Offsets":[{"Topic":"topic1","Partitions":[{"Partition":0,"Offset":9}]}]}

I expect to see just the line Message with a new scope every time: test1 and an exception and retry loop over and over. Instead, it shows that and then completes with Message with a reused scope: test1.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants