From b54b18d8537f3305d7687d47abfaab1242a9284e Mon Sep 17 00:00:00 2001 From: Oleg Danilov Date: Wed, 1 Feb 2017 16:06:22 +0300 Subject: [PATCH] [SPARK-19531] Send UPDATE_LENGTH for Spark History service --- .../org/apache/spark/scheduler/EventLoggingListener.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 35690b2783ad3..00ab2a393e17f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -20,6 +20,7 @@ package org.apache.spark.scheduler import java.io._ import java.net.URI import java.nio.charset.StandardCharsets +import java.util.EnumSet import java.util.Locale import scala.collection.mutable @@ -28,6 +29,8 @@ import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} import org.apache.hadoop.fs.permission.FsPermission +import org.apache.hadoop.hdfs.DFSOutputStream +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag import org.json4s.JsonAST.JValue import org.json4s.jackson.JsonMethods._ @@ -138,7 +141,10 @@ private[spark] class EventLoggingListener( // scalastyle:on println if (flushLogger) { writer.foreach(_.flush()) - hadoopDataStream.foreach(_.hflush()) + hadoopDataStream.foreach(ds => ds.getWrappedStream match { + case wrapped: DFSOutputStream => wrapped.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)) + case _ => ds.hflush() + }) } if (testing) { loggedEvents += eventJson