-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
=con #17670 Fix potential ReceivePipeline MatchError #17678
Conversation
Can one of the repo owners verify this patch? |
aroundCache = Some((receive, zipped)) | ||
zipped | ||
} | ||
super.aroundReceive(around, msg) | ||
around(msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pretty sure we must call super.aroundReceive
somewhere, is this change safe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed. Have re-jigged to use super.aroundReceive as the zero for the fold to pick up the unhandled msg case
OK TO TEST |
Test PASSed. |
@@ -38,10 +38,13 @@ trait ReceivePipeline extends Actor { | |||
val around = aroundCache match { | |||
case Some((`receive`, cached)) ⇒ cached | |||
case _ ⇒ | |||
val zipped = pipeline.foldRight[Receive](receive)((outer, inner) ⇒ outer(inner).orElse[Any, Unit](inner)) | |||
val zipped = pipeline.foldRight[Receive] { | |||
case msg => super.aroundReceive(receive, msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this is the exact semantics we need here, is it ok to call super.aroundReceive
multiple times? Seems a bit weird.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right @ktoso, in a mixin context super.aroundReceive
might not be just the default impl, and if any of the interceptors intentionally discard messages super.aroundReceive
won't be called.
We should still use the original receive
as zero but with fallback in unhandled
. receive.orElse(PartialFunction(unhandled))
I saw that this was your first approach for fixing it @jeremystone. What was wrong with it that you changed it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When @ktoso suggested that super.aroundReceive
should be called somewhere, I agreed: what right has ReceivePipeline
to enforce the specific use of unhandled
as the fallback when the base actor might deliberately have overridden aroundReceive
with some other behaviour.
Placing the super.aroundReceive
call in the middle of the fold makes the fallback less invasive. But now as you point out, @ktonga, it might not get called at all. Clearly, calling super.aroundReceive
at the end of the method as well will not do as then it will then often be called twice.
My gut feeling is that it is almost as if ReceivePipeline
is ceding too much power over the flow control to the interceptors (allowing them to call each other directly). An alternative would be to have each interceptor as a PartialFunction[Any, InterceptorResult]
with something like:
sealed trait InterceptorResult
case class Inner(transformedMsg: Any) extends InterceptorResult
case object HandledCompletely extends InterceptorResult
The ReceivePipeline would construct a Receive
from these interceptors (so that it would be responsible for the message flow between interceptors) and just pass this to super.aroundReceive
along with the original message at the end (as per the original implementation).
Of course this would be an API changer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea @jeremystone, seems to be a definitive solution for the flow control problem. I don't think it is a problem to change the API since it is in the contrib module.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. I'd be happy to knock something up for this - and see how it looks.
Would be nice @ktonga if you could have a look in this PR as well, since you authored the initial impl - thanks a lot in advance! |
So if I understand the discussion and code above correctly this only needs a rebase and is ready to be merged, right? (Would like to avoid the |
Test PASSed. |
Just got round to pushing change addressing flow control as discussed, so
|
private def toReceive(handler: Handler) = new Receive { | ||
// Cache the result locally to avoid evaluating potentially | ||
// side-effecting code twice in isDefinedAt/apply... | ||
var resultCache = Option.empty[(Any, HandlerResult)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resultCache
is never cleaned, it could unnecessarily prevent the message to be GCed. Maybe apply
should empty it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point.
Perhaps better approach is to not cache the result at all and simply override applyOrElse - as per recommendations for non-literal PFs. (If we can safely assume that the caller is aware of this and does not separately call isDefinedAt and then apply.) Namely:
private def toReceive(handler: Handler) = new Receive {
def isDefinedAt(m: Any): Boolean = evaluate(m) != Undefined
def apply(m: Any): Unit = evaluate(m)
override def applyOrElse[A1 <: Any, B1 >: Unit](m: A1, default: A1 => B1): B1 = {
val result = handler(m)
if (result == Undefined) default(m)
}
private def evaluate(m: Any) = handler(m)
}
Before we go any further with this, one thing that is missing (that could be achieved before) is the ability to do something within an interceptor after the message has been processed by the actor's receive (e.g. for the documented use case of timing message processing). Looks like this could be provided by altering the
along with some appropriate convenience factory methods providing defaults. Then inside the outerInterceptor.andThen {
case Inner(msg, after) =>
val result = innerHandler(msg)
after()
result
case HandledCompletely => Done
} Any thoughts on this? |
needs a |
Test PASSed. |
AFAICS this is ready right? If so, could you please squash it into one commit titling it |
Yes...so long as everyone happy with alterations to API. Will not be in a There's also some documentation to do. Do you want this as part of the same
|
Docs in the same commit would be excellent :) Thanks! |
Test PASSed. |
for the messages of your interest and at some point delegate on the inner :class:`Receive` | ||
you get by parameter. We will talk about ignored and unhandled messages later. | ||
Multiple interceptors can be added to actors that mixin the :class:`ReceivePipeline` trait. | ||
These interceptors can be thought of as layers: each outer interceptor advising the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure we should use the advice
word, the API does not expose any "advice" in methods/types.
I know it's AOP wording and am familiar with it, but would like to avoid it leaking into the docs, since the API does not use such words.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. Could use decorate/decoration/delegate terminology as before, though the interceptor at the API level is now a more functional "description of the advice" than it was - so not sure this is right either unless this fact is made clear in the docs.
I will try to come up with an alternative.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before I commit anything, how about the following which uses the decorator/delegate terminology and renames InterceptorResult to Delegation.
(Internally the code uses advice/advisor terminology - albeit privately - so will rename to line up and lessen any confusion here, too.)
Interceptors
Multiple interceptors can be added to actors that mixin the ReceivePipeline
trait.
These interceptors internally define layers of decorators around the actor's behavior. The first interceptor
defines an outer decorator which delegates to a decorator corresponding to the second interceptor and so on,
until the last interceptor which defines a decorator for the actor's Receive
.
[etc etc as before...]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, and please excuse the delayed response!
Yes this sounds good to me, thanks!
Test FAILed. |
Test PASSed. |
This is LGTM AFAICS, though needs to be squashed into one commit with title |
Test PASSed. |
LGTM |
Yes I think so, been LGTMed a lot ;-) |
=con #17670 Fix potential ReceivePipeline MatchError
Cool!
|
Patch that should hopefully fix #17670