From d05f4d1f548e8af805bfe5febdd31d0ffafe2256 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Fri, 3 Apr 2015 14:37:24 +0900 Subject: [PATCH 1/2] Fixed a race condition related to o.a.h.f.FileSystem --- .../spark/scheduler/EventLoggingListener.scala | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index c0d889360ae99..0bc6571dbed45 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -61,7 +61,18 @@ private[spark] class EventLoggingListener( private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false) private val testing = sparkConf.getBoolean("spark.eventLog.testing", false) private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024 - private val fileSystem = Utils.getHadoopFileSystem(new URI(logBaseDir), hadoopConf) + private val fileSystem = { + val logUri = new URI(logBaseDir) + val schema = logUri.getScheme + if (schema == "hdfs") { + val conf = SparkHadoopUtil.get.newConfiguration(sparkConf) + conf.setBoolean("fs.hdfs.impl.disable.cache", true) + Utils.getHadoopFileSystem(logUri, conf) + } else { + Utils.getHadoopFileSystem(logUri, hadoopConf) + } + } + private val compressionCodec = if (shouldCompress) { Some(CompressionCodec.createCodec(sparkConf)) From 2b479c33afcad706f4f23a330ba765801a8be711 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Fri, 10 Apr 2015 14:24:20 +0900 Subject: [PATCH 2/2] Fixed conflict --- .../org/apache/spark/scheduler/EventLoggingListener.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 0030373fd43d9..9476cab91b13f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -63,14 +63,13 @@ private[spark] class EventLoggingListener( private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024 private val fileSystem = { - val logUri = new URI(logBaseDir) - val schema = logUri.getScheme + val schema = logBaseDir.getScheme if (schema == "hdfs") { val conf = SparkHadoopUtil.get.newConfiguration(sparkConf) conf.setBoolean("fs.hdfs.impl.disable.cache", true) - Utils.getHadoopFileSystem(logUri, conf) + Utils.getHadoopFileSystem(logBaseDir, conf) } else { - Utils.getHadoopFileSystem(logUri, hadoopConf) + Utils.getHadoopFileSystem(logBaseDir, hadoopConf) } }