Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persisted events are not always reported via EventsByTagQuery #383

Closed
bvingerh opened this issue Aug 16, 2018 · 12 comments
Closed

Persisted events are not always reported via EventsByTagQuery #383

bvingerh opened this issue Aug 16, 2018 · 12 comments
Assignees
Labels

Comments

@bvingerh
Copy link

We use Akka in a two-node cluster. On the one side we have 40 persistent actors, with persistence IDs 'Sender-01' up to 'Sender-40', that persist various types of events in Cassandra, each tagged 'BCE'.
On the other side we have a set of 11 persistent actors, with persistence IDs 'Receiver-01' up to 'Receiver-11', that start a stream using an EventsByTagQuery. Each receiver actor is configured with a set of event types which it should process; this is used as a filter on the tagged event stream.

For the purposes of testing/debugging this problem, both Akka nodes and a single Cassandra node run on one physical machine (ie no real network traffic, VMs or containers are involved).

The problem we encounter is that not all tagged events are seen by each of the 11 receiver actors (even before filtering).
Using additional logging and analyzing Cassandra tables we've reconstructed the situation:

  • Actor 'Sender-25' persists an event of a certain type (let's say 'X')

  • This event is saved in Cassandra with sequence number 206, timestamp bffcad70-a130-11e8-9cab-6ba70b39e234, timebucket 1534406400000, and writer UUID a4e385a5-e28a-46e7-961d-02ca51a8b5d7

  • This event is of interest to two receivers: 9 and 11. All receivers except one encounter the event and filter it accordingly; receiver 9 receives and processes the event, but unfortunately receiver 11 happens to be the one that doesn't encounter the event on its stream, so it can't process it (which is a problem in our application)

  • Our application continues and actor 'Sender-25' persists another event, this time of type 'Y'

  • This event is saved in Cassandra with sequence number 207, timestamp c0151770-a130-11e8-9cab-6ba70b39e234, timebucket 1534406400000 (the same), and writer UUID a4e385a5-e28a-46e7-961d-02ca51a8b5d7 (the same)

  • This event is of interest to only one receiver: Receiver-01; all receivers except Receiver-11 see this event

  • Next, 'Sender-25' persists another event of type 'X'

  • This event is saved in Cassandra with sequence number 208, timestamp c1c649e0-a130-11e8-9cab-6ba70b39e234, timebucket 1534406400000 (the same), and writer UUID a4e385a5-e28a-46e7-961d-02ca51a8b5d7 (the same)

  • This event is seen by all receivers except 9 and 11, exactly the two receivers who are interested in events of this type (more work gets stuck in our application)

In fact the last event that Receiver-11 has seen from Sender-25 is that with sequence number 205 (received on 2018-08-16 10:45:39.522), which was filtered because Receiver-11 was not interested in events of that type.
The last event that Receiver-11 has seen from any sender is one sent by Sender-08 (received on 2018-08-16 10:47:00.514).
So we can assume that the Receiver-11 actor and associated EventsByTagQuery stream are still live. Cassandra also contains all the necessary event information, but somehow some streams refuse to pick up new messages of certain senders.
As a side note: all 11 receivers have seen the event with sequence number 205 sent by Sender-25.

All Receiver actors are cluster singletons that reside on Akka node 1. The Sender actors are distributed over the two Akka nodes, but Sender-25 and Sender-08 also happen reside on Akka node 1.

Now our application idles for some time while I analyse logs and write the text above. Then I launch some more work.

I expect Sender-25 to be invoked at some point to persist an event with sequence number 209. This is indeed the case.
Between timestamps 2018-08-16 12:42:23.558 and 2018-08-16 12:52:16.897, Receivers 1 to 10 (including receiver 9) see the event and act accordingly. Then I get the following log:
2018-08-16 12:53:37.515 INFO - akka.persistence.cassandra.query.EventsByTagStage - BCE: Missing event for new persistence id: Sender-25. Expected sequence nr: 1, actual: 209.
2018-08-16 12:53:37.515 INFO - akka.persistence.cassandra.query.EventsByTagStage - BCE: Executing query to look for missing. Timebucket: TimeBucket(1534410000000, Hour, true, false). From: c1672800-a132-11e8-8080-808080808080 (2018-08-16 09:00:00:000). To: 0e6c39c0-a141-11e8-9cab-6ba70b39e234 (2018-08-16 10:42:22:172)
2018-08-16 12:53:37.518 INFO - akka.persistence.cassandra.query.EventsByTagStage - BCE: Still looking for missing. Some(LookingForMissing{previousOffset=c1672800-a132-11e8-8080-808080808080 bucket=TimeBucket(1534410000000, Hour, true, false) queryPrevious=false maxOffset=0e6c39c0-a141-11e8-9cab-6ba70b39e234 persistenceId=Sender-25 maxSequenceNr=209 missing=Set(69, 138, 101, 88, 170, 115, 5, 120, 202, 10, 56, 142, 153, 174, 185, 42, 24, 37, 25, 52, 14, 184, 110, 125, 196, 157, 189, 20, 46, 93, 152, 57, 78, 29, 164, 179, 106, 121, 84, 147, 61, 132, 89, 133, 116, 1, 74, 206, 6, 60, 117, 85, 201, 102, 28, 38, 160, 70, 192, 21, 137, 165, 33, 92, 197, 65, 97, 156, 9, 188, 53, 169, 141, 109, 124, 77, 193, 96, 173, 13, 129, 41, 134, 73, 128, 105, 2, 205, 166, 32, 34, 148, 45, 161, 64, 180, 17, 149, 176, 191, 22, 44, 59, 118, 204, 27, 71, 12, 54, 144, 49, 181, 86, 159, 187, 172, 113, 81, 76, 7, 39, 98, 208, 103, 140, 91, 66, 155, 198, 108, 130, 135, 3, 80, 167, 35, 162, 112, 123, 194, 145, 48, 63, 18, 150, 95, 50, 67, 199, 177, 182, 16, 127, 31, 154, 11, 72, 175, 143, 43, 99, 87, 203, 104, 40, 26, 158, 186, 55, 114, 171, 139, 23, 8, 75, 119, 58, 207, 82, 151, 36, 168, 146, 30, 51, 190, 183, 19, 107, 4, 126, 136, 79, 195, 94, 131, 47, 15, 163, 200, 68, 62, 178, 90, 111, 122, 83, 100) deadline=Deadline(707031245806532 nanoseconds) failIfNotFound=false). Waiting for next poll.
2018-08-16 12:53:40.524 INFO - akka.persistence.cassandra.query.EventsByTagStage - BCE: Failed to find missing sequence nr: Some(LookingForMissing{previousOffset=c1672800-a132-11e8-8080-808080808080 bucket=TimeBucket(1534413600000, Hour, false, true) queryPrevious=true maxOffset=0e6c39c0-a141-11e8-9cab-6ba70b39e234 persistenceId=Sender-25 maxSequenceNr=209 missing=Set(69, 138, 101, 88, 170, 115, 5, 120, 202, 10, 56, 142, 153, 174, 185, 42, 24, 37, 25, 52, 14, 184, 110, 125, 196, 157, 189, 20, 46, 93, 152, 57, 78, 29, 164, 179, 106, 121, 84, 147, 61, 132, 89, 133, 116, 1, 74, 206, 6, 60, 117, 85, 201, 102, 28, 38, 160, 70, 192, 21, 137, 165, 33, 92, 197, 65, 97, 156, 9, 188, 53, 169, 141, 109, 124, 77, 193, 96, 173, 13, 129, 41, 134, 73, 128, 105, 2, 205, 166, 32, 34, 148, 45, 161, 64, 180, 17, 149, 176, 191, 22, 44, 59, 118, 204, 27, 71, 12, 54, 144, 49, 181, 86, 159, 187, 172, 113, 81, 76, 7, 39, 98, 208, 103, 140, 91, 66, 155, 198, 108, 130, 135, 3, 80, 167, 35, 162, 112, 123, 194, 145, 48, 63, 18, 150, 95, 50, 67, 199, 177, 182, 16, 127, 31, 154, 11, 72, 175, 143, 43, 99, 87, 203, 104, 40, 26, 158, 186, 55, 114, 171, 139, 23, 8, 75, 119, 58, 207, 82, 151, 36, 168, 146, 30, 51, 190, 183, 19, 107, 4, 126, 136, 79, 195, 94, 131, 47, 15, 163, 200, 68, 62, 178, 90, 111, 122, 83, 100) deadline=Deadline(707031245806532 nanoseconds) failIfNotFound=false)
2018-08-16 12:53:40.524 INFO - akka.persistence.cassandra.query.EventsByTagStage - No more missing events. Sending buffered events. BufferedEvents(List(UUIDRow(Sender-25,209,0e6c39c0-a141-11e8-9cab-6ba70b39e234,209,Row[BCE, 1534413600000, 0e6c39c0-a141-11e8-9cab-6ba70b39e234, Sender-25, 209, java.nio.HeapByteBuffer[pos=0 lim=139 cap=139], , NULL, NULL, NULL, 209, 135421337, cx, a4e385a5-e28a-46e7-961d-02ca51a8b5d7])))
And then at timestamp 2018-08-16 12:53:40.524 finally Receiver-11 sees event 209 by Sender-25.

For some reason 'Sender-25' is suddenly seen as a new persistence ID, while at least sequence numbers 206-208 have been generated (and been see by some Receivers) during the same application run.
Also, sequence numbers 206-208 are still not picked up by Receiver-11. So these events seem to be lost forever.

Some questions that come to mind:

  • Why do some EventsByTagQuery streams suddenly start to 'ignore' events that are persisted by some actors? (but not events that are persisted by other actors)
  • Is there a limit on the number of concurrent identical EventsByTagQuery streams you can create? (we have 11)
  • Why does this happen only using multiple Akka nodes in a cluster, but not with a single-node cluster? (even though all relevant actors happen to reside on the same node in this case)
  • Why are previously known persistence IDs suddenly considered 'new'? (this is a minor conundrum as it happens only after one or more events have already been lost)

We recently upgraded from Akka persistence Cassandra version 0.60 to 0.87 in order to have multiple tag support, but if event delivery reliability is problematic, we'll have to downgrade again.
The above analysis was performed with version 0.88, so the problem we encountered in 0.87 is still present.

@chbatey chbatey added the bug label Aug 17, 2018
@chbatey
Copy link
Member

chbatey commented Aug 20, 2018

I'm going to investigate this now. I'll start bu putting in some multi jvm test with https://doc.akka.io/docs/akka/2.5/multi-jvm-testing.html

@chbatey chbatey self-assigned this Aug 20, 2018
@chbatey
Copy link
Member

chbatey commented Aug 20, 2018

Couple of initial questions @bvingerh

  • Reading through your scenario am I correct in saying that each tagged event is received by at least one of the receivers? If you were to look in the tag_views table.
  • Are you wrapping your events by tag queries in a restart stage? If so does it definitely log if it restarts?
  • How are you handling resuming the stream if it fails?

So you have full logs for it that I can take alook at?

@bvingerh
Copy link
Author

  • I didn't do the analysis for each and every tagged event; I assume that each tagged event is received by at least one of the receivers, yes. But it may be possible that our application didn't run long enough for all of the listeners to stop working...
  • No, we don't use a restart stage as far as I know
  • We rely on the actor supervisor to restart the actor holding the stream after a backoff period. The actor holding the stream persists offsets in the stream so that, on recovery, the EventsByTagQuery starts at the last persisted offset

I have logs and cassandra dumps, but the logs are at INFO level and contain very little about akka (the only akka-specific log lines are similar to the ones above: 'EventsByTagStage' lines about missing events).

@toerni
Copy link

toerni commented Aug 21, 2018

We attempted ourselves to provide a reproduce case. The result can be found below in the zip file as a java 8 maven project.

The setup is as follows

  • the test assumes you have a cassandra running on localhost, no login needed
  • the test creates a akka cluster consisting out of 2 nodes, running on 1 vm (the junit test in this case)
  • the test drops the akka persistence related keyspaces in cassandra (akka and akka_snapshots)
  • the test creates a batch of broadcast actors on each node. This is a persistent actor, configured to persist to cassandra
  • the test uses a akka config that tags all saved messages with the tag "reproducecaseTag"
  • the test starts a queryByTag on each node (obviously for the tag mentioned above). Using akka streams, each message read by queryByTags is dumped in a concurrent list which is created in the test code. This list will be used to assert that all messages were picked up by the queryByTag
  • Once the queryByTag "listeners" are running, the String "START" is sent to all the broadcaster actors on both nodes that were created previously. The broadcaster-actor will react by starting to send a configurable amount of messages to itself. Every message received on a broadcaster is persisted to cassandra (and tagged with the "reproducecaseTag".
  • Since the queryByTag is already started and producing an infinite stream of messages with the correct tag, the concurrent list is being filled up with all the messages saved and read from cassandra
  • Finally, once the broadcasters are pumping out and persisting messages, we use awaitility to assert that the number of messages in the concurrent lists is equal to (numberOfNodes) * (numberOfBroadcasters) * (numberOfMessagesSentPerBroadcaster). Awaitility will perform this check every 10 seconds. The intermediate result is also printed to sysout.
  • Note that in the implementation below, awaitility will wait forever for the results to arrive. So the test will stall instead of fail. The same log-lines will be seen over and over in the console

What is the observed behavior:

  • depending on the number of broadcasters and the number of messages, we see that the number of messages received are too few. With lower number of broadcasters and lower number of messages, the chances of missing events are lower.
  • for example, a run with 50 broadcasters and 10_000 messages sent per broadcaster typically fails (however, we have observed cases where no messages were lost, with even higher number of broadcasters and higher number of messages)
  • So it is important to retry multiple times and with varying numbers. With the numbers mentioned above, we were able to reproduce the case quite consistently on our dev laptop.

What is the setup:

  • we ran this test on a Thinkpad P50 with 32Gig of ram and SSD
  • OS: ubuntu 18.04
  • Cassandra was running as a docker container
  • The test failures were observed both from eclipse and from maven

Note that the zip file below also contains a folder logs which contains the output of the test for both a successful and failed run.

Let us know when more info is required

reproducecase.zip

@Reeebuuk
Copy link

I'm seeing the same issue with 0.90

Persist and tag 100 events (100 hits locally on 2 node cluster via gatling over 10 seconds)
Query the tag_views and 100 events are there.
eventsByTag replies 95-99 events.

I create one manually and it gets replyed

Another batch of 100. Persisted and tagged.
Again 95-99 replayed BUT some events are from previous batch so now I'm not missing any events from first batch but I miss potentially 10 events from the second one.

This is the only setting being set atm:

cassandra-query-journal.refresh-interval = 250ms

Can we get any official answer on this?

@SirMullich
Copy link

SirMullich commented Sep 17, 2018

I'm struggling with the same issue. I have 5-6 akka-clusters that use akka-persistence. Events are persisted to Cassandra but queryByTag in projector (that projects from Cassandra to Elasticsearch) does not project all events. Projector opens 10 streams that query by tag.
I restart projectors and it temporarily solves the problem. akka-persistence-cassandra in use is0.87 version.

@Reeebuuk
Copy link

For me the restart doesn't solve the problem cuz most likely I persist the offset of newer event and when restarting it is started from that offset.

@chbatey
Copy link
Member

chbatey commented Sep 27, 2018

@toerni I ran your reproducer and it does fail some times. However when I changed it to store the message number per persistenceId so I could dig into where the event went with a final Map<String, Integer> countPerPid = new ConcurrentHashMap<>(); I can't get it to fail however I can't see an issue with using a ConcurrentLinkedQueue as it unbounded and thread safe.

Also when running with a very large number of messages/broadcasters storing all the messages requires a large heap so I tried it with an AtomicInteger and can't get that to fail either.

If anyone can provide debug logs along with which events were missed that would be very helpful.

@chbatey
Copy link
Member

chbatey commented Sep 27, 2018

@Reeebuuk for the case of resuming never missing a delayed event we'd need to implement #263

@chbatey
Copy link
Member

chbatey commented Oct 1, 2018

I've had a modified version of the reproducer fail. In this case it was when events were delayed due to the Cassandra/Akka JVMs being overloaded at the end of a time bucket. The stage then goes onto the next bucket and doesn't look back in the previous bucket as no new events come for that tag/pid combination. If others have different scenarios with logs please attach. The solution could also be #263

@chbatey
Copy link
Member

chbatey commented Oct 5, 2018

Found another possible issue where as Cassandra responds more slowly the tag writes buffer up meaning that live eventsByTag queries will move past their offset by the time they've been written. I've added in some extra visibility for this while implementing #263 with a log like:

Buffer for tagged events is getting too large (701), is Cassandra responsive? Are writes failing? If events are buffered for longer than the eventual-consistency-delay they won't be picked up by live queries. The oldest event in the buffer is offset: 8e4286d1-c87a-11e8-96aa-8d077a7fae38 (2018-10-05 08:42:13:309)

For most use cases the tag write is more efficient than the normal message writes as they are batched via their partition key. The batch is 150 by default meaning there's only 1 tag write per 150 event writes. But if you have a lot of persistenceIds all writing the same tag this balance will shift.

Raised #406 as a possible improvement.

@chbatey
Copy link
Member

chbatey commented Oct 15, 2018

0.91 released with eventual consistency delay. I have run a modified version (not storing the events in memory so can use larger numbers, minute time bucket to make bucket changing issues more likely) of the reproducer 100s of times.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants