From f9a34ba8de20c9f600f08518b77eb9477fb70d93 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 25 Jan 2021 19:20:14 +0800 Subject: [PATCH 01/13] fix not closed --- .../org/apache/spark/SparkContextSuite.scala | 4 +- .../spark/deploy/LogUrlsStandaloneSuite.scala | 8 +-- .../spark/deploy/master/MasterSuite.scala | 50 ++++++++++----- .../org/apache/spark/ui/UISeleniumSuite.scala | 19 ++++-- .../scala/org/apache/spark/ui/UISuite.scala | 8 ++- .../util/MutableURLClassLoaderSuite.scala | 8 ++- .../sql/kafka010/KafkaSourceOffsetSuite.scala | 8 ++- .../spark/sql/kafka010/KafkaTestUtils.scala | 4 +- .../spark/deploy/yarn/YarnClusterSuite.scala | 8 ++- .../sql/execution/QueryExecutionSuite.scala | 62 ++++++++++--------- .../sql/streaming/FileStreamSourceSuite.scala | 7 ++- .../HiveExternalCatalogVersionsSuite.scala | 14 +++-- .../spark/streaming/util/RawTextSender.scala | 8 +-- 13 files changed, 132 insertions(+), 76 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 21bb47dd0c037..91fc357ae1e3d 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -376,7 +376,9 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu sc.addFile(file1.getAbsolutePath) def getAddedFileContents(): String = { sc.parallelize(Seq(0)).map { _ => - scala.io.Source.fromFile(SparkFiles.get("file")).mkString + Utils.tryWithResource(scala.io.Source.fromFile(SparkFiles.get("file"))) { data => + data.mkString + } }.first() } assert(getAddedFileContents() === "old") diff --git a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala index 84fc16979925b..f6aaadf51b918 100644 --- a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala @@ -18,14 +18,12 @@ package org.apache.spark.deploy import java.net.URL - import scala.collection.mutable import scala.io.Source - import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite} import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded} import org.apache.spark.scheduler.cluster.ExecutorInfo -import org.apache.spark.util.SparkConfWithEnv +import org.apache.spark.util.{SparkConfWithEnv, Utils} class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext { @@ -43,7 +41,9 @@ class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext { assert(info.logUrlMap.nonEmpty) // Browse to each URL to check that it's valid info.logUrlMap.foreach { case (logType, logUrl) => - val html = Source.fromURL(logUrl).mkString + val html = Utils.tryWithResource(Source.fromURL(logUrl)) { data => + data.mkString + } assert(html.contains(s"$logType log page")) } } diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index 022654139e8e1..3a0dfe6059d57 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -48,6 +48,7 @@ import org.apache.spark.resource.{ResourceInformation, ResourceRequirement} import org.apache.spark.resource.ResourceUtils.{FPGA, GPU} import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, RpcEndpointRef, RpcEnv} import org.apache.spark.serializer +import org.apache.spark.util.Utils object MockWorker { val counter = new AtomicInteger(10000) @@ -327,22 +328,31 @@ class MasterSuite extends SparkFunSuite val masterUrl = s"http://localhost:${localCluster.masterWebUIPort}" try { eventually(timeout(5.seconds), interval(100.milliseconds)) { - val json = Source.fromURL(s"$masterUrl/json").getLines().mkString("\n") + val json = Utils.tryWithResource( + Source.fromURL(s"$masterUrl/json")) { data => + data.getLines().mkString("\n") + } val JArray(workers) = (parse(json) \ "workers") workers.size should be (2) workers.foreach { workerSummaryJson => val JString(workerWebUi) = workerSummaryJson \ "webuiaddress" - val workerResponse = parse(Source.fromURL(s"${workerWebUi}/json") - .getLines().mkString("\n")) + val workerResponse = Utils.tryWithResource( + Source.fromURL(s"$workerWebUi/json")) { data => + parse(data.getLines().mkString("\n")) + } (workerResponse \ "cores").extract[Int] should be (2) } - val html = Source.fromURL(s"$masterUrl/").getLines().mkString("\n") + val html = Utils.tryWithResource(Source.fromURL(s"$masterUrl/")) { data => + data.getLines().mkString("\n") + } html should include ("Spark Master at spark://") val workerLinks = (WORKER_LINK_RE findAllMatchIn html).toList workerLinks.size should be (2) workerLinks foreach { case WORKER_LINK_RE(workerUrl, workerId) => - val workerHtml = Source.fromURL(workerUrl).getLines().mkString("\n") + val workerHtml = Utils.tryWithResource(Source.fromURL(workerUrl)) { data => + data.getLines().mkString("\n") + } workerHtml should include ("Spark Worker at") workerHtml should include ("Running Executors (0)") } @@ -361,8 +371,9 @@ class MasterSuite extends SparkFunSuite val masterUrl = s"http://localhost:${localCluster.masterWebUIPort}" try { eventually(timeout(5.seconds), interval(100.milliseconds)) { - val json = Source.fromURL(s"$masterUrl/json") - .getLines().mkString("\n") + val json = Utils.tryWithResource(Source.fromURL(s"$masterUrl/json")) { data => + data.getLines().mkString("\n") + } val JArray(workers) = (parse(json) \ "workers") workers.size should be (2) workers.foreach { workerSummaryJson => @@ -370,11 +381,15 @@ class MasterSuite extends SparkFunSuite // explicitly construct reverse proxy url targeting the master val JString(workerId) = workerSummaryJson \ "id" val url = s"$masterUrl/proxy/${workerId}/json" - val workerResponse = parse(Source.fromURL(url).getLines().mkString("\n")) + val workerResponse = Utils.tryWithResource(Source.fromURL(url)) { data => + parse(data.getLines().mkString("\n")) + } (workerResponse \ "cores").extract[Int] should be (2) } - val html = Source.fromURL(s"$masterUrl/").getLines().mkString("\n") + val html = Utils.tryWithResource(Source.fromURL(s"$masterUrl/")) { data => + data.getLines().mkString("\n") + } html should include ("Spark Master at spark://") html should include ("""href="/static""") html should include ("""src="/static""") @@ -397,8 +412,9 @@ class MasterSuite extends SparkFunSuite val masterUrl = s"http://localhost:${localCluster.masterWebUIPort}" try { eventually(timeout(5.seconds), interval(100.milliseconds)) { - val json = Source.fromURL(s"$masterUrl/json") - .getLines().mkString("\n") + val json = Utils.tryWithResource( Source.fromURL(s"$masterUrl/json")) { data => + data.getLines().mkString("\n") + } val JArray(workers) = (parse(json) \ "workers") workers.size should be (2) workers.foreach { workerSummaryJson => @@ -406,7 +422,9 @@ class MasterSuite extends SparkFunSuite // explicitly construct reverse proxy url targeting the master val JString(workerId) = workerSummaryJson \ "id" val url = s"$masterUrl/proxy/${workerId}/json" - val workerResponse = parse(Source.fromURL(url).getLines().mkString("\n")) + val workerResponse = Utils.tryWithResource(Source.fromURL(url)) { data => + parse(data.getLines().mkString("\n")) + } (workerResponse \ "cores").extract[Int] should be (2) (workerResponse \ "masterwebuiurl").extract[String] should be (reverseProxyUrl + "/") } @@ -417,7 +435,9 @@ class MasterSuite extends SparkFunSuite System.getProperty("spark.ui.proxyBase") should startWith (s"$reverseProxyUrl/proxy/worker-") System.setProperty("spark.ui.proxyBase", reverseProxyUrl) - val html = Source.fromURL(s"$masterUrl/").getLines().mkString("\n") + val html = Utils.tryWithResource(Source.fromURL(s"$masterUrl/")) { data => + data.mkString("\n") + } html should include ("Spark Master at spark://") verifyStaticResourcesServedByProxy(html, reverseProxyUrl) verifyWorkerUI(html, masterUrl, reverseProxyUrl) @@ -439,7 +459,9 @@ class MasterSuite extends SparkFunSuite // construct url directly targeting the master val url = s"$masterUrl/proxy/$workerId/" System.setProperty("spark.ui.proxyBase", workerUrl) - val workerHtml = Source.fromURL(url).getLines().mkString("\n") + val workerHtml = Utils.tryWithResource(Source.fromURL(url)) { data => + data.getLines().mkString("\n") + } workerHtml should include ("Spark Worker at") workerHtml should include ("Running Executors (0)") verifyStaticResourcesServedByProxy(workerHtml, workerUrl) diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala index d10260ecb291c..4d9d6f23653c1 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala @@ -46,6 +46,7 @@ import org.apache.spark.internal.config.Status._ import org.apache.spark.internal.config.UI._ import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.status.api.v1.{JacksonMessageWriter, RDDDataDistribution, StageStatus} +import org.apache.spark.util.Utils private[spark] class SparkUICssErrorHandler extends DefaultCssErrorHandler { @@ -709,8 +710,10 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B rdd.count() eventually(timeout(5.seconds), interval(100.milliseconds)) { - val stage0 = Source.fromURL(sc.ui.get.webUrl + - "/stages/stage/?id=0&attempt=0&expandDagViz=true").mkString + val stage0 = Utils.tryWithResource(Source.fromURL(sc.ui.get.webUrl + + "/stages/stage/?id=0&attempt=0&expandDagViz=true")) { data => + data.mkString + } assert(stage0.contains("digraph G {\n subgraph clusterstage_0 {\n " + "label="Stage 0";\n subgraph ")) assert(stage0.contains("{\n label="parallelize";\n " + @@ -720,8 +723,10 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B assert(stage0.contains("{\n label="groupBy";\n " + "2 [labelType="html" label="MapPartitionsRDD [2]")) - val stage1 = Source.fromURL(sc.ui.get.webUrl + - "/stages/stage/?id=1&attempt=0&expandDagViz=true").mkString + val stage1 = Utils.tryWithResource(Source.fromURL(sc.ui.get.webUrl + + "/stages/stage/?id=1&attempt=0&expandDagViz=true")) { data => + data.mkString + } assert(stage1.contains("digraph G {\n subgraph clusterstage_1 {\n " + "label="Stage 1";\n subgraph ")) assert(stage1.contains("{\n label="groupBy";\n " + @@ -731,8 +736,10 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B assert(stage1.contains("{\n label="groupBy";\n " + "5 [labelType="html" label="MapPartitionsRDD [5]")) - val stage2 = Source.fromURL(sc.ui.get.webUrl + - "/stages/stage/?id=2&attempt=0&expandDagViz=true").mkString + val stage2 = Utils.tryWithResource(Source.fromURL(sc.ui.get.webUrl + + "/stages/stage/?id=2&attempt=0&expandDagViz=true")) { data => + data.mkString + } assert(stage2.contains("digraph G {\n subgraph clusterstage_2 {\n " + "label="Stage 2";\n subgraph ")) assert(stage2.contains("{\n label="groupBy";\n " + diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index c7e1dfe71d563..1fd0de5479fa8 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -76,7 +76,9 @@ class UISuite extends SparkFunSuite { withSpark(newSparkContext()) { sc => // test if the ui is visible, and all the expected tabs are visible eventually(timeout(10.seconds), interval(50.milliseconds)) { - val html = Source.fromURL(sc.ui.get.webUrl).mkString + val html = Utils.tryWithResource(Source.fromURL(sc.ui.get.webUrl)) { data => + data.mkString + } assert(!html.contains("random data that should not be present")) assert(html.toLowerCase(Locale.ROOT).contains("stages")) assert(html.toLowerCase(Locale.ROOT).contains("storage")) @@ -90,7 +92,9 @@ class UISuite extends SparkFunSuite { withSpark(newSparkContext()) { sc => // test if visible from http://localhost:4040 eventually(timeout(10.seconds), interval(50.milliseconds)) { - val html = Source.fromURL("http://localhost:4040").mkString + val html = Utils.tryWithResource(Source.fromURL("http://localhost:4040")) { data => + data.mkString + } assert(html.toLowerCase(Locale.ROOT).contains("stages")) } } diff --git a/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala b/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala index 3063e79704fff..ad54a9d570550 100644 --- a/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala @@ -112,9 +112,11 @@ class MutableURLClassLoaderSuite extends SparkFunSuite with Matchers { val res1 = classLoader.getResources("resource1").asScala.toList assert(res1.size === 2) assert(classLoader.getResources("resource2").asScala.size === 1) - - res1.map(scala.io.Source.fromURL(_).mkString) should contain inOrderOnly - ("resource1Contents-child", "resource1Contents-parent") + res1.map { res => + Utils.tryWithResource(scala.io.Source.fromURL(res)) { data => + data.mkString + } + } should contain inOrderOnly("resource1Contents-child", "resource1Contents-parent") classLoader.close() parentLoader.close() } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala index ef902fcab3b50..c2c7535ee94e0 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala @@ -18,10 +18,10 @@ package org.apache.spark.sql.kafka010 import java.io.File - import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.streaming.OffsetSuite import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.util.Utils class KafkaSourceOffsetSuite extends OffsetSuite with SharedSparkSession { @@ -99,7 +99,9 @@ class KafkaSourceOffsetSuite extends OffsetSuite with SharedSparkSession { private def readFromResource(file: String): SerializedOffset = { import scala.io.Source val input = getClass.getResource(s"/$file").toURI - val str = Source.fromFile(input).mkString - SerializedOffset(str) + Utils.tryWithResource(Source.fromFile(input)) { data => + val str = data.mkString + SerializedOffset(str) + } } } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index 6a4990e1cd11e..d5cfb40812934 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -167,7 +167,9 @@ class KafkaTestUtils( * In this method we rewrite krb5.conf to make kdc and client use the same enctypes */ private def rewriteKrb5Conf(): Unit = { - val krb5Conf = Source.fromFile(kdc.getKrb5conf, "UTF-8").getLines() + val krb5Conf = Utils.tryWithResource(Source.fromFile(kdc.getKrb5conf, "UTF-8")) { data => + data.getLines() + } var rewritten = false val addedConfig = addedKrb5Config("default_tkt_enctypes", "aes128-cts-hmac-sha1-96") + diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 222b24ca12dce..d455bbf941636 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -480,7 +480,9 @@ private object YarnClusterDriver extends Logging with Matchers { executorInfos.foreach { info => assert(info.logUrlMap.nonEmpty) info.logUrlMap.values.foreach { url => - val log = Source.fromURL(url).mkString + val log = Utils.tryWithResource(Source.fromURL(url)) { data => + data.mkString + } assert( !log.contains(SECRET_PASSWORD), s"Executor logs contain sensitive info (${SECRET_PASSWORD}): \n${log} " @@ -499,7 +501,9 @@ private object YarnClusterDriver extends Logging with Matchers { assert(driverLogs.contains("stdout")) val urlStr = driverLogs("stderr") driverLogs.foreach { kv => - val log = Source.fromURL(kv._2).mkString + val log = Utils.tryWithResource(Source.fromURL(kv._2)) { data => + data.mkString + } assert( !log.contains(SECRET_PASSWORD), s"Driver logs contain sensitive info (${SECRET_PASSWORD}): \n${log} " diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala index 585ce4e40471d..df7dfb690234b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala @@ -17,13 +17,13 @@ package org.apache.spark.sql.execution import scala.io.Source - import org.apache.spark.sql.{AnalysisException, FastOperator} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation} import org.apache.spark.sql.catalyst.trees.TreeNodeTag import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.util.Utils case class QueryExecutionTestRecord( c0: Int, c1: Int, c2: Int, c3: Int, c4: Int, @@ -37,21 +37,23 @@ class QueryExecutionSuite extends SharedSparkSession { import testImplicits._ def checkDumpedPlans(path: String, expected: Int): Unit = { - assert(Source.fromFile(path).getLines.toList - .takeWhile(_ != "== Whole Stage Codegen ==") == List( - "== Parsed Logical Plan ==", - s"Range (0, $expected, step=1, splits=Some(2))", - "", - "== Analyzed Logical Plan ==", - "id: bigint", - s"Range (0, $expected, step=1, splits=Some(2))", - "", - "== Optimized Logical Plan ==", - s"Range (0, $expected, step=1, splits=Some(2))", - "", - "== Physical Plan ==", - s"*(1) Range (0, $expected, step=1, splits=2)", - "")) + Utils.tryWithResource(Source.fromFile(path)) { data => + assert(data.getLines.toList + .takeWhile(_ != "== Whole Stage Codegen ==") == List( + "== Parsed Logical Plan ==", + s"Range (0, $expected, step=1, splits=Some(2))", + "", + "== Analyzed Logical Plan ==", + "id: bigint", + s"Range (0, $expected, step=1, splits=Some(2))", + "", + "== Optimized Logical Plan ==", + s"Range (0, $expected, step=1, splits=Some(2))", + "", + "== Physical Plan ==", + s"*(1) Range (0, $expected, step=1, splits=2)", + "")) + } } test("dumping query execution info to a file") { @@ -99,17 +101,19 @@ class QueryExecutionSuite extends SharedSparkSession { val path = dir.getCanonicalPath + "/plans.txt" val df = spark.range(0, 10) df.queryExecution.debug.toFile(path, explainMode = Option("formatted")) - assert(Source.fromFile(path).getLines.toList - .takeWhile(_ != "== Whole Stage Codegen ==").map(_.replaceAll("#\\d+", "#x")) == List( - "== Physical Plan ==", - s"* Range (1)", - "", - "", - s"(1) Range [codegen id : 1]", - "Output [1]: [id#xL]", - s"Arguments: Range (0, 10, step=1, splits=Some(2))", - "", - "")) + Utils.tryWithResource(Source.fromFile(path)) { data => + assert(data.getLines.toList + .takeWhile(_ != "== Whole Stage Codegen ==").map(_.replaceAll("#\\d+", "#x")) == List( + "== Physical Plan ==", + s"* Range (1)", + "", + "", + s"(1) Range [codegen id : 1]", + "Output [1]: [id#xL]", + s"Arguments: Range (0, 10, step=1, splits=Some(2))", + "", + "")) + } } } @@ -135,7 +139,9 @@ class QueryExecutionSuite extends SharedSparkSession { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26))) ds.queryExecution.debug.toFile(path) - val localRelations = Source.fromFile(path).getLines().filter(_.contains("LocalRelation")) + val localRelations = Utils.tryWithResource(Source.fromFile(path)) { data => + data.getLines().filter(_.contains("LocalRelation")) + } assert(!localRelations.exists(_.contains("more fields"))) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 6b9fa9c968fb4..4beaeae130db7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1532,8 +1532,11 @@ class FileStreamSourceSuite extends FileStreamSourceTest { private def readOffsetFromResource(file: String): SerializedOffset = { import scala.io.Source - val str = Source.fromFile(getClass.getResource(s"/structured-streaming/$file").toURI).mkString - SerializedOffset(str.trim) + Utils.tryWithResource( + Source.fromFile(getClass.getResource(s"/structured-streaming/$file").toURI)) { data => + val str = data.mkString + SerializedOffset(str.trim) + } } private def runTwoBatchesAndVerifyResults( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala index fe87941d8c713..05b5439056007 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala @@ -243,12 +243,14 @@ object PROCESS_TABLES extends QueryTest with SQLTestUtils { val testingVersions: Seq[String] = { import scala.io.Source val versions: Seq[String] = try { - Source.fromURL(s"${releaseMirror}/spark").mkString - .split("\n") - .filter(_.contains("""""".r.findFirstMatchIn(_).get.group(1)) - .filter(_ < org.apache.spark.SPARK_VERSION) + Utils.tryWithResource(Source.fromURL(s"$releaseMirror/spark")) { bufferedSource => + bufferedSource.mkString + .split("\n") + .filter(_.contains("""""".r.findFirstMatchIn(_).get.group(1)) + .filter(_ < org.apache.spark.SPARK_VERSION) + } } catch { // do not throw exception during object initialization. case NonFatal(_) => Seq("3.0.1", "2.4.7") // A temporary fallback to use a specific version diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala index 5d4fcf8bd1596..0473524802503 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala @@ -20,13 +20,11 @@ package org.apache.spark.streaming.util import java.io.{ByteArrayOutputStream, IOException} import java.net.ServerSocket import java.nio.ByteBuffer - import scala.io.Source - import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.serializer.KryoSerializer -import org.apache.spark.util.IntParam +import org.apache.spark.util.{IntParam, Utils} /** * A helper program that sends blocks of Kryo-serialized text strings out on a socket at a @@ -45,7 +43,9 @@ object RawTextSender extends Logging { val Array(IntParam(port), file, IntParam(blockSize), IntParam(bytesPerSec)) = args // Repeat the input data multiple times to fill in a buffer - val lines = Source.fromFile(file).getLines().toArray + val lines = Utils.tryWithResource(Source.fromFile(file)) { data => + data.getLines().toArray + } val bufferStream = new ByteArrayOutputStream(blockSize + 1000) val ser = new KryoSerializer(new SparkConf()).newInstance() val serStream = ser.serializeStream(bufferStream) From b6bd1f19ede73e2b8038811483a1c40fe6e2abc5 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 25 Jan 2021 21:26:42 +0800 Subject: [PATCH 02/13] rename var --- .../org/apache/spark/SparkContextSuite.scala | 5 ++- .../spark/deploy/LogUrlsStandaloneSuite.scala | 6 ++- .../spark/deploy/master/MasterSuite.scala | 40 +++++++++---------- .../org/apache/spark/ui/UISeleniumSuite.scala | 12 +++--- .../scala/org/apache/spark/ui/UISuite.scala | 8 ++-- .../util/MutableURLClassLoaderSuite.scala | 4 +- .../spark/examples/DFSReadWriteTest.scala | 6 ++- .../sql/kafka010/KafkaSourceOffsetSuite.scala | 5 ++- .../spark/sql/kafka010/KafkaTestUtils.scala | 4 +- .../spark/deploy/yarn/YarnClusterSuite.scala | 8 ++-- .../sql/execution/QueryExecutionSuite.scala | 4 +- .../sql/streaming/FileStreamSourceSuite.scala | 4 +- .../HiveExternalCatalogVersionsSuite.scala | 4 +- .../spark/streaming/util/RawTextSender.scala | 6 ++- 14 files changed, 63 insertions(+), 53 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 91fc357ae1e3d..e640a89216be9 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets import java.util.concurrent.{CountDownLatch, Semaphore, TimeUnit} import scala.concurrent.duration._ +import scala.io.Source import com.google.common.io.Files import org.apache.hadoop.conf.Configuration @@ -376,8 +377,8 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu sc.addFile(file1.getAbsolutePath) def getAddedFileContents(): String = { sc.parallelize(Seq(0)).map { _ => - Utils.tryWithResource(scala.io.Source.fromFile(SparkFiles.get("file"))) { data => - data.mkString + Utils.tryWithResource(Source.fromFile(SparkFiles.get("file"))) { source => + source.mkString } }.first() } diff --git a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala index f6aaadf51b918..370e68945115b 100644 --- a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala @@ -18,8 +18,10 @@ package org.apache.spark.deploy import java.net.URL + import scala.collection.mutable import scala.io.Source + import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite} import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded} import org.apache.spark.scheduler.cluster.ExecutorInfo @@ -41,8 +43,8 @@ class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext { assert(info.logUrlMap.nonEmpty) // Browse to each URL to check that it's valid info.logUrlMap.foreach { case (logType, logUrl) => - val html = Utils.tryWithResource(Source.fromURL(logUrl)) { data => - data.mkString + val html = Utils.tryWithResource(Source.fromURL(logUrl)) { source => + source.mkString } assert(html.contains(s"$logType log page")) } diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index 3a0dfe6059d57..eb854cc3e2a91 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -329,29 +329,29 @@ class MasterSuite extends SparkFunSuite try { eventually(timeout(5.seconds), interval(100.milliseconds)) { val json = Utils.tryWithResource( - Source.fromURL(s"$masterUrl/json")) { data => - data.getLines().mkString("\n") + Source.fromURL(s"$masterUrl/json")) { source => + source.getLines().mkString("\n") } val JArray(workers) = (parse(json) \ "workers") workers.size should be (2) workers.foreach { workerSummaryJson => val JString(workerWebUi) = workerSummaryJson \ "webuiaddress" val workerResponse = Utils.tryWithResource( - Source.fromURL(s"$workerWebUi/json")) { data => - parse(data.getLines().mkString("\n")) + Source.fromURL(s"$workerWebUi/json")) { source => + parse(source.getLines().mkString("\n")) } (workerResponse \ "cores").extract[Int] should be (2) } - val html = Utils.tryWithResource(Source.fromURL(s"$masterUrl/")) { data => - data.getLines().mkString("\n") + val html = Utils.tryWithResource(Source.fromURL(s"$masterUrl/")) { source => + source.getLines().mkString("\n") } html should include ("Spark Master at spark://") val workerLinks = (WORKER_LINK_RE findAllMatchIn html).toList workerLinks.size should be (2) workerLinks foreach { case WORKER_LINK_RE(workerUrl, workerId) => - val workerHtml = Utils.tryWithResource(Source.fromURL(workerUrl)) { data => - data.getLines().mkString("\n") + val workerHtml = Utils.tryWithResource(Source.fromURL(workerUrl)) { source => + source.getLines().mkString("\n") } workerHtml should include ("Spark Worker at") workerHtml should include ("Running Executors (0)") @@ -371,8 +371,8 @@ class MasterSuite extends SparkFunSuite val masterUrl = s"http://localhost:${localCluster.masterWebUIPort}" try { eventually(timeout(5.seconds), interval(100.milliseconds)) { - val json = Utils.tryWithResource(Source.fromURL(s"$masterUrl/json")) { data => - data.getLines().mkString("\n") + val json = Utils.tryWithResource(Source.fromURL(s"$masterUrl/json")) { source => + source.getLines().mkString("\n") } val JArray(workers) = (parse(json) \ "workers") workers.size should be (2) @@ -381,14 +381,14 @@ class MasterSuite extends SparkFunSuite // explicitly construct reverse proxy url targeting the master val JString(workerId) = workerSummaryJson \ "id" val url = s"$masterUrl/proxy/${workerId}/json" - val workerResponse = Utils.tryWithResource(Source.fromURL(url)) { data => - parse(data.getLines().mkString("\n")) + val workerResponse = Utils.tryWithResource(Source.fromURL(url)) { source => + parse(source.getLines().mkString("\n")) } (workerResponse \ "cores").extract[Int] should be (2) } - val html = Utils.tryWithResource(Source.fromURL(s"$masterUrl/")) { data => - data.getLines().mkString("\n") + val html = Utils.tryWithResource(Source.fromURL(s"$masterUrl/")) { source => + source.getLines().mkString("\n") } html should include ("Spark Master at spark://") html should include ("""href="/static""") @@ -412,8 +412,8 @@ class MasterSuite extends SparkFunSuite val masterUrl = s"http://localhost:${localCluster.masterWebUIPort}" try { eventually(timeout(5.seconds), interval(100.milliseconds)) { - val json = Utils.tryWithResource( Source.fromURL(s"$masterUrl/json")) { data => - data.getLines().mkString("\n") + val json = Utils.tryWithResource( Source.fromURL(s"$masterUrl/json")) { source => + source.getLines().mkString("\n") } val JArray(workers) = (parse(json) \ "workers") workers.size should be (2) @@ -422,8 +422,8 @@ class MasterSuite extends SparkFunSuite // explicitly construct reverse proxy url targeting the master val JString(workerId) = workerSummaryJson \ "id" val url = s"$masterUrl/proxy/${workerId}/json" - val workerResponse = Utils.tryWithResource(Source.fromURL(url)) { data => - parse(data.getLines().mkString("\n")) + val workerResponse = Utils.tryWithResource(Source.fromURL(url)) { source => + parse(source.getLines().mkString("\n")) } (workerResponse \ "cores").extract[Int] should be (2) (workerResponse \ "masterwebuiurl").extract[String] should be (reverseProxyUrl + "/") @@ -459,8 +459,8 @@ class MasterSuite extends SparkFunSuite // construct url directly targeting the master val url = s"$masterUrl/proxy/$workerId/" System.setProperty("spark.ui.proxyBase", workerUrl) - val workerHtml = Utils.tryWithResource(Source.fromURL(url)) { data => - data.getLines().mkString("\n") + val workerHtml = Utils.tryWithResource(Source.fromURL(url)) { source => + source.getLines().mkString("\n") } workerHtml should include ("Spark Worker at") workerHtml should include ("Running Executors (0)") diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala index 4d9d6f23653c1..9c61c5e27508e 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala @@ -711,8 +711,8 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B eventually(timeout(5.seconds), interval(100.milliseconds)) { val stage0 = Utils.tryWithResource(Source.fromURL(sc.ui.get.webUrl + - "/stages/stage/?id=0&attempt=0&expandDagViz=true")) { data => - data.mkString + "/stages/stage/?id=0&attempt=0&expandDagViz=true")) { source => + source.mkString } assert(stage0.contains("digraph G {\n subgraph clusterstage_0 {\n " + "label="Stage 0";\n subgraph ")) @@ -724,8 +724,8 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B "2 [labelType="html" label="MapPartitionsRDD [2]")) val stage1 = Utils.tryWithResource(Source.fromURL(sc.ui.get.webUrl + - "/stages/stage/?id=1&attempt=0&expandDagViz=true")) { data => - data.mkString + "/stages/stage/?id=1&attempt=0&expandDagViz=true")) { source => + source.mkString } assert(stage1.contains("digraph G {\n subgraph clusterstage_1 {\n " + "label="Stage 1";\n subgraph ")) @@ -737,8 +737,8 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B "5 [labelType="html" label="MapPartitionsRDD [5]")) val stage2 = Utils.tryWithResource(Source.fromURL(sc.ui.get.webUrl + - "/stages/stage/?id=2&attempt=0&expandDagViz=true")) { data => - data.mkString + "/stages/stage/?id=2&attempt=0&expandDagViz=true")) { source => + source.mkString } assert(stage2.contains("digraph G {\n subgraph clusterstage_2 {\n " + "label="Stage 2";\n subgraph ")) diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index 1fd0de5479fa8..3b67489409699 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -76,8 +76,8 @@ class UISuite extends SparkFunSuite { withSpark(newSparkContext()) { sc => // test if the ui is visible, and all the expected tabs are visible eventually(timeout(10.seconds), interval(50.milliseconds)) { - val html = Utils.tryWithResource(Source.fromURL(sc.ui.get.webUrl)) { data => - data.mkString + val html = Utils.tryWithResource(Source.fromURL(sc.ui.get.webUrl)) { source => + source.mkString } assert(!html.contains("random data that should not be present")) assert(html.toLowerCase(Locale.ROOT).contains("stages")) @@ -92,8 +92,8 @@ class UISuite extends SparkFunSuite { withSpark(newSparkContext()) { sc => // test if visible from http://localhost:4040 eventually(timeout(10.seconds), interval(50.milliseconds)) { - val html = Utils.tryWithResource(Source.fromURL("http://localhost:4040")) { data => - data.mkString + val html = Utils.tryWithResource(Source.fromURL("http://localhost:4040")) { source => + source.mkString } assert(html.toLowerCase(Locale.ROOT).contains("stages")) } diff --git a/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala b/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala index ad54a9d570550..eea90bc825260 100644 --- a/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala @@ -113,8 +113,8 @@ class MutableURLClassLoaderSuite extends SparkFunSuite with Matchers { assert(res1.size === 2) assert(classLoader.getResources("resource2").asScala.size === 1) res1.map { res => - Utils.tryWithResource(scala.io.Source.fromURL(res)) { data => - data.mkString + Utils.tryWithResource(scala.io.Source.fromURL(res)) { source => + source.mkString } } should contain inOrderOnly("resource1Contents-child", "resource1Contents-parent") classLoader.close() diff --git a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala index a73859891a860..30bde99f8ae78 100644 --- a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path import org.apache.spark.sql.SparkSession +import org.apache.spark.util.Utils /** * Simple test for reading and writing to a distributed @@ -46,7 +47,10 @@ object DFSReadWriteTest { private val NPARAMS = 2 private def readFile(filename: String): List[String] = { - val lineIter: Iterator[String] = fromFile(filename).getLines() + val lineIter: Iterator[String] = + Utils.tryWithResource(fromFile(filename)) { source => + source.getLines() + } val lineList: List[String] = lineIter.toList lineList } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala index c2c7535ee94e0..ebf432d917c3c 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.kafka010 import java.io.File + import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.streaming.OffsetSuite import org.apache.spark.sql.test.SharedSparkSession @@ -99,8 +100,8 @@ class KafkaSourceOffsetSuite extends OffsetSuite with SharedSparkSession { private def readFromResource(file: String): SerializedOffset = { import scala.io.Source val input = getClass.getResource(s"/$file").toURI - Utils.tryWithResource(Source.fromFile(input)) { data => - val str = data.mkString + Utils.tryWithResource(Source.fromFile(input)) { source => + val str = source.mkString SerializedOffset(str) } } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index d5cfb40812934..b0f8ed41d8088 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -167,8 +167,8 @@ class KafkaTestUtils( * In this method we rewrite krb5.conf to make kdc and client use the same enctypes */ private def rewriteKrb5Conf(): Unit = { - val krb5Conf = Utils.tryWithResource(Source.fromFile(kdc.getKrb5conf, "UTF-8")) { data => - data.getLines() + val krb5Conf = Utils.tryWithResource(Source.fromFile(kdc.getKrb5conf, "UTF-8")) { source => + source.getLines() } var rewritten = false val addedConfig = diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index d455bbf941636..57875ec72cda9 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -480,8 +480,8 @@ private object YarnClusterDriver extends Logging with Matchers { executorInfos.foreach { info => assert(info.logUrlMap.nonEmpty) info.logUrlMap.values.foreach { url => - val log = Utils.tryWithResource(Source.fromURL(url)) { data => - data.mkString + val log = Utils.tryWithResource(Source.fromURL(url)) { source => + source.mkString } assert( !log.contains(SECRET_PASSWORD), @@ -501,8 +501,8 @@ private object YarnClusterDriver extends Logging with Matchers { assert(driverLogs.contains("stdout")) val urlStr = driverLogs("stderr") driverLogs.foreach { kv => - val log = Utils.tryWithResource(Source.fromURL(kv._2)) { data => - data.mkString + val log = Utils.tryWithResource(Source.fromURL(kv._2)) { source => + source.mkString } assert( !log.contains(SECRET_PASSWORD), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala index df7dfb690234b..e48e5d7d59c1f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala @@ -37,8 +37,8 @@ class QueryExecutionSuite extends SharedSparkSession { import testImplicits._ def checkDumpedPlans(path: String, expected: Int): Unit = { - Utils.tryWithResource(Source.fromFile(path)) { data => - assert(data.getLines.toList + Utils.tryWithResource(Source.fromFile(path)) { source => + assert(source.getLines.toList .takeWhile(_ != "== Whole Stage Codegen ==") == List( "== Parsed Logical Plan ==", s"Range (0, $expected, step=1, splits=Some(2))", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 4beaeae130db7..d1013c61f67b6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1533,8 +1533,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest { private def readOffsetFromResource(file: String): SerializedOffset = { import scala.io.Source Utils.tryWithResource( - Source.fromFile(getClass.getResource(s"/structured-streaming/$file").toURI)) { data => - val str = data.mkString + Source.fromFile(getClass.getResource(s"/structured-streaming/$file").toURI)) { source => + val str = source.mkString SerializedOffset(str.trim) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala index 05b5439056007..18a823dd72dcf 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala @@ -243,8 +243,8 @@ object PROCESS_TABLES extends QueryTest with SQLTestUtils { val testingVersions: Seq[String] = { import scala.io.Source val versions: Seq[String] = try { - Utils.tryWithResource(Source.fromURL(s"$releaseMirror/spark")) { bufferedSource => - bufferedSource.mkString + Utils.tryWithResource(Source.fromURL(s"$releaseMirror/spark")) { source => + source.mkString .split("\n") .filter(_.contains(""" - data.getLines().toArray + val lines = Utils.tryWithResource(Source.fromFile(file)) { source => + source.getLines().toArray } val bufferStream = new ByteArrayOutputStream(blockSize + 1000) val ser = new KryoSerializer(new SparkConf()).newInstance() From bc30422e0ac8d0c4ec6a46ae77a6fcfd038e9dc0 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 25 Jan 2021 21:31:43 +0800 Subject: [PATCH 03/13] fix format --- .../org/apache/spark/deploy/master/MasterSuite.scala | 4 ++-- .../apache/spark/sql/execution/QueryExecutionSuite.scala | 9 +++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index eb854cc3e2a91..37ded10d756c2 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -435,8 +435,8 @@ class MasterSuite extends SparkFunSuite System.getProperty("spark.ui.proxyBase") should startWith (s"$reverseProxyUrl/proxy/worker-") System.setProperty("spark.ui.proxyBase", reverseProxyUrl) - val html = Utils.tryWithResource(Source.fromURL(s"$masterUrl/")) { data => - data.mkString("\n") + val html = Utils.tryWithResource(Source.fromURL(s"$masterUrl/")) { source => + source.mkString("\n") } html should include ("Spark Master at spark://") verifyStaticResourcesServedByProxy(html, reverseProxyUrl) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala index e48e5d7d59c1f..7616a3f205e3a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution import scala.io.Source + import org.apache.spark.sql.{AnalysisException, FastOperator} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation} @@ -101,8 +102,8 @@ class QueryExecutionSuite extends SharedSparkSession { val path = dir.getCanonicalPath + "/plans.txt" val df = spark.range(0, 10) df.queryExecution.debug.toFile(path, explainMode = Option("formatted")) - Utils.tryWithResource(Source.fromFile(path)) { data => - assert(data.getLines.toList + Utils.tryWithResource(Source.fromFile(path)) { source => + assert(source.getLines.toList .takeWhile(_ != "== Whole Stage Codegen ==").map(_.replaceAll("#\\d+", "#x")) == List( "== Physical Plan ==", s"* Range (1)", @@ -139,8 +140,8 @@ class QueryExecutionSuite extends SharedSparkSession { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26))) ds.queryExecution.debug.toFile(path) - val localRelations = Utils.tryWithResource(Source.fromFile(path)) { data => - data.getLines().filter(_.contains("LocalRelation")) + val localRelations = Utils.tryWithResource(Source.fromFile(path)) { source => + source.getLines().filter(_.contains("LocalRelation")) } assert(!localRelations.exists(_.contains("more fields"))) From a03934f0a9f390e65e8293cdfc96e229d0f6a102 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 25 Jan 2021 21:50:43 +0800 Subject: [PATCH 04/13] fix some formats --- .../spark/deploy/LogUrlsStandaloneSuite.scala | 4 +- .../spark/deploy/master/MasterSuite.scala | 39 ++++++++----------- 2 files changed, 17 insertions(+), 26 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala index 370e68945115b..5d60aad615583 100644 --- a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala @@ -43,9 +43,7 @@ class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext { assert(info.logUrlMap.nonEmpty) // Browse to each URL to check that it's valid info.logUrlMap.foreach { case (logType, logUrl) => - val html = Utils.tryWithResource(Source.fromURL(logUrl)) { source => - source.mkString - } + val html = Utils.tryWithResource(Source.fromURL(logUrl))(_.mkString) assert(html.contains(s"$logType log page")) } } diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index 37ded10d756c2..17a250905e266 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -328,24 +328,21 @@ class MasterSuite extends SparkFunSuite val masterUrl = s"http://localhost:${localCluster.masterWebUIPort}" try { eventually(timeout(5.seconds), interval(100.milliseconds)) { - val json = Utils.tryWithResource( - Source.fromURL(s"$masterUrl/json")) { source => - source.getLines().mkString("\n") - } + val json = Utils + .tryWithResource(Source.fromURL(s"$masterUrl/json"))(_.getLines().mkString("\n")) val JArray(workers) = (parse(json) \ "workers") workers.size should be (2) workers.foreach { workerSummaryJson => val JString(workerWebUi) = workerSummaryJson \ "webuiaddress" - val workerResponse = Utils.tryWithResource( - Source.fromURL(s"$workerWebUi/json")) { source => - parse(source.getLines().mkString("\n")) - } + val workerResponse = Utils + .tryWithResource(Source.fromURL(s"$workerWebUi/json")) { source => + parse(source.getLines().mkString("\n")) + } (workerResponse \ "cores").extract[Int] should be (2) } - val html = Utils.tryWithResource(Source.fromURL(s"$masterUrl/")) { source => - source.getLines().mkString("\n") - } + val html = Utils + .tryWithResource(Source.fromURL(s"$masterUrl/"))(_.getLines().mkString("\n")) html should include ("Spark Master at spark://") val workerLinks = (WORKER_LINK_RE findAllMatchIn html).toList workerLinks.size should be (2) @@ -371,9 +368,8 @@ class MasterSuite extends SparkFunSuite val masterUrl = s"http://localhost:${localCluster.masterWebUIPort}" try { eventually(timeout(5.seconds), interval(100.milliseconds)) { - val json = Utils.tryWithResource(Source.fromURL(s"$masterUrl/json")) { source => - source.getLines().mkString("\n") - } + val json = Utils + .tryWithResource(Source.fromURL(s"$masterUrl/json"))(_.getLines().mkString("\n")) val JArray(workers) = (parse(json) \ "workers") workers.size should be (2) workers.foreach { workerSummaryJson => @@ -387,9 +383,8 @@ class MasterSuite extends SparkFunSuite (workerResponse \ "cores").extract[Int] should be (2) } - val html = Utils.tryWithResource(Source.fromURL(s"$masterUrl/")) { source => - source.getLines().mkString("\n") - } + val html = Utils + .tryWithResource(Source.fromURL(s"$masterUrl/"))(_.getLines().mkString("\n")) html should include ("Spark Master at spark://") html should include ("""href="/static""") html should include ("""src="/static""") @@ -435,9 +430,8 @@ class MasterSuite extends SparkFunSuite System.getProperty("spark.ui.proxyBase") should startWith (s"$reverseProxyUrl/proxy/worker-") System.setProperty("spark.ui.proxyBase", reverseProxyUrl) - val html = Utils.tryWithResource(Source.fromURL(s"$masterUrl/")) { source => - source.mkString("\n") - } + val html = Utils + .tryWithResource(Source.fromURL(s"$masterUrl/"))(_.mkString("\n")) html should include ("Spark Master at spark://") verifyStaticResourcesServedByProxy(html, reverseProxyUrl) verifyWorkerUI(html, masterUrl, reverseProxyUrl) @@ -459,9 +453,8 @@ class MasterSuite extends SparkFunSuite // construct url directly targeting the master val url = s"$masterUrl/proxy/$workerId/" System.setProperty("spark.ui.proxyBase", workerUrl) - val workerHtml = Utils.tryWithResource(Source.fromURL(url)) { source => - source.getLines().mkString("\n") - } + val workerHtml = Utils + .tryWithResource(Source.fromURL(url))(_.getLines().mkString("\n")) workerHtml should include ("Spark Worker at") workerHtml should include ("Running Executors (0)") verifyStaticResourcesServedByProxy(workerHtml, workerUrl) From 5b2b96e530ee2bbc14a9892bf4a418cad84103b2 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 25 Jan 2021 22:05:39 +0800 Subject: [PATCH 05/13] re fix --- .../org/apache/spark/deploy/master/MasterSuite.scala | 10 ++++------ .../scala/org/apache/spark/ui/UISeleniumSuite.scala | 12 +++--------- .../src/test/scala/org/apache/spark/ui/UISuite.scala | 8 ++------ .../spark/util/MutableURLClassLoaderSuite.scala | 4 +--- .../org/apache/spark/examples/DFSReadWriteTest.scala | 7 +------ .../spark/sql/kafka010/KafkaSourceOffsetSuite.scala | 3 +-- .../apache/spark/sql/kafka010/KafkaTestUtils.scala | 5 ++--- .../apache/spark/deploy/yarn/YarnClusterSuite.scala | 8 ++------ .../spark/sql/execution/QueryExecutionSuite.scala | 7 ++++--- .../spark/sql/streaming/FileStreamSourceSuite.scala | 3 +-- .../apache/spark/streaming/util/RawTextSender.scala | 4 +--- 11 files changed, 22 insertions(+), 49 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index 17a250905e266..e1e43fddfb167 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -347,9 +347,8 @@ class MasterSuite extends SparkFunSuite val workerLinks = (WORKER_LINK_RE findAllMatchIn html).toList workerLinks.size should be (2) workerLinks foreach { case WORKER_LINK_RE(workerUrl, workerId) => - val workerHtml = Utils.tryWithResource(Source.fromURL(workerUrl)) { source => - source.getLines().mkString("\n") - } + val workerHtml = Utils + .tryWithResource(Source.fromURL(workerUrl))(_.getLines().mkString("\n")) workerHtml should include ("Spark Worker at") workerHtml should include ("Running Executors (0)") } @@ -407,9 +406,8 @@ class MasterSuite extends SparkFunSuite val masterUrl = s"http://localhost:${localCluster.masterWebUIPort}" try { eventually(timeout(5.seconds), interval(100.milliseconds)) { - val json = Utils.tryWithResource( Source.fromURL(s"$masterUrl/json")) { source => - source.getLines().mkString("\n") - } + val json = Utils + .tryWithResource(Source.fromURL(s"$masterUrl/json"))(_.getLines().mkString("\n")) val JArray(workers) = (parse(json) \ "workers") workers.size should be (2) workers.foreach { workerSummaryJson => diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala index 9c61c5e27508e..015f299fc6bdf 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala @@ -711,9 +711,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B eventually(timeout(5.seconds), interval(100.milliseconds)) { val stage0 = Utils.tryWithResource(Source.fromURL(sc.ui.get.webUrl + - "/stages/stage/?id=0&attempt=0&expandDagViz=true")) { source => - source.mkString - } + "/stages/stage/?id=0&attempt=0&expandDagViz=true"))(_.mkString) assert(stage0.contains("digraph G {\n subgraph clusterstage_0 {\n " + "label="Stage 0";\n subgraph ")) assert(stage0.contains("{\n label="parallelize";\n " + @@ -724,9 +722,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B "2 [labelType="html" label="MapPartitionsRDD [2]")) val stage1 = Utils.tryWithResource(Source.fromURL(sc.ui.get.webUrl + - "/stages/stage/?id=1&attempt=0&expandDagViz=true")) { source => - source.mkString - } + "/stages/stage/?id=1&attempt=0&expandDagViz=true"))(_.mkString) assert(stage1.contains("digraph G {\n subgraph clusterstage_1 {\n " + "label="Stage 1";\n subgraph ")) assert(stage1.contains("{\n label="groupBy";\n " + @@ -737,9 +733,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B "5 [labelType="html" label="MapPartitionsRDD [5]")) val stage2 = Utils.tryWithResource(Source.fromURL(sc.ui.get.webUrl + - "/stages/stage/?id=2&attempt=0&expandDagViz=true")) { source => - source.mkString - } + "/stages/stage/?id=2&attempt=0&expandDagViz=true"))(_.mkString) assert(stage2.contains("digraph G {\n subgraph clusterstage_2 {\n " + "label="Stage 2";\n subgraph ")) assert(stage2.contains("{\n label="groupBy";\n " + diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index 3b67489409699..fb3015e3b08d3 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -76,9 +76,7 @@ class UISuite extends SparkFunSuite { withSpark(newSparkContext()) { sc => // test if the ui is visible, and all the expected tabs are visible eventually(timeout(10.seconds), interval(50.milliseconds)) { - val html = Utils.tryWithResource(Source.fromURL(sc.ui.get.webUrl)) { source => - source.mkString - } + val html = Utils.tryWithResource(Source.fromURL(sc.ui.get.webUrl))(_.mkString) assert(!html.contains("random data that should not be present")) assert(html.toLowerCase(Locale.ROOT).contains("stages")) assert(html.toLowerCase(Locale.ROOT).contains("storage")) @@ -92,9 +90,7 @@ class UISuite extends SparkFunSuite { withSpark(newSparkContext()) { sc => // test if visible from http://localhost:4040 eventually(timeout(10.seconds), interval(50.milliseconds)) { - val html = Utils.tryWithResource(Source.fromURL("http://localhost:4040")) { source => - source.mkString - } + val html = Utils.tryWithResource(Source.fromURL("http://localhost:4040"))(_.mkString) assert(html.toLowerCase(Locale.ROOT).contains("stages")) } } diff --git a/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala b/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala index eea90bc825260..9435b5acd2224 100644 --- a/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala @@ -113,9 +113,7 @@ class MutableURLClassLoaderSuite extends SparkFunSuite with Matchers { assert(res1.size === 2) assert(classLoader.getResources("resource2").asScala.size === 1) res1.map { res => - Utils.tryWithResource(scala.io.Source.fromURL(res)) { source => - source.mkString - } + Utils.tryWithResource(scala.io.Source.fromURL(res))(_.mkString) } should contain inOrderOnly("resource1Contents-child", "resource1Contents-parent") classLoader.close() parentLoader.close() diff --git a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala index 30bde99f8ae78..c3ca85639ec10 100644 --- a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala @@ -47,12 +47,7 @@ object DFSReadWriteTest { private val NPARAMS = 2 private def readFile(filename: String): List[String] = { - val lineIter: Iterator[String] = - Utils.tryWithResource(fromFile(filename)) { source => - source.getLines() - } - val lineList: List[String] = lineIter.toList - lineList + Utils.tryWithResource(fromFile(filename))(_.getLines()).toList } private def printUsage(): Unit = { diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala index ebf432d917c3c..3f3fc883ff650 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala @@ -101,8 +101,7 @@ class KafkaSourceOffsetSuite extends OffsetSuite with SharedSparkSession { import scala.io.Source val input = getClass.getResource(s"/$file").toURI Utils.tryWithResource(Source.fromFile(input)) { source => - val str = source.mkString - SerializedOffset(str) + SerializedOffset(source.mkString) } } } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index b0f8ed41d8088..0fa4bcb5aaed0 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -167,9 +167,8 @@ class KafkaTestUtils( * In this method we rewrite krb5.conf to make kdc and client use the same enctypes */ private def rewriteKrb5Conf(): Unit = { - val krb5Conf = Utils.tryWithResource(Source.fromFile(kdc.getKrb5conf, "UTF-8")) { source => - source.getLines() - } + val krb5Conf = Utils + .tryWithResource(Source.fromFile(kdc.getKrb5conf, "UTF-8"))(_.getLines()) var rewritten = false val addedConfig = addedKrb5Config("default_tkt_enctypes", "aes128-cts-hmac-sha1-96") + diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 57875ec72cda9..9bc934d246fec 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -480,9 +480,7 @@ private object YarnClusterDriver extends Logging with Matchers { executorInfos.foreach { info => assert(info.logUrlMap.nonEmpty) info.logUrlMap.values.foreach { url => - val log = Utils.tryWithResource(Source.fromURL(url)) { source => - source.mkString - } + val log = Utils.tryWithResource(Source.fromURL(url))(_.mkString) assert( !log.contains(SECRET_PASSWORD), s"Executor logs contain sensitive info (${SECRET_PASSWORD}): \n${log} " @@ -501,9 +499,7 @@ private object YarnClusterDriver extends Logging with Matchers { assert(driverLogs.contains("stdout")) val urlStr = driverLogs("stderr") driverLogs.foreach { kv => - val log = Utils.tryWithResource(Source.fromURL(kv._2)) { source => - source.mkString - } + val log = Utils.tryWithResource(Source.fromURL(kv._2))(_.mkString) assert( !log.contains(SECRET_PASSWORD), s"Driver logs contain sensitive info (${SECRET_PASSWORD}): \n${log} " diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala index 7616a3f205e3a..93c6b56b27685 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala @@ -140,9 +140,10 @@ class QueryExecutionSuite extends SharedSparkSession { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26))) ds.queryExecution.debug.toFile(path) - val localRelations = Utils.tryWithResource(Source.fromFile(path)) { source => - source.getLines().filter(_.contains("LocalRelation")) - } + val localRelations = Utils + .tryWithResource(Source.fromFile(path)) { source => + source.getLines().filter(_.contains("LocalRelation")) + } assert(!localRelations.exists(_.contains("more fields"))) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index d1013c61f67b6..ff00c474e2ef0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1534,8 +1534,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { import scala.io.Source Utils.tryWithResource( Source.fromFile(getClass.getResource(s"/structured-streaming/$file").toURI)) { source => - val str = source.mkString - SerializedOffset(str.trim) + SerializedOffset(source.mkString.trim) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala index 702accf268e38..a63d50f1334fe 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala @@ -45,9 +45,7 @@ object RawTextSender extends Logging { val Array(IntParam(port), file, IntParam(blockSize), IntParam(bytesPerSec)) = args // Repeat the input data multiple times to fill in a buffer - val lines = Utils.tryWithResource(Source.fromFile(file)) { source => - source.getLines().toArray - } + val lines = Utils.tryWithResource(Source.fromFile(file))(_.getLines().toArray) val bufferStream = new ByteArrayOutputStream(blockSize + 1000) val ser = new KryoSerializer(new SparkConf()).newInstance() val serStream = ser.serializeStream(bufferStream) From 17fe2836f3bda49ba3e144d9c379f455d87a3add Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 25 Jan 2021 22:13:30 +0800 Subject: [PATCH 06/13] re fix --- .../apache/spark/deploy/master/MasterSuite.scala | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index e1e43fddfb167..88d3cec7e6525 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -334,10 +334,8 @@ class MasterSuite extends SparkFunSuite workers.size should be (2) workers.foreach { workerSummaryJson => val JString(workerWebUi) = workerSummaryJson \ "webuiaddress" - val workerResponse = Utils - .tryWithResource(Source.fromURL(s"$workerWebUi/json")) { source => - parse(source.getLines().mkString("\n")) - } + val workerResponse = parse(Utils + .tryWithResource(Source.fromURL(s"$workerWebUi/json"))(_.getLines().mkString("\n"))) (workerResponse \ "cores").extract[Int] should be (2) } @@ -376,9 +374,8 @@ class MasterSuite extends SparkFunSuite // explicitly construct reverse proxy url targeting the master val JString(workerId) = workerSummaryJson \ "id" val url = s"$masterUrl/proxy/${workerId}/json" - val workerResponse = Utils.tryWithResource(Source.fromURL(url)) { source => - parse(source.getLines().mkString("\n")) - } + val workerResponse = parse( + Utils.tryWithResource(Source.fromURL(url))(_.getLines().mkString("\n"))) (workerResponse \ "cores").extract[Int] should be (2) } @@ -415,9 +412,8 @@ class MasterSuite extends SparkFunSuite // explicitly construct reverse proxy url targeting the master val JString(workerId) = workerSummaryJson \ "id" val url = s"$masterUrl/proxy/${workerId}/json" - val workerResponse = Utils.tryWithResource(Source.fromURL(url)) { source => - parse(source.getLines().mkString("\n")) - } + val workerResponse = parse(Utils + .tryWithResource(Source.fromURL(url))(_.getLines().mkString("\n"))) (workerResponse \ "cores").extract[Int] should be (2) (workerResponse \ "masterwebuiurl").extract[String] should be (reverseProxyUrl + "/") } From 4a51d204f8e22c5a8c2a133a06540b9f13abb63c Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 25 Jan 2021 22:19:18 +0800 Subject: [PATCH 07/13] re fix --- .../apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala index 3f3fc883ff650..553ab42b9c8b1 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala @@ -100,8 +100,6 @@ class KafkaSourceOffsetSuite extends OffsetSuite with SharedSparkSession { private def readFromResource(file: String): SerializedOffset = { import scala.io.Source val input = getClass.getResource(s"/$file").toURI - Utils.tryWithResource(Source.fromFile(input)) { source => - SerializedOffset(source.mkString) - } + SerializedOffset(Utils.tryWithResource(Source.fromFile(input))(_.mkString)) } } From 40d7f6c042fa2388d0741c322764cda0aa370a53 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Tue, 26 Jan 2021 11:36:03 +0800 Subject: [PATCH 08/13] fix test failed --- .../org/apache/spark/SparkContextSuite.scala | 4 +- .../spark/deploy/master/MasterSuite.scala | 2 +- .../spark/sql/kafka010/KafkaTestUtils.scala | 43 ++++++++++--------- .../sql/execution/QueryExecutionSuite.scala | 10 ++--- 4 files changed, 28 insertions(+), 31 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index e640a89216be9..d987b2af3d2e3 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -377,9 +377,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu sc.addFile(file1.getAbsolutePath) def getAddedFileContents(): String = { sc.parallelize(Seq(0)).map { _ => - Utils.tryWithResource(Source.fromFile(SparkFiles.get("file"))) { source => - source.mkString - } + Utils.tryWithResource(Source.fromFile(SparkFiles.get("file")))(_.mkString) }.first() } assert(getAddedFileContents() === "old") diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index 88d3cec7e6525..3a4a125a9a470 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -425,7 +425,7 @@ class MasterSuite extends SparkFunSuite (s"$reverseProxyUrl/proxy/worker-") System.setProperty("spark.ui.proxyBase", reverseProxyUrl) val html = Utils - .tryWithResource(Source.fromURL(s"$masterUrl/"))(_.mkString("\n")) + .tryWithResource(Source.fromURL(s"$masterUrl/"))(_.getLines().mkString("\n")) html should include ("Spark Master at spark://") verifyStaticResourcesServedByProxy(html, reverseProxyUrl) verifyWorkerUI(html, masterUrl, reverseProxyUrl) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index 0fa4bcb5aaed0..d0c74bf16b7d3 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -167,30 +167,31 @@ class KafkaTestUtils( * In this method we rewrite krb5.conf to make kdc and client use the same enctypes */ private def rewriteKrb5Conf(): Unit = { - val krb5Conf = Utils - .tryWithResource(Source.fromFile(kdc.getKrb5conf, "UTF-8"))(_.getLines()) - var rewritten = false - val addedConfig = - addedKrb5Config("default_tkt_enctypes", "aes128-cts-hmac-sha1-96") + - addedKrb5Config("default_tgs_enctypes", "aes128-cts-hmac-sha1-96") - val rewriteKrb5Conf = krb5Conf.map(s => - if (s.contains("libdefaults")) { - rewritten = true - s + addedConfig + Utils.tryWithResource(Source.fromFile(kdc.getKrb5conf, "UTF-8")) { source => + val krb5Conf = source.getLines() + var rewritten = false + val addedConfig = + addedKrb5Config("default_tkt_enctypes", "aes128-cts-hmac-sha1-96") + + addedKrb5Config("default_tgs_enctypes", "aes128-cts-hmac-sha1-96") + val rewriteKrb5Conf = krb5Conf.map(s => + if (s.contains("libdefaults")) { + rewritten = true + s + addedConfig + } else { + s + }).filter(!_.trim.startsWith("#")).mkString(System.lineSeparator()) + + val krb5confStr = if (!rewritten) { + "[libdefaults]" + addedConfig + System.lineSeparator() + + System.lineSeparator() + rewriteKrb5Conf } else { - s - }).filter(!_.trim.startsWith("#")).mkString(System.lineSeparator()) + rewriteKrb5Conf + } - val krb5confStr = if (!rewritten) { - "[libdefaults]" + addedConfig + System.lineSeparator() + - System.lineSeparator() + rewriteKrb5Conf - } else { - rewriteKrb5Conf + kdc.getKrb5conf.delete() + Files.write(krb5confStr, kdc.getKrb5conf, StandardCharsets.UTF_8) + logDebug(s"krb5.conf file content: $krb5confStr") } - - kdc.getKrb5conf.delete() - Files.write(krb5confStr, kdc.getKrb5conf, StandardCharsets.UTF_8) - logDebug(s"krb5.conf file content: $krb5confStr") } private def addedKrb5Config(key: String, value: String): String = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala index 93c6b56b27685..69be3b677e923 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala @@ -140,12 +140,10 @@ class QueryExecutionSuite extends SharedSparkSession { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26))) ds.queryExecution.debug.toFile(path) - val localRelations = Utils - .tryWithResource(Source.fromFile(path)) { source => - source.getLines().filter(_.contains("LocalRelation")) - } - - assert(!localRelations.exists(_.contains("more fields"))) + Utils.tryWithResource(Source.fromFile(path)) { source => + val localRelations = source.getLines().filter(_.contains("LocalRelation")) + assert(!localRelations.exists(_.contains("more fields"))) + } } } From 8dfc2b46455c7397e5e073abe788957fa9193cbb Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Tue, 26 Jan 2021 12:21:54 +0800 Subject: [PATCH 09/13] use mkstring instead of getlines().mkstring('\n') --- .../spark/deploy/master/MasterSuite.scala | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index 3a4a125a9a470..851707fe1c920 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -329,24 +329,24 @@ class MasterSuite extends SparkFunSuite try { eventually(timeout(5.seconds), interval(100.milliseconds)) { val json = Utils - .tryWithResource(Source.fromURL(s"$masterUrl/json"))(_.getLines().mkString("\n")) + .tryWithResource(Source.fromURL(s"$masterUrl/json"))(_.mkString) val JArray(workers) = (parse(json) \ "workers") workers.size should be (2) workers.foreach { workerSummaryJson => val JString(workerWebUi) = workerSummaryJson \ "webuiaddress" val workerResponse = parse(Utils - .tryWithResource(Source.fromURL(s"$workerWebUi/json"))(_.getLines().mkString("\n"))) + .tryWithResource(Source.fromURL(s"$workerWebUi/json"))(_.mkString)) (workerResponse \ "cores").extract[Int] should be (2) } val html = Utils - .tryWithResource(Source.fromURL(s"$masterUrl/"))(_.getLines().mkString("\n")) + .tryWithResource(Source.fromURL(s"$masterUrl/"))(_.mkString) html should include ("Spark Master at spark://") val workerLinks = (WORKER_LINK_RE findAllMatchIn html).toList workerLinks.size should be (2) workerLinks foreach { case WORKER_LINK_RE(workerUrl, workerId) => val workerHtml = Utils - .tryWithResource(Source.fromURL(workerUrl))(_.getLines().mkString("\n")) + .tryWithResource(Source.fromURL(workerUrl))(_.mkString) workerHtml should include ("Spark Worker at") workerHtml should include ("Running Executors (0)") } @@ -366,7 +366,7 @@ class MasterSuite extends SparkFunSuite try { eventually(timeout(5.seconds), interval(100.milliseconds)) { val json = Utils - .tryWithResource(Source.fromURL(s"$masterUrl/json"))(_.getLines().mkString("\n")) + .tryWithResource(Source.fromURL(s"$masterUrl/json"))(_.mkString) val JArray(workers) = (parse(json) \ "workers") workers.size should be (2) workers.foreach { workerSummaryJson => @@ -375,12 +375,12 @@ class MasterSuite extends SparkFunSuite val JString(workerId) = workerSummaryJson \ "id" val url = s"$masterUrl/proxy/${workerId}/json" val workerResponse = parse( - Utils.tryWithResource(Source.fromURL(url))(_.getLines().mkString("\n"))) + Utils.tryWithResource(Source.fromURL(url))(_.mkString)) (workerResponse \ "cores").extract[Int] should be (2) } val html = Utils - .tryWithResource(Source.fromURL(s"$masterUrl/"))(_.getLines().mkString("\n")) + .tryWithResource(Source.fromURL(s"$masterUrl/"))(_.mkString) html should include ("Spark Master at spark://") html should include ("""href="/static""") html should include ("""src="/static""") @@ -404,7 +404,7 @@ class MasterSuite extends SparkFunSuite try { eventually(timeout(5.seconds), interval(100.milliseconds)) { val json = Utils - .tryWithResource(Source.fromURL(s"$masterUrl/json"))(_.getLines().mkString("\n")) + .tryWithResource(Source.fromURL(s"$masterUrl/json"))(_.mkString) val JArray(workers) = (parse(json) \ "workers") workers.size should be (2) workers.foreach { workerSummaryJson => @@ -413,7 +413,7 @@ class MasterSuite extends SparkFunSuite val JString(workerId) = workerSummaryJson \ "id" val url = s"$masterUrl/proxy/${workerId}/json" val workerResponse = parse(Utils - .tryWithResource(Source.fromURL(url))(_.getLines().mkString("\n"))) + .tryWithResource(Source.fromURL(url))(_.mkString)) (workerResponse \ "cores").extract[Int] should be (2) (workerResponse \ "masterwebuiurl").extract[String] should be (reverseProxyUrl + "/") } @@ -425,7 +425,7 @@ class MasterSuite extends SparkFunSuite (s"$reverseProxyUrl/proxy/worker-") System.setProperty("spark.ui.proxyBase", reverseProxyUrl) val html = Utils - .tryWithResource(Source.fromURL(s"$masterUrl/"))(_.getLines().mkString("\n")) + .tryWithResource(Source.fromURL(s"$masterUrl/"))(_.mkString) html should include ("Spark Master at spark://") verifyStaticResourcesServedByProxy(html, reverseProxyUrl) verifyWorkerUI(html, masterUrl, reverseProxyUrl) @@ -448,7 +448,7 @@ class MasterSuite extends SparkFunSuite val url = s"$masterUrl/proxy/$workerId/" System.setProperty("spark.ui.proxyBase", workerUrl) val workerHtml = Utils - .tryWithResource(Source.fromURL(url))(_.getLines().mkString("\n")) + .tryWithResource(Source.fromURL(url))(_.mkString) workerHtml should include ("Spark Worker at") workerHtml should include ("Running Executors (0)") verifyStaticResourcesServedByProxy(workerHtml, workerUrl) From 8e5b51cb3ac0a48ad568a05dac7c06e8db700327 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Tue, 26 Jan 2021 13:28:27 +0800 Subject: [PATCH 10/13] revert change of MasterSuite.scala --- .../spark/deploy/master/MasterSuite.scala | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index 851707fe1c920..3a4a125a9a470 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -329,24 +329,24 @@ class MasterSuite extends SparkFunSuite try { eventually(timeout(5.seconds), interval(100.milliseconds)) { val json = Utils - .tryWithResource(Source.fromURL(s"$masterUrl/json"))(_.mkString) + .tryWithResource(Source.fromURL(s"$masterUrl/json"))(_.getLines().mkString("\n")) val JArray(workers) = (parse(json) \ "workers") workers.size should be (2) workers.foreach { workerSummaryJson => val JString(workerWebUi) = workerSummaryJson \ "webuiaddress" val workerResponse = parse(Utils - .tryWithResource(Source.fromURL(s"$workerWebUi/json"))(_.mkString)) + .tryWithResource(Source.fromURL(s"$workerWebUi/json"))(_.getLines().mkString("\n"))) (workerResponse \ "cores").extract[Int] should be (2) } val html = Utils - .tryWithResource(Source.fromURL(s"$masterUrl/"))(_.mkString) + .tryWithResource(Source.fromURL(s"$masterUrl/"))(_.getLines().mkString("\n")) html should include ("Spark Master at spark://") val workerLinks = (WORKER_LINK_RE findAllMatchIn html).toList workerLinks.size should be (2) workerLinks foreach { case WORKER_LINK_RE(workerUrl, workerId) => val workerHtml = Utils - .tryWithResource(Source.fromURL(workerUrl))(_.mkString) + .tryWithResource(Source.fromURL(workerUrl))(_.getLines().mkString("\n")) workerHtml should include ("Spark Worker at") workerHtml should include ("Running Executors (0)") } @@ -366,7 +366,7 @@ class MasterSuite extends SparkFunSuite try { eventually(timeout(5.seconds), interval(100.milliseconds)) { val json = Utils - .tryWithResource(Source.fromURL(s"$masterUrl/json"))(_.mkString) + .tryWithResource(Source.fromURL(s"$masterUrl/json"))(_.getLines().mkString("\n")) val JArray(workers) = (parse(json) \ "workers") workers.size should be (2) workers.foreach { workerSummaryJson => @@ -375,12 +375,12 @@ class MasterSuite extends SparkFunSuite val JString(workerId) = workerSummaryJson \ "id" val url = s"$masterUrl/proxy/${workerId}/json" val workerResponse = parse( - Utils.tryWithResource(Source.fromURL(url))(_.mkString)) + Utils.tryWithResource(Source.fromURL(url))(_.getLines().mkString("\n"))) (workerResponse \ "cores").extract[Int] should be (2) } val html = Utils - .tryWithResource(Source.fromURL(s"$masterUrl/"))(_.mkString) + .tryWithResource(Source.fromURL(s"$masterUrl/"))(_.getLines().mkString("\n")) html should include ("Spark Master at spark://") html should include ("""href="/static""") html should include ("""src="/static""") @@ -404,7 +404,7 @@ class MasterSuite extends SparkFunSuite try { eventually(timeout(5.seconds), interval(100.milliseconds)) { val json = Utils - .tryWithResource(Source.fromURL(s"$masterUrl/json"))(_.mkString) + .tryWithResource(Source.fromURL(s"$masterUrl/json"))(_.getLines().mkString("\n")) val JArray(workers) = (parse(json) \ "workers") workers.size should be (2) workers.foreach { workerSummaryJson => @@ -413,7 +413,7 @@ class MasterSuite extends SparkFunSuite val JString(workerId) = workerSummaryJson \ "id" val url = s"$masterUrl/proxy/${workerId}/json" val workerResponse = parse(Utils - .tryWithResource(Source.fromURL(url))(_.mkString)) + .tryWithResource(Source.fromURL(url))(_.getLines().mkString("\n"))) (workerResponse \ "cores").extract[Int] should be (2) (workerResponse \ "masterwebuiurl").extract[String] should be (reverseProxyUrl + "/") } @@ -425,7 +425,7 @@ class MasterSuite extends SparkFunSuite (s"$reverseProxyUrl/proxy/worker-") System.setProperty("spark.ui.proxyBase", reverseProxyUrl) val html = Utils - .tryWithResource(Source.fromURL(s"$masterUrl/"))(_.mkString) + .tryWithResource(Source.fromURL(s"$masterUrl/"))(_.getLines().mkString("\n")) html should include ("Spark Master at spark://") verifyStaticResourcesServedByProxy(html, reverseProxyUrl) verifyWorkerUI(html, masterUrl, reverseProxyUrl) @@ -448,7 +448,7 @@ class MasterSuite extends SparkFunSuite val url = s"$masterUrl/proxy/$workerId/" System.setProperty("spark.ui.proxyBase", workerUrl) val workerHtml = Utils - .tryWithResource(Source.fromURL(url))(_.mkString) + .tryWithResource(Source.fromURL(url))(_.getLines().mkString("\n")) workerHtml should include ("Spark Worker at") workerHtml should include ("Running Executors (0)") verifyStaticResourcesServedByProxy(workerHtml, workerUrl) From 12d4957edd899eb4f43c0785fb07672b6e406b6d Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Tue, 26 Jan 2021 13:36:12 +0800 Subject: [PATCH 11/13] reduce changes --- .../spark/sql/kafka010/KafkaTestUtils.scala | 43 +++++++------ .../sql/execution/QueryExecutionSuite.scala | 60 +++++++++---------- 2 files changed, 50 insertions(+), 53 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index d0c74bf16b7d3..058563dfa167d 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -167,31 +167,30 @@ class KafkaTestUtils( * In this method we rewrite krb5.conf to make kdc and client use the same enctypes */ private def rewriteKrb5Conf(): Unit = { - Utils.tryWithResource(Source.fromFile(kdc.getKrb5conf, "UTF-8")) { source => - val krb5Conf = source.getLines() - var rewritten = false - val addedConfig = - addedKrb5Config("default_tkt_enctypes", "aes128-cts-hmac-sha1-96") + - addedKrb5Config("default_tgs_enctypes", "aes128-cts-hmac-sha1-96") - val rewriteKrb5Conf = krb5Conf.map(s => - if (s.contains("libdefaults")) { - rewritten = true - s + addedConfig - } else { - s - }).filter(!_.trim.startsWith("#")).mkString(System.lineSeparator()) - - val krb5confStr = if (!rewritten) { - "[libdefaults]" + addedConfig + System.lineSeparator() + - System.lineSeparator() + rewriteKrb5Conf + val krb5Conf = Utils + .tryWithResource(Source.fromFile(kdc.getKrb5conf, "UTF-8"))(_.getLines().toList) + var rewritten = false + val addedConfig = + addedKrb5Config("default_tkt_enctypes", "aes128-cts-hmac-sha1-96") + + addedKrb5Config("default_tgs_enctypes", "aes128-cts-hmac-sha1-96") + val rewriteKrb5Conf = krb5Conf.map(s => + if (s.contains("libdefaults")) { + rewritten = true + s + addedConfig } else { - rewriteKrb5Conf - } + s + }).filter(!_.trim.startsWith("#")).mkString(System.lineSeparator()) - kdc.getKrb5conf.delete() - Files.write(krb5confStr, kdc.getKrb5conf, StandardCharsets.UTF_8) - logDebug(s"krb5.conf file content: $krb5confStr") + val krb5confStr = if (!rewritten) { + "[libdefaults]" + addedConfig + System.lineSeparator() + + System.lineSeparator() + rewriteKrb5Conf + } else { + rewriteKrb5Conf } + + kdc.getKrb5conf.delete() + Files.write(krb5confStr, kdc.getKrb5conf, StandardCharsets.UTF_8) + logDebug(s"krb5.conf file content: $krb5confStr") } private def addedKrb5Config(key: String, value: String): String = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala index 69be3b677e923..cf733b8f50cb5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala @@ -37,24 +37,23 @@ case class QueryExecutionTestRecord( class QueryExecutionSuite extends SharedSparkSession { import testImplicits._ - def checkDumpedPlans(path: String, expected: Int): Unit = { - Utils.tryWithResource(Source.fromFile(path)) { source => - assert(source.getLines.toList - .takeWhile(_ != "== Whole Stage Codegen ==") == List( - "== Parsed Logical Plan ==", - s"Range (0, $expected, step=1, splits=Some(2))", - "", - "== Analyzed Logical Plan ==", - "id: bigint", - s"Range (0, $expected, step=1, splits=Some(2))", - "", - "== Optimized Logical Plan ==", - s"Range (0, $expected, step=1, splits=Some(2))", - "", - "== Physical Plan ==", - s"*(1) Range (0, $expected, step=1, splits=2)", - "")) - } + def checkDumpedPlans(path: String, expected: Int): Unit = Utils.tryWithResource( + Source.fromFile(path)) { source => + assert(source.getLines.toList + .takeWhile(_ != "== Whole Stage Codegen ==") == List( + "== Parsed Logical Plan ==", + s"Range (0, $expected, step=1, splits=Some(2))", + "", + "== Analyzed Logical Plan ==", + "id: bigint", + s"Range (0, $expected, step=1, splits=Some(2))", + "", + "== Optimized Logical Plan ==", + s"Range (0, $expected, step=1, splits=Some(2))", + "", + "== Physical Plan ==", + s"*(1) Range (0, $expected, step=1, splits=2)", + "")) } test("dumping query execution info to a file") { @@ -102,19 +101,18 @@ class QueryExecutionSuite extends SharedSparkSession { val path = dir.getCanonicalPath + "/plans.txt" val df = spark.range(0, 10) df.queryExecution.debug.toFile(path, explainMode = Option("formatted")) - Utils.tryWithResource(Source.fromFile(path)) { source => - assert(source.getLines.toList - .takeWhile(_ != "== Whole Stage Codegen ==").map(_.replaceAll("#\\d+", "#x")) == List( - "== Physical Plan ==", - s"* Range (1)", - "", - "", - s"(1) Range [codegen id : 1]", - "Output [1]: [id#xL]", - s"Arguments: Range (0, 10, step=1, splits=Some(2))", - "", - "")) - } + val lines = Utils.tryWithResource(Source.fromFile(path))(_.getLines().toList) + assert(lines + .takeWhile(_ != "== Whole Stage Codegen ==").map(_.replaceAll("#\\d+", "#x")) == List( + "== Physical Plan ==", + s"* Range (1)", + "", + "", + s"(1) Range [codegen id : 1]", + "Output [1]: [id#xL]", + s"Arguments: Range (0, 10, step=1, splits=Some(2))", + "", + "")) } } From 9f3c0d51b8b9e8e5eee8bd2eb3e070ab8dde0cde Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Tue, 26 Jan 2021 13:42:38 +0800 Subject: [PATCH 12/13] fix HiveExternalCatalogVersionsSuite --- .../hive/HiveExternalCatalogVersionsSuite.scala | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala index 18a823dd72dcf..766edcae6e4a1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala @@ -242,15 +242,14 @@ object PROCESS_TABLES extends QueryTest with SQLTestUtils { // Tests the latest version of every release line. val testingVersions: Seq[String] = { import scala.io.Source - val versions: Seq[String] = try { - Utils.tryWithResource(Source.fromURL(s"$releaseMirror/spark")) { source => - source.mkString - .split("\n") - .filter(_.contains("""""".r.findFirstMatchIn(_).get.group(1)) - .filter(_ < org.apache.spark.SPARK_VERSION) - } + val versions: Seq[String] = try Utils.tryWithResource( + Source.fromURL(s"$releaseMirror/spark")) { source => + source.mkString + .split("\n") + .filter(_.contains("""""".r.findFirstMatchIn(_).get.group(1)) + .filter(_ < org.apache.spark.SPARK_VERSION) } catch { // do not throw exception during object initialization. case NonFatal(_) => Seq("3.0.1", "2.4.7") // A temporary fallback to use a specific version From 245d8446e831043a140b8c16a31a9710328a67a7 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Tue, 26 Jan 2021 13:44:26 +0800 Subject: [PATCH 13/13] fix /DFSReadWriteTest --- .../main/scala/org/apache/spark/examples/DFSReadWriteTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala index c3ca85639ec10..323bab427e55e 100644 --- a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala @@ -47,7 +47,7 @@ object DFSReadWriteTest { private val NPARAMS = 2 private def readFile(filename: String): List[String] = { - Utils.tryWithResource(fromFile(filename))(_.getLines()).toList + Utils.tryWithResource(fromFile(filename))(_.getLines().toList) } private def printUsage(): Unit = {