Skip to content
Closed
3 changes: 2 additions & 1 deletion core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets
import java.util.concurrent.{CountDownLatch, Semaphore, TimeUnit}

import scala.concurrent.duration._
import scala.io.Source

import com.google.common.io.Files
import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -376,7 +377,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
sc.addFile(file1.getAbsolutePath)
def getAddedFileContents(): String = {
sc.parallelize(Seq(0)).map { _ =>
scala.io.Source.fromFile(SparkFiles.get("file")).mkString
Utils.tryWithResource(Source.fromFile(SparkFiles.get("file")))(_.mkString)
}.first()
}
assert(getAddedFileContents() === "old")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.io.Source
import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite}
import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded}
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.util.SparkConfWithEnv
import org.apache.spark.util.{SparkConfWithEnv, Utils}

class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext {

Expand All @@ -43,7 +43,7 @@ class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext {
assert(info.logUrlMap.nonEmpty)
// Browse to each URL to check that it's valid
info.logUrlMap.foreach { case (logType, logUrl) =>
val html = Source.fromURL(logUrl).mkString
val html = Utils.tryWithResource(Source.fromURL(logUrl))(_.mkString)
assert(html.contains(s"$logType log page"))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import org.apache.spark.resource.{ResourceInformation, ResourceRequirement}
import org.apache.spark.resource.ResourceUtils.{FPGA, GPU}
import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, RpcEndpointRef, RpcEnv}
import org.apache.spark.serializer
import org.apache.spark.util.Utils

object MockWorker {
val counter = new AtomicInteger(10000)
Expand Down Expand Up @@ -327,22 +328,25 @@ class MasterSuite extends SparkFunSuite
val masterUrl = s"http://localhost:${localCluster.masterWebUIPort}"
try {
eventually(timeout(5.seconds), interval(100.milliseconds)) {
val json = Source.fromURL(s"$masterUrl/json").getLines().mkString("\n")
val json = Utils
.tryWithResource(Source.fromURL(s"$masterUrl/json"))(_.getLines().mkString("\n"))
val JArray(workers) = (parse(json) \ "workers")
workers.size should be (2)
workers.foreach { workerSummaryJson =>
val JString(workerWebUi) = workerSummaryJson \ "webuiaddress"
val workerResponse = parse(Source.fromURL(s"${workerWebUi}/json")
.getLines().mkString("\n"))
val workerResponse = parse(Utils
.tryWithResource(Source.fromURL(s"$workerWebUi/json"))(_.getLines().mkString("\n")))
(workerResponse \ "cores").extract[Int] should be (2)
}

val html = Source.fromURL(s"$masterUrl/").getLines().mkString("\n")
val html = Utils
.tryWithResource(Source.fromURL(s"$masterUrl/"))(_.getLines().mkString("\n"))
html should include ("Spark Master at spark://")
val workerLinks = (WORKER_LINK_RE findAllMatchIn html).toList
workerLinks.size should be (2)
workerLinks foreach { case WORKER_LINK_RE(workerUrl, workerId) =>
val workerHtml = Source.fromURL(workerUrl).getLines().mkString("\n")
val workerHtml = Utils
.tryWithResource(Source.fromURL(workerUrl))(_.getLines().mkString("\n"))
workerHtml should include ("Spark Worker at")
workerHtml should include ("Running Executors (0)")
}
Expand All @@ -361,20 +365,22 @@ class MasterSuite extends SparkFunSuite
val masterUrl = s"http://localhost:${localCluster.masterWebUIPort}"
try {
eventually(timeout(5.seconds), interval(100.milliseconds)) {
val json = Source.fromURL(s"$masterUrl/json")
.getLines().mkString("\n")
val json = Utils
.tryWithResource(Source.fromURL(s"$masterUrl/json"))(_.getLines().mkString("\n"))
val JArray(workers) = (parse(json) \ "workers")
workers.size should be (2)
workers.foreach { workerSummaryJson =>
// the webuiaddress intentionally points to the local web ui.
// explicitly construct reverse proxy url targeting the master
val JString(workerId) = workerSummaryJson \ "id"
val url = s"$masterUrl/proxy/${workerId}/json"
val workerResponse = parse(Source.fromURL(url).getLines().mkString("\n"))
val workerResponse = parse(
Utils.tryWithResource(Source.fromURL(url))(_.getLines().mkString("\n")))
(workerResponse \ "cores").extract[Int] should be (2)
}

val html = Source.fromURL(s"$masterUrl/").getLines().mkString("\n")
val html = Utils
.tryWithResource(Source.fromURL(s"$masterUrl/"))(_.getLines().mkString("\n"))
html should include ("Spark Master at spark://")
html should include ("""href="/static""")
html should include ("""src="/static""")
Expand All @@ -397,16 +403,17 @@ class MasterSuite extends SparkFunSuite
val masterUrl = s"http://localhost:${localCluster.masterWebUIPort}"
try {
eventually(timeout(5.seconds), interval(100.milliseconds)) {
val json = Source.fromURL(s"$masterUrl/json")
.getLines().mkString("\n")
val json = Utils
.tryWithResource(Source.fromURL(s"$masterUrl/json"))(_.getLines().mkString("\n"))
val JArray(workers) = (parse(json) \ "workers")
workers.size should be (2)
workers.foreach { workerSummaryJson =>
// the webuiaddress intentionally points to the local web ui.
// explicitly construct reverse proxy url targeting the master
val JString(workerId) = workerSummaryJson \ "id"
val url = s"$masterUrl/proxy/${workerId}/json"
val workerResponse = parse(Source.fromURL(url).getLines().mkString("\n"))
val workerResponse = parse(Utils
.tryWithResource(Source.fromURL(url))(_.getLines().mkString("\n")))
(workerResponse \ "cores").extract[Int] should be (2)
(workerResponse \ "masterwebuiurl").extract[String] should be (reverseProxyUrl + "/")
}
Expand All @@ -417,7 +424,8 @@ class MasterSuite extends SparkFunSuite
System.getProperty("spark.ui.proxyBase") should startWith
(s"$reverseProxyUrl/proxy/worker-")
System.setProperty("spark.ui.proxyBase", reverseProxyUrl)
val html = Source.fromURL(s"$masterUrl/").getLines().mkString("\n")
val html = Utils
.tryWithResource(Source.fromURL(s"$masterUrl/"))(_.getLines().mkString("\n"))
html should include ("Spark Master at spark://")
verifyStaticResourcesServedByProxy(html, reverseProxyUrl)
verifyWorkerUI(html, masterUrl, reverseProxyUrl)
Expand All @@ -439,7 +447,8 @@ class MasterSuite extends SparkFunSuite
// construct url directly targeting the master
val url = s"$masterUrl/proxy/$workerId/"
System.setProperty("spark.ui.proxyBase", workerUrl)
val workerHtml = Source.fromURL(url).getLines().mkString("\n")
val workerHtml = Utils
.tryWithResource(Source.fromURL(url))(_.getLines().mkString("\n"))
workerHtml should include ("Spark Worker at")
workerHtml should include ("Running Executors (0)")
verifyStaticResourcesServedByProxy(workerHtml, workerUrl)
Expand Down
13 changes: 7 additions & 6 deletions core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import org.apache.spark.internal.config.Status._
import org.apache.spark.internal.config.UI._
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.status.api.v1.{JacksonMessageWriter, RDDDataDistribution, StageStatus}
import org.apache.spark.util.Utils

private[spark] class SparkUICssErrorHandler extends DefaultCssErrorHandler {

Expand Down Expand Up @@ -709,8 +710,8 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
rdd.count()

eventually(timeout(5.seconds), interval(100.milliseconds)) {
val stage0 = Source.fromURL(sc.ui.get.webUrl +
"/stages/stage/?id=0&attempt=0&expandDagViz=true").mkString
val stage0 = Utils.tryWithResource(Source.fromURL(sc.ui.get.webUrl +
"/stages/stage/?id=0&attempt=0&expandDagViz=true"))(_.mkString)
assert(stage0.contains("digraph G {\n subgraph clusterstage_0 {\n " +
"label="Stage 0";\n subgraph "))
assert(stage0.contains("{\n label="parallelize";\n " +
Expand All @@ -720,8 +721,8 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
assert(stage0.contains("{\n label="groupBy";\n " +
"2 [labelType="html" label="MapPartitionsRDD [2]"))

val stage1 = Source.fromURL(sc.ui.get.webUrl +
"/stages/stage/?id=1&attempt=0&expandDagViz=true").mkString
val stage1 = Utils.tryWithResource(Source.fromURL(sc.ui.get.webUrl +
"/stages/stage/?id=1&attempt=0&expandDagViz=true"))(_.mkString)
assert(stage1.contains("digraph G {\n subgraph clusterstage_1 {\n " +
"label="Stage 1";\n subgraph "))
assert(stage1.contains("{\n label="groupBy";\n " +
Expand All @@ -731,8 +732,8 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
assert(stage1.contains("{\n label="groupBy";\n " +
"5 [labelType="html" label="MapPartitionsRDD [5]"))

val stage2 = Source.fromURL(sc.ui.get.webUrl +
"/stages/stage/?id=2&attempt=0&expandDagViz=true").mkString
val stage2 = Utils.tryWithResource(Source.fromURL(sc.ui.get.webUrl +
"/stages/stage/?id=2&attempt=0&expandDagViz=true"))(_.mkString)
assert(stage2.contains("digraph G {\n subgraph clusterstage_2 {\n " +
"label="Stage 2";\n subgraph "))
assert(stage2.contains("{\n label="groupBy";\n " +
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/org/apache/spark/ui/UISuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class UISuite extends SparkFunSuite {
withSpark(newSparkContext()) { sc =>
// test if the ui is visible, and all the expected tabs are visible
eventually(timeout(10.seconds), interval(50.milliseconds)) {
val html = Source.fromURL(sc.ui.get.webUrl).mkString
val html = Utils.tryWithResource(Source.fromURL(sc.ui.get.webUrl))(_.mkString)
assert(!html.contains("random data that should not be present"))
assert(html.toLowerCase(Locale.ROOT).contains("stages"))
assert(html.toLowerCase(Locale.ROOT).contains("storage"))
Expand All @@ -90,7 +90,7 @@ class UISuite extends SparkFunSuite {
withSpark(newSparkContext()) { sc =>
// test if visible from http://localhost:4040
eventually(timeout(10.seconds), interval(50.milliseconds)) {
val html = Source.fromURL("http://localhost:4040").mkString
val html = Utils.tryWithResource(Source.fromURL("http://localhost:4040"))(_.mkString)
assert(html.toLowerCase(Locale.ROOT).contains("stages"))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ class MutableURLClassLoaderSuite extends SparkFunSuite with Matchers {
val res1 = classLoader.getResources("resource1").asScala.toList
assert(res1.size === 2)
assert(classLoader.getResources("resource2").asScala.size === 1)

res1.map(scala.io.Source.fromURL(_).mkString) should contain inOrderOnly
("resource1Contents-child", "resource1Contents-parent")
res1.map { res =>
Utils.tryWithResource(scala.io.Source.fromURL(res))(_.mkString)
} should contain inOrderOnly("resource1Contents-child", "resource1Contents-parent")
classLoader.close()
parentLoader.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.SparkSession
import org.apache.spark.util.Utils

/**
* Simple test for reading and writing to a distributed
Expand All @@ -46,9 +47,7 @@ object DFSReadWriteTest {
private val NPARAMS = 2

private def readFile(filename: String): List[String] = {
val lineIter: Iterator[String] = fromFile(filename).getLines()
val lineList: List[String] = lineIter.toList
lineList
Utils.tryWithResource(fromFile(filename))(_.getLines().toList)
}

private def printUsage(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.io.File
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.streaming.OffsetSuite
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.util.Utils

class KafkaSourceOffsetSuite extends OffsetSuite with SharedSparkSession {

Expand Down Expand Up @@ -99,7 +100,6 @@ class KafkaSourceOffsetSuite extends OffsetSuite with SharedSparkSession {
private def readFromResource(file: String): SerializedOffset = {
import scala.io.Source
val input = getClass.getResource(s"/$file").toURI
val str = Source.fromFile(input).mkString
SerializedOffset(str)
SerializedOffset(Utils.tryWithResource(Source.fromFile(input))(_.mkString))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ class KafkaTestUtils(
* In this method we rewrite krb5.conf to make kdc and client use the same enctypes
*/
private def rewriteKrb5Conf(): Unit = {
val krb5Conf = Source.fromFile(kdc.getKrb5conf, "UTF-8").getLines()
val krb5Conf = Utils
.tryWithResource(Source.fromFile(kdc.getKrb5conf, "UTF-8"))(_.getLines().toList)
var rewritten = false
val addedConfig =
addedKrb5Config("default_tkt_enctypes", "aes128-cts-hmac-sha1-96") +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ private object YarnClusterDriver extends Logging with Matchers {
executorInfos.foreach { info =>
assert(info.logUrlMap.nonEmpty)
info.logUrlMap.values.foreach { url =>
val log = Source.fromURL(url).mkString
val log = Utils.tryWithResource(Source.fromURL(url))(_.mkString)
assert(
!log.contains(SECRET_PASSWORD),
s"Executor logs contain sensitive info (${SECRET_PASSWORD}): \n${log} "
Expand All @@ -499,7 +499,7 @@ private object YarnClusterDriver extends Logging with Matchers {
assert(driverLogs.contains("stdout"))
val urlStr = driverLogs("stderr")
driverLogs.foreach { kv =>
val log = Source.fromURL(kv._2).mkString
val log = Utils.tryWithResource(Source.fromURL(kv._2))(_.mkString)
assert(
!log.contains(SECRET_PASSWORD),
s"Driver logs contain sensitive info (${SECRET_PASSWORD}): \n${log} "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.util.Utils

case class QueryExecutionTestRecord(
c0: Int, c1: Int, c2: Int, c3: Int, c4: Int,
Expand All @@ -36,8 +37,9 @@ case class QueryExecutionTestRecord(
class QueryExecutionSuite extends SharedSparkSession {
import testImplicits._

def checkDumpedPlans(path: String, expected: Int): Unit = {
assert(Source.fromFile(path).getLines.toList
def checkDumpedPlans(path: String, expected: Int): Unit = Utils.tryWithResource(
Source.fromFile(path)) { source =>
assert(source.getLines.toList
.takeWhile(_ != "== Whole Stage Codegen ==") == List(
"== Parsed Logical Plan ==",
s"Range (0, $expected, step=1, splits=Some(2))",
Expand Down Expand Up @@ -99,7 +101,8 @@ class QueryExecutionSuite extends SharedSparkSession {
val path = dir.getCanonicalPath + "/plans.txt"
val df = spark.range(0, 10)
df.queryExecution.debug.toFile(path, explainMode = Option("formatted"))
assert(Source.fromFile(path).getLines.toList
val lines = Utils.tryWithResource(Source.fromFile(path))(_.getLines().toList)
assert(lines
.takeWhile(_ != "== Whole Stage Codegen ==").map(_.replaceAll("#\\d+", "#x")) == List(
"== Physical Plan ==",
s"* Range (1)",
Expand Down Expand Up @@ -135,9 +138,10 @@ class QueryExecutionSuite extends SharedSparkSession {
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26)))
ds.queryExecution.debug.toFile(path)
val localRelations = Source.fromFile(path).getLines().filter(_.contains("LocalRelation"))

assert(!localRelations.exists(_.contains("more fields")))
Utils.tryWithResource(Source.fromFile(path)) { source =>
val localRelations = source.getLines().filter(_.contains("LocalRelation"))
assert(!localRelations.exists(_.contains("more fields")))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1532,8 +1532,10 @@ class FileStreamSourceSuite extends FileStreamSourceTest {

private def readOffsetFromResource(file: String): SerializedOffset = {
import scala.io.Source
val str = Source.fromFile(getClass.getResource(s"/structured-streaming/$file").toURI).mkString
SerializedOffset(str.trim)
Utils.tryWithResource(
Source.fromFile(getClass.getResource(s"/structured-streaming/$file").toURI)) { source =>
SerializedOffset(source.mkString.trim)
}
}

private def runTwoBatchesAndVerifyResults(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,9 @@ object PROCESS_TABLES extends QueryTest with SQLTestUtils {
// Tests the latest version of every release line.
val testingVersions: Seq[String] = {
import scala.io.Source
val versions: Seq[String] = try {
Source.fromURL(s"${releaseMirror}/spark").mkString
val versions: Seq[String] = try Utils.tryWithResource(
Source.fromURL(s"$releaseMirror/spark")) { source =>
source.mkString
.split("\n")
.filter(_.contains("""<a href="spark-"""))
.filterNot(_.contains("preview"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.io.Source
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.util.IntParam
import org.apache.spark.util.{IntParam, Utils}

/**
* A helper program that sends blocks of Kryo-serialized text strings out on a socket at a
Expand All @@ -45,7 +45,7 @@ object RawTextSender extends Logging {
val Array(IntParam(port), file, IntParam(blockSize), IntParam(bytesPerSec)) = args

// Repeat the input data multiple times to fill in a buffer
val lines = Source.fromFile(file).getLines().toArray
val lines = Utils.tryWithResource(Source.fromFile(file))(_.getLines().toArray)
val bufferStream = new ByteArrayOutputStream(blockSize + 1000)
val ser = new KryoSerializer(new SparkConf()).newInstance()
val serStream = ser.serializeStream(bufferStream)
Expand Down