Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-5199. FS read metrics should support CombineFileSplits and track bytes from all FSs #4050

Closed
wants to merge 4 commits into from

Conversation

sryza
Copy link
Contributor

@sryza sryza commented Jan 14, 2015

...mbineFileSplits

@SparkQA
Copy link

SparkQA commented Jan 14, 2015

Test build #25567 has started for PR 4050 at commit 9962dd0.

  • This patch merges cleanly.

@@ -219,6 +220,9 @@ class HadoopRDD[K, V](
val bytesReadCallback = if (split.inputSplit.value.isInstanceOf[FileSplit]) {
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(
split.inputSplit.value.asInstanceOf[FileSplit].getPath, jobConf)
} else if (split.inputSplit.value.isInstanceOf[CombineFileSplit]) {
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(
split.inputSplit.value.asInstanceOf[CombineFileSplit].getPath(0), jobConf)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you push this logic down to the SparkHadoopUtil so that we don't duplicate it in two places (HadoopRDD and NewHadoopRDD).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is that those are actually two different classes. There's a CombineFileSplit for the old MR API (used by HadoopRDD) and a CombineFileSplit for the new one (used by NewHadoopRDD).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, SparkHadoopUtil) can check for those classes. It can have a matcher on the 4 classes (2 new and 2 old). So the call from hadoopRdd would be something like:
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(split.inputSplit, jobConf)
Not a big deal i guess since in SparkHadoopUtil you'll have four cases but at least that logic is centralized.

@ksakellis
Copy link

This mostly LGTM. My only concern is with the proliferation of copy pasta between the HadoopRDD and NewHadoopRDD.

@SparkQA
Copy link

SparkQA commented Jan 14, 2015

Test build #25567 has finished for PR 4050 at commit 9962dd0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25567/
Test PASSed.

@@ -219,6 +220,9 @@ class HadoopRDD[K, V](
val bytesReadCallback = if (split.inputSplit.value.isInstanceOf[FileSplit]) {
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(
split.inputSplit.value.asInstanceOf[FileSplit].getPath, jobConf)
} else if (split.inputSplit.value.isInstanceOf[CombineFileSplit]) {
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it guaranteed that all paths in the CombineFileSplit have the same filesystem?

Also one related question after I dug around a bit more. Hadoops FileSystem.getAllStatistics() returns a list where you can only distinguish one file system from another via the scheme. What happens if two different hdfs:// filesystems are being read from within the same thread (for instance if two Hadoop RDD's are coalesced)? Is the assumption that this will never happen?

@SparkQA
Copy link

SparkQA commented Jan 20, 2015

Test build #25848 has started for PR 4050 at commit ff8a4cb.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 20, 2015

Test build #25848 has finished for PR 4050 at commit ff8a4cb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25848/
Test PASSed.

@shenh062326
Copy link
Contributor

If we use a inputFormat that don‘t instanc of org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit}, then we can't get information of input metrics.

case _ => None
val bytesReadCallback = inputMetrics.bytesReadCallback.orElse {
val inputSplit = split.inputSplit.value
if (inputSplit.isInstanceOf[FileSplit] || inputSplit.isInstanceOf[CombineFileSplit]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is fine as is, but fyi you can do the same thing in a pattern match:

split.inputSplit.value match {
   case _: FileSplit | _: CombineFileSplit => SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(jobConf)
   case _ => None
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yours looks prettier, will switch it.

@sryza
Copy link
Contributor Author

sryza commented Jan 26, 2015

If we use a inputFormat that don‘t instanc of org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit}, then we can't get information of input metrics.

This is the desired behavior. The input metrics are currently only able to track the bytes read from a Hadoop-compatible file system. Many InputFormats (e.g. DBInputFormat) don't read from Hadoop-compatible file systems, so reporting "bytes read" would be misleading.

@SparkQA
Copy link

SparkQA commented Jan 26, 2015

Test build #26109 has started for PR 4050 at commit 0d504f1.

  • This patch merges cleanly.

val qualifiedPath = path.getFileSystem(conf).makeQualified(path)
val scheme = qualifiedPath.toUri().getScheme()
val stats = FileSystem.getAllStatistics().filter(_.getScheme().equals(scheme))
private def getFileSystemThreadStatistics(conf: Configuration): Seq[AnyRef] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need to take a +conf+ object anymore... can we just remove the +conf+'s throughout this callstack?

@SparkQA
Copy link

SparkQA commented Jan 26, 2015

Test build #26109 has finished for PR 4050 at commit 0d504f1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26109/
Test PASSed.

@pwendell
Copy link
Contributor

Looking good, added some comments.

One thing - could we change the title here to reflect the actual change (maybe we could even open a new JIRA or something). This now is a broader change, something like "Track Hadoop from all filesystems access inside of a task".

Second, we don't actually add a test here for the case that was reported, notably the use of CompbineFileSpits. Can we explicitly test that to make sure we don't regress behavior?

@sryza sryza changed the title SPARK-5199. Input metrics should show up for InputFormats that return Co... SPARK-5199. FS read metrics should support CombineFileSplits and track bytes from all FSs Jan 26, 2015
@sryza
Copy link
Contributor Author

sryza commented Jan 26, 2015

Edited the JIRA title and added tests for the CombineFileSplits. Tested both against Hadoop 2.3 (which doesn't support getFSBytesReadCallback) and Hadoop 2.5 (which does).

@SparkQA
Copy link

SparkQA commented Jan 26, 2015

Test build #26124 has started for PR 4050 at commit 864514b.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 27, 2015

Test build #26124 has finished for PR 4050 at commit 864514b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26124/
Test PASSed.

@pwendell
Copy link
Contributor

Cool - thanks Sandy!

@asfgit asfgit closed this in b1b35ca Jan 27, 2015
@rxin
Copy link
Contributor

rxin commented Feb 2, 2015

Hey FYI, I think this patch is causing some problems. I got the following exception today:

15/02/02 01:12:20 WARN scheduler.TaskSetManager: Lost task 3.0 in stage 0.0 (TID 3, ip-172-31-13-57.us-west-2.compute.internal): java.lang.ClassNotFoundException: org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:191)
    at org.apache.spark.deploy.SparkHadoopUtil.getFileSystemThreadStatisticsMethod(SparkHadoopUtil.scala:180)
    at org.apache.spark.deploy.SparkHadoopUtil.getFSBytesReadOnThreadCallback(SparkHadoopUtil.scala:139)
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1$$anonfun$2.apply(NewHadoopRDD.scala:120)
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1$$anonfun$2.apply(NewHadoopRDD.scala:118)
    at scala.Option.orElse(Option.scala:257)
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:117)
    at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107)
    at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:281)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:248)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:281)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:248)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:281)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:248)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

@sryza
Copy link
Contributor Author

sryza commented Feb 2, 2015

@rxin this should be fixed by SPARK-5492

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
8 participants