Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spawning entities fails continually with JournalFailureException when PreparedStatement initialization fails #103

Open
negokaz opened this issue Oct 5, 2023 · 8 comments
Labels
bug Something isn't working
Milestone

Comments

@negokaz
Copy link

negokaz commented Oct 5, 2023

scala: 2.13.12
pekko: 1.0.1
pekko-persistence-cassandra: 0.0.0-1120-5b7555fe-SNAPSHOT

CassandraJournal caches Future[PreparedStatement] with lazy val.
It caches failed Future[PreparedStatement] when PreparedStatement initialization fails.
https://github.com/apache/incubator-pekko-persistence-cassandra/blob/7741ba0ab3be3ab0c132dcfc866d14c25e2c08de/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournal.scala#L117-L152
The failure can occur when the app starts where Cassandra or the network is unstable.

The broken cache makes it fail to read events from the Journal continually.

[2023-10-02 06:43:27,936] [ERROR] [com.example.Counter$] [system-pekko.actor.default-dispatcher-14] [] - Supervisor StopSupervisor saw failure: Exception during recovery. Last known sequence number [0]. PersistenceId [example2], due to: Exception during recovery. Last known sequence number [0]. PersistenceId [example2], due to: Query timed out after PT0.5S
org.apache.pekko.persistence.typed.internal.JournalFailureException: Exception during recovery. Last known sequence number [0]. PersistenceId [example2], due to: Exception during recovery. Last known sequence number [0]. PersistenceId [example2], due to: Query timed out after PT0.5S
        at org.apache.pekko.persistence.typed.internal.ReplayingEvents.onRecoveryFailure(ReplayingEvents.scala:278)
        at org.apache.pekko.persistence.typed.internal.ReplayingEvents.onJournalResponse(ReplayingEvents.scala:211)
        at org.apache.pekko.persistence.typed.internal.ReplayingEvents.onMessage(ReplayingEvents.scala:109)
        at org.apache.pekko.persistence.typed.internal.ReplayingEvents.onMessage(ReplayingEvents.scala:84)
        at org.apache.pekko.actor.typed.scaladsl.AbstractBehavior.receive(AbstractBehavior.scala:93)
        at org.apache.pekko.actor.typed.Behavior$.interpret(Behavior.scala:283)
        at org.apache.pekko.actor.typed.Behavior$.interpretMessage(Behavior.scala:239)
        at org.apache.pekko.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:67)
        at org.apache.pekko.persistence.typed.internal.EventSourcedBehaviorImpl$$anon$1.aroundReceive(EventSourcedBehaviorImpl.scala:222)
        at org.apache.pekko.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:95)
        at org.apache.pekko.actor.typed.Behavior$.interpret(Behavior.scala:283)
        at org.apache.pekko.actor.typed.Behavior$.interpretMessage(Behavior.scala:239)
        at org.apache.pekko.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:67)
        at org.apache.pekko.actor.typed.internal.SimpleSupervisor.aroundReceive(Supervision.scala:141)
        at org.apache.pekko.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:95)
        at org.apache.pekko.actor.typed.Behavior$.interpret(Behavior.scala:283)
        at org.apache.pekko.actor.typed.Behavior$.interpretMessage(Behavior.scala:239)
        at org.apache.pekko.actor.typed.internal.adapter.ActorAdapter.handleMessage(ActorAdapter.scala:141)
        at org.apache.pekko.actor.typed.internal.adapter.ActorAdapter.aroundReceive(ActorAdapter.scala:117)
        at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
        at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
        at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
        at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
        at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
        at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
        at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: org.apache.pekko.persistence.typed.internal.JournalFailureException: Exception during recovery. Last known sequence number [0]. PersistenceId [example2], due to: Query timed out after PT0.5S
        at org.apache.pekko.persistence.typed.internal.ReplayingEvents.onRecoveryFailure(ReplayingEvents.scala:278)
        at org.apache.pekko.persistence.typed.internal.ReplayingEvents.onJournalResponse(ReplayingEvents.scala:201)
        ... 27 common frames omitted
Caused by: com.datastax.oss.driver.api.core.DriverTimeoutException: Query timed out after PT0.5S
        at com.datastax.oss.driver.internal.core.cql.CqlPrepareHandler.lambda$scheduleTimeout$1(CqlPrepareHandler.java:164)
        at io.netty.util.HashedWheelTimer$HashedWheelTimeout.run(HashedWheelTimer.java:715)
        at io.netty.util.concurrent.ImmediateExecutor.execute(ImmediateExecutor.java:34)
        at io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:703)
        at io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:790)
        at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:503)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:829)

We can reproduce the problem with the following sample project.

@pjfanning
Copy link
Contributor

@danischroeter @nvollmar I did some investigation of the history of why the lazy vals were introduced in CassandraJournal and the 2 of you were involved in the changes. (akka/akka-persistence-cassandra#816)

It seems that we need retry when the Future[PreparedStatement] ends up in failed state.

@mdedetrich
Copy link
Contributor

@danischroeter @nvollmar I did some investigation of the history of why the lazy vals were introduced in CassandraJournal and the 2 of you were involved in the changes. (akka/akka-persistence-cassandra#816)

It seems that we need retry when the Future[PreparedStatement] ends up in failed state.

The solution would be pretty easy then, i.e. change to def?

@pjfanning
Copy link
Contributor

@mdedetrich the problem code used defs but were changed to lazy vals to fix issues - see the conversation in akka/akka-persistence-cassandra#816. We may be better off trying to introduce code that recovers from failure than reverting to defs. But it's worth considering both approaches.

@pjfanning pjfanning added the bug Something isn't working label Oct 5, 2023
@pjfanning
Copy link
Contributor

I'm going to press on with the 1.0.0 release and add this as a known issue in the release notes. Hopefully, we should be able to release a patch in the next few weeks that fixes this issue.

@pjfanning
Copy link
Contributor

@negokaz I've released v1.0.0 of this lib with this as a known issue. I have a speculative PR (#106) but would like to get some feedback on it. If you are in a position to build your own jar based on this PR and see if it helps in your scenario, that would be great.

@negokaz
Copy link
Author

negokaz commented Oct 10, 2023

@pjfanning
Thank you for addressing the issue.
I'll see if your PR (#106) can fix my problem.

@negokaz
Copy link
Author

negokaz commented Oct 11, 2023

According to the conversation in akka/akka-persistence-cassandra#816, we should also change CassandraSnapshotStore to use something like LazyFutureEval in #106 ?
CassandraSnapshotStore creates PreparedStatements with def:

https://github.com/apache/incubator-pekko-persistence-cassandra/blob/51af9dbb244e1154142acb5c5e9f641e2a3094d9/core/src/main/scala/org/apache/pekko/persistence/cassandra/snapshot/CassandraSnapshotStore.scala#L68-L77

@pjfanning
Copy link
Contributor

@negokaz LazyFutureEval is designed to fix the issue where a lazy val stores a failed future. A def won't store a failed future. I'm not fully against changing those defs that you highlighted but I think that could be treated as a separate issue. In my mind, changing those defs is a potential performance optimisation.

@pjfanning pjfanning added this to the 1.1.0 milestone Mar 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants