Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eventsByTag may sometimes skip some events when more than 1 database thread is used #96

Closed
WellingR opened this issue Apr 7, 2017 · 13 comments

Comments

@WellingR
Copy link
Member

WellingR commented Apr 7, 2017

This issue occurred on version 2.4.17.1

In an actor system where several actors concurrently persist events with the same tag. The following issue may occur whenever multiple database threads are used:

Suppose that two database threads are concurrently writing an event. Thread 1 writes event A and thread 2 writes event B. Events A and B are written by different persistent actors, but they both have the same tag.

The database (I use Postgres) generates the sequence number for the ordering column. However it does not guarantee that the sequence number are visible in the order of the number.

What can happen is that event A get sequence number 1 and event B gets sequence number 2. However event B is visible in the database before event A.
eventsByTag is implemented by repeatedly querying the database. Therefore it may happen that an eventsByTag query (at offset 0) results in event B while event A has never been seen before.

At this point the implementation of eventsByTag assumes that all events with a sequence number equal to or lower than 2 (the sequence number of event B) either do not match the tag, or have been returned by the query before. This was not the case because event A was not yet visible in the database.

@mboogerd
Copy link

mboogerd commented Apr 7, 2017

Possible solution sketch: Use a driver-controlled version-vector instead of a database-controlled sequence.

Rationale: The trouble is that the global sequence contains gaps with two undistinguishable causes.

  • Natural gaps: A query by tag naturally introduces gaps because it filters out every event not matching the tag.
  • Bad gaps: Accesses to the database-sequence occurring between transactions from different threads / hosts are not serialised or part of the transaction at all. This can lead to identifiers being written out of order. Also transactions may fail, leaving a reserved identifier forever unwritten.

Given that there are valid reasons for gaps, a reader cannot interpret a gap in the identifier space as a reason to block until the event identified by it becomes available. The only solution imo is then not causing 'bad gaps'.

Hence, the question is, how can bad-gaps be prevented? How can the set of readers observe a sequence of events that is monotonic and contiguous, given a set of writers writing events individually? One option is to have each writer serialize its actions, and establish consensus between all other writers over how their particular event-sequence precede/interleave/follow events produced by others. This is both expensive and unnecessarily strong. The cheaper and oft-applied alternative is to have writers produce totally ordered event sequences, and have events be identified by both the writing host and a unique integer produced from a monotonic, contiguous sequence controlled by that writer. All writers together form a version vector.

Concrete proposal
Writing side
For each instance of the driver and journal, create a unique session identifier (random + verify with db). For each event to be persisted, generate an identifier consisting of the session-id and a monotonic+contiguous (atomic/actor-protected) integer.

Reading side
For each tagged query, we maintain a local 'seen-version' for each session, and query for events for all sessions that have an identifier higher then the 'seen-version' of the respective session.

For reasons of performance and managing complexity, I would suggest maintaining a mutable version-vector table. The writing side can then even have concurrent batch-writes, but only update its version (from x to y) in this table once events have been successfully written, whose identifiers form a gapless sequence of (x+1 to y). If any concurrent transaction fails, one can either re-attempt that particular transaction, or start a new session (create a new id) without introducing inconsistency. The reading side can then poll this table to identify whether any new sessions exist, or known sessions wrote new events, and then retrieve the events for those sessions (using its last-known-version for that session as a lower-bound).

Advantages

  • No missing events
  • Does not rely on features offered by a specific JDBC implementation
  • Despite introducing additional queries to maintain the version-vector table, polling for updates may be faster as this table is expected to be orders of magnitude smaller than the log itself.

Disadvantages

  • No ordering relation between different writers (driver instances), which means that events that querying stream produces may be ordered quite different from wall-clock time (which was significantly less likely with the identifiers produced from a global sequence). However, imho, if that was important, ordering should be established by synchronising the writers (and preferably, wonder whether these writers should have been separated into different actor-systems at all...)
  • To maintain fast polling of the version-vector table, one would need to implement a pruning mechanism to merge writes of an old session with those of another one. I believe that is achievable, but the explanation would probably double the size of this comment

@dnvriend
Copy link
Contributor

dnvriend commented Apr 7, 2017

@mboogerd I was kinda hoping that the serialization properties of the postgres relational database would do the job. I'm kinda baffled that the solution misses events, which means that the issue needs to be proven. Also the use case is one driver per journal ie. table.

@mboogerd
Copy link

mboogerd commented Apr 7, 2017

@dnvriend I am not sure if I get you right when you say "the issue must be proven", but the problem is not the serialization features of postgres, it is their implementation of sequence. The notes section in postgres sequence documentation should suffice as a proof for that.

With respect to your statement

one driver per journal

A case of RTFM for me I guess, I was not aware of that. In that case the above solution can be simplified a bit (only one single session ever, which can safely be resumed) and the disadvantages disappear.

@dnvriend
Copy link
Contributor

dnvriend commented Apr 7, 2017

@mboogerd Do I understand correctly, the sequence generator, assuming a cache size of one, will issue numbers to sessions sequentially, but the process of issuing vs the write is not atomic? So the sequence generator issues value '1' to session 'S1', but the db doesn't write this number directly to the table before issuing another number '2' to another session 'S2', afterwards 'S2' could be written to the database effectively containing the event, but with sequence '2' in the database and then 'S1' would be written so the database now contains two events with Seq 1 and 2, but from the query point of view, first S2 will be 'seen' and then S1. If this is the case, it would be a problem for the query.

@mboogerd
Copy link

mboogerd commented Apr 7, 2017

@dnvriend Yep, that's how I believe it works from the documentation. Credits to @WellingR though, for a thorough investigation of the issue, which lead us to these findings.

@dnvriend
Copy link
Contributor

dnvriend commented Apr 9, 2017

@WellingR @mboogerd please check branch OrderingSequenceAsService, maybe this quickfix will solve the issue you have now.

@WellingR
Copy link
Member Author

@dnvriend I did a test, however the issue still occurs.

If I have some time this week (no guarantees) I will see if I can create a test to reproduce this issue. It should not be too hard.

I think it should be possible to reproduce as follows:

  • Confiture the system such that number of db threads should be 4 (I used this number when I reproduced the issue)
  • To make the issue appear more frequently, set jdbc-read-journal.refresh-interval to 100ms (or even smaller)
  • Create an actor system where 10 persistent actors continuously persist events with the same tag
  • Create an eventsByTag query for this tag, and use some method to verify that no events are missing.

@octonato
Copy link
Member

octonato commented Jun 6, 2017

Just for information, this other akka-persistence plugin have solved that issue using an extra column rowid. (see https://github.com/WegenenVerkeer/akka-persistence-postgresql#rowidupdatingstrategy)

The basic idea is that an extra column (row_id) is initially set to null and the query is based on it. As long as the event has a null row id it won't get delivered by the query.

At the same time, whenever a new row is added, another actor RowIdUpdater is notified and will fill the row_id making sure that no gaps are left. At the end, the rowId has the same value as the sequence number.

@dnvriend
Copy link
Contributor

dnvriend commented Jun 7, 2017

Hi Renato. Thanks for the suggestion and it is an interesting strategy. This problem needs further investigation into possible solution strategies and the pro/cons of each. If you have more ideas, keep them coming :)

@WellingR
Copy link
Member Author

I created a test which is able to reproduce this issue for Postgres and MySql. The issue also occurred for Oracle in one test run, however most of the time the tests runs fine for Oracle.

WellingR@972efb0

I have an idea for a potential solution for this problem, which I will try to implement and test in the next couple of days.

@WellingR
Copy link
Member Author

I have a solution which makes the test pass. See master...WellingR:eventByTag-bug for details.

The idea is a simple change to the eventsByTag stream. Every "tick" a currentEventsByTag query is used to retrieve the next batch of elements. However instead of returning these directly. We record the max ordering and return only those elements for which the ordering is smaller than or equal to the previous ordering.

In other words, in the first "tick" (of the delaySource) we find out the max ordering. And we actually retrieve and return those elements in the next "tick".

Disadvantages of this approach:

  • The delay for when an element is returned by the eventsByTag query is doubled (this could by solved with some tuning of the delay)
  • Even with the extra grace period there is no guarantee that the issue does not occur, however since db inserts are generally executed quickly. The delay could be configured such that it is practically impossible for this issue to occur.
  • The current linked implementation retrieves the journal rows twice from the database, I should change this so that we use a max db query combined with a query that actually retrieves the rows.

@dnvriend Let me know what you think of this.

@dnvriend
Copy link
Contributor

@WellingR Thanks! This is another interesting suggestion. Just like with @rcavalcanti suggestion, lets collect some strategies first and then determine the pro/cons of each. If you have more suggestions, white papers about the subject (for me/others) to read and so on, keep them coming!

@WellingR
Copy link
Member Author

The akka-persistence plugin for casssandra: https://github.com/akka/akka-persistence-cassandra/blob/master/core/src/main/scala/akka/persistence/cassandra/query/EventsByTagPublisher.scala seems to use a mechanism where they allow a (configurable) delay before messages are published. (the offset for that plugin is time-based though). However a similar mechanism would also be doable for us: when we observed that event with number 100 is visible in the database we could wait for a grace period of x seconds before returning these events.

The cassandra implementation is fairly complex though, it might be work the time to dive into the implementation details to find out why certain choices have been made.

WellingR added a commit to WellingR/akka-persistence-jdbc that referenced this issue Jul 30, 2017
WellingR added a commit to WellingR/akka-persistence-jdbc that referenced this issue Aug 13, 2017
@octonato octonato added this to the invalid/not release bound milestone Feb 13, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants