Skip to content

Commit

Permalink
Address compatibility issue on reader side with lower version of Spark
Browse files Browse the repository at this point in the history
  • Loading branch information
HeartSaVioR committed Jun 5, 2020
1 parent 737cc60 commit 6cd9bfd
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 14 deletions.
Expand Up @@ -1432,6 +1432,21 @@ object SQLConf {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(TimeUnit.MINUTES.toMillis(10)) // 10 minutes

val FILE_SINK_LOG_WRITE_METADATA_VERSION =
buildConf("spark.sql.streaming.fileSink.log.writeMetadataVersion")
.doc("The version of file stream sink log metadata. By default the version is set to " +
"the highest version current Spark handles, as higher version tends to be better in " +
"some aspects. You may want to set this to lower value when the outputs should be " +
"readable from lower version of Spark. " +
"Note that it doesn't 'rewrite' the old batch files: to ensure the metadata to be " +
"read by lower version of Spark, the metadata log should be written from the scratch, " +
"or at least one compact batch should be written with configured version. " +
"Available metadata versions: 1 (all versions) 2 (3.1.0+)")
.version("3.1.0")
.intConf
.checkValue(v => Set(1, 2).contains(v), "Valid versions are 1 and 2")
.createOptional

val FILE_SOURCE_LOG_DELETION = buildConf("spark.sql.streaming.fileSource.log.deletion")
.internal()
.doc("Whether to delete the expired log files in file stream source.")
Expand Down Expand Up @@ -2727,6 +2742,8 @@ class SQLConf extends Serializable with Logging {

def fileSinkLogCleanupDelay: Long = getConf(FILE_SINK_LOG_CLEANUP_DELAY)

def fileSinkWriteMetadataLogVersion: Option[Int] = getConf(FILE_SINK_LOG_WRITE_METADATA_VERSION)

def fileSourceLogDeletion: Boolean = getConf(FILE_SOURCE_LOG_DELETION)

def fileSourceLogCompactInterval: Int = getConf(FILE_SOURCE_LOG_COMPACT_INTERVAL)
Expand Down
Expand Up @@ -72,6 +72,19 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](

protected def defaultCompactInterval: Int

/**
* In some case, log files being written from the application A should be able to be read from
* application B, which Spark versions between twos may not be same. To support writing log file
* which is readable from lower version of Spark, this method receives additional metadata log
* version which will be only used for writing.
*
* Note that this class doesn't "rewrite" the old batch files: to ensure the metadata to be read
* by lower version of Spark, the metadata log should be written with proper version from the
* scratch, or at least one compact batch should be written with proper version. (so that reader
* will ignore previous batch logs which may be written with higher version)
*/
protected def writeMetadataLogVersion: Option[Int] = None

protected final lazy val compactInterval: Int = {
// SPARK-18187: "compactInterval" can be set by user via defaultCompactInterval.
// If there are existing log entries, then we should ensure a compatible compactInterval
Expand Down Expand Up @@ -143,8 +156,9 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](

override def serialize(logData: Array[T], out: OutputStream): Unit = {
// called inside a try-finally where the underlying stream is closed in the caller
out.write(("v" + metadataLogVersion).getBytes(UTF_8))
metadataLogVersion match {
val version = writeMetadataLogVersion.getOrElse(metadataLogVersion)
out.write(("v" + version).getBytes(UTF_8))
version match {
case 1 => serializeToV1(out, logData)
case 2 => serializeToV2(out, logData)
case _ =>
Expand Down
Expand Up @@ -133,6 +133,7 @@ class FileStreamSink(
FileStreamSink.checkEscapedMetadataPath(fs, metadataDir, sparkSession.sessionState.conf)
metadataDir
}

private val fileLog =
new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toString)

Expand Down
Expand Up @@ -98,6 +98,10 @@ class FileStreamSinkLog(
s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $defaultCompactInterval) " +
"to a positive value.")

// The validation of version is done in SQLConf.
protected override val writeMetadataLogVersion: Option[Int] =
sparkSession.sessionState.conf.fileSinkWriteMetadataLogVersion

override def compactLogs(logs: Seq[SinkFileStatus]): Seq[SinkFileStatus] = {
val deletedFiles = logs.filter(_.action == FileStreamSinkLog.DELETE_ACTION).map(_.path).toSet
if (deletedFiles.isEmpty) {
Expand Down Expand Up @@ -144,6 +148,7 @@ class FileStreamSinkLog(

object FileStreamSinkLog {
val VERSION = 2
val SUPPORTED_VERSIONS = Seq(1, 2)
val DELETE_ACTION = "delete"
val ADD_ACTION = "add"
}
Expand Up @@ -21,10 +21,7 @@ import java.io._
import java.nio.charset.StandardCharsets._

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.unsafe.types.UTF8String

class CompactibleFileStreamLogSuite extends SharedSparkSession {

Expand Down Expand Up @@ -133,11 +130,36 @@ class CompactibleFileStreamLogSuite extends SharedSparkSession {
})
}

test("write older version of metadata for compatibility") {
withTempDir { dir =>
def newFakeCompactibleFileStreamLog(
readVersion: Int,
writeVersion: Option[Int]): FakeCompactibleFileStreamLog =
new FakeCompactibleFileStreamLog(
readVersion,
writeVersion,
_fileCleanupDelayMs = Long.MaxValue, // this param does not matter here in this test case
_defaultCompactInterval = 3, // this param does not matter here in this test case
_defaultMinBatchesToRetain = 1, // this param does not matter here in this test case
spark,
dir.getCanonicalPath)

// writer understands version 2 but to ensure compatibility it sets the write version to 1
val writer = newFakeCompactibleFileStreamLog(2, Some(1))
// suppose a reader only understand version 1
val reader = newFakeCompactibleFileStreamLog(1, None)
writer.add(0, Array("entry"))
// reader can read the metadata log writer just wrote
assert(Array("entry") === reader.get(0).get)
}
}

test("deserialization log written by future version") {
withTempDir { dir =>
def newFakeCompactibleFileStreamLog(version: Int): FakeCompactibleFileStreamLog =
new FakeCompactibleFileStreamLog(
version,
None,
_fileCleanupDelayMs = Long.MaxValue, // this param does not matter here in this test case
_defaultCompactInterval = 3, // this param does not matter here in this test case
_defaultMinBatchesToRetain = 1, // this param does not matter here in this test case
Expand Down Expand Up @@ -246,6 +268,7 @@ class CompactibleFileStreamLogSuite extends SharedSparkSession {
withTempDir { file =>
val compactibleLog = new FakeCompactibleFileStreamLog(
FakeCompactibleFileStreamLog.VERSION,
None,
fileCleanupDelayMs,
defaultCompactInterval,
defaultMinBatchesToRetain,
Expand All @@ -262,6 +285,7 @@ object FakeCompactibleFileStreamLog {

class FakeCompactibleFileStreamLog(
metadataLogVersion: Int,
_writeMetadataLogVersion: Option[Int],
_fileCleanupDelayMs: Long,
_defaultCompactInterval: Int,
_defaultMinBatchesToRetain: Int,
Expand All @@ -279,6 +303,8 @@ class FakeCompactibleFileStreamLog(

override protected def defaultCompactInterval: Int = _defaultCompactInterval

override protected def writeMetadataLogVersion: Option[Int] = _writeMetadataLogVersion

override protected val minBatchesToRetain: Int = _defaultMinBatchesToRetain

override def compactLogs(logs: Seq[String]): Seq[String] = logs
Expand Down
Expand Up @@ -31,14 +31,31 @@ import org.apache.hadoop.fs.{FSDataInputStream, Path, RawLocalFileSystem}
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.util.Utils

class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {

import CompactibleFileStreamLog._
import FileStreamSinkLog._

test("compactLogs") {
def executeFuncWithMetadataVersion(metadataVersion: Int, func: => Any): Unit = {
withSQLConf(
Seq(SQLConf.FILE_SINK_LOG_WRITE_METADATA_VERSION.key -> metadataVersion.toString): _*) {
func
}
}

// This makes sure tests are passing for all supported versions on write version, where
// the read version is set to the highest supported version. This ensures Spark can read
// older versions of file stream sink metadata log.
def testWithAllMetadataVersions(name: String)(func: => Any): Unit = {
for (version <- FileStreamSinkLog.SUPPORTED_VERSIONS) {
test(s"$name - metadata version $version") {
executeFuncWithMetadataVersion(version, func)
}
}
}

testWithAllMetadataVersions("compactLogs") {
withFileStreamSinkLog { sinkLog =>
val logs = Seq(
newFakeSinkFileStatus("/a/b/x", FileStreamSinkLog.ADD_ACTION),
Expand All @@ -54,7 +71,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
}
}

test("serialize & deserialize") {
testWithAllMetadataVersions("serialize & deserialize") {
withFileStreamSinkLog { sinkLog =>
val logs = Array(
SinkFileStatus(
Expand Down Expand Up @@ -95,7 +112,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
}
}

test("compact") {
testWithAllMetadataVersions("compact") {
withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
withFileStreamSinkLog { sinkLog =>
for (batchId <- 0 to 10) {
Expand All @@ -115,7 +132,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
}
}

test("delete expired file") {
testWithAllMetadataVersions("delete expired file") {
// Set FILE_SINK_LOG_CLEANUP_DELAY to 0 so that we can detect the deleting behaviour
// deterministically and one min batches to retain
withSQLConf(
Expand Down Expand Up @@ -191,7 +208,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
}
}

test("read Spark 2.1.0 log format") {
testWithAllMetadataVersions("read Spark 2.1.0 log format") {
assert(readFromResource("file-sink-log-version-2.1.0") === Seq(
// SinkFileStatus("/a/b/0", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION), -> deleted
SinkFileStatus("/a/b/1", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION),
Expand All @@ -206,7 +223,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
))
}

test("getLatestBatchId") {
testWithAllMetadataVersions("getLatestBatchId") {
withCountOpenLocalFileSystemAsLocalFileSystem {
val scheme = CountOpenLocalFileSystem.scheme
withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
Expand Down
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.sql.streaming
import java.io.File
import java.net.URI

import scala.collection.mutable
import scala.util.Random

import org.apache.hadoop.fs._
Expand All @@ -39,7 +38,6 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.ExistsThrowsExceptionFileSystem._
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.{StructType, _}
import org.apache.spark.util.Utils

Expand Down

0 comments on commit 6cd9bfd

Please sign in to comment.