-
Notifications
You must be signed in to change notification settings - Fork 33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Recovery strategies when handler fails to process, #17 #95
Conversation
case (offset, envelope) => | ||
offsetStore | ||
.saveOffset(projectionId, offset) | ||
.flatMap(_ => applyUserRecovery(envelope, () => handler.process(envelope))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was wrong because the async boundary may cause more than one offset to be saved before processing
|
||
// TODO: add a LogSource for projection when we have a name and key | ||
val logger = Logging(systemProvider.classicSystem, this.getClass) | ||
|
||
val futDone = Future.successful(Done) | ||
|
||
def applyUserRecovery(envelope: Envelope)(futureCallback: () => Future[Done]) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this was moved to core (and improved)
} | ||
} | ||
|
||
def aroundUserHandler(env: Envelope)(handlerFunc: Envelope => slick.dbio.DBIO[Done]) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed this. I think the purpose was to deal with thrown exceptions, but that is now handled in same way as if the handler returned Future.failed. There were no tests for this and I don't think it was completely correct anyway.
.transactionally | ||
|
||
applyUserRecovery(env) { () => | ||
databaseConfig.db.run(txDBIO).map(_ => Done) | ||
} | ||
} | ||
|
||
def processEnvelopeAndStoreOffsetInSeparateTransactions(env: Envelope): Future[Done] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed this, and using future composition instead, see the AtLeastOnce(1, _)
case below. Strange if offsetStore fails, then we would retry and run the handler transaction again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
A few change request here and there though
def applyUserRecovery[Offset, Envelope]( | ||
handler: HandlerRecovery[Envelope], | ||
envelope: Envelope, | ||
sourceProvider: SourceProvider[Offset, Envelope], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here we can pass just the extracted offset
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good
@@ -17,6 +29,6 @@ import akka.Done | |||
* guarantees between the invocations are handled automatically, i.e. no volatile or | |||
* other concurrency primitives are needed for managing the state. | |||
*/ | |||
trait Handler[Envelope] { | |||
abstract class Handler[Envelope] extends HandlerRecovery[Envelope] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ApiMayChange like in Scala?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I'll also add a ticket so that we take a look at all of it before "final"
retryAndSkip(retries, delay.asScala) | ||
|
||
/** | ||
* INTERNAL API: placed here instead of the `internal` package because of sealed trait |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
retied | ||
|
||
case RetryAndSkip(retries, delay) => | ||
logger.error( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warning since we are skipping
akka-projection-core/src/main/scala/akka/projection/internal/HandlerRecoveryImpl.scala
Show resolved
Hide resolved
"fail" in { | ||
val handler = new FailHandler(HandlerRecoveryStrategy.fail, failOnOffset = 3) | ||
val result = | ||
HandlerRecoveryImpl.applyUserRecovery(handler, env3, sourceProvider, logger, () => handler.process(env3)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we pass only the offset, we don't even need to hack a SourceProvider here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
The projection impl will have access to the Envelope
and SourceProvider
already so it can extract the offset then.
|
||
class ConcatHandlerFail4(recoveryStrategy: HandlerRecoveryStrategy = HandlerRecoveryStrategy.fail) | ||
extends SlickHandler[Envelope] { | ||
private val _attempts = new AtomicInteger() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍, I didn't think about making this atomic on first impl. Good catch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reason is that it is accessed from the outside of the handler, from the test
changed so that retries are not used for atMostOnce because it would be more than once |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really nice 👍
|
||
// retries - 1 because retry() is based on attempts | ||
// first attempt is performed immediately and therefore we must first delay | ||
val retied = after(delay, scheduler)(retry(tryFutureCallback, retries - 1, delay)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: typo
futDone | ||
|
||
case RetryAndFail(retries, delay) => | ||
logger.error( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be a warning since we're going to retry?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I think so
akka-projection-core/src/main/scala/akka/projection/internal/HandlerRecoveryImpl.scala
Show resolved
Hide resolved
"fail" in { | ||
val handler = new FailHandler(HandlerRecoveryStrategy.fail, failOnOffset = 3) | ||
val result = | ||
HandlerRecoveryImpl.applyUserRecovery(handler, env3, sourceProvider, logger, () => handler.process(env3)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
The projection impl will have access to the Envelope
and SourceProvider
already so it can extract the offset then.
4707a78
to
d74431d
Compare
* moved recovery strategies and implementation to core * used in CassandraProjection (and SlickProjection) * new unit test of the HandlerRecoveryImpl * also found and fixed a few bugs * problem with at-least-once after 1 in Slick * problem with at-most-once in Cassandra, async boundary may cause more than one offset to be saved before processing
a6c58b1
to
3b38393
Compare
incorporated feedback |
some test failure, I'll look into it |
* simple search and replace refactoring so not much too review
offset to be saved before processing
References #17
Draft because on top of other PR, but ready for review.