-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Typed: Un-nest supervision that overlaps #25269
Changes from all commits
2f514bb
8bcfd45
64a591f
35310b3
5b37dae
92304ff
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,8 +11,9 @@ import akka.actor.DeadLetterSuppression | |
import akka.actor.typed.SupervisorStrategy._ | ||
import akka.actor.typed.scaladsl.Behaviors | ||
import akka.annotation.InternalApi | ||
import akka.util.OptionVal | ||
import akka.util.{ OptionVal, PrettyDuration } | ||
|
||
import scala.annotation.tailrec | ||
import scala.concurrent.duration.{ Deadline, FiniteDuration } | ||
import scala.reflect.ClassTag | ||
import scala.util.control.Exception.Catcher | ||
|
@@ -44,13 +45,46 @@ import scala.util.control.NonFatal | |
supervisor.init(ctx) | ||
} | ||
|
||
// find supervision that is superflous by having the same exception handled closer to the behavior | ||
// and remove those instances. Needs to be called in the wrap method of all supervisor classes | ||
def deduplicate[T, Thr <: Throwable: ClassTag](supervisor: Supervisor[T, Thr]): Supervisor[T, Thr] = { | ||
|
||
// side effecting to avoid allocating a tuple per loop | ||
var seenSupervised = Set.empty[Class[_]] | ||
|
||
// can't be tailrec, but should be ok since hierarchies shouldn't be _that_ deep given that they | ||
// are deduplicated for every wrap | ||
def loop(behavior: Behavior[T]): Behavior[T] = { | ||
behavior match { | ||
case s: Supervisor[T, _] ⇒ | ||
val inner = loop(s.behavior) | ||
if (seenSupervised.contains(s.throwableClass)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we could discard subclasses also but makes it a bit more complicated and the main purpose is to remove the accidental wrapping of same There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I thought about that, but wasn't sure it is worth because it isn't very realistic that you add more general and more general supervision over time. |
||
// the exception this supervision cover is already covered closer to | ||
// the actual behavior so s will never be invoked, lets' remove it | ||
inner | ||
else { | ||
seenSupervised += s.throwableClass | ||
if (inner eq s.behavior) s | ||
else s.wrap(inner, false) | ||
} | ||
case b ⇒ b | ||
} | ||
} | ||
|
||
// we know that the result is either the original outermost or another outermost supervision | ||
// but the type system doesn't | ||
loop(supervisor).asInstanceOf[Supervisor[T, Thr]] | ||
} | ||
|
||
} | ||
|
||
/** | ||
* INTERNAL API | ||
*/ | ||
@InternalApi private[akka] abstract class Supervisor[T, Thr <: Throwable: ClassTag] extends ExtensibleBehavior[T] { | ||
|
||
private[akka] def throwableClass = implicitly[ClassTag[Thr]].runtimeClass | ||
|
||
protected def loggingEnabled: Boolean | ||
|
||
/** | ||
|
@@ -99,7 +133,7 @@ import scala.util.control.NonFatal | |
|
||
protected def log(ctx: ActorContext[T], ex: Thr): Unit = { | ||
if (loggingEnabled) | ||
ctx.asScala.log.error(ex, ex.getMessage) | ||
ctx.asScala.log.error(ex, "Supervisor [{}] saw failure: {}", this, ex.getMessage) | ||
} | ||
} | ||
|
||
|
@@ -120,8 +154,9 @@ import scala.util.control.NonFatal | |
} | ||
|
||
override protected def wrap(nextBehavior: Behavior[T], afterException: Boolean): Supervisor[T, Thr] = | ||
new Resumer[T, Thr](nextBehavior, loggingEnabled) | ||
Supervisor.deduplicate(new Resumer[T, Thr](nextBehavior, loggingEnabled)) | ||
|
||
override def toString = "resume" | ||
} | ||
|
||
/** | ||
|
@@ -140,7 +175,9 @@ import scala.util.control.NonFatal | |
} | ||
|
||
override protected def wrap(nextBehavior: Behavior[T], afterException: Boolean): Supervisor[T, Thr] = | ||
new Stopper[T, Thr](nextBehavior, loggingEnabled) | ||
Supervisor.deduplicate(new Stopper[T, Thr](nextBehavior, loggingEnabled)) | ||
|
||
override def toString = "stop" | ||
|
||
} | ||
|
||
|
@@ -162,7 +199,9 @@ import scala.util.control.NonFatal | |
} | ||
|
||
override protected def wrap(nextBehavior: Behavior[T], afterException: Boolean): Supervisor[T, Thr] = | ||
new Restarter[T, Thr](initialBehavior, nextBehavior, loggingEnabled) | ||
Supervisor.deduplicate(new Restarter[T, Thr](initialBehavior, nextBehavior, loggingEnabled)) | ||
|
||
override def toString = "restart" | ||
} | ||
|
||
/** | ||
|
@@ -200,14 +239,18 @@ import scala.util.control.NonFatal | |
} | ||
|
||
override protected def wrap(nextBehavior: Behavior[T], afterException: Boolean): Supervisor[T, Thr] = { | ||
if (afterException) { | ||
val restarter = if (afterException) { | ||
val timeLeft = deadlineHasTimeLeft | ||
val newRetries = if (timeLeft) retries + 1 else 1 | ||
val newDeadline = if (deadline.isDefined && timeLeft) deadline else OptionVal.Some(Deadline.now + strategy.withinTimeRange) | ||
new LimitedRestarter[T, Thr](initialBehavior, nextBehavior, strategy, newRetries, newDeadline) | ||
} else | ||
new LimitedRestarter[T, Thr](initialBehavior, nextBehavior, strategy, retries, deadline) | ||
|
||
Supervisor.deduplicate(restarter) | ||
} | ||
|
||
override def toString = s"restartWithLimit(${strategy.maxNrOfRetries}, ${PrettyDuration.format(strategy.withinTimeRange)})" | ||
} | ||
|
||
/** | ||
|
@@ -308,7 +351,9 @@ import scala.util.control.NonFatal | |
if (afterException) | ||
throw new IllegalStateException("wrap not expected afterException in BackoffRestarter") | ||
else | ||
new BackoffRestarter[T, Thr](initialBehavior, nextBehavior, strategy, restartCount, blackhole) | ||
Supervisor.deduplicate(new BackoffRestarter[T, Thr](initialBehavior, nextBehavior, strategy, restartCount, blackhole)) | ||
} | ||
|
||
override def toString = s"restartWithBackoff(${PrettyDuration.format(strategy.minBackoff)}, ${PrettyDuration.format(strategy.maxBackoff)}, ${strategy.randomFactor})" | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"hadouken!"
// totally fine to keep it, just reminded me of the old pic... ;-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pyramid of supervision doom!