-
Notifications
You must be signed in to change notification settings - Fork 13.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-10251] Handle oversized response messages in AkkaRpcActor #6876
Conversation
e92b8af
to
e3c1155
Compare
cc @tillrohrmann At the moment it seems that I don't see the infrastructure for testing AkkaRpcActor#handleRpcInvocation? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your contribution @yanghua. I had some comments to further improve the PR.
I think we should add a test case verifying the behavior. We could for example add them to AkkaRpcActor
and create an RpcEndpoint
which creates a large response which exceeds the configured maximum frame size.
@@ -108,9 +111,11 @@ | |||
|
|||
private volatile boolean stopped; | |||
|
|||
public AkkaRpcService(final ActorSystem actorSystem, final Time timeout) { | |||
public AkkaRpcService(final ActorSystem actorSystem, final Configuration configuration) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you pass the Configuration
here? I think it would be better to revert this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I forgot to refactor this code segment :
if (actorSystem.settings().config().hasPath(MAXIMUM_FRAME_SIZE_PATH)) {
maximumFramesize = actorSystem.settings().config().getBytes(MAXIMUM_FRAME_SIZE_PATH);
} else {
// only local communication
maximumFramesize = Long.MAX_VALUE;
}
to use
configuration.getString(AkkaOptions.FRAMESIZE);
(this suggestion from @zentol)
If I would do this, I think it's better to replace scattered config items into one Configuration object so that we can encapsulate change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes let's introduce an AkkaRpcServiceConfiguration
.
"size, result size is " + resultSize + ", maximum frame size is " + | ||
maximumFramesize + " .")); | ||
} else { | ||
promise.success(value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better to send the SerializedValue
. That way we save some extra serialization work. This, of course, requires that the receiver expects a SerializedValue
.
try { | ||
resultSize = new SerializedValue(value).getByteArray().length; | ||
if (resultSize > maximumFramesize) { | ||
promise.failure(new Throwable("The result size exceeds the maximum " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's throw an AkkaRpcException
resultSize = new SerializedValue(value).getByteArray().length; | ||
if (resultSize > maximumFramesize) { | ||
promise.failure(new Throwable("The result size exceeds the maximum " + | ||
"size, result size is " + resultSize + ", maximum frame size is " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it's enough to say The result size %s exceeds the maximum frame size %s
.
promise.success(value); | ||
} | ||
} catch (IOException e) { | ||
promise.failure(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe wrap in AkkaRpcException("Failed to serialize the result for RPC call %s.", e)
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
Show resolved
Hide resolved
77e882d
to
8d3e3fb
Compare
hi @tillrohrmann There is a problem about sending serialized data. For example,
|
The problem is that you always try to serialize the result. However, it should only be serialized if the sender is remote. |
Thank you @tillrohrmann , I think I misunderstood your previous meaning about serialization results (I thought all the results were serialized). I have fixed it locally, but the Github service has failed (at least in China), and once the Github service is normal, I will push my changes. |
@tillrohrmann I have updated this PR, please review. |
@zentol Any suggestion? |
@tillrohrmann If you can give a review suggestion as soon as possible, then I can refactor it quickly so that it catches up with 1.7.0. |
@tillrohrmann Will you review this PR before the release of Flink 1.7.0? |
Hi @yanghua sorry for my unresponsiveness. I think this fix won't make it into the 1.7 release anymore. But we will include in the next bug fix release! |
Thanks for the reminder @yanghua. I'm still busy with the 1.7 release. Once it is out, I'll take a look at this PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update @yanghua. The changes go in a really good direction. I had some comments which we should address before merging.
final T rpcEndpoint, | ||
final CompletableFuture<Boolean> terminationFuture, | ||
final int version, | ||
final long maximumFramesize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please indent parameters one level deeper.
this.rpcEndpoint = checkNotNull(rpcEndpoint, "rpc endpoint"); | ||
this.mainThreadValidator = new MainThreadValidatorUtil(rpcEndpoint); | ||
this.terminationFuture = checkNotNull(terminationFuture); | ||
this.version = version; | ||
this.maximumFramesize = maximumFramesize; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should check that this is positive
@@ -71,7 +73,7 @@ | |||
* | |||
* @param <T> Type of the {@link RpcEndpoint} | |||
*/ | |||
class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor { | |||
public class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does the AkkaRpcActor
need to be public?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I created org.apache.flink.runtime.rpc.TestingRemoteAkkaRpcActor
before. Now, I will move this class into pkg org.apache.flink.runtime.rpc.akka
, then I can remove the public
keyword.
return ((SerializedValue) o).deserializeValue(getClass().getClassLoader()); | ||
} catch (IOException e) { | ||
throw new CompletionException(e); | ||
} catch (ClassNotFoundException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The two catch clause could be collapsed catch(IOException | ClassNotFoundException e)
Either<SerializedValue, AkkaRpcException> serializedResult = | ||
serializeRemoteResultAndVerifySize(result, method.getName()); | ||
if (serializedResult.isLeft()) { | ||
getSender().tell(serializedResult.left(), getSelf()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Serialized result should be wrapped in new Status.Success(result)
|
||
static class OversizedResponseRpcEndpoint extends TestRpcEndpoint implements OversizedResponseMsgRpcGateway { | ||
|
||
private volatile Byte[] bytes = new Byte[1024 * 10]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's initialize the bytes via a constructor parameter. That way it is controllable and we don't need the getters and setters.
} | ||
} | ||
|
||
static class TestingRemoteRpcService extends TestingRpcService { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for this RpcService
if we properly simulate remote communication by using a second RpcService
.
akkaRpcService1 = new AkkaRpcService(actorSystem1, timeout); | ||
akkaRpcService2 = new AkkaRpcService(actorSystem2, timeout); | ||
akkaRpcService1 = new AkkaRpcService(actorSystem1, AkkaRpcServiceConfiguration.fromConfiguration(configuration)); | ||
akkaRpcService2 = new AkkaRpcService(actorSystem2, AkkaRpcServiceConfiguration.fromConfiguration(configuration)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could reuse the first AkkaRpcServiceConfiguration
.
@@ -757,7 +758,7 @@ protected RpcService createRpcService( | |||
|
|||
final ActorSystem actorSystem = AkkaUtils.createActorSystem(effectiveAkkaConfig); | |||
|
|||
return new AkkaRpcService(actorSystem, askTimeout); | |||
return new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.fromConfiguration(configuration)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest to replace the askTimeout
parameter with a AkkaRpcServiceConfiguration akkaRpcServiceConfiguration
parameter and pass it to this method.
return o; | ||
} | ||
}); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This block should go into the invokeRpc
method because we also need to unwrap the synchronous calls.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also add a test case that synchronous calls (Rpc's which return directly a value instead of a CompletableFuture
) work with this change as well.
@tillrohrmann I have updated this PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update @yanghua. The code looks now really good. I had some minor comments. After resolving them, we should be good to merge this PR.
try { | ||
result = ((SerializedValue) returnedResult).deserializeValue(getClass().getClassLoader()); | ||
} catch (IOException | ClassNotFoundException e) { | ||
throw new CompletionException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not be a CompletionException
. Instead I would throw a RpcException
saying that one could not deserialize a serialized payload.
try { | ||
return ((SerializedValue) o).deserializeValue(getClass().getClassLoader()); | ||
} catch (IOException | ClassNotFoundException e) { | ||
throw new CompletionException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same applies here. It would be good CompletionExcpetion(new RpcException("Could not deserialize...", e));
} | ||
} else { | ||
result = returnedResult; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is somewhat a code duplication with the other branch. Could we deduplicate this code by applying the deserialization logic on the future? Additionally it would be good to catch ExecutionException
when calling resultFuture.get()
and unstrip them via ExceptionUtils.unstripExecutionException
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not find ExceptionUtils#unstripExecutionException
, but exists ExceptionUtils#stripExecutionException
. Did you mean ExceptionUtils#rethrow
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @tillrohrmann I have updated the PR based on your suggestion, except this one. I did not find the method you mentioned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, I meant stripExecutionException
. I'll take a look and see whether I can do it while merging.
/** | ||
* Configuration object for {@link AkkaRpcService}. | ||
*/ | ||
public class AkkaRpcServiceConfiguration { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could also add the Configuration
to this class. Then we only need to pass in this object instead of the configuration and the object.
|
||
public AkkaRpcServiceConfiguration(Time timeout, long maximumFramesize) { | ||
this.timeout = timeout; | ||
this.maximumFramesize = maximumFramesize; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check that timeout
is not null and maximumFramesize > 0
missing.
String akkaConfigStr = String.format(SIMPLE_AKKA_CONFIG_TEMPLATE, maxFrameSizeStr); | ||
Config akkaConfig = ConfigFactory.parseString(akkaConfigStr); | ||
return akkaConfig.getBytes(MAXIMUM_FRAME_SIZE_PATH); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice :-)
private State state; | ||
|
||
AkkaRpcActor(final T rpcEndpoint, final CompletableFuture<Boolean> terminationFuture, final int version) { | ||
public AkkaRpcActor( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one is still public. Can we make it package private again?
rpcEndpoint.shutDown(); | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line break not needed.
/** | ||
* RPC sync invoke test. | ||
*/ | ||
public class SyncCallsTest extends TestLogger { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice tests. Can we also add a synchronous call with an oversized message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates @yanghua. LGTM. Merging this PR.
This commit slightly refactors and cleans up the handling of over sized messages. This closes apache#6876.
This commit slightly refactors and cleans up the handling of over sized messages. This closes apache#6876.
What is the purpose of the change
This pull request handles oversized response messages in AkkaRpcActor*
Brief change log
Verifying this change
TBD
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation