diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index cf0268773c399..c39c0ae20cf5d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1432,6 +1432,21 @@ object SQLConf { .timeConf(TimeUnit.MILLISECONDS) .createWithDefault(TimeUnit.MINUTES.toMillis(10)) // 10 minutes + val FILE_SINK_LOG_WRITE_METADATA_VERSION = + buildConf("spark.sql.streaming.fileSink.log.writeMetadataVersion") + .doc("The version of file stream sink log metadata. By default the version is set to " + + "the highest version current Spark handles, as higher version tends to be better in " + + "some aspects. You may want to set this to lower value when the outputs should be " + + "readable from lower version of Spark. " + + "Note that it doesn't 'rewrite' the old batch files: to ensure the metadata to be " + + "read by lower version of Spark, the metadata log should be written from the scratch, " + + "or at least one compact batch should be written with configured version. " + + "Available metadata versions: 1 (all versions) 2 (3.1.0+)") + .version("3.1.0") + .intConf + .checkValue(v => Set(1, 2).contains(v), "Valid versions are 1 and 2") + .createOptional + val FILE_SOURCE_LOG_DELETION = buildConf("spark.sql.streaming.fileSource.log.deletion") .internal() .doc("Whether to delete the expired log files in file stream source.") @@ -2727,6 +2742,8 @@ class SQLConf extends Serializable with Logging { def fileSinkLogCleanupDelay: Long = getConf(FILE_SINK_LOG_CLEANUP_DELAY) + def fileSinkWriteMetadataLogVersion: Option[Int] = getConf(FILE_SINK_LOG_WRITE_METADATA_VERSION) + def fileSourceLogDeletion: Boolean = getConf(FILE_SOURCE_LOG_DELETION) def fileSourceLogCompactInterval: Int = getConf(FILE_SOURCE_LOG_COMPACT_INTERVAL) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index 8ff0be4ff3206..43f7a7c597a75 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -72,6 +72,19 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( protected def defaultCompactInterval: Int + /** + * In some case, log files being written from the application A should be able to be read from + * application B, which Spark versions between twos may not be same. To support writing log file + * which is readable from lower version of Spark, this method receives additional metadata log + * version which will be only used for writing. + * + * Note that this class doesn't "rewrite" the old batch files: to ensure the metadata to be read + * by lower version of Spark, the metadata log should be written with proper version from the + * scratch, or at least one compact batch should be written with proper version. (so that reader + * will ignore previous batch logs which may be written with higher version) + */ + protected def writeMetadataLogVersion: Option[Int] = None + protected final lazy val compactInterval: Int = { // SPARK-18187: "compactInterval" can be set by user via defaultCompactInterval. // If there are existing log entries, then we should ensure a compatible compactInterval @@ -143,8 +156,9 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( override def serialize(logData: Array[T], out: OutputStream): Unit = { // called inside a try-finally where the underlying stream is closed in the caller - out.write(("v" + metadataLogVersion).getBytes(UTF_8)) - metadataLogVersion match { + val version = writeMetadataLogVersion.getOrElse(metadataLogVersion) + out.write(("v" + version).getBytes(UTF_8)) + version match { case 1 => serializeToV1(out, logData) case 2 => serializeToV2(out, logData) case _ => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index 32245470d8f5d..3332cdfec858b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -133,6 +133,7 @@ class FileStreamSink( FileStreamSink.checkEscapedMetadataPath(fs, metadataDir, sparkSession.sessionState.conf) metadataDir } + private val fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toString) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala index 4ff37c2984e1d..ceaab47589d23 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala @@ -98,6 +98,10 @@ class FileStreamSinkLog( s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $defaultCompactInterval) " + "to a positive value.") + // The validation of version is done in SQLConf. + protected override val writeMetadataLogVersion: Option[Int] = + sparkSession.sessionState.conf.fileSinkWriteMetadataLogVersion + override def compactLogs(logs: Seq[SinkFileStatus]): Seq[SinkFileStatus] = { val deletedFiles = logs.filter(_.action == FileStreamSinkLog.DELETE_ACTION).map(_.path).toSet if (deletedFiles.isEmpty) { @@ -144,6 +148,7 @@ class FileStreamSinkLog( object FileStreamSinkLog { val VERSION = 2 + val SUPPORTED_VERSIONS = Seq(1, 2) val DELETE_ACTION = "delete" val ADD_ACTION = "add" } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala index 28ee212c75af6..79235e6893a35 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala @@ -21,10 +21,7 @@ import java.io._ import java.nio.charset.StandardCharsets._ import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types.{StringType, StructField, StructType} -import org.apache.spark.unsafe.types.UTF8String class CompactibleFileStreamLogSuite extends SharedSparkSession { @@ -133,11 +130,36 @@ class CompactibleFileStreamLogSuite extends SharedSparkSession { }) } + test("write older version of metadata for compatibility") { + withTempDir { dir => + def newFakeCompactibleFileStreamLog( + readVersion: Int, + writeVersion: Option[Int]): FakeCompactibleFileStreamLog = + new FakeCompactibleFileStreamLog( + readVersion, + writeVersion, + _fileCleanupDelayMs = Long.MaxValue, // this param does not matter here in this test case + _defaultCompactInterval = 3, // this param does not matter here in this test case + _defaultMinBatchesToRetain = 1, // this param does not matter here in this test case + spark, + dir.getCanonicalPath) + + // writer understands version 2 but to ensure compatibility it sets the write version to 1 + val writer = newFakeCompactibleFileStreamLog(2, Some(1)) + // suppose a reader only understand version 1 + val reader = newFakeCompactibleFileStreamLog(1, None) + writer.add(0, Array("entry")) + // reader can read the metadata log writer just wrote + assert(Array("entry") === reader.get(0).get) + } + } + test("deserialization log written by future version") { withTempDir { dir => def newFakeCompactibleFileStreamLog(version: Int): FakeCompactibleFileStreamLog = new FakeCompactibleFileStreamLog( version, + None, _fileCleanupDelayMs = Long.MaxValue, // this param does not matter here in this test case _defaultCompactInterval = 3, // this param does not matter here in this test case _defaultMinBatchesToRetain = 1, // this param does not matter here in this test case @@ -246,6 +268,7 @@ class CompactibleFileStreamLogSuite extends SharedSparkSession { withTempDir { file => val compactibleLog = new FakeCompactibleFileStreamLog( FakeCompactibleFileStreamLog.VERSION, + None, fileCleanupDelayMs, defaultCompactInterval, defaultMinBatchesToRetain, @@ -262,6 +285,7 @@ object FakeCompactibleFileStreamLog { class FakeCompactibleFileStreamLog( metadataLogVersion: Int, + _writeMetadataLogVersion: Option[Int], _fileCleanupDelayMs: Long, _defaultCompactInterval: Int, _defaultMinBatchesToRetain: Int, @@ -279,6 +303,8 @@ class FakeCompactibleFileStreamLog( override protected def defaultCompactInterval: Int = _defaultCompactInterval + override protected def writeMetadataLogVersion: Option[Int] = _writeMetadataLogVersion + override protected val minBatchesToRetain: Int = _defaultMinBatchesToRetain override def compactLogs(logs: Seq[String]): Seq[String] = logs diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala index 28dd0bad570ef..3a6ef124f46a0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala @@ -31,14 +31,31 @@ import org.apache.hadoop.fs.{FSDataInputStream, Path, RawLocalFileSystem} import org.apache.spark.SparkFunSuite import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.util.Utils class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { import CompactibleFileStreamLog._ import FileStreamSinkLog._ - test("compactLogs") { + def executeFuncWithMetadataVersion(metadataVersion: Int, func: => Any): Unit = { + withSQLConf( + Seq(SQLConf.FILE_SINK_LOG_WRITE_METADATA_VERSION.key -> metadataVersion.toString): _*) { + func + } + } + + // This makes sure tests are passing for all supported versions on write version, where + // the read version is set to the highest supported version. This ensures Spark can read + // older versions of file stream sink metadata log. + def testWithAllMetadataVersions(name: String)(func: => Any): Unit = { + for (version <- FileStreamSinkLog.SUPPORTED_VERSIONS) { + test(s"$name - metadata version $version") { + executeFuncWithMetadataVersion(version, func) + } + } + } + + testWithAllMetadataVersions("compactLogs") { withFileStreamSinkLog { sinkLog => val logs = Seq( newFakeSinkFileStatus("/a/b/x", FileStreamSinkLog.ADD_ACTION), @@ -54,7 +71,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { } } - test("serialize & deserialize") { + testWithAllMetadataVersions("serialize & deserialize") { withFileStreamSinkLog { sinkLog => val logs = Array( SinkFileStatus( @@ -95,7 +112,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { } } - test("compact") { + testWithAllMetadataVersions("compact") { withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") { withFileStreamSinkLog { sinkLog => for (batchId <- 0 to 10) { @@ -115,7 +132,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { } } - test("delete expired file") { + testWithAllMetadataVersions("delete expired file") { // Set FILE_SINK_LOG_CLEANUP_DELAY to 0 so that we can detect the deleting behaviour // deterministically and one min batches to retain withSQLConf( @@ -191,7 +208,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { } } - test("read Spark 2.1.0 log format") { + testWithAllMetadataVersions("read Spark 2.1.0 log format") { assert(readFromResource("file-sink-log-version-2.1.0") === Seq( // SinkFileStatus("/a/b/0", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION), -> deleted SinkFileStatus("/a/b/1", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION), @@ -206,7 +223,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { )) } - test("getLatestBatchId") { + testWithAllMetadataVersions("getLatestBatchId") { withCountOpenLocalFileSystemAsLocalFileSystem { val scheme = CountOpenLocalFileSystem.scheme withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index fa320333143ec..479d3da442ecf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.streaming import java.io.File import java.net.URI -import scala.collection.mutable import scala.util.Random import org.apache.hadoop.fs._ @@ -39,7 +38,6 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.ExistsThrowsExceptionFileSystem._ import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types._ import org.apache.spark.sql.types.{StructType, _} import org.apache.spark.util.Utils