Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,13 @@ class SparkHadoopUtil extends Logging {
private def getFileSystemThreadStatistics(path: Path, conf: Configuration): Seq[AnyRef] = {
val qualifiedPath = path.getFileSystem(conf).makeQualified(path)
val scheme = qualifiedPath.toUri().getScheme()
val stats = FileSystem.getAllStatistics().filter(_.getScheme().equals(scheme))
stats.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
if (scheme == null) {
Seq.empty
} else {
FileSystem.getAllStatistics
.filter { stats => scheme.equals(stats.getScheme()) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tiny nit: be consistent with parentheses for methods with empty args (getAllStatistics missing them, getScheme and toUri have them)
Tiny nit: do you think (scheme.equals(_.getScheme())) would be less clear?

.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
}
}

private def getFileSystemThreadStatisticsMethod(methodName: String): Method = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.io.{FileWriter, PrintWriter, File}
import org.apache.spark.SharedSparkContext
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler.{SparkListenerTaskEnd, SparkListener}
import org.apache.spark.util.Utils

import org.scalatest.FunSuite
import org.scalatest.matchers.ShouldMatchers
Expand Down Expand Up @@ -106,4 +107,23 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext with Shou
}
}
}

test("getFileSystemThreadStatistics should guard against null schemes (SPARK-8062)") {
val tempDir = Utils.createTempDir()
val outPath = new File(tempDir, "outfile")

// Intentionally call this method with a null scheme, which will store an entry for a FileSystem
// with a null scheme into Hadoop's global `FileSystem.statisticsTable`.
FileSystem.getStatistics(null, classOf[FileSystem])

// Prior to fixing SPARK-8062, this would fail with a NullPointerException in
// SparkHadoopUtil.getFileSystemThreadStatistics
try {
val rdd = sc.parallelize(Array("a", "b", "c", "d"), 2)
rdd.saveAsTextFile(outPath.toString)
sc.textFile(outPath.toString).count()
} finally {
Utils.deleteRecursively(tempDir)
}
}
}