Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-6602][Core] Update Master, Worker, Client, AppClient and related classes to use RpcEndpoint #5392

Closed
wants to merge 24 commits into from

Conversation

zsxwing
Copy link
Member

@zsxwing zsxwing commented Apr 7, 2015

This PR updates the rest Actors in core to RpcEndpoint.

Because there is no ActorSelection in RpcEnv, I changes the logic of registerWithMaster in Worker and AppClient to avoid blocking the message loop. These changes need to be reviewed carefully.

@zsxwing
Copy link
Member Author

zsxwing commented Apr 7, 2015

cc @rxin

@SparkQA
Copy link

SparkQA commented Apr 7, 2015

Test build #29794 has finished for PR 5392 at commit b776817.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Heartbeat(workerId: String, worker: RpcEndpointRef) extends DeployMessage
    • case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: String) extends DeployMessage
    • case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)
    • case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends DeployMessage
    • case class MasterChanged(master: RpcEndpointRef, masterWebUiUrl: String)
  • This patch does not change any dependencies.

with Logging {

var master: Option[RpcEndpointRef] = None
var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can the above two vars be private?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added private.

@vanzin
Copy link
Contributor

vanzin commented Apr 7, 2015

So I took a quick look, but I'm not familiar enough with the original code to provide good, informed feedback. The original protocol seems really weird to me, and I guess that leaks into the new code too. But fixing the protocol is probably out of the scope of this change...

@zsxwing
Copy link
Member Author

zsxwing commented Apr 8, 2015

I used the following codes to handle errors for the new codes in Client but kept the old codes untouched.

logError(e.getMessage, e)
System.exit(SparkExitCode.UNCAUGHT_EXCEPTION)

@SparkQA
Copy link

SparkQA commented Apr 8, 2015

Test build #29860 has finished for PR 5392 at commit 4d1633b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Heartbeat(workerId: String, worker: RpcEndpointRef) extends DeployMessage
    • case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: String) extends DeployMessage
    • case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)
    • case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends DeployMessage
    • case class MasterChanged(master: RpcEndpointRef, masterWebUiUrl: String)
  • This patch does not change any dependencies.

@SparkQA
Copy link

SparkQA commented Apr 8, 2015

Test build #29861 has finished for PR 5392 at commit e8ad0a5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Heartbeat(workerId: String, worker: RpcEndpointRef) extends DeployMessage
    • case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: String) extends DeployMessage
    • case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)
    • case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends DeployMessage
    • case class MasterChanged(master: RpcEndpointRef, masterWebUiUrl: String)
  • This patch does not change any dependencies.

@SparkQA
Copy link

SparkQA commented Apr 8, 2015

Test build #29867 has finished for PR 5392 at commit 7fdee0e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Heartbeat(workerId: String, worker: RpcEndpointRef) extends DeployMessage
    • case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: String) extends DeployMessage
    • case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)
    • case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends DeployMessage
    • case class MasterChanged(master: RpcEndpointRef, masterWebUiUrl: String)
  • This patch does not change any dependencies.

Conflicts:
	core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
	core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
	core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala
@SparkQA
Copy link

SparkQA commented Apr 14, 2015

Test build #30230 has finished for PR 5392 at commit 6637e3c.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.

@zsxwing
Copy link
Member Author

zsxwing commented Apr 14, 2015

@vanzin I agree that this PR should focus on following the previous protocol instead of fixing issues of the protocol. Do you have other comments?

@SparkQA
Copy link

SparkQA commented Apr 14, 2015

Test build #30233 has finished for PR 5392 at commit fadbb9e.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Heartbeat(workerId: String, worker: RpcEndpointRef) extends DeployMessage
    • case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: String) extends DeployMessage
    • case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)
    • case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends DeployMessage
    • case class MasterChanged(master: RpcEndpointRef, masterWebUiUrl: String)
  • This patch does not change any dependencies.

@zsxwing
Copy link
Member Author

zsxwing commented Apr 14, 2015

retest this please.

@SparkQA
Copy link

SparkQA commented Apr 14, 2015

Test build #30236 has finished for PR 5392 at commit fadbb9e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Heartbeat(workerId: String, worker: RpcEndpointRef) extends DeployMessage
    • case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: String) extends DeployMessage
    • case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)
    • case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends DeployMessage
    • case class MasterChanged(master: RpcEndpointRef, masterWebUiUrl: String)
  • This patch does not change any dependencies.

@vanzin
Copy link
Contributor

vanzin commented Apr 14, 2015

@zsxwing not really, I'm not familiar enough with the code to comment. Maybe @rxin or @andrewor14?

private var master: Option[RpcEndpointRef] = None
// To avoid calling listener.disconnected() multiple times
private var alreadyDisconnected = false
@volatile private var alreadyDead = false // To avoid calling listener.dead() multiple times
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not generally true that ThreadSafeRpcEndpoints require their mutable state to be volatile, right? Perhaps this is just being modified from a separate thread pool?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be updated in a separate thread pool or the message loop of ClientEndpoint, so it's volatile.

@aarondav
Copy link
Contributor

The core logic all looks good to me, just had some nits.

@SparkQA
Copy link

SparkQA commented Jun 30, 2015

Test build #36121 has finished for PR 5392 at commit 2de7bed.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Heartbeat(workerId: String, worker: RpcEndpointRef) extends DeployMessage
    • case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: String) extends DeployMessage
    • case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)
    • case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends DeployMessage
    • case class SubmitDriverResponse(
    • case class KillDriverResponse(
    • case class MasterChanged(master: RpcEndpointRef, masterWebUiUrl: String)

@zsxwing
Copy link
Member Author

zsxwing commented Jun 30, 2015

retest this please

@SparkQA
Copy link

SparkQA commented Jun 30, 2015

Test build #36147 has finished for PR 5392 at commit 2de7bed.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Heartbeat(workerId: String, worker: RpcEndpointRef) extends DeployMessage
    • case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: String) extends DeployMessage
    • case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)
    • case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends DeployMessage
    • case class SubmitDriverResponse(
    • case class KillDriverResponse(
    • case class MasterChanged(master: RpcEndpointRef, masterWebUiUrl: String)

@zsxwing
Copy link
Member Author

zsxwing commented Jun 30, 2015

retest this please

@SparkQA
Copy link

SparkQA commented Jun 30, 2015

Test build #36148 has finished for PR 5392 at commit 2de7bed.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Heartbeat(workerId: String, worker: RpcEndpointRef) extends DeployMessage
    • case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: String) extends DeployMessage
    • case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)
    • case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends DeployMessage
    • case class SubmitDriverResponse(
    • case class KillDriverResponse(
    • case class MasterChanged(master: RpcEndpointRef, masterWebUiUrl: String)

@@ -504,6 +518,7 @@ private[master] class Master(
}

private def completeRecovery() {
// TODO Why synchronized
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was due to an earlier state in the code when this method could be invoked from a different thread. It can be safely removed now.

@aarondav
Copy link
Contributor

Just one comment thanks to your proactive TODOifying :) LGTM, feel free to merge after.

@rxin
Copy link
Contributor

rxin commented Jul 1, 2015

Alright I'm going to merge this. @zsxwing please submit a separate PR to address the TODO.

@asfgit asfgit closed this in 3bee0f1 Jul 1, 2015
asfgit pushed a commit that referenced this pull request Jul 1, 2015
A follow-up pr to address #5392 (comment)

Author: zsxwing <zsxwing@gmail.com>

Closes #7141 from zsxwing/pr5392-follow-up and squashes the following commits:

fcf7b50 [zsxwing] Remove unnecessary synchronized
@zsxwing zsxwing deleted the rpc-rewrite-part3 branch December 23, 2015 23:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants