Navigation Menu

Skip to content

Commit

Permalink
Adding unit test and handle case when scheme specified in eventlog.dir
Browse files Browse the repository at this point in the history
  • Loading branch information
yudovin committed Nov 30, 2019
1 parent f4c5917 commit 622f90d
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 2 deletions.
5 changes: 4 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Expand Up @@ -507,7 +507,10 @@ class SparkContext(config: SparkConf) extends Logging {
val defaultFS = if (defaultFSProperty == null) "" else defaultFSProperty

val unresolvedDir = s"$defaultFS${conf.get(EVENT_LOG_DIR).stripSuffix("/")}"
Some(Utils.resolveURI(unresolvedDir))

val fs = new Path(unresolvedDir).getFileSystem(_hadoopConfiguration)
val qualifiedPath = fs.makeQualified(new Path(unresolvedDir))
Some(qualifiedPath.toUri)
} else {
None
}
Expand Down
36 changes: 35 additions & 1 deletion core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Expand Up @@ -26,14 +26,15 @@ import scala.concurrent.duration._

import com.google.common.io.Files
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{Path, RawLocalFileSystem}
import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat}
import org.json4s.{DefaultFormats, Extraction}
import org.scalatest.Matchers._
import org.scalatest.concurrent.Eventually

import org.apache.spark.FakeFileSystem._
import org.apache.spark.TestUtils._
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.UI._
Expand Down Expand Up @@ -882,6 +883,39 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
}
}
}

test("Resolve scheme-less event log directory relative to default filesystem") {
withTempDir { temp =>
val path = temp.getAbsolutePath
val conf = new SparkConf()
.setAppName("test")
.setMaster("local")
.set(s"spark.hadoop.fs.$scheme.impl", classOf[FakeFileSystem].getName)
.set("spark.hadoop.fs.defaultFS", s"$scheme:///")
.set("spark.eventLog.enabled", "true")
.set("spark.eventLog.dir", path)
sc = new SparkContext(conf)

assert(sc.eventLogDir.isDefined)
assert(sc.eventLogDir.get.getPath == path)
assert(sc.eventLogDir.get.getScheme == s"$scheme")
}

}
}

/**
* Fake FileSystem to test whether the method `fs.exists` is called during
* `DataSource.resolveRelation`.
*/
class FakeFileSystem extends RawLocalFileSystem {
override def getUri: URI = {
URI.create(s"$scheme:///")
}
}

object FakeFileSystem {
val scheme = "fake"
}

object SparkContextSuite {
Expand Down

0 comments on commit 622f90d

Please sign in to comment.