From 592cdb3d711295465dd1822bd80b580a33f29c28 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 11 Apr 2014 13:12:34 -0700 Subject: [PATCH] Honor default fs name when initializing event logger. This is related to SPARK-1459 / PR #375. Without this fix, FileLogger.createLogDir() may try to create the log dir on HDFS, while createWriter() will try to open the log file on the local file system, leading to interesting errors and confusion. --- .../scala/org/apache/spark/SparkContext.scala | 48 +++++++++---------- .../scheduler/EventLoggingListener.scala | 9 +++- .../org/apache/spark/util/FileLogger.scala | 17 ++++--- 3 files changed, 41 insertions(+), 33 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 25ca650a3a37e..c14dce8273bc1 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -216,10 +216,33 @@ class SparkContext(config: SparkConf) extends Logging { private[spark] val ui = new SparkUI(this) ui.bind() + /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */ + val hadoopConfiguration: Configuration = { + val env = SparkEnv.get + val hadoopConf = SparkHadoopUtil.get.newConfiguration() + // Explicitly check for S3 environment variables + if (System.getenv("AWS_ACCESS_KEY_ID") != null && + System.getenv("AWS_SECRET_ACCESS_KEY") != null) { + hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID")) + hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID")) + hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY")) + hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY")) + } + // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar" + conf.getAll.foreach { case (key, value) => + if (key.startsWith("spark.hadoop.")) { + hadoopConf.set(key.substring("spark.hadoop.".length), value) + } + } + val bufferSize = conf.get("spark.buffer.size", "65536") + hadoopConf.set("io.file.buffer.size", bufferSize) + hadoopConf + } + // Optionally log Spark events private[spark] val eventLogger: Option[EventLoggingListener] = { if (conf.getBoolean("spark.eventLog.enabled", false)) { - val logger = new EventLoggingListener(appName, conf) + val logger = new EventLoggingListener(appName, conf, hadoopConfiguration) logger.start() listenerBus.addListener(logger) Some(logger) @@ -294,29 +317,6 @@ class SparkContext(config: SparkConf) extends Logging { postEnvironmentUpdate() postApplicationStart() - /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */ - val hadoopConfiguration: Configuration = { - val env = SparkEnv.get - val hadoopConf = SparkHadoopUtil.get.newConfiguration() - // Explicitly check for S3 environment variables - if (System.getenv("AWS_ACCESS_KEY_ID") != null && - System.getenv("AWS_SECRET_ACCESS_KEY") != null) { - hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID")) - hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID")) - hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY")) - hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY")) - } - // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar" - conf.getAll.foreach { case (key, value) => - if (key.startsWith("spark.hadoop.")) { - hadoopConf.set(key.substring("spark.hadoop.".length), value) - } - } - val bufferSize = conf.get("spark.buffer.size", "65536") - hadoopConf.set("io.file.buffer.size", bufferSize) - hadoopConf - } - private[spark] var checkpointDir: Option[String] = None // Thread Local variable that can be used by users to pass information down the stack diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index b983c16af14f4..2fe65cd944b67 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -19,6 +19,7 @@ package org.apache.spark.scheduler import scala.collection.mutable +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.json4s.jackson.JsonMethods._ @@ -36,7 +37,10 @@ import org.apache.spark.util.{FileLogger, JsonProtocol} * spark.eventLog.dir - Path to the directory in which events are logged. * spark.eventLog.buffer.kb - Buffer size to use when writing to output streams */ -private[spark] class EventLoggingListener(appName: String, conf: SparkConf) +private[spark] class EventLoggingListener( + appName: String, + conf: SparkConf, + hadoopConfiguration: Configuration) extends SparkListener with Logging { import EventLoggingListener._ @@ -49,7 +53,8 @@ private[spark] class EventLoggingListener(appName: String, conf: SparkConf) val logDir = logBaseDir + "/" + name private val logger = - new FileLogger(logDir, conf, outputBufferSize, shouldCompress, shouldOverwrite) + new FileLogger(logDir, conf, hadoopConfiguration, outputBufferSize, shouldCompress, + shouldOverwrite) /** * Begin logging events. diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala index 7d58d1c765180..7d47b2a72aff7 100644 --- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala +++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala @@ -22,7 +22,8 @@ import java.net.URI import java.text.SimpleDateFormat import java.util.Date -import org.apache.hadoop.fs.{FSDataOutputStream, Path} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} import org.apache.spark.{Logging, SparkConf} import org.apache.spark.io.CompressionCodec @@ -37,7 +38,8 @@ import org.apache.spark.io.CompressionCodec */ private[spark] class FileLogger( logDir: String, - conf: SparkConf = new SparkConf, + conf: SparkConf, + hadoopConfiguration: Configuration, outputBufferSize: Int = 8 * 1024, // 8 KB compress: Boolean = false, overwrite: Boolean = true) @@ -85,19 +87,20 @@ private[spark] class FileLogger( private def createWriter(fileName: String): PrintWriter = { val logPath = logDir + "/" + fileName val uri = new URI(logPath) + val defaultFs = FileSystem.getDefaultUri(hadoopConfiguration).getScheme + val isDefaultLocal = (defaultFs == null || defaultFs == "file") /* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844). * Therefore, for local files, use FileOutputStream instead. */ - val dstream = uri.getScheme match { - case "file" | null => + val dstream = + if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") { // Second parameter is whether to append new FileOutputStream(uri.getPath, !overwrite) - - case _ => + } else { val path = new Path(logPath) hadoopDataStream = Some(fileSystem.create(path, overwrite)) hadoopDataStream.get - } + } val bstream = new BufferedOutputStream(dstream, outputBufferSize) val cstream = if (compress) compressionCodec.compressedOutputStream(bstream) else bstream