Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2889] Create Hadoop config objects consistently. #1843

Closed
wants to merge 11 commits into from
24 changes: 3 additions & 21 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -224,26 +224,7 @@ class SparkContext(config: SparkConf) extends Logging {
ui.bind()

/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration: Configuration = {
val hadoopConf = SparkHadoopUtil.get.newConfiguration()
// Explicitly check for S3 environment variables
if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
}
// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
conf.getAll.foreach { case (key, value) =>
if (key.startsWith("spark.hadoop.")) {
hadoopConf.set(key.substring("spark.hadoop.".length), value)
}
}
val bufferSize = conf.get("spark.buffer.size", "65536")
hadoopConf.set("io.file.buffer.size", bufferSize)
hadoopConf
}
val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)

// Optionally log Spark events
private[spark] val eventLogger: Option[EventLoggingListener] = {
Expand Down Expand Up @@ -827,7 +808,8 @@ class SparkContext(config: SparkConf) extends Logging {
addedFiles(key) = System.currentTimeMillis

// Fetch the file locally in case a job is executed using DAGScheduler.runLocally().
Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager)
Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager,
hadoopConfiguration)

logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
postEnvironmentUpdate()
Expand Down
27 changes: 23 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation

import org.apache.spark.{Logging, SparkContext, SparkException}
import org.apache.spark.{Logging, SparkContext, SparkConf, SparkException}

import scala.collection.JavaConversions._

/**
* Contains util methods to interact with Hadoop from Spark.
*/
class SparkHadoopUtil extends Logging {
val conf: Configuration = newConfiguration()
val conf: Configuration = newConfiguration(new SparkConf())
UserGroupInformation.setConfiguration(conf)

/**
Expand Down Expand Up @@ -68,7 +68,26 @@ class SparkHadoopUtil extends Logging {
* Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop
* subsystems.
*/
def newConfiguration(): Configuration = new Configuration()
def newConfiguration(conf: SparkConf): Configuration = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is technically a breaking API change, we can't just do it like this. We have to add the old version.

Also, somewhat worryingly, I don't think SparkHadoopUtil was meant to be a public API, so it's weird that it gets used in our examples. We should probably mark it as @DeveloperApi and make sure that the examples don't use it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know the whole "deploy" package is excluded from mima checks (because I added the exclude at @pwendell's request). How is it documented that these packages are "private", if at all? Do we need explicit annotations in that case?

(http://spark.apache.org/docs/1.0.0/api/scala/#package does not list the package, so maybe that's it?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the same as the rest of the codebase -- everything that is "private" should be marked private[spark]. Things that we need to make public for advanced developers are @DeveloperApi. In this case, this thing has been public so we can't remove it, but we could at least mark it to tell people not to depend on it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW in this case you should mark this class and all its methods as @DeveloperApi.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I added the annotation.

val hadoopConf = new Configuration()
// Explicitly check for S3 environment variables
if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
}
// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
conf.getAll.foreach { case (key, value) =>
if (key.startsWith("spark.hadoop.")) {
hadoopConf.set(key.substring("spark.hadoop.".length), value)
}
}
val bufferSize = conf.get("spark.buffer.size", "65536")
hadoopConf.set("io.file.buffer.size", bufferSize)
hadoopConf
}

/**
* Add any user credentials to the job conf which are necessary for running on a secure Hadoop
Expand All @@ -86,7 +105,7 @@ class SparkHadoopUtil extends Logging {

def getSecretKeyFromUserCredentials(key: String): Array[Byte] = { null }

def loginUserFromKeytab(principalName: String, keytabFilename: String) {
def loginUserFromKeytab(principalName: String, keytabFilename: String) {
UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.mutable
import org.apache.hadoop.fs.{FileStatus, Path}

import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.Utils
Expand All @@ -40,7 +41,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
.map { d => Utils.resolveURI(d) }
.getOrElse { throw new IllegalArgumentException("Logging directory must be specified.") }

private val fs = Utils.getHadoopFileSystem(resolvedLogDir)
private val fs = Utils.getHadoopFileSystem(resolvedLogDir,
SparkHadoopUtil.get.newConfiguration(conf))

// A timestamp of when the disk was last accessed to check for log updates
private var lastLogCheckTimeMs = -1L
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import akka.serialization.SerializationExtension

import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState}
import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState,
SparkHadoopUtil}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.deploy.master.DriverState.DriverState
Expand Down Expand Up @@ -673,7 +674,8 @@ private[spark] class Master(
app.desc.appUiUrl = notFoundBasePath
return false
}
val fileSystem = Utils.getHadoopFileSystem(eventLogDir)
val fileSystem = Utils.getHadoopFileSystem(eventLogDir,
SparkHadoopUtil.get.newConfiguration(conf))
val eventLogInfo = EventLoggingListener.parseLoggingInfo(eventLogDir, fileSystem)
val eventLogPaths = eventLogInfo.logPaths
val compressionCodec = eventLogInfo.compressionCodec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import com.google.common.io.Files
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileUtil, Path}

import org.apache.spark.Logging
import org.apache.spark.deploy.{Command, DriverDescription}
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.{Command, DriverDescription, SparkHadoopUtil}
import org.apache.spark.deploy.DeployMessages.DriverStateChanged
import org.apache.spark.deploy.master.DriverState
import org.apache.spark.deploy.master.DriverState.DriverState
Expand All @@ -39,6 +39,7 @@ import org.apache.spark.deploy.master.DriverState.DriverState
* This is currently only used in standalone cluster deploy mode.
*/
private[spark] class DriverRunner(
val conf: SparkConf,
val driverId: String,
val workDir: File,
val sparkHome: File,
Expand Down Expand Up @@ -144,8 +145,8 @@ private[spark] class DriverRunner(

val jarPath = new Path(driverDesc.jarUrl)

val emptyConf = new Configuration()
val jarFileSystem = jarPath.getFileSystem(emptyConf)
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
val jarFileSystem = jarPath.getFileSystem(hadoopConf)

val destPath = new File(driverDir.getAbsolutePath, jarPath.getName)
val jarFileName = jarPath.getName
Expand All @@ -154,7 +155,7 @@ private[spark] class DriverRunner(

if (!localJarFile.exists()) { // May already exist if running multiple workers on one node
logInfo(s"Copying user jar $jarPath to $destPath")
FileUtil.copy(jarFileSystem, jarPath, destPath, false, emptyConf)
FileUtil.copy(jarFileSystem, jarPath, destPath, false, hadoopConf)
}

if (!localJarFile.exists()) { // Verify copy succeeded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ private[spark] class Worker(
val fullId = appId + "/" + execId
if (ExecutorState.isFinished(state)) {
executors.get(fullId) match {
case Some(executor) =>
case Some(executor) =>
logInfo("Executor " + fullId + " finished with state " + state +
message.map(" message " + _).getOrElse("") +
exitStatus.map(" exitStatus " + _).getOrElse(""))
Expand Down Expand Up @@ -288,7 +288,7 @@ private[spark] class Worker(

case LaunchDriver(driverId, driverDesc) => {
logInfo(s"Asked to launch driver $driverId")
val driver = new DriverRunner(driverId, workDir, sparkHome, driverDesc, self, akkaUrl)
val driver = new DriverRunner(conf, driverId, workDir, sparkHome, driverDesc, self, akkaUrl)
drivers(driverId) = driver
driver.start()

Expand Down
14 changes: 9 additions & 5 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}

import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler._
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
Expand Down Expand Up @@ -294,9 +295,9 @@ private[spark] class Executor(
try {
val klass = Class.forName("org.apache.spark.repl.ExecutorClassLoader")
.asInstanceOf[Class[_ <: ClassLoader]]
val constructor = klass.getConstructor(classOf[String], classOf[ClassLoader],
classOf[Boolean])
constructor.newInstance(classUri, parent, userClassPathFirst)
val constructor = klass.getConstructor(classOf[SparkConf], classOf[String],
classOf[ClassLoader], classOf[Boolean])
constructor.newInstance(conf, classUri, parent, userClassPathFirst)
} catch {
case _: ClassNotFoundException =>
logError("Could not find org.apache.spark.repl.ExecutorClassLoader on classpath!")
Expand All @@ -313,16 +314,19 @@ private[spark] class Executor(
* SparkContext. Also adds any new JARs we fetched to the class loader.
*/
private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) {
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
synchronized {
// Fetch missing dependencies
for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) {
logInfo("Fetching " + name + " with timestamp " + timestamp)
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager)
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager,
hadoopConf)
currentFiles(name) = timestamp
}
for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) {
logInfo("Fetching " + name + " with timestamp " + timestamp)
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager)
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager,
hadoopConf)
currentJars(name) = timestamp
// Add it to our class loader
val localName = name.split("/").last
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ private[spark] object CheckpointRDD extends Logging {
val sc = new SparkContext(cluster, "CheckpointRDD Test")
val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000)
val path = new Path(hdfsPath, "temp")
val conf = SparkHadoopUtil.get.newConfiguration()
val conf = SparkHadoopUtil.get.newConfiguration(new SparkConf())
val fs = path.getFileSystem(conf)
val broadcastedConf = sc.broadcast(new SerializableWritable(conf))
sc.runJob(rdd, CheckpointRDD.writeToFile[Int](path.toString, broadcastedConf, 1024) _)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,14 @@ import org.apache.spark.util.{FileLogger, JsonProtocol, Utils}
private[spark] class EventLoggingListener(
appName: String,
sparkConf: SparkConf,
hadoopConf: Configuration = SparkHadoopUtil.get.newConfiguration())
hadoopConf: Configuration)
extends SparkListener with Logging {

import EventLoggingListener._

def this(appName: String, sparkConf: SparkConf) =
this(appName, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf))

private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false)
private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false)
private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}

import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler.TaskSchedulerImpl

private[spark] class SimrSchedulerBackend(
Expand All @@ -42,7 +43,7 @@ private[spark] class SimrSchedulerBackend(
sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)

val conf = new Configuration()
val conf = SparkHadoopUtil.get.newConfiguration(sc.conf)
val fs = FileSystem.get(conf)

logInfo("Writing to HDFS file: " + driverFilePath)
Expand All @@ -61,7 +62,7 @@ private[spark] class SimrSchedulerBackend(
}

override def stop() {
val conf = new Configuration()
val conf = SparkHadoopUtil.get.newConfiguration(sc.conf)
val fs = FileSystem.get(conf)
fs.delete(new Path(driverFilePath), false)
super.stop()
Expand Down
13 changes: 11 additions & 2 deletions core/src/main/scala/org/apache/spark/util/FileLogger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,22 @@ import org.apache.spark.io.CompressionCodec
private[spark] class FileLogger(
logDir: String,
sparkConf: SparkConf,
hadoopConf: Configuration = SparkHadoopUtil.get.newConfiguration(),
hadoopConf: Configuration,
outputBufferSize: Int = 8 * 1024, // 8 KB
compress: Boolean = false,
overwrite: Boolean = true,
dirPermissions: Option[FsPermission] = None)
extends Logging {

def this(
logDir: String,
sparkConf: SparkConf,
compress: Boolean = false,
overwrite: Boolean = true) = {
this(logDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf), compress = compress,
overwrite = overwrite)
}

private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
}
Expand All @@ -57,7 +66,7 @@ private[spark] class FileLogger(
* create unique FileSystem instance only for FileLogger
*/
private val fileSystem = {
val conf = SparkHadoopUtil.get.newConfiguration()
val conf = SparkHadoopUtil.get.newConfiguration(sparkConf)
val logUri = new URI(logDir)
val scheme = logUri.getScheme
if (scheme == "hdfs") {
Expand Down
20 changes: 11 additions & 9 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import scala.util.control.{ControlThrowable, NonFatal}
import com.google.common.io.Files
import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.commons.lang3.SystemUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.json4s._
import tachyon.client.{TachyonFile,TachyonFS}
Expand Down Expand Up @@ -347,7 +348,8 @@ private[spark] object Utils extends Logging {
* Throws SparkException if the target file already exists and has different contents than
* the requested file.
*/
def fetchFile(url: String, targetDir: File, conf: SparkConf, securityMgr: SecurityManager) {
def fetchFile(url: String, targetDir: File, conf: SparkConf, securityMgr: SecurityManager,
hadoopConf: Configuration) {
val filename = url.split("/").last
val tempDir = getLocalDir(conf)
val tempFile = File.createTempFile("fetchFileTemp", null, new File(tempDir))
Expand Down Expand Up @@ -419,7 +421,7 @@ private[spark] object Utils extends Logging {
}
case _ =>
// Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
val fs = getHadoopFileSystem(uri)
val fs = getHadoopFileSystem(uri, hadoopConf)
val in = fs.open(new Path(uri))
val out = new FileOutputStream(tempFile)
Utils.copyStream(in, out, true)
Expand Down Expand Up @@ -899,8 +901,8 @@ private[spark] object Utils extends Logging {
*/
def getCallSite: CallSite = {
val trace = Thread.currentThread.getStackTrace()
.filterNot { ste:StackTraceElement =>
// When running under some profilers, the current stack trace might contain some bogus
.filterNot { ste:StackTraceElement =>
// When running under some profilers, the current stack trace might contain some bogus
// frames. This is intended to ensure that we don't crash in these situations by
// ignoring any frames that we can't examine.
(ste == null || ste.getMethodName == null || ste.getMethodName.contains("getStackTrace"))
Expand Down Expand Up @@ -1216,15 +1218,15 @@ private[spark] object Utils extends Logging {
/**
* Return a Hadoop FileSystem with the scheme encoded in the given path.
*/
def getHadoopFileSystem(path: URI): FileSystem = {
FileSystem.get(path, SparkHadoopUtil.get.newConfiguration())
def getHadoopFileSystem(path: URI, conf: Configuration): FileSystem = {
FileSystem.get(path, conf)
}

/**
* Return a Hadoop FileSystem with the scheme encoded in the given path.
*/
def getHadoopFileSystem(path: String): FileSystem = {
getHadoopFileSystem(new URI(path))
def getHadoopFileSystem(path: String, conf: Configuration): FileSystem = {
getHadoopFileSystem(new URI(path), conf)
}

/**
Expand Down Expand Up @@ -1301,7 +1303,7 @@ private[spark] object Utils extends Logging {
}
}

/**
/**
* Execute the given block, logging and re-throwing any uncaught exception.
* This is particularly useful for wrapping code that runs in a thread, to ensure
* that exceptions are printed, and to avoid having to catch Throwable.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ class JsonProtocolSuite extends FunSuite {
new SparkConf, ExecutorState.RUNNING)
}
def createDriverRunner(): DriverRunner = {
new DriverRunner("driverId", new File("workDir"), new File("sparkHome"), createDriverDesc(),
null, "akka://worker")
new DriverRunner(new SparkConf(), "driverId", new File("workDir"), new File("sparkHome"),
createDriverDesc(), null, "akka://worker")
}

def assertValidJson(json: JValue) {
Expand Down
Loading