From 5ef06d96ce43e14ec2e0c767c758cea6fb47d461 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 11 Oct 2016 09:25:13 -0700 Subject: [PATCH 1/5] save --- .../streaming/CompactibleFileStreamLog.scala | 25 ++++++++++++------- .../streaming/FileStreamSinkLog.scala | 6 +++-- .../execution/streaming/HDFSMetadataLog.scala | 20 ++++++++------- 3 files changed, 31 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index 027b5bbfab8d6..cafc60888d037 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -17,11 +17,13 @@ package org.apache.spark.sql.execution.streaming -import java.io.IOException +import java.io.{InputStream, IOException, OutputStream} import java.nio.charset.StandardCharsets.UTF_8 +import scala.collection.JavaConverters._ import scala.reflect.ClassTag +import org.apache.commons.io.IOUtils import org.apache.hadoop.fs.{Path, PathFilter} import org.apache.spark.sql.SparkSession @@ -60,7 +62,7 @@ abstract class CompactibleFileStreamLog[T: ClassTag]( /** * Serialize the data into encoded string. */ - protected def serializeData(t: T): String + protected def serializeData(t: T, out: OutputStream): Unit /** * Deserialize the string into data object. @@ -93,20 +95,25 @@ abstract class CompactibleFileStreamLog[T: ClassTag]( } } - override def serialize(logData: Array[T]): Array[Byte] = { - (metadataLogVersion +: logData.map(serializeData)).mkString("\n").getBytes(UTF_8) + override def serialize(logData: Array[T], out: OutputStream): Unit = { + // called inside a try-finally where the underlying stream is closed in the caller + out.write((metadataLogVersion + "\n").getBytes(UTF_8)) + logData.foreach { data => + serializeData(data, out) + out.write('\n') + } } - override def deserialize(bytes: Array[Byte]): Array[T] = { - val lines = new String(bytes, UTF_8).split("\n") - if (lines.length == 0) { + override def deserialize(in: InputStream): Array[T] = { + val lines = IOUtils.lineIterator(in, "UTF-8").asScala + if (!lines.hasNext) { throw new IllegalStateException("Incomplete log file") } - val version = lines(0) + val version = lines.next() if (version != metadataLogVersion) { throw new IllegalStateException(s"Unknown log version: ${version}") } - lines.slice(1, lines.length).map(deserializeData) + lines.map(deserializeData).toArray } override def add(batchId: Long, logs: Array[T]): Boolean = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala index f9e24167a17ec..c161cbf78dca6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.streaming +import java.io.OutputStream + import org.apache.hadoop.fs.{FileStatus, Path} import org.json4s.NoTypeHints import org.json4s.jackson.Serialization @@ -93,8 +95,8 @@ class FileStreamSinkLog( s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $compactInterval) " + "to a positive value.") - protected override def serializeData(data: SinkFileStatus): String = { - write(data) + protected override def serializeData(data: SinkFileStatus, out: OutputStream): Unit = { + write(data, out) } protected override def deserializeData(encodedString: String): SinkFileStatus = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index 39a0f3341389c..6d32266ea00dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.streaming -import java.io.{FileNotFoundException, IOException} +import java.io.{FileNotFoundException, IOException, InputStream, OutputStream} import java.nio.ByteBuffer import java.util.{ConcurrentModificationException, EnumSet, UUID} @@ -29,7 +29,6 @@ import org.apache.hadoop.fs._ import org.apache.hadoop.fs.permission.FsPermission import org.apache.spark.internal.Logging -import org.apache.spark.network.util.JavaUtils import org.apache.spark.serializer.JavaSerializer import org.apache.spark.sql.SparkSession import org.apache.spark.util.UninterruptibleThread @@ -88,12 +87,15 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String) } } - protected def serialize(metadata: T): Array[Byte] = { - JavaUtils.bufferToArray(serializer.serialize(metadata)) + protected def serialize(metadata: T, out: OutputStream): Unit = { + // called inside a try-finally where the underlying stream is closed in the caller + val outStream = serializer.serializeStream(out) + outStream.writeObject(metadata) } - protected def deserialize(bytes: Array[Byte]): T = { - serializer.deserialize[T](ByteBuffer.wrap(bytes)) + protected def deserialize(in: InputStream): T = { + val inStream = serializer.deserializeStream(in) + inStream.readObject[T]() } /** @@ -114,7 +116,7 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String) // Only write metadata when the batch has not yet been written Thread.currentThread match { case ut: UninterruptibleThread => - ut.runUninterruptibly { writeBatch(batchId, serialize(metadata)) } + ut.runUninterruptibly { writeBatch(batchId, metadata, serialize) } case _ => throw new IllegalStateException( "HDFSMetadataLog.add() must be executed on a o.a.spark.util.UninterruptibleThread") @@ -129,7 +131,7 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String) * There may be multiple [[HDFSMetadataLog]] using the same metadata path. Although it is not a * valid behavior, we still need to prevent it from destroying the files. */ - private def writeBatch(batchId: Long, bytes: Array[Byte]): Unit = { + private def writeBatch(batchId: Long, metadata: T, writer: (T, OutputStream) => Unit): Unit = { // Use nextId to create a temp file var nextId = 0 while (true) { @@ -137,7 +139,7 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String) try { val output = fileManager.create(tempPath) try { - output.write(bytes) + writer(metadata, output) } finally { output.close() } From 988df08e416a0577ff343180a8858593ba2017eb Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 11 Oct 2016 11:21:29 -0700 Subject: [PATCH 2/5] ready for review --- .../streaming/CompactibleFileStreamLog.scala | 7 ++++--- .../execution/streaming/FileStreamSinkLog.scala | 4 ++-- .../execution/streaming/FileStreamSourceLog.scala | 1 + .../sql/execution/streaming/HDFSMetadataLog.scala | 13 ++++++------- .../streaming/FileStreamSinkLogSuite.scala | 14 +++++++++----- 5 files changed, 22 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index cafc60888d037..fd636c6327202 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -62,7 +62,7 @@ abstract class CompactibleFileStreamLog[T: ClassTag]( /** * Serialize the data into encoded string. */ - protected def serializeData(t: T, out: OutputStream): Unit + protected def serializeData(t: T): String /** * Deserialize the string into data object. @@ -97,11 +97,12 @@ abstract class CompactibleFileStreamLog[T: ClassTag]( override def serialize(logData: Array[T], out: OutputStream): Unit = { // called inside a try-finally where the underlying stream is closed in the caller - out.write((metadataLogVersion + "\n").getBytes(UTF_8)) + out.write(metadataLogVersion.getBytes(UTF_8)) logData.foreach { data => - serializeData(data, out) out.write('\n') + out.write(serializeData(data).getBytes(UTF_8)) } + out.flush() } override def deserialize(in: InputStream): Array[T] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala index c161cbf78dca6..38aee031821a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala @@ -95,8 +95,8 @@ class FileStreamSinkLog( s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $compactInterval) " + "to a positive value.") - protected override def serializeData(data: SinkFileStatus, out: OutputStream): Unit = { - write(data, out) + protected override def serializeData(data: SinkFileStatus): String = { + write(data) } protected override def deserializeData(encodedString: String): SinkFileStatus = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala index 4681f2ba08c84..d94c415ab2bf8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.streaming +import java.io.OutputStream import java.util.{LinkedHashMap => JLinkedHashMap} import java.util.Map.Entry diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index 6d32266ea00dc..1eb5bb57d6d02 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -17,16 +17,15 @@ package org.apache.spark.sql.execution.streaming -import java.io.{FileNotFoundException, IOException, InputStream, OutputStream} -import java.nio.ByteBuffer +import java.io.{FileNotFoundException, InputStream, IOException, OutputStream} import java.util.{ConcurrentModificationException, EnumSet, UUID} import scala.reflect.ClassTag -import org.apache.commons.io.IOUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.hadoop.fs.permission.FsPermission +import org.apache.hadoop.io.IOUtils import org.apache.spark.internal.Logging import org.apache.spark.serializer.JavaSerializer @@ -94,6 +93,7 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String) } protected def deserialize(in: InputStream): T = { + // called inside a try-finally where the underlying stream is closed in the caller val inStream = serializer.deserializeStream(in) inStream.readObject[T]() } @@ -141,7 +141,7 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String) try { writer(metadata, output) } finally { - output.close() + IOUtils.closeStream(output) } try { // Try to commit the batch @@ -195,10 +195,9 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String) if (fileManager.exists(batchMetadataFile)) { val input = fileManager.open(batchMetadataFile) try { - val bytes = IOUtils.toByteArray(input) - Some(deserialize(bytes)) + Some(deserialize(input)) } finally { - input.close() + IOUtils.closeStream(input) } } else { logDebug(s"Unable to find batch $batchMetadataFile") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala index 41a8cc2400dff..e1bc674a28071 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.streaming +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} import java.nio.charset.StandardCharsets.UTF_8 import org.apache.spark.SparkFunSuite @@ -133,9 +134,12 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { |{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete"} |{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add"}""".stripMargin // scalastyle:on - assert(expected === new String(sinkLog.serialize(logs), UTF_8)) - - assert(VERSION === new String(sinkLog.serialize(Array()), UTF_8)) + val baos = new ByteArrayOutputStream() + sinkLog.serialize(logs, baos) + assert(expected === baos.toString(UTF_8.name())) + baos.reset() + sinkLog.serialize(Array(), baos) + assert(VERSION === baos.toString(UTF_8.name())) } } @@ -174,9 +178,9 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { blockSize = 30000L, action = FileStreamSinkLog.ADD_ACTION)) - assert(expected === sinkLog.deserialize(logs.getBytes(UTF_8))) + assert(expected === sinkLog.deserialize(new ByteArrayInputStream(logs.getBytes(UTF_8)))) - assert(Nil === sinkLog.deserialize(VERSION.getBytes(UTF_8))) + assert(Nil === sinkLog.deserialize(new ByteArrayInputStream(VERSION.getBytes(UTF_8)))) } } From 4d50be565841bdb6d647c75e36168151e1e8a621 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 11 Oct 2016 11:29:04 -0700 Subject: [PATCH 3/5] charset --- .../sql/execution/streaming/CompactibleFileStreamLog.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index fd636c6327202..8e9adce111d55 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -106,7 +106,7 @@ abstract class CompactibleFileStreamLog[T: ClassTag]( } override def deserialize(in: InputStream): Array[T] = { - val lines = IOUtils.lineIterator(in, "UTF-8").asScala + val lines = IOUtils.lineIterator(in, UTF_8).asScala if (!lines.hasNext) { throw new IllegalStateException("Incomplete log file") } From cece6723de82607c12681e97f16ef5a86c2d90d2 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 12 Oct 2016 16:24:09 -0700 Subject: [PATCH 4/5] address nits --- .../sql/execution/streaming/CompactibleFileStreamLog.scala | 6 ++---- .../spark/sql/execution/streaming/FileStreamSinkLog.scala | 2 -- .../spark/sql/execution/streaming/FileStreamSourceLog.scala | 1 - .../spark/sql/execution/streaming/HDFSMetadataLog.scala | 6 +++--- 4 files changed, 5 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index 8e9adce111d55..61f7a656809de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -20,10 +20,9 @@ package org.apache.spark.sql.execution.streaming import java.io.{InputStream, IOException, OutputStream} import java.nio.charset.StandardCharsets.UTF_8 -import scala.collection.JavaConverters._ +import scala.io.Source import scala.reflect.ClassTag -import org.apache.commons.io.IOUtils import org.apache.hadoop.fs.{Path, PathFilter} import org.apache.spark.sql.SparkSession @@ -102,11 +101,10 @@ abstract class CompactibleFileStreamLog[T: ClassTag]( out.write('\n') out.write(serializeData(data).getBytes(UTF_8)) } - out.flush() } override def deserialize(in: InputStream): Array[T] = { - val lines = IOUtils.lineIterator(in, UTF_8).asScala + val lines = Source.fromInputStream(in, UTF_8.name()).getLines() if (!lines.hasNext) { throw new IllegalStateException("Incomplete log file") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala index 38aee031821a1..f9e24167a17ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution.streaming -import java.io.OutputStream - import org.apache.hadoop.fs.{FileStatus, Path} import org.json4s.NoTypeHints import org.json4s.jackson.Serialization diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala index d94c415ab2bf8..4681f2ba08c84 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution.streaming -import java.io.OutputStream import java.util.{LinkedHashMap => JLinkedHashMap} import java.util.Map.Entry diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index 1eb5bb57d6d02..c7235320fd6bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -22,10 +22,10 @@ import java.util.{ConcurrentModificationException, EnumSet, UUID} import scala.reflect.ClassTag +import org.apache.commons.io.IOUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.hadoop.fs.permission.FsPermission -import org.apache.hadoop.io.IOUtils import org.apache.spark.internal.Logging import org.apache.spark.serializer.JavaSerializer @@ -141,7 +141,7 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String) try { writer(metadata, output) } finally { - IOUtils.closeStream(output) + IOUtils.closeQuietly(output) } try { // Try to commit the batch @@ -197,7 +197,7 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String) try { Some(deserialize(input)) } finally { - IOUtils.closeStream(input) + IOUtils.closeQuietly(input) } } else { logDebug(s"Unable to find batch $batchMetadataFile") From 9c4fe723ce7ae09ac0b559e62468f13769baed6b Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 12 Oct 2016 16:34:45 -0700 Subject: [PATCH 5/5] shade source --- .../sql/execution/streaming/CompactibleFileStreamLog.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index 61f7a656809de..c14feea91ed7d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.streaming import java.io.{InputStream, IOException, OutputStream} import java.nio.charset.StandardCharsets.UTF_8 -import scala.io.Source +import scala.io.{Source => IOSource} import scala.reflect.ClassTag import org.apache.hadoop.fs.{Path, PathFilter} @@ -104,7 +104,7 @@ abstract class CompactibleFileStreamLog[T: ClassTag]( } override def deserialize(in: InputStream): Array[T] = { - val lines = Source.fromInputStream(in, UTF_8.name()).getLines() + val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines() if (!lines.hasNext) { throw new IllegalStateException("Incomplete log file") }