Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MultiStreamProjection - as async projection does not execute apply method when a lot of pending events #2885

Open
czadokonradnetigate opened this issue Dec 29, 2023 · 4 comments
Milestone

Comments

@czadokonradnetigate
Copy link

I encountered an issue with MultiStreamProjection

I have projection defined like this

public class MyViewProjection: MultiStreamProjection<MyView, Guid>
{
    public MyViewProjection()
    {
        Identity<ParentCreated>(e => e.Id);
        Identity<ChildCreatedEvent>(e => e.ParentId);
        Identity<ChildSomeEvent1>(e => e.ParentId);
    }

    public MyView Create(ParentCreated e) => new() { Id = e.Id, TenantId = e.TenantId.ToString()};
    
    public void Apply(ChildCreatedEvent _, MyView view) => view.IncSomeCounter();

    public void Apply(ChildSomeEvent1 _, MyView view) => view.IncSomeOtherCounter();

}

View

public sealed record MyView
{
    public Guid Id { get; set; }
    public string TenantId { get; set; } = null!;
    
    public int SomeCounter { get; set; }
    public int SomeOtherCounter { get; set; }

    public void IncSomeCounter()
    {
         SomeCounter++;
    }

    public void IncSomeOtherCounter()
    {
         SomeOtherCounter++;
    }
}

It is defined as an async projection.

For<MyView>()
    .Index(z => z.Id)
    .MultiTenanted();

options.Events.AddEventType<ParentCreated>();
options.Events.AddEventType<ChildCreatedEvent>();
options.Events.AddEventType<ChildSomeEvent1>();

options.Projections.Add(new MyViewProjection(), ProjectionLifecycle.Async);

It works fine for most of times, when events per SaveChangesAsync are about 100-300 it always properly stores counters values.
Issues are happening once 1000 or more child entities are created, causing 1000 or more events to be inserted at once.
In such cases Apply(ChildCreatedEvent _, MyView view) is not being invoked and counters are not incremented.

In mt_event_progression projection (MyView:All) last_seq_id matches HighWaterMark last_seq_id
No errors are thrown and it logs that event batches were successfully updated

{"Timestamp": 12/29/2023 6:36:40 PM, "CategoryName": Marten.Events.Daemon.AsyncProjectionHostedService, "LogLevel": Information,"FormattedMessage": Shard 'MyView:All': Executed updates for Event range of 'Identity: MyView:All', 0 to 1, "Exception":  }
{"Timestamp":"2023-12-29T19:17:07.5460649Z","TraceFlags":0,"CategoryName":"Marten.Events.Daemon.AsyncProjectionHostedService","LogLevel":"Information","EventId":{"Id":0,"Name":null},"FormattedMessage":"Shard \u0027MyView:All\u0027: Executed updates for Event range of \u0027Identity: MyView:All\u0027, 1 to 501","Body":"Shard \u0027{ProjectionShardIdentity}\u0027: Executed updates for {Range}","Attributes":[{"Key":"ProjectionShardIdentity","Value":"MyView:All"},{"Key":"Range","Value":{"ShardName":{"ProjectionName":"MyView","Key":"All","Identity":"MyView:All"},"SequenceFloor":1,"SequenceCeiling":501,"Events":[],"Size":0}},{"Key":"{OriginalFormat}","Value":"Shard \u0027{ProjectionShardIdentity}\u0027: Executed updates for {Range}"}],"Exception":null,"Scope":{}}
{"Timestamp":"2023-12-29T19:17:07.5486838Z","TraceFlags":0,"CategoryName":"Marten.Events.Daemon.AsyncProjectionHostedService","LogLevel":"Information","EventId":{"Id":0,"Name":null},"FormattedMessage":"Shard \u0027MyView:All\u0027: Executed updates for Event range of \u0027Identity: MyView:All\u0027, 501 to 1001","Body":"Shard \u0027{ProjectionShardIdentity}\u0027: Executed updates for {Range}","Attributes":[{"Key":"ProjectionShardIdentity","Value":"MyView:All"},{"Key":"Range","Value":{"ShardName":{"ProjectionName":"MyView","Key":"All","Identity":"MyView:All"},"SequenceFloor":501,"SequenceCeiling":1001,"Events":[],"Size":0}},{"Key":"{OriginalFormat}","Value":"Shard \u0027{ProjectionShardIdentity}\u0027: Executed updates for {Range}"}],"Exception":null,"Scope":{}}
{"Timestamp":"2023-12-29T19:17:07.5506023Z","TraceFlags":0,"CategoryName":"Marten.Events.Daemon.AsyncProjectionHostedService","LogLevel":"Information","EventId":{"Id":0,"Name":null},"FormattedMessage":"Shard \u0027MyView:All\u0027: Executed updates for Event range of \u0027Identity: MyView:All\u0027, 1001 to 1501","Body":"Shard \u0027{ProjectionShardIdentity}\u0027: Executed updates for {Range}","Attributes":[{"Key":"ProjectionShardIdentity","Value":"MyView:All"},{"Key":"Range","Value":{"ShardName":{"ProjectionName":"MyView","Key":"All","Identity":"MyView:All"},"SequenceFloor":1001,"SequenceCeiling":1501,"Events":[],"Size":0}},{"Key":"{OriginalFormat}","Value":"Shard \u0027{ProjectionShardIdentity}\u0027: Executed updates for {Range}"}],"Exception":null,"Scope":{}}
{"Timestamp":"2023-12-29T19:17:07.5524935Z","TraceFlags":0,"CategoryName":"Marten.Events.Daemon.AsyncProjectionHostedService","LogLevel":"Information","EventId":{"Id":0,"Name":null},"FormattedMessage":"Shard \u0027MyView:All\u0027: Executed updates for Event range of \u0027Identity: MyView:All\u0027, 1501 to 2001","Body":"Shard \u0027{ProjectionShardIdentity}\u0027: Executed updates for {Range}","Attributes":[{"Key":"ProjectionShardIdentity","Value":"MyView:All"},{"Key":"Range","Value":{"ShardName":{"ProjectionName":"MyView","Key":"All","Identity":"MyView:All"},"SequenceFloor":1501,"SequenceCeiling":2001,"Events":[],"Size":0}},{"Key":"{OriginalFormat}","Value":"Shard \u0027{ProjectionShardIdentity}\u0027: Executed updates for {Range}"}],"Exception":null,"Scope":{}}
{"Timestamp":"2023-12-29T19:17:07.5545154Z","TraceFlags":0,"CategoryName":"Marten.Events.Daemon.AsyncProjectionHostedService","LogLevel":"Information","EventId":{"Id":0,"Name":null},"FormattedMessage":"Shard \u0027MyView:All\u0027: Executed updates for Event range of \u0027Identity: MyView:All\u0027, 2001 to 2002","Body":"Shard \u0027{ProjectionShardIdentity}\u0027: Executed updates for {Range}","Attributes":[{"Key":"ProjectionShardIdentity","Value":"MyView:All"},{"Key":"Range","Value":{"ShardName":{"ProjectionName":"MyView","Key":"All","Identity":"MyView:All"},"SequenceFloor":2001,"SequenceCeiling":2002,"Events":[],"Size":0}},{"Key":"{OriginalFormat}","Value":"Shard \u0027{ProjectionShardIdentity}\u0027: Executed updates for {Range}"}],"Exception":null,"Scope":{}}

Marten version: 6.4.1
.NET version: .NET 8

Or maybe I am missing something?

@ElanHasson
Copy link
Contributor

I am seeing the exact same behavior. Funny enough I'm also trying to count instances of streams :D

@ElanHasson
Copy link
Contributor

I found a fix.

You should remove the void return type and return your model.

@czadokonradnetigate
Copy link
Author

I found a fix.

You should remove the void return type and return your model.

Thanks. I will try this out :)
Weird that it works for Inline projections without any issues and even in docs sample it returns void

@ElanHasson
Copy link
Contributor

@czadokonradnetigate ya, can't speak to that, I just know I tried like 10 permutations before I got it

@jeremydmiller jeremydmiller added this to the 7.0.1 milestone Feb 28, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants