Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27232][SQL]Ignore file locality in InMemoryFileIndex if spark.locality.wait is set to zero #24175

Closed

Conversation

WangGuangxin
Copy link
Contributor

What changes were proposed in this pull request?

InMemoryFileIndex needs to request file block location information in order to do locality schedule in TaskSetManager.

Usually this is a time-cost task. For example, In our production env, there are 24 partitions, with totally 149925 files and 83TB in size. It costs about 10 minutes to request file block locations before submit a spark job. Even though I set spark.sql.sources.parallelPartitionDiscovery.threshold to 24 to make it parallelized, it also needs 2 minutes.

Anyway, this is a waste if we don't care about the locality of files(for example, storage and computation are separate).

So there should be a conf to control whether we need to send getFileBlockLocations request to HDFS NN. If user set spark.locality.wait to 0, file block location information is meaningless.

Here in this PR, if spark.locality.wait is set to 0, it will not request file location information anymore, which will save several seconds to minutes.

How was this patch tested?

tested manually

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@WangGuangxin
Copy link
Contributor Author

@srowen Could you please help review this patch?

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems plausible, but I don't really know this code well enough to endorse it. Your best bet is to contact people who may have worked on this code most recently. @peter-toth @adrian-ionescu do you have any thoughts? Maybe @cloud-fan for good measure.

@peter-toth
Copy link
Contributor

peter-toth commented Mar 25, 2019

Sorry, I don't know this part of the code either. But it looks like PartitionedFileUtil.getBlockLocations can return an empty array even without this change so it looks viable. But I can't judge if it should be bound to spark.locality.wait.

Copy link
Contributor

@adrian-ionescu adrian-ionescu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not very familiar with this code either, but at a high-level I'd say: let's try to come up with some micro-benchmark that can quantify the perf improvement this brings.

@@ -168,10 +168,12 @@ object InMemoryFileIndex extends Logging {
filter: PathFilter,
sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = {

val ignoreFileLocality = sparkSession.sparkContext.conf.get[Long](config.LOCALITY_WAIT) == 0L
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'd be safer to check the more specific confs as well and only perform this optimization if they're all 0.

private[spark] val LOCALITY_WAIT_PROCESS = ConfigBuilder("spark.locality.wait.process")
.fallbackConf(LOCALITY_WAIT)
private[spark] val LOCALITY_WAIT_NODE = ConfigBuilder("spark.locality.wait.node")
.fallbackConf(LOCALITY_WAIT)
private[spark] val LOCALITY_WAIT_RACK = ConfigBuilder("spark.locality.wait.rack")
.fallbackConf(LOCALITY_WAIT)

new BlockLocation(loc.getNames, loc.getHosts, loc.getOffset, loc.getLength)
fs.getFileBlockLocations(f, 0, f.getLen).map { loc =>
// Store BlockLocation objects to consume less memory
if (loc.getClass == classOf[BlockLocation]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part doesn't change in this PR. The new thing here is that we don't look up the block locations, but return an empty array instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry, you're right about that. Please disregard.

new BlockLocation(loc.getNames, loc.getHosts, loc.getOffset, loc.getLength)
fs.getFileBlockLocations(f, 0, f.getLen).map { loc =>
// Store BlockLocation objects to consume less memory
if (loc.getClass == classOf[BlockLocation]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part doesn't change in this PR. The new thing here is that we don't look up the block locations, but return an empty array instead.

@squito
Copy link
Contributor

squito commented Apr 1, 2019

@LantaoJin just pointed me at this based on some discussion in #23951. I totally understand the use case for this, but it needs to use a new config. Even with locality wait == 0, spark still tries to schedule tasks to take advantage of locality. It just means spark won't wait until it gets an offer with better locality. In fact I regularly recommend users to turn locality wait == 0 even on colocated clusters.

Furthermore, even in disagg clusters, you don't necessarily want to turn all locality wait to 0, right? I mean you still might want to wait for locality persisted data from cached rdds?

#23951 pointed out a case for skipping rack resolution entirely on disagg clusters. This is another good case. I'm not entirely sure if they should be controlled by the same thing ... I wonder if there is some hdfs-specific thing which might be appropriate here. Eg. you might have "semi" disagg clusters with most data living remotely, but some small local hdfs. I'm not sure if there is an easy way to figure this out.

@WangGuangxin
Copy link
Contributor Author

Close this since there is a better solution #24672

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants