New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-5124][Core] A standard RPC interface and an Akka implementation #4588
Conversation
Test build #27440 has started for PR 4588 at commit
|
Test build #27440 has finished for PR 4588 at commit
|
Test PASSed. |
@@ -45,30 +42,37 @@ private[spark] class WorkerWatcher(workerUrl: String) | |||
private var isTesting = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can u document what this does?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some docs to explain it.
Test build #28002 has started for PR 4588 at commit
|
I will update this PR as per discussion in https://issues.apache.org/jira/browse/SPARK-5124 |
Test build #28002 has finished for PR 4588 at commit
|
Test PASSed. |
* Retrieve the [[RpcEndpointRef]] represented by `systemName`, `address` and `endpointName` | ||
*/ | ||
def setupEndpointRef( | ||
systemName: String, address: RpcAddress, endpointName: String): RpcEndpointRef |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick i think the indentation is off by one here.
Test build #28264 has started for PR 4588 at commit
|
Test build #28264 has finished for PR 4588 at commit
|
Test PASSed. |
* @param address the remote address of the connection which this error happens on. | ||
* @param cause the cause of the network error. | ||
*/ | ||
private[spark] case class NetworkErrorEvent(address: RpcAddress, cause: Throwable) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually like your old design better - NetworkErrorEvent is not a message, but rather just a function that can be overridden. I think it is more clear that way on how errors are handled. Similarly for AssociatedEvent and DisassociatedEvent btw.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean you like adding the methods in the previous NetworkRpcEndpoint to RpcEndpoint?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes
private[deploy] var isShutDown = false | ||
private[deploy] def setTesting(testing: Boolean) = isTesting = testing | ||
private var isTesting = false | ||
|
||
// Lets us filter events only from the worker's actor system | ||
private val expectedHostPort = AddressFromURIString(workerUrl).hostPort | ||
private def isWorker(address: Address) = address.hostPort == expectedHostPort | ||
private val expectedHostPort = new java.net.URI(workerUrl) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps RpcAddress.fromUriString(workerUrl) and use of the == method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to use RpcAddress.fromUriString
Just a few mainly minor comments, otherwise LGTM. We can iterate on some of the API moving forward as we get more endpoints using it. |
Conflicts: project/MimaExcludes.scala
Test build #29378 has started for PR 4588 at commit |
Test build #29379 has started for PR 4588 at commit |
lazy val actorRef = actorSystem.actorOf(Props(new Actor with ActorLogReceive with Logging { | ||
|
||
assert(endpointRef != null) | ||
registerEndpoint(endpoint, endpointRef) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah - I think there was a miscommunication on theses lines. I was not worried about require vs assert, I was wondering if we could just invoke registerEndpoint(endpoint, endpointRef)
right before endpointRef.init()
, on L158 , since endpointRef is only used inside this call inside the constructor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, sorry. Yes, it can be moved. Done.
Test build #29380 has started for PR 4588 at commit |
Test build #29378 has finished for PR 4588 at commit
|
Test PASSed. |
Test build #29379 has finished for PR 4588 at commit
|
Test PASSed. |
Test build #29380 has finished for PR 4588 at commit
|
Test PASSed. |
Thanks. I've merged this in master. Let's start replacing the direct use of actors with this API. Great work! |
@rxin @CodingCat @vanzin @aarondav Thanks for reviewing this PR. |
Fixed my mistake in apache#4588 Author: zsxwing <zsxwing@gmail.com> Closes apache#5529 from zsxwing/SPARK-6934 and squashes the following commits: 9890b2d [zsxwing] Use 'spark.akka.askTimeout' for the ask timeout
This PR added a standard internal RPC interface for Spark and an Akka implementation. See the design document for more details.
I will split the whole work into multiple PRs to make it easier for code review. This is the first PR and avoid to touch too many files.