Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-20408][SQL] Get the glob path in parallel to reduce resolve relation time #17702

Closed
wants to merge 6 commits into from

Conversation

xuanyuanking
Copy link
Member

@xuanyuanking xuanyuanking commented Apr 20, 2017

What changes were proposed in this pull request?

This PR change the work of getting glob path in parallel, which can make complex wildcard path more quickly, the mainly changes in details:
1.Add new function getGlobbedPaths in DataSource, return all paths represented by the wildcard, follow the logic of InMemoryFileIndex.bulkListLeafFiles and reuse the config.
2.Add new function expandGlobPath in SparkHadoopUtil, to expand the first dir represented by the wildcard

How was this patch tested?

Existing UT.

@SparkQA
Copy link

SparkQA commented Apr 20, 2017

Test build #75984 has finished for PR 17702 at commit b27ef4f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@xuanyuanking
Copy link
Member Author

@marmbrus Can you take a look of this? Thanks :)

@xuanyuanking
Copy link
Member Author

cc @zsxwing @tdas, can you review this? Founded the relative code of yours before. Thanks :)

@xuanyuanking
Copy link
Member Author

@HyukjinKwon Can you help me to find a appropriate reviewer about this?

@HyukjinKwon
Copy link
Member

Not sure. Probably, @cloud-fan or @gatorsmile ?

@xuanyuanking
Copy link
Member Author

ping @cloud-fan and @gatorsmile , could you have a look about this ? Thanks :)

@@ -146,6 +146,11 @@ object SQLConf {
.longConf
.createWithDefault(Long.MaxValue)

val GLOB_PATH_IN_PARALLEL = buildConf("spark.sql.globPathInParallel")
.doc("When true, resolve the glob path in parallel, the strategy same with ")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the cut-off, I add a patch to fix this.

@gatorsmile
Copy link
Member

Can you show us the performance difference?

@cloud-fan
Copy link
Contributor

yea I'm also wondering how useful it is

@xuanyuanking
Copy link
Member Author

xuanyuanking commented May 2, 2017

Thanks for your review. @gatorsmile @cloud-fan

Can you show us the performance difference?

No problem, I reproduce our online case offline like below

Test env:

item detail
Spark version current master 2.2.0-SNAPSHOT
Hadoop version 2.7.2
HDFS 8 servers(128G, 20 core + 20 hyper threading)
Test case spark.read.text("/app/dc/test_for_ls/*/*/*/*").count()
first level below test_for_ls contains 96 directory, each directory has 1000 directory in next level, the third and forth only have 1 directory and file each.

Without parallel resolve:

image

With parallel resolve:

image

Discussion:

  1. More complex scenario and deeper directory level will have more optimization, in this local test it can bring us 100% faster.
  2. spark.sql.globPathInParallel config will only parallel the process of resolving glob path.
  3. If driver and cluster are in different geographical region, this improvement can produce at least 5* boosting in our scenario because of the resolving work become a parallel job on the cluster.

@SparkQA
Copy link

SparkQA commented May 2, 2017

Test build #76378 has finished for PR 17702 at commit 2dcf96f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@xuanyuanking
Copy link
Member Author

@gatorsmile @cloud-fan, do we need other performance test?

@xuanyuanking
Copy link
Member Author

ping @cloud-fan

@SparkQA
Copy link

SparkQA commented Jun 1, 2017

Test build #77634 has finished for PR 17702 at commit 897d316.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

The logic looks very similar to InMemoryFileIndex.bulkListLeafFiles, is it possible to consolidate them?

@xuanyuanking
Copy link
Member Author

@cloud-fan Thanks for your reply!
It's possible to consolidate them but may be not so necessary? I can consolidate them by replace the logic in getGlobbedPaths list below to InMemoryFileIndex.bulkListLeafFiles

+      val expanded = sparkSession.sparkContext
+        .parallelize(paths, paths.length)
+        .map { pathString =>
+          SparkHadoopUtil.get.globPathIfNecessary(new Path(pathString)).map(_.toString)
+        }.collect()
+      expanded.flatMap(paths => paths.map(new Path(_))).toSeq

, and also change the interface of bulkListLeafFiles by adding a default param to parse the function globPathIfNecessary, because of the glob path can be nested in many levels.
Maybe current logic to add a new parallelize job is better than consolidate, what do you think?

@gatorsmile
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Jun 16, 2017

Test build #78137 has finished for PR 17702 at commit 897d316.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -389,6 +389,23 @@ case class DataSource(
}

/**
* Return all paths represented by the wildcard string.
*/
private def getGlobbedPaths(qualified: Path): Seq[Path] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at least we should follow InMemoryFileIndex.bulkListLeafFiles and Picks the listing strategy adaptively depending on the number of paths to list

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right.
I'll fix this and also limit the max parallelism num in next patch, reuse the config in InMemoryFileIndex.bulkListLeafFiles.

@SparkQA
Copy link

SparkQA commented Jun 16, 2017

Test build #78156 has started for PR 17702 at commit a3a3509.

@xuanyuanking
Copy link
Member Author

xuanyuanking commented Jun 16, 2017

Test failed may cause by the env? process was terminated by signal 9 in jenkins log.

@xuanyuanking
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Jun 16, 2017

Test build #78171 has finished for PR 17702 at commit a3a3509.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 30, 2017

Test build #82351 has finished for PR 17702 at commit 0ee6943.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@xuanyuanking
Copy link
Member Author

retest this please

1 similar comment
@xuanyuanking
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Oct 2, 2017

Test build #82378 has finished for PR 17702 at commit d019656.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jiangxb1987
Copy link
Contributor

cc @zsxwing

* Return all paths represented by the wildcard string.
* Follow [[InMemoryFileIndex]].bulkListLeafFile and reuse the conf.
*/
private def getGlobbedPaths(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you move this method to object DataSource?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in next commit.

@@ -246,6 +246,18 @@ class SparkHadoopUtil extends Logging {
if (isGlobPath(pattern)) globPath(fs, pattern) else Seq(pattern)
}

def expandGlobPath(fs: FileSystem, pattern: Path): Seq[String] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add unit tests for this method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add UT in SparkHadoopUtilSuite.scala

@SparkQA
Copy link

SparkQA commented Nov 13, 2017

Test build #83783 has finished for PR 17702 at commit 660b95a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 14, 2017

Test build #83819 has finished for PR 17702 at commit ec9c1c1.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@xuanyuanking
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Nov 14, 2017

Test build #83822 has finished for PR 17702 at commit ec9c1c1.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@xuanyuanking
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Nov 14, 2017

Test build #83837 has finished for PR 17702 at commit ec9c1c1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@xuanyuanking
Copy link
Member Author

@zsxwing Thanks for your comments, ready for review.

@xuanyuanking
Copy link
Member Author

gental ping @zsxwing

val parallelPartitionDiscoveryParallelism =
sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism
val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism)
val expanded = sparkSession.sparkContext
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do this using a Spark job, instead of just a local thread pool?

I see this is the same thing done by InMemoryFileIndex, but it feels unnecessarily expensive.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vanzin Thanks for you reply.

Why do this using a Spark job, instead of just a local thread pool?

As the DFS generally deployed together with NodeManagers for better data locality, while using client mode and driver in different region with cluster, using a Spark job will resolve the problem of cross region interaction in our scenario.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NMs are part of YARN, not HDFS.

This code will talk to HDFS's NameNodes; there is generally only one of those you'll be talking to. Latency here is not an issue, throughput is, so I still don't see why this shouldn't be done with a local thread pool.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I means YARN and HDFS always deploy in same region, but driver we can't control because it's our customer's machine in client mode like spark sql or shell.
For example we deploy YARN and HDFS in Beijing CN, user use spark sql on Shanghai CN.
Maybe this scenario shouldn't consider in this patch? What's your opinion @vanzin

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where the driver is deployed is not of concern; first because, as I said, this is not about latency, but parallelizing multiple calls to the NN. Second, because if your driver is in a different network, it will have the same latency when talking to the executors as if it would talk directly to the NN, so you're not fixing anything, you're just adding an extra hop (= more latency).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your suggestion and detailed explanation, I'll reimplement this to local thread pool.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late reply, finished in next commit.

@SparkQA
Copy link

SparkQA commented Jan 23, 2018

Test build #86517 has finished for PR 17702 at commit dc373ae.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@xuanyuanking
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Jan 24, 2018

Test build #86558 has finished for PR 17702 at commit dc373ae.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@xuanyuanking
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Jan 24, 2018

Test build #86573 has finished for PR 17702 at commit dc373ae.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@xuanyuanking
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Jan 24, 2018

Test build #86583 has finished for PR 17702 at commit dc373ae.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@xuanyuanking
Copy link
Member Author

ping @vanzin

@@ -252,6 +252,18 @@ class SparkHadoopUtil extends Logging {
if (isGlobPath(pattern)) globPath(fs, pattern) else Seq(pattern)
}

def expandGlobPath(fs: FileSystem, pattern: Path): Seq[String] = {
val arr = pattern.toString.split("/")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should not parse the path string ourselves, it's too risky, we may miss some special cases like windows path, escape character, etc. Let's take a look at org.apache.hadoop.fs.Globber and see if we can reuse some parser API there.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your reply, agree with you.

@cloud-fan
Copy link
Contributor

This approach only works if the first level glob pattern matches a lot of directories, e.g. /my_path/*/*. Otherwise, we can't apply it, e.g. /my_path/{ab, cd}/*.

My proposal: think about how glob works

  1. split path into parts, e.g. /a/*/* -> a, *, *
  2. for each path part, expand it if it's glob pattern, then flatMap the expanded results and expand the next path part, repeat until the last path part.

Step by step, we first expand /a/*/* to /a/b1/*; /a/b2/*, and then /a/b1/c1; /a/b1/c2; /a/b2/c1; /a/b2/c2. Theoritically, we can add a check in each step, if the current to-be-expanded list is above a treshold, do the next expand in parallel.

Maybe we should just fork the Hadoop Globber and improve it to run in parallel.

@xuanyuanking
Copy link
Member Author

This approach only works if the first level glob pattern matches a lot of directories.

Yep, actually in our internal usage, we leave the problem to user and they should use first wild cast to represent most of file.

Maybe we should just fork the Hadoop Globber and improve it to run in parallel.

Thanks for your detailed explain and guidance, I'll reconsider this and open another PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
8 participants