Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27369][CORE] Setup resources when Standalone Worker starts up #24841

Closed
wants to merge 7 commits into from

Conversation

Projects
None yet
7 participants
@Ngone51
Copy link
Contributor

commented Jun 11, 2019

What changes were proposed in this pull request?

To support GPU-aware scheduling in Standalone (cluster mode), Worker should have ability to setup resources(e.g. GPU/FPGA) when it starts up.

Similar as driver/executor do, Worker has two ways(resourceFile & resourceDiscoveryScript) to setup resources when it starts up. User could use SPARK_WORKER_OPTS to apply resource configs on Worker in the form of "-Dx=y". For example,

SPARK_WORKER_OPTS="-Dspark.worker.resource.gpu.amount=2 \
                   -Dspark.worker.resource.fpga.amount=1 \
                   -Dspark.worker.resource.fpga.discoveryScript=/Users/wuyi/tmp/getFPGAResources.sh \
                   -Dspark.worker.resourcesFile=/Users/wuyi/tmp/worker-resource-file"

How was this patch tested?

Tested manually in Standalone locally:

  • Worker could start up normally when no resources are configured

  • Worker should fail to start up when exception threw during setup resources(e.g. unknown directory, parse fail)

  • Worker could setup resources from resource file

  • Worker could setup resources from discovery scripts

  • Worker should setup resources from resource file & discovery scripts when both are configure.

@SparkQA

This comment has been minimized.

Copy link

commented Jun 11, 2019

Test build #106387 has finished for PR 24841 at commit 92c0b52.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@Ngone51

This comment has been minimized.

Copy link
Contributor Author

commented Jun 11, 2019

@tgravescs

This comment has been minimized.

Copy link
Contributor

commented Jun 11, 2019

pr #24821 is adding in an abstraction layer for the resource handling
I think these will conflict so we need to decide which one to put in first

@Ngone51

This comment has been minimized.

Copy link
Contributor Author

commented Jun 11, 2019

Yeah, I noticed that but haven't go through it. In this PR, Worker has similar behavior with driver/executor to setup resources(something like, resource file first, discovery script second). So, I guess mixing this PR into #24821 may just need some similar modify ?

And, I'm also OK to wait for #24821, since this PR is a smaller one, which is more easy to update.

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala Outdated
@@ -220,6 +225,38 @@ private[deploy] class Worker(
metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
}

// TODO if we're starting up multi workers under the same host, discovery script won't work.

This comment has been minimized.

Copy link
@jiangxb1987

jiangxb1987 Jun 12, 2019

Contributor

How would the resourceFile work under your approach on the scenario of multiple workers launched on the same host? We need to avoid multiple workers access the same external resource address.

This comment has been minimized.

Copy link
@Ngone51

Ngone51 Jun 12, 2019

Author Contributor

For the initial version, I have a base assumption here that user should be responsible for configuring the right resourceFile among multi workers on the same hosts.

This comment has been minimized.

Copy link
@tgravescs

tgravescs Jun 12, 2019

Contributor

technically a discovery script could work but the writer of that script would have to handle that case inside of the script to assign each worker different resources. Either way we should be sure to document it

This comment has been minimized.

Copy link
@Ngone51

Ngone51 Jun 13, 2019

Author Contributor

Yeah, technically we can do, but I think it may be more troublesome than resourceFile to do it. Document is good, have done in ade97c2.

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala Outdated
resources = resourceFile.map { rFile =>
ResourceDiscoverer.parseAllocatedFromJsonFile(rFile)
}.getOrElse {
if (resourceDiscoveryScript.isEmpty) {

This comment has been minimized.

Copy link
@jiangxb1987

jiangxb1987 Jun 12, 2019

Contributor

We don't want to reuse the discoveryScript config?

This comment has been minimized.

Copy link
@Ngone51

Ngone51 Jun 12, 2019

Author Contributor

Not sure what you actually mean here. Just explain my understanding. resourceDiscoveryScript is configured from SPARK_WORKER_RESOURCE_DISCOVERY_SCRIPT or --resource-script, which are specially for Worker. And, application(driver/executor) discoveryScipt config haven't setup while Worker is starting up.

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala Outdated
ResourceDiscoverer.parseAllocatedFromJsonFile(rFile)
}.getOrElse {
if (resourceDiscoveryScript.isEmpty) {
logWarning(s"Neither resourceFile nor resourceDiscoveryScript found for worker. " +

This comment has been minimized.

Copy link
@tgravescs

tgravescs Jun 12, 2019

Contributor

I'm not sure you want to warn here. If users just aren't configuring spark to use other resources there is no reason to warn.

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala Outdated
}
} catch {
case t: Throwable =>
logWarning("Failed to setup worker resources: ", t)

This comment has been minimized.

Copy link
@tgravescs

tgravescs Jun 12, 2019

Contributor

logError

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala Outdated
}
}
} catch {
case t: Throwable =>

This comment has been minimized.

Copy link
@tgravescs

tgravescs Jun 12, 2019

Contributor

is there a reason we catch all Throwable here? I think just Exception makes more sense.

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala Outdated
@@ -220,6 +225,38 @@ private[deploy] class Worker(
metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
}

// TODO if we're starting up multi workers under the same host, discovery script won't work.

This comment has been minimized.

Copy link
@tgravescs

tgravescs Jun 12, 2019

Contributor

technically a discovery script could work but the writer of that script would have to handle that case inside of the script to assign each worker different resources. Either way we should be sure to document it

core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala Outdated
resourceFile = Some(System.getenv("SPARK_WORKER_RESOURCE_FILE"))
}
if (System.getenv("SPARK_WORKER_RESOURCE_DISCOVERY_SCRIPT") != null) {
parseResourceDiscoveryScript(conf.getenv("SPARK_WORKER_RESOURCE_DISCOVERY_SCRIPT"))

This comment has been minimized.

Copy link
@tgravescs

tgravescs Jun 12, 2019

Contributor

might be easier to read if we have parseResourceDiscoveryScript return a Map and set it here.

core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala Outdated
@@ -36,6 +36,8 @@ private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) {
var memory = inferDefaultMemory()
var masters: Array[String] = null
var workDir: String = null
var resourceFile: Option[String] = None
var resourceDiscoveryScript: Map[String, String] = Map()

This comment has been minimized.

Copy link
@tgravescs

tgravescs Jun 12, 2019

Contributor

= Map.empty might be more clear that this is immutable.

core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala Outdated
@@ -60,7 +60,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
val securityMgr = new SecurityManager(conf)
val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, securityMgr)
_worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
"Worker", "/tmp", conf, securityMgr, shuffleServiceSupplier)
"Worker", "/tmp", conf, securityMgr, None, Map.empty, shuffleServiceSupplier)

This comment has been minimized.

Copy link
@tgravescs

tgravescs Jun 12, 2019

Contributor

can we add more tests that actually get the resources?

This comment has been minimized.

Copy link
@tgravescs

tgravescs Jun 12, 2019

Contributor

if its easier to do with the next jira for assigments that is fine too

This comment has been minimized.

Copy link
@Ngone51

Ngone51 Jun 13, 2019

Author Contributor

We need to get Master in to test wether Worker sets up the resources correctly, since Worker could not start up without a living Master. Maybe, we could cover this in the following JIRA task, which would sharing resource info between Master and Worker.

This comment has been minimized.

Copy link
@tgravescs

tgravescs Jun 13, 2019

Contributor

sounds fine

@viirya
Copy link
Contributor

left a comment

Not sure if I understand correctly, but from a curiosity look at the description, executor will also setup resources? Doesn't executor setup it again if Worker already setup it?

@Ngone51

This comment has been minimized.

Copy link
Contributor Author

commented Jun 13, 2019

@viirya IIUC, executor set up the resources from what the worker assigned to it. For example, worker could "split" its own resources to some separate resource files according to Masters' requirements for executors. Then, executor could set up from corresponding resource file when it starts up.

@SparkQA

This comment has been minimized.

Copy link

commented Jun 13, 2019

Test build #106450 has finished for PR 24841 at commit 14203f5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@viirya

This comment has been minimized.

Copy link
Contributor

commented Jun 13, 2019

@Ngone51 Sounds reasonable. So the splitting logic will be implemented in following up?

@Ngone51

This comment has been minimized.

Copy link
Contributor Author

commented Jun 13, 2019

@viirya Yeah, I think so.

@jiangxb1987

This comment has been minimized.

Copy link
Contributor

commented Jun 19, 2019

@Ngone51 Could you please rebase the PR so we can continue work on this?

@Ngone51

This comment has been minimized.

Copy link
Contributor Author

commented Jun 20, 2019

@jiangxb1987 sure, will do it.

@Ngone51 Ngone51 force-pushed the Ngone51:dev-worker-resources-setup branch to a1cd955 Jun 21, 2019

@SparkQA

This comment has been minimized.

Copy link

commented Jun 21, 2019

Test build #106750 has finished for PR 24841 at commit a1cd955.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Copy link

commented Jun 21, 2019

Test build #106752 has finished for PR 24841 at commit 0960bcf.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.
@Ngone51

This comment has been minimized.

Copy link
Contributor Author

commented Jun 21, 2019

Jenkins, retest this please.

@SparkQA

This comment has been minimized.

Copy link

commented Jun 21, 2019

Test build #106759 has finished for PR 24841 at commit 0960bcf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
ConfigBuilder("spark.worker.resourcesFile")
.internal()
.doc("Path to a file containing the resources allocated to the worker. " +
"The file should be formatted as a JSON array of ResourceInformation objects. " +

This comment has been minimized.

Copy link
@tgravescs

tgravescs Jun 24, 2019

Contributor

I don't think this is true anymore, I think the json has to be formatted like a ResourceAllocation

This comment has been minimized.

Copy link
@Ngone51

Ngone51 Jun 24, 2019

Author Contributor

Thanks for catching this.

@SparkQA

This comment has been minimized.

Copy link

commented Jun 24, 2019

Test build #106842 has finished for PR 24841 at commit ba62e35.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@jiangxb1987
Copy link
Contributor

left a comment

Looks good, this is a straightforward change that enables Worker to be able to load resource informations. Resource Informations sync between the master and workers, as well as split of resources among different workers on the same node would be addressed by a separated PR.

@@ -41,6 +41,8 @@ import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.internal.config.UI._
import org.apache.spark.internal.config.Worker._
import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances}
import org.apache.spark.resource.ResourceInformation
import org.apache.spark.resource.ResourceUtils.getOrDiscoverAllResources

This comment has been minimized.

Copy link
@jiangxb1987

jiangxb1987 Jun 24, 2019

Contributor

nit: I would prefer import org.apache.spark.resource.ResourceUtils._.

@jiangxb1987

This comment has been minimized.

Copy link
Contributor

commented Jun 24, 2019

@SparkQA

This comment has been minimized.

Copy link

commented Jun 25, 2019

Test build #106858 has finished for PR 24841 at commit 41a70bd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@WeichenXu123
Copy link
Contributor

left a comment

Two issue:

  • If the discovery script is the format like {"name": "gpu", "addresses": ["0","1"]}, how to avoid launch multiple workers on the same worker node (which will cause resources conflicts)? We should add some code to avoid this case.
  • If the discovery script is the dynamic allocation script for the launching worker, then should we limit the script to allocate exactly the same amount resources with requested amount ? Allocate more amount than requested will be wasted because we're possible to launch multiple workers on the same node ?
@Ngone51

This comment has been minimized.

Copy link
Contributor Author

commented Jun 26, 2019

how to avoid launch multiple workers on the same worker node (which will cause resources conflicts)?

The current plan is to let Master be the role of coordinator to avoid resources conflicts. So, multiple workers are still allowed to be launched on the same node. But if the available resources can't meet the worker's requested amount, it will fail to launch.

If the discovery script is the dynamic allocation script for the launching worker, then should we limit the script to allocate exactly the same amount resources with requested amount ? Allocate more amount than requested will be wasted because we're possible to launch multiple workers on the same node ?

If Master is a coordinator, then the superfluous resources could be allocated to other workers launched on the same node. If no more worker launches, then, those resources could be wasted.

Another possibility is, setting the requested amount to the size of resources found by script if user do not config the resources amount explicitly(similar like cpu, memory do). In this way, once a worker launches on the node, then, no more other worker(using the same script) could launch on the same node(as the first worker has already grabbed all the resources). This may could achieve the goal of issue 1.

@WeichenXu123

This comment has been minimized.

Copy link
Contributor

commented Jun 26, 2019

@Ngone51

Oh, let me clarify my issues clearly:
Issue1
I said, suppose the case: in machine node A we have provided a discovery script like {"name": "gpu", "addresses": ["0","1"]}, then if we launch 2 workers on node A, then what will happen ? The 2 workers are both allocated gpu-0 and gpu-1. Where's the code to detect this case and then raise error ?

Issue2
I said, for example, suppose we have config worker requesting gpu amount to be 2, but the discovery script allocate more than 2 gpus when this worker launching, is it a good discovery script ? I think the discovery script should exactly allocate the gpu amount the worker requesting, so I suggest the verification in assertAllResourceAllocationsMeetRequests can be "numAllocatedByDiscoveryScript==numRequsted" (current verification is "numAllocatedByDiscoveryScript>=numRequsted")

Thanks :)

@tgravescs

This comment has been minimized.

Copy link
Contributor

commented Jun 26, 2019

  1. if you do this then your script has to handle this case. This not impossible its just left up to the user. Or you could have the Master coordinate, but you would need to make sure there isn't some sort of isolation going on, because then the gpu ids could potentially overlap. Really if you do this I would recommend the other method of providing the resource file and then you can split the GPU's between your workers. The script would just give you an automated way to do that.
  2. This can be run in lots of different environments, as long as you have at least the number you requested its fine. The discovery script isn't "allocating". Its discovering whats there. If you happen to have some hosts with 6 gpu's but only want to run with 4 this allows it. Originally the function did have a strict check, but really it would just be a user nice to have thing and I can see some people not wanting it for experimentation/testing. It doesn't functionally matter if the container has more.
@jiangxb1987

This comment has been minimized.

Copy link
Contributor

commented Jun 26, 2019

Just my 2cents: We may rely on the discovery script to discover all the available resources on the host, but that doesn't means the worker shall occupy them all. My idea is the Worker shall send the information of all available resources to the Master on register, and then the Master shall be responsible for coordination, and tell the Worker which addresses it shall use.

One tricky thing here is when to recycle the resources. Ideally we shall recycle them when a Worker is lost, but as Tom mentioned, a lost Worker may be still running, so there is possibility that the resources addresses are still being used. Instead, we shall make Worker to explicitly send a ReleaseResource message to the Master to release the resource addresses it no longer need. This way we may completely avoid resource address collision, but when a Worker dies silently, we may never recycle the resources allocated to it.

Issue1
I said, suppose the case: in machine node A we have provided a discovery script like {"name": "gpu", "addresses": ["0","1"]}, then if we launch 2 workers on node A, then what will happen ? The 2 workers are both allocated gpu-0 and gpu-1. Where's the code to detect this case and then raise error ?

The Master will keep the information that which addresses have been allocated to a Worker, so when a new Worker joins, the Master will detect which addresses are free, and make sure to only allocate the free addresses to it.

Issue2
I said, for example, suppose we have config worker requesting gpu amount to be 2, but the discovery script allocate more than 2 gpus when this worker launching, is it a good discovery script ? I think the discovery script should exactly allocate the gpu amount the worker requesting, so I suggest the verification in assertAllResourceAllocationsMeetRequests can be "numAllocatedByDiscoveryScript==numRequsted" (current verification is "numAllocatedByDiscoveryScript>=numRequsted")

We don't require the number of addresses detected by the discovery script to be exactly the same as the request, because the assumption is too strong, and it may happen that the hardware on a node may change over time too (we may add new devices, and devices may broken). So we only require the number of discover addresses is greater than the request, and Spark will ensure we only use the requested number of devices.

@Ngone51

This comment has been minimized.

Copy link
Contributor Author

commented Jun 27, 2019

Where's the code to detect this case and then raise error ?

This part is planning to be done in the following PR :) @WeichenXu123

@jiangxb1987

This comment has been minimized.

Copy link
Contributor

commented Jun 27, 2019

Thanks, merged to master!

@Ngone51

This comment has been minimized.

Copy link
Contributor Author

commented Jun 27, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.