Skip to content

Commit

Permalink
improve the error message in actor handler
Browse files Browse the repository at this point in the history
  • Loading branch information
kailuowang committed Mar 10, 2017
1 parent a8752fe commit 081b8f8
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 5 deletions.
15 changes: 10 additions & 5 deletions core/src/main/scala/kanaloa/handler/GeneralActorRefHandler.scala
Expand Up @@ -27,7 +27,7 @@ class GeneralActorRefHandler[TResp, TError](
)(resultChecker: ResultChecker[TResp, TError]) extends Handler[Any] {
import GeneralActorRefHandler._

override type Error = Option[TError]
override type Error = ActorError
override type Resp = TResp

val maxConcurrentHandling = 100000000 //the actual max concurrent handling is controlled by the kanaloa, this number is just a hard limit, not suggesting that the concurrent handling should be allowed to go there.
Expand All @@ -47,9 +47,9 @@ class GeneralActorRefHandler[TResp, TError](

override val result: Future[Result[Resp, Error]] = promise.future.map {
case Left(TargetTerminated)
Result(Left(None), Some(Terminate))
case Left(Cancelled) Result(Left(None), None) //Note: this result is going to be ignored by the worker since it cancels it. find a way to
case Right(m) Result(resultChecker(m), None)
Result(Left(TargetTerminated), Some(Terminate))
case Left(Cancelled) Result(Left(Cancelled), None) //Note: this result is going to be ignored by the worker since it cancels it. find a way to
case Right(m) Result(resultChecker(m).left.map(CustomResultError(_)), None)
}

override val cancellable: Option[Cancellable] = Some(new Cancellable {
Expand All @@ -65,6 +65,11 @@ class GeneralActorRefHandler[TResp, TError](
}

object GeneralActorRefHandler {

sealed abstract class ActorError extends Product with Serializable

case class CustomResultError[T](e: T) extends ActorError

type ResultChecker[TResp, TError] = Any Either[Option[TError], TResp]

private class HandlerActor(promise: Promise[Either[Interrupted, Any]], target: ActorRef, cancelled: AtomicBoolean) extends Actor {
Expand Down Expand Up @@ -92,7 +97,7 @@ object GeneralActorRefHandler {

private case object Cancel

sealed abstract class Interrupted extends Product with Serializable
sealed abstract class Interrupted extends ActorError

case object TargetTerminated extends Interrupted
case object Cancelled extends Interrupted
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kanaloa/queue/Worker.scala
Expand Up @@ -133,6 +133,7 @@ private[queue] class Worker[T](

log.warning(s"$message, work abandoned")
outstanding.fail(WorkFailed(message))
onComplete
}

}
Expand Down

0 comments on commit 081b8f8

Please sign in to comment.