Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscriptions sometimes skips events #222

Open
fbjerggaard opened this issue May 31, 2023 · 24 comments
Open

Subscriptions sometimes skips events #222

fbjerggaard opened this issue May 31, 2023 · 24 comments
Labels
help wanted Extra attention is needed postgres PostgreSQL-specific issue

Comments

@fbjerggaard
Copy link
Contributor

I have a funny problem with my setup that is running with the Postgresql EventStore (using npgsql 7) and MongoDB EventHandlers (using MongoDB.Entities) that are really simple - basically only upserting data without any weird data modification along the way.

However, sometimes when loading a bunch of data (fx from importing data from a legacy application) the checkpoint store has passed through a specific event but the EventHandler has never been run. If I then reset the checkpoint to a lower value and restart the application the EventHandlers run fine and everything is up to date.

Any ideas on how to debug this further is much appreciated - I am working on a PoC sample to see if it can be replicated outside our environment

@alexeyzimarev
Copy link
Contributor

Have you confirmed that the handler hasn't fired, like with logs or something?

@fbjerggaard
Copy link
Contributor Author

Yes (sorta), I am logging every MongoDB call and the one to upsert the offending documents was never called

However, digging deeper into the logs I noticed my container was getting OOMKilled which might explain it, however that possibly raises another issue where the checkpoint is updated before the handlers have run?

@alexeyzimarev
Copy link
Contributor

No it can't be. The checkpoint commit is downstream from the projector. Are you sure you are doing SaveAsync or ExecuteAsync and it's not being delayed in any way by the library you use?

@fbjerggaard
Copy link
Contributor Author

This is a part of the handler that doesn't always run:

public class RegistrationProjections : EventHandler
{
    public RegistrationProjections()
    {
        On<V1.RegistrationReceived>(
            async ctx =>
                await new Registration
                {
                    ID = ctx.Message.Id.ToString(),
                    [... Other properties ...]
                }.SaveExceptAsync(x => new {x.Barcodes})
        );
    }
}

Barcodes are being updated in another event, which is why they are ignored here.

The handler is being registered like this:

services.AddSubscription<PostgresAllStreamSubscription, PostgresAllStreamSubscriptionOptions>(
    "RegistrationProjections",
    builder => builder.Configure(x => x.MaxPageSize = 256).AddEventHandler<RegistrationProjections>()
);

I tried lowering the MaxPageSize to 256 see if it had any effect, but that doesn't seem to have done anything

@alexeyzimarev
Copy link
Contributor

alexeyzimarev commented May 31, 2023

What if you try using Eventuous MongoDB tools for that upsert? Or the MongoDB driver native API? I just want to remove the possibility that it's a third-party dependency causing the issue.

@fbjerggaard
Copy link
Contributor Author

I understand, however that would require quite the refactor in my handlers.

I will try to add some more debug logging and try to dig deeper into it, and maybe create a simple handler using the Eventuous MongoDB tools to see if I can replicate it there.

@fbjerggaard
Copy link
Contributor Author

I tried putting some more logging into the handler:

public class RegistrationProjections : EventHandler
{
    public RegistrationProjections(ILogger<RegistrationProjections> logger)
    {
        On<V1.RegistrationReceived>(
            async ctx =>
                {
                    logger.LogDebug(
                                    "Running projection for {EventType} with ID {RegistrationID}",
                                    typeof(V1.RegistrationReceived).FullName,
                                    ctx.Message.Id
                                );

                    await new Registration
                    {
                        ID = ctx.Message.Id.ToString(),
                        [... Other properties ...]
                    }.SaveExceptAsync(x => new {x.Barcodes});
                }
        );
    }
}

Most of the events log it fine, but some are skipped. The event streams does exist in the database and this time there were no OOM kills of the container

@alexeyzimarev
Copy link
Contributor

Can you add global position to the logs? Also, how do you produce these migration events? Do you commit from multiple processes/threads, or is it a linear fetch-produce?

@fbjerggaard
Copy link
Contributor Author

Here is an excerpt from the Eventuous debug logs - the Position property is the one highlighted in blue:

- Dont mind that the event handler names etc are different from above - the above is merely a sample whereas the below is from real code

image
Note that positions 246197, 246201 & 246202 are missing even though they exist in the database:

image
(The ID's are off by 1 because of #163)

I have 1 service that discovers what data should be migrated which pushes a bunch of messages to another service using MassTransit that then uses the command service to apply the events and then a third service that generates the projections - the logs are from the third service. So the commits are multi-threaded from multiple containers (running the same code) but the projections are guaranteed to only be run by 1 instance at a time

@alexeyzimarev
Copy link
Contributor

I see that the global position appears out of order in the logs, do you partition the subscription?

For missing events, my suspicion is that it's the sequence issue. In case of frequent concurrent writes, in once service the sequence might get allocated before and commit later than in the other service. As the result, you can events with the lower sequence number committed after events with the higher sequence number. It results in the subscription receiving the events with higher number first and the next call will say "more than the one I have", and you have skipped events.

I know that only in theory as I am not an expert in Postgres, but I have read about it somewhere. Will try to find more about it, you can spend some time googling too...

@alexeyzimarev
Copy link
Contributor

Unexpected results might be obtained if a cache setting greater than one is used for a sequence object that will be used concurrently by multiple sessions. Each session will allocate and cache successive sequence values during one access to the sequence object and increase the sequence object's last_value accordingly. Then, the next cache-1 uses of nextval within that session simply return the preallocated values without touching the sequence object. So, any numbers allocated but not used within a session will be lost when that session ends, resulting in "holes" in the sequence.

Furthermore, although multiple sessions are guaranteed to allocate distinct sequence values, the values might be generated out of sequence when all the sessions are considered. For example, with a cache setting of 10, session A might reserve values 1..10 and return nextval=1, then session B might reserve values 11..20 and return nextval=11 before session A has generated nextval=2. Thus, with a cache setting of one it is safe to assume that nextval values are generated sequentially; with a cache setting greater than one you should only assume that the nextval values are all distinct, not that they are generated purely sequentially. Also, last_value will reflect the latest value reserved by any session, whether or not it has yet been returned by nextval.

@alexeyzimarev
Copy link
Contributor

Basically, what the docs say is to use cache setting of one to guarantee order in the sequence.

@alexeyzimarev
Copy link
Contributor

As we don't use sequences explicitly, I am wondering what are the sequence settings for the unique auto incremented id...

@fbjerggaard
Copy link
Contributor Author

fbjerggaard commented Jun 1, 2023

do you partition the subscription?

Partitioning is not enabled

--

A quick search around suggests that the cache values are only really a thing for a sequence, not for the identity. I will try searching deeper

@alexeyzimarev
Copy link
Contributor

Ok, looks like I am spamming here, but still.

Basically, what I am trying to say is that the issue is not that the subscription skips events. It's most probably caused by events with higher global position being committed to the database before events with lower values in global position. I am not exactly sure how to solve it.

I thought of the following:

  • Don't use autogenerated id
  • Query the max id value after the transaction is opened
  • Assign incrementing values to global position when inserting
  • Commit the transaction
  • It might fail in case of optimistic concurrency, then retry

It will probably slow down the appends, but should solve the issue.

@fbjerggaard
Copy link
Contributor Author

No worries about the spamming - It is actually a quite interesting problem to solve.

I digged around my postgres instance and found the following for the global position:
image
So it seems like the cache is already 1 for the autogenerated id.

I will try and dig deeper for a solution

@alexeyzimarev
Copy link
Contributor

alexeyzimarev commented Jun 1, 2023

Partitioning is not enabled

Ok, what about concurrency? I clearly see from your logs that events are being processed out of order, which should not happen if you have the default concurrency.

Also, auto-generated identity still uses a sequence. I think you can even find it, but I am not sure.

@fbjerggaard
Copy link
Contributor Author

Concurrency isn't enabled either. The registration of the handler is referenced here: #222 (comment) - bog standard registration.

How do you conclude that they are processed out of order? The way I read the logs they start at 246192, processes up to 246196, "skips" 246197, processes up to 246200, "skips" 246201+246202, processes 246203 and then stores a new checkpoint.

The auto-generated identity sequence is exactly the one I (think) I found above - messages_global_position_seq - the seq-part being what makes me believe that.

@alexeyzimarev
Copy link
Contributor

Ah, it's the screenshot from Seq, right? So, it's sorted by time and new entries are on the top? Then indeed it looks fine.

@fbjerggaard
Copy link
Contributor Author

Exactly :) Sorry that wasn't clearer before

@fbjerggaard
Copy link
Contributor Author

A small update:

I tried adding some logging in PostgresSubscriptionBase to output a list of GlobalPositions received back from the db call - and I think our suspicion is correct - some events are missing:
image

I am still working on the "why" part of it - since they are in the database when looking manually afterwards

@alexeyzimarev
Copy link
Contributor

I think it's what I wrote it is. Transactions competing for the sequence. The order of number allocation doesn't match with the commit order.

@fbjerggaard
Copy link
Contributor Author

A small update from here:

I have tried a lot of various things, including trying to set a lock on the table when appending events (as it seems like that is the way MartenDB does it) - however that did not seem to do much of a difference.

It might be my pgbouncer that complicates things even further.

We have now decided to switch to EventStoreDB, and so far that seems like a much better choice. Our postgresql instance is also not getting slammed as much now.

I still think this issue should be kept open since it might be an issue for others

@alexeyzimarev
Copy link
Contributor

For anyone interested, the issue is well-known in frameworks that support Postgres as event store, such as Axon and Marten. They have a complex gap-detection process to resolve it. MessgeDB, on the other hand, locks the "category", but they avoid the concept of the global log (and do whatever it takes to defend this decision by throwing arguments).

I checked with Oskar, who maintains Marten, and he knows two solutions:

My proposal was a bit different. I thought in the append SP to do the following:

  • Get max(global_position)
  • Use it with +1 to assign to events in the batch
  • global_position should remain identity but not generated
  • Competing append committed in parallel would cause PK constraint violation
  • Detect it and retry the whole thing

I have no idea what the performance will be though, but it will guarantee that appends happen with incrementing global position, and the subscription issue should vanish.

If anyone is interested in implementing any of the approaches, please feel free to do so.

@alexeyzimarev alexeyzimarev added help wanted Extra attention is needed postgres PostgreSQL-specific issue labels Aug 23, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
help wanted Extra attention is needed postgres PostgreSQL-specific issue
Projects
None yet
Development

No branches or pull requests

2 participants