diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 21bb47dd0c037..d987b2af3d2e3 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets import java.util.concurrent.{CountDownLatch, Semaphore, TimeUnit} import scala.concurrent.duration._ +import scala.io.Source import com.google.common.io.Files import org.apache.hadoop.conf.Configuration @@ -376,7 +377,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu sc.addFile(file1.getAbsolutePath) def getAddedFileContents(): String = { sc.parallelize(Seq(0)).map { _ => - scala.io.Source.fromFile(SparkFiles.get("file")).mkString + Utils.tryWithResource(Source.fromFile(SparkFiles.get("file")))(_.mkString) }.first() } assert(getAddedFileContents() === "old") diff --git a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala index 84fc16979925b..5d60aad615583 100644 --- a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala @@ -25,7 +25,7 @@ import scala.io.Source import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite} import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded} import org.apache.spark.scheduler.cluster.ExecutorInfo -import org.apache.spark.util.SparkConfWithEnv +import org.apache.spark.util.{SparkConfWithEnv, Utils} class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext { @@ -43,7 +43,7 @@ class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext { assert(info.logUrlMap.nonEmpty) // Browse to each URL to check that it's valid info.logUrlMap.foreach { case (logType, logUrl) => - val html = Source.fromURL(logUrl).mkString + val html = Utils.tryWithResource(Source.fromURL(logUrl))(_.mkString) assert(html.contains(s"$logType log page")) } } diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index 022654139e8e1..3a4a125a9a470 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -48,6 +48,7 @@ import org.apache.spark.resource.{ResourceInformation, ResourceRequirement} import org.apache.spark.resource.ResourceUtils.{FPGA, GPU} import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, RpcEndpointRef, RpcEnv} import org.apache.spark.serializer +import org.apache.spark.util.Utils object MockWorker { val counter = new AtomicInteger(10000) @@ -327,22 +328,25 @@ class MasterSuite extends SparkFunSuite val masterUrl = s"http://localhost:${localCluster.masterWebUIPort}" try { eventually(timeout(5.seconds), interval(100.milliseconds)) { - val json = Source.fromURL(s"$masterUrl/json").getLines().mkString("\n") + val json = Utils + .tryWithResource(Source.fromURL(s"$masterUrl/json"))(_.getLines().mkString("\n")) val JArray(workers) = (parse(json) \ "workers") workers.size should be (2) workers.foreach { workerSummaryJson => val JString(workerWebUi) = workerSummaryJson \ "webuiaddress" - val workerResponse = parse(Source.fromURL(s"${workerWebUi}/json") - .getLines().mkString("\n")) + val workerResponse = parse(Utils + .tryWithResource(Source.fromURL(s"$workerWebUi/json"))(_.getLines().mkString("\n"))) (workerResponse \ "cores").extract[Int] should be (2) } - val html = Source.fromURL(s"$masterUrl/").getLines().mkString("\n") + val html = Utils + .tryWithResource(Source.fromURL(s"$masterUrl/"))(_.getLines().mkString("\n")) html should include ("Spark Master at spark://") val workerLinks = (WORKER_LINK_RE findAllMatchIn html).toList workerLinks.size should be (2) workerLinks foreach { case WORKER_LINK_RE(workerUrl, workerId) => - val workerHtml = Source.fromURL(workerUrl).getLines().mkString("\n") + val workerHtml = Utils + .tryWithResource(Source.fromURL(workerUrl))(_.getLines().mkString("\n")) workerHtml should include ("Spark Worker at") workerHtml should include ("Running Executors (0)") } @@ -361,8 +365,8 @@ class MasterSuite extends SparkFunSuite val masterUrl = s"http://localhost:${localCluster.masterWebUIPort}" try { eventually(timeout(5.seconds), interval(100.milliseconds)) { - val json = Source.fromURL(s"$masterUrl/json") - .getLines().mkString("\n") + val json = Utils + .tryWithResource(Source.fromURL(s"$masterUrl/json"))(_.getLines().mkString("\n")) val JArray(workers) = (parse(json) \ "workers") workers.size should be (2) workers.foreach { workerSummaryJson => @@ -370,11 +374,13 @@ class MasterSuite extends SparkFunSuite // explicitly construct reverse proxy url targeting the master val JString(workerId) = workerSummaryJson \ "id" val url = s"$masterUrl/proxy/${workerId}/json" - val workerResponse = parse(Source.fromURL(url).getLines().mkString("\n")) + val workerResponse = parse( + Utils.tryWithResource(Source.fromURL(url))(_.getLines().mkString("\n"))) (workerResponse \ "cores").extract[Int] should be (2) } - val html = Source.fromURL(s"$masterUrl/").getLines().mkString("\n") + val html = Utils + .tryWithResource(Source.fromURL(s"$masterUrl/"))(_.getLines().mkString("\n")) html should include ("Spark Master at spark://") html should include ("""href="/static""") html should include ("""src="/static""") @@ -397,8 +403,8 @@ class MasterSuite extends SparkFunSuite val masterUrl = s"http://localhost:${localCluster.masterWebUIPort}" try { eventually(timeout(5.seconds), interval(100.milliseconds)) { - val json = Source.fromURL(s"$masterUrl/json") - .getLines().mkString("\n") + val json = Utils + .tryWithResource(Source.fromURL(s"$masterUrl/json"))(_.getLines().mkString("\n")) val JArray(workers) = (parse(json) \ "workers") workers.size should be (2) workers.foreach { workerSummaryJson => @@ -406,7 +412,8 @@ class MasterSuite extends SparkFunSuite // explicitly construct reverse proxy url targeting the master val JString(workerId) = workerSummaryJson \ "id" val url = s"$masterUrl/proxy/${workerId}/json" - val workerResponse = parse(Source.fromURL(url).getLines().mkString("\n")) + val workerResponse = parse(Utils + .tryWithResource(Source.fromURL(url))(_.getLines().mkString("\n"))) (workerResponse \ "cores").extract[Int] should be (2) (workerResponse \ "masterwebuiurl").extract[String] should be (reverseProxyUrl + "/") } @@ -417,7 +424,8 @@ class MasterSuite extends SparkFunSuite System.getProperty("spark.ui.proxyBase") should startWith (s"$reverseProxyUrl/proxy/worker-") System.setProperty("spark.ui.proxyBase", reverseProxyUrl) - val html = Source.fromURL(s"$masterUrl/").getLines().mkString("\n") + val html = Utils + .tryWithResource(Source.fromURL(s"$masterUrl/"))(_.getLines().mkString("\n")) html should include ("Spark Master at spark://") verifyStaticResourcesServedByProxy(html, reverseProxyUrl) verifyWorkerUI(html, masterUrl, reverseProxyUrl) @@ -439,7 +447,8 @@ class MasterSuite extends SparkFunSuite // construct url directly targeting the master val url = s"$masterUrl/proxy/$workerId/" System.setProperty("spark.ui.proxyBase", workerUrl) - val workerHtml = Source.fromURL(url).getLines().mkString("\n") + val workerHtml = Utils + .tryWithResource(Source.fromURL(url))(_.getLines().mkString("\n")) workerHtml should include ("Spark Worker at") workerHtml should include ("Running Executors (0)") verifyStaticResourcesServedByProxy(workerHtml, workerUrl) diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala index d10260ecb291c..015f299fc6bdf 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala @@ -46,6 +46,7 @@ import org.apache.spark.internal.config.Status._ import org.apache.spark.internal.config.UI._ import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.status.api.v1.{JacksonMessageWriter, RDDDataDistribution, StageStatus} +import org.apache.spark.util.Utils private[spark] class SparkUICssErrorHandler extends DefaultCssErrorHandler { @@ -709,8 +710,8 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B rdd.count() eventually(timeout(5.seconds), interval(100.milliseconds)) { - val stage0 = Source.fromURL(sc.ui.get.webUrl + - "/stages/stage/?id=0&attempt=0&expandDagViz=true").mkString + val stage0 = Utils.tryWithResource(Source.fromURL(sc.ui.get.webUrl + + "/stages/stage/?id=0&attempt=0&expandDagViz=true"))(_.mkString) assert(stage0.contains("digraph G {\n subgraph clusterstage_0 {\n " + "label="Stage 0";\n subgraph ")) assert(stage0.contains("{\n label="parallelize";\n " + @@ -720,8 +721,8 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B assert(stage0.contains("{\n label="groupBy";\n " + "2 [labelType="html" label="MapPartitionsRDD [2]")) - val stage1 = Source.fromURL(sc.ui.get.webUrl + - "/stages/stage/?id=1&attempt=0&expandDagViz=true").mkString + val stage1 = Utils.tryWithResource(Source.fromURL(sc.ui.get.webUrl + + "/stages/stage/?id=1&attempt=0&expandDagViz=true"))(_.mkString) assert(stage1.contains("digraph G {\n subgraph clusterstage_1 {\n " + "label="Stage 1";\n subgraph ")) assert(stage1.contains("{\n label="groupBy";\n " + @@ -731,8 +732,8 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B assert(stage1.contains("{\n label="groupBy";\n " + "5 [labelType="html" label="MapPartitionsRDD [5]")) - val stage2 = Source.fromURL(sc.ui.get.webUrl + - "/stages/stage/?id=2&attempt=0&expandDagViz=true").mkString + val stage2 = Utils.tryWithResource(Source.fromURL(sc.ui.get.webUrl + + "/stages/stage/?id=2&attempt=0&expandDagViz=true"))(_.mkString) assert(stage2.contains("digraph G {\n subgraph clusterstage_2 {\n " + "label="Stage 2";\n subgraph ")) assert(stage2.contains("{\n label="groupBy";\n " + diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index c7e1dfe71d563..fb3015e3b08d3 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -76,7 +76,7 @@ class UISuite extends SparkFunSuite { withSpark(newSparkContext()) { sc => // test if the ui is visible, and all the expected tabs are visible eventually(timeout(10.seconds), interval(50.milliseconds)) { - val html = Source.fromURL(sc.ui.get.webUrl).mkString + val html = Utils.tryWithResource(Source.fromURL(sc.ui.get.webUrl))(_.mkString) assert(!html.contains("random data that should not be present")) assert(html.toLowerCase(Locale.ROOT).contains("stages")) assert(html.toLowerCase(Locale.ROOT).contains("storage")) @@ -90,7 +90,7 @@ class UISuite extends SparkFunSuite { withSpark(newSparkContext()) { sc => // test if visible from http://localhost:4040 eventually(timeout(10.seconds), interval(50.milliseconds)) { - val html = Source.fromURL("http://localhost:4040").mkString + val html = Utils.tryWithResource(Source.fromURL("http://localhost:4040"))(_.mkString) assert(html.toLowerCase(Locale.ROOT).contains("stages")) } } diff --git a/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala b/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala index 3063e79704fff..9435b5acd2224 100644 --- a/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala @@ -112,9 +112,9 @@ class MutableURLClassLoaderSuite extends SparkFunSuite with Matchers { val res1 = classLoader.getResources("resource1").asScala.toList assert(res1.size === 2) assert(classLoader.getResources("resource2").asScala.size === 1) - - res1.map(scala.io.Source.fromURL(_).mkString) should contain inOrderOnly - ("resource1Contents-child", "resource1Contents-parent") + res1.map { res => + Utils.tryWithResource(scala.io.Source.fromURL(res))(_.mkString) + } should contain inOrderOnly("resource1Contents-child", "resource1Contents-parent") classLoader.close() parentLoader.close() } diff --git a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala index a73859891a860..323bab427e55e 100644 --- a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path import org.apache.spark.sql.SparkSession +import org.apache.spark.util.Utils /** * Simple test for reading and writing to a distributed @@ -46,9 +47,7 @@ object DFSReadWriteTest { private val NPARAMS = 2 private def readFile(filename: String): List[String] = { - val lineIter: Iterator[String] = fromFile(filename).getLines() - val lineList: List[String] = lineIter.toList - lineList + Utils.tryWithResource(fromFile(filename))(_.getLines().toList) } private def printUsage(): Unit = { diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala index ef902fcab3b50..553ab42b9c8b1 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala @@ -22,6 +22,7 @@ import java.io.File import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.streaming.OffsetSuite import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.util.Utils class KafkaSourceOffsetSuite extends OffsetSuite with SharedSparkSession { @@ -99,7 +100,6 @@ class KafkaSourceOffsetSuite extends OffsetSuite with SharedSparkSession { private def readFromResource(file: String): SerializedOffset = { import scala.io.Source val input = getClass.getResource(s"/$file").toURI - val str = Source.fromFile(input).mkString - SerializedOffset(str) + SerializedOffset(Utils.tryWithResource(Source.fromFile(input))(_.mkString)) } } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index 6a4990e1cd11e..058563dfa167d 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -167,7 +167,8 @@ class KafkaTestUtils( * In this method we rewrite krb5.conf to make kdc and client use the same enctypes */ private def rewriteKrb5Conf(): Unit = { - val krb5Conf = Source.fromFile(kdc.getKrb5conf, "UTF-8").getLines() + val krb5Conf = Utils + .tryWithResource(Source.fromFile(kdc.getKrb5conf, "UTF-8"))(_.getLines().toList) var rewritten = false val addedConfig = addedKrb5Config("default_tkt_enctypes", "aes128-cts-hmac-sha1-96") + diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 222b24ca12dce..9bc934d246fec 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -480,7 +480,7 @@ private object YarnClusterDriver extends Logging with Matchers { executorInfos.foreach { info => assert(info.logUrlMap.nonEmpty) info.logUrlMap.values.foreach { url => - val log = Source.fromURL(url).mkString + val log = Utils.tryWithResource(Source.fromURL(url))(_.mkString) assert( !log.contains(SECRET_PASSWORD), s"Executor logs contain sensitive info (${SECRET_PASSWORD}): \n${log} " @@ -499,7 +499,7 @@ private object YarnClusterDriver extends Logging with Matchers { assert(driverLogs.contains("stdout")) val urlStr = driverLogs("stderr") driverLogs.foreach { kv => - val log = Source.fromURL(kv._2).mkString + val log = Utils.tryWithResource(Source.fromURL(kv._2))(_.mkString) assert( !log.contains(SECRET_PASSWORD), s"Driver logs contain sensitive info (${SECRET_PASSWORD}): \n${log} " diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala index 585ce4e40471d..cf733b8f50cb5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation} import org.apache.spark.sql.catalyst.trees.TreeNodeTag import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.util.Utils case class QueryExecutionTestRecord( c0: Int, c1: Int, c2: Int, c3: Int, c4: Int, @@ -36,8 +37,9 @@ case class QueryExecutionTestRecord( class QueryExecutionSuite extends SharedSparkSession { import testImplicits._ - def checkDumpedPlans(path: String, expected: Int): Unit = { - assert(Source.fromFile(path).getLines.toList + def checkDumpedPlans(path: String, expected: Int): Unit = Utils.tryWithResource( + Source.fromFile(path)) { source => + assert(source.getLines.toList .takeWhile(_ != "== Whole Stage Codegen ==") == List( "== Parsed Logical Plan ==", s"Range (0, $expected, step=1, splits=Some(2))", @@ -99,7 +101,8 @@ class QueryExecutionSuite extends SharedSparkSession { val path = dir.getCanonicalPath + "/plans.txt" val df = spark.range(0, 10) df.queryExecution.debug.toFile(path, explainMode = Option("formatted")) - assert(Source.fromFile(path).getLines.toList + val lines = Utils.tryWithResource(Source.fromFile(path))(_.getLines().toList) + assert(lines .takeWhile(_ != "== Whole Stage Codegen ==").map(_.replaceAll("#\\d+", "#x")) == List( "== Physical Plan ==", s"* Range (1)", @@ -135,9 +138,10 @@ class QueryExecutionSuite extends SharedSparkSession { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26))) ds.queryExecution.debug.toFile(path) - val localRelations = Source.fromFile(path).getLines().filter(_.contains("LocalRelation")) - - assert(!localRelations.exists(_.contains("more fields"))) + Utils.tryWithResource(Source.fromFile(path)) { source => + val localRelations = source.getLines().filter(_.contains("LocalRelation")) + assert(!localRelations.exists(_.contains("more fields"))) + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 6b9fa9c968fb4..ff00c474e2ef0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1532,8 +1532,10 @@ class FileStreamSourceSuite extends FileStreamSourceTest { private def readOffsetFromResource(file: String): SerializedOffset = { import scala.io.Source - val str = Source.fromFile(getClass.getResource(s"/structured-streaming/$file").toURI).mkString - SerializedOffset(str.trim) + Utils.tryWithResource( + Source.fromFile(getClass.getResource(s"/structured-streaming/$file").toURI)) { source => + SerializedOffset(source.mkString.trim) + } } private def runTwoBatchesAndVerifyResults( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala index fe87941d8c713..766edcae6e4a1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala @@ -242,8 +242,9 @@ object PROCESS_TABLES extends QueryTest with SQLTestUtils { // Tests the latest version of every release line. val testingVersions: Seq[String] = { import scala.io.Source - val versions: Seq[String] = try { - Source.fromURL(s"${releaseMirror}/spark").mkString + val versions: Seq[String] = try Utils.tryWithResource( + Source.fromURL(s"$releaseMirror/spark")) { source => + source.mkString .split("\n") .filter(_.contains("""