[SPARK-27823] [CORE] Refactor resource handling code#24856
[SPARK-27823] [CORE] Refactor resource handling code#24856mengxr wants to merge 28 commits intoapache:masterfrom
Conversation
|
Test build #106432 has finished for PR 24856 at commit
|
|
Test build #106433 has finished for PR 24856 at commit
|
|
@dongjoon-hyun this is not a new feature. |
|
Test build #106435 has finished for PR 24856 at commit
|
|
Thank you for the change, @mengxr ! |
|
Test build #106483 has finished for PR 24856 at commit
|
|
Test build #106484 has finished for PR 24856 at commit
|
|
Test build #106485 has finished for PR 24856 at commit
|
|
Test build #106487 has finished for PR 24856 at commit
|
|
Test build #106488 has finished for PR 24856 at commit
|
|
Test build #106489 has finished for PR 24856 at commit
|
| } | ||
| } else { | ||
| throw new SparkException(s"User is expecting to use resource: $resourceName but " + | ||
| "didn't specify a discovery script!") |
There was a problem hiding this comment.
"but neither allocated by resources file nor specified a discovery script!" ?
There was a problem hiding this comment.
The resources file is provided by the cluster manager via a discovery script, not directly by user. So the error message is correct.
| "didn't specify a discovery script!") | ||
| } | ||
| if (!result.name.equals(resourceName)) { | ||
| throw new SparkException("Error running the resource discovery script, script returned " + |
There was a problem hiding this comment.
"Error running the resource discovery script $scriptFile, script ...." ?
| val executorResourcesAndCounts = sc.conf.getAllWithPrefixAndSuffix( | ||
| SPARK_EXECUTOR_RESOURCE_PREFIX, SPARK_RESOURCE_AMOUNT_SUFFIX).toMap | ||
| val taskResourceRequirements = parseTaskResourceRequirements(sc.conf) | ||
| val executorResourcesAndCounts = |
There was a problem hiding this comment.
Shall we unify executor's count to amount ?
| if (execCount < taskReq.amount) { | ||
| throw new SparkException("The executor resource config: " + | ||
| ResourceID(SPARK_EXECUTOR_PREFIX, taskReq.resourceName).amountConf + | ||
| s" = $execCount has to be >= the task config: " + |
There was a problem hiding this comment.
Just for symmetry: executor resource config <-> task resource config
| logWarning(s"The configuration of resource: ${taskReq.resourceName} " + | ||
| s"(limits tasks to $resourceNumSlots) will result in wasted resources of resource " + | ||
| s"${limitingResourceName} (would allow for $numSlots tasks). " + | ||
| "Please adjust your configuration.") |
There was a problem hiding this comment.
I know it's previous logic, but isn't this warning a little redundant comparing to the following warning below ?
| request = parseResourceRequest(conf, DRIVER_GPU_ID) | ||
| assert(request.id.resourceName === GPU, "should only have GPU for resource") | ||
| assert(request.amount === 2, "GPU count should be 2") | ||
| assert(request.discoveryScript.get === discoveryScript, "discovery script should be empty") |
There was a problem hiding this comment.
"discovery script should be discoveryScriptGPU"
There was a problem hiding this comment.
changed to "should get discovery script". putting discoveryScriptGPU in results duplicate info.
| assert(request.id.resourceName === GPU, "should only have GPU for resource") | ||
| assert(request.amount === 2, "GPU count should be 2") | ||
| assert(request.discoveryScript.get === discoveryScript, "discovery script should be empty") | ||
| assert(request.vendor.get === vendor, "vendor should be empty") |
| parse(json).extract[ResourceInformationJson].toResourceInformation | ||
| } catch { | ||
| case NonFatal(e) => | ||
| throw new SparkException(s"Error parsing JSON into ResourceInformation:\n$json\n", e) |
There was a problem hiding this comment.
Maybe, given tips of what is right JSON format for user ?
| import org.apache.spark.resource.TestResourceIDs._ | ||
| import org.apache.spark.util.Utils | ||
|
|
||
| class ResourceUtilsSuite extends SparkFunSuite |
There was a problem hiding this comment.
case for the combination of resources file and discovery script ?
There was a problem hiding this comment.
added one test. I will leave improving test coverage to a follow-up PR since the refactoring work is quite big now.
| def parseJson(json: String): ResourceInformation = { | ||
| implicit val formats = DefaultFormats | ||
| try { | ||
| parse(json).extract[ResourceInformationJson].toResourceInformation |
There was a problem hiding this comment.
Shall we check duplicate addresses for user ?
There was a problem hiding this comment.
We can add that check later. It is beyond the scope of this refactor.
jiangxb1987
left a comment
There was a problem hiding this comment.
Generally looks good
| def confPrefix: String = s"$componentName.resource.$resourceName." // with ending dot | ||
| def amountConf: String = s"$confPrefix${ResourceUtils.AMOUNT}" | ||
| def discoveryScriptConf: String = s"$confPrefix${ResourceUtils.DISCOVERY_SCRIPT}" | ||
| def vendorConf: String = s"$confPrefix${ResourceUtils.VENDOR}" |
There was a problem hiding this comment.
This is k8s specific, do we want to move it to the k8s package?
There was a problem hiding this comment.
currently its only used by k8s but I didn't want to make it specific. I can see others using it in the future and its just another suffix on spark.{executor/driver}.resources.{resourcename}. . If we made it specific I don't think would be as user friendly as now I have to know which prefix to use.
| def parseAllResourceRequests( | ||
| sparkConf: SparkConf, | ||
| componentName: String): Seq[ResourceRequest] = { | ||
| listResourceIds(sparkConf, componentName).map { id => |
There was a problem hiding this comment.
This would call sparkConf.getAllWithPrefix multiple times, we may reduce the call to only once and manually turn the Map[String, String] to Seq[ResourceRequest]. I understand this is tradeoff between code readability and performance, I'm just worried the config number can be big thus we want to reduce the call of getAllWithPrefix. I'd prefer to leave a TODO here and consider the improvement in the future.
There was a problem hiding this comment.
This is a one-time only call for each service. I think we should optimize SparkConf and getAllWithPrefix instead.
|
Test build #106632 has finished for PR 24856 at commit
|
|
Test build #106635 has finished for PR 24856 at commit
|
|
test this please |
|
The failed test succeeded on my local. @jiangxb1987 Could you take a look? https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/106635/testReport/org.apache.spark/SparkContextSuite/test_resource_scheduling_under_local_cluster_mode/ |
|
Test build #106640 has finished for PR 24856 at commit
|
|
@mengxr It's a known flaky test, but I haven't figure out the root cause. |
|
Thanks, merged to master |
What changes were proposed in this pull request?
Continue the work from #24821. Refactor resource handling code to make the code more readable. Major changes:
spark.resourcefromspark.TestResourceIDsto reference commonly used resource IDs in tests likespark.executor.resource.gpu.cc: @tgravescs @jiangxb1987 @Ngone51
How was this patch tested?
Unit tests for added utils and existing unit tests.