From 0fa3bb26335726a1e359b6eb66d5f352efffd103 Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Sun, 13 Nov 2016 11:24:15 +0800 Subject: [PATCH 1/3] Move tests --- .../CompactibleFileStreamLogSuite.scala | 107 ++++++++++++++++++ .../streaming/FileStreamSinkLogSuite.scala | 68 ----------- 2 files changed, 107 insertions(+), 68 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala new file mode 100644 index 0000000000000..fdf985931fca2 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io._ +import java.nio.charset.StandardCharsets._ + +import scala.language.implicitConversions + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.sql.execution.streaming.FakeFileSystem._ +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.test.SharedSQLContext + +class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext { + + /** To avoid caching of FS objects */ + override protected val sparkConf = + new SparkConf().set(s"spark.hadoop.fs.$scheme.impl.disable.cache", "true") + + import CompactibleFileStreamLog._ + + test("getBatchIdFromFileName") { + assert(1234L === getBatchIdFromFileName("1234")) + assert(1234L === getBatchIdFromFileName("1234.compact")) + intercept[NumberFormatException] { + getBatchIdFromFileName("1234a") + } + } + + test("isCompactionBatch") { + assert(false === isCompactionBatch(0, compactInterval = 3)) + assert(false === isCompactionBatch(1, compactInterval = 3)) + assert(true === isCompactionBatch(2, compactInterval = 3)) + assert(false === isCompactionBatch(3, compactInterval = 3)) + assert(false === isCompactionBatch(4, compactInterval = 3)) + assert(true === isCompactionBatch(5, compactInterval = 3)) + } + + test("nextCompactionBatchId") { + assert(2 === nextCompactionBatchId(0, compactInterval = 3)) + assert(2 === nextCompactionBatchId(1, compactInterval = 3)) + assert(5 === nextCompactionBatchId(2, compactInterval = 3)) + assert(5 === nextCompactionBatchId(3, compactInterval = 3)) + assert(5 === nextCompactionBatchId(4, compactInterval = 3)) + assert(8 === nextCompactionBatchId(5, compactInterval = 3)) + } + + test("getValidBatchesBeforeCompactionBatch") { + intercept[AssertionError] { + getValidBatchesBeforeCompactionBatch(0, compactInterval = 3) + } + intercept[AssertionError] { + getValidBatchesBeforeCompactionBatch(1, compactInterval = 3) + } + assert(Seq(0, 1) === getValidBatchesBeforeCompactionBatch(2, compactInterval = 3)) + intercept[AssertionError] { + getValidBatchesBeforeCompactionBatch(3, compactInterval = 3) + } + intercept[AssertionError] { + getValidBatchesBeforeCompactionBatch(4, compactInterval = 3) + } + assert(Seq(2, 3, 4) === getValidBatchesBeforeCompactionBatch(5, compactInterval = 3)) + } + + test("getAllValidBatches") { + assert(Seq(0) === getAllValidBatches(0, compactInterval = 3)) + assert(Seq(0, 1) === getAllValidBatches(1, compactInterval = 3)) + assert(Seq(2) === getAllValidBatches(2, compactInterval = 3)) + assert(Seq(2, 3) === getAllValidBatches(3, compactInterval = 3)) + assert(Seq(2, 3, 4) === getAllValidBatches(4, compactInterval = 3)) + assert(Seq(5) === getAllValidBatches(5, compactInterval = 3)) + assert(Seq(5, 6) === getAllValidBatches(6, compactInterval = 3)) + assert(Seq(5, 6, 7) === getAllValidBatches(7, compactInterval = 3)) + assert(Seq(8) === getAllValidBatches(8, compactInterval = 3)) + } + + test("batchIdToPath") { + withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") { + withFileStreamSinkLog { sinkLog => + assert("0" === sinkLog.batchIdToPath(0).getName) + assert("1" === sinkLog.batchIdToPath(1).getName) + assert("2.compact" === sinkLog.batchIdToPath(2).getName) + assert("3" === sinkLog.batchIdToPath(3).getName) + assert("4" === sinkLog.batchIdToPath(4).getName) + assert("5.compact" === sinkLog.batchIdToPath(5).getName) + } + } + } + + +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala index e1bc674a28071..e046fee0c04d3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala @@ -29,61 +29,6 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { import CompactibleFileStreamLog._ import FileStreamSinkLog._ - test("getBatchIdFromFileName") { - assert(1234L === getBatchIdFromFileName("1234")) - assert(1234L === getBatchIdFromFileName("1234.compact")) - intercept[NumberFormatException] { - getBatchIdFromFileName("1234a") - } - } - - test("isCompactionBatch") { - assert(false === isCompactionBatch(0, compactInterval = 3)) - assert(false === isCompactionBatch(1, compactInterval = 3)) - assert(true === isCompactionBatch(2, compactInterval = 3)) - assert(false === isCompactionBatch(3, compactInterval = 3)) - assert(false === isCompactionBatch(4, compactInterval = 3)) - assert(true === isCompactionBatch(5, compactInterval = 3)) - } - - test("nextCompactionBatchId") { - assert(2 === nextCompactionBatchId(0, compactInterval = 3)) - assert(2 === nextCompactionBatchId(1, compactInterval = 3)) - assert(5 === nextCompactionBatchId(2, compactInterval = 3)) - assert(5 === nextCompactionBatchId(3, compactInterval = 3)) - assert(5 === nextCompactionBatchId(4, compactInterval = 3)) - assert(8 === nextCompactionBatchId(5, compactInterval = 3)) - } - - test("getValidBatchesBeforeCompactionBatch") { - intercept[AssertionError] { - getValidBatchesBeforeCompactionBatch(0, compactInterval = 3) - } - intercept[AssertionError] { - getValidBatchesBeforeCompactionBatch(1, compactInterval = 3) - } - assert(Seq(0, 1) === getValidBatchesBeforeCompactionBatch(2, compactInterval = 3)) - intercept[AssertionError] { - getValidBatchesBeforeCompactionBatch(3, compactInterval = 3) - } - intercept[AssertionError] { - getValidBatchesBeforeCompactionBatch(4, compactInterval = 3) - } - assert(Seq(2, 3, 4) === getValidBatchesBeforeCompactionBatch(5, compactInterval = 3)) - } - - test("getAllValidBatches") { - assert(Seq(0) === getAllValidBatches(0, compactInterval = 3)) - assert(Seq(0, 1) === getAllValidBatches(1, compactInterval = 3)) - assert(Seq(2) === getAllValidBatches(2, compactInterval = 3)) - assert(Seq(2, 3) === getAllValidBatches(3, compactInterval = 3)) - assert(Seq(2, 3, 4) === getAllValidBatches(4, compactInterval = 3)) - assert(Seq(5) === getAllValidBatches(5, compactInterval = 3)) - assert(Seq(5, 6) === getAllValidBatches(6, compactInterval = 3)) - assert(Seq(5, 6, 7) === getAllValidBatches(7, compactInterval = 3)) - assert(Seq(8) === getAllValidBatches(8, compactInterval = 3)) - } - test("compactLogs") { withFileStreamSinkLog { sinkLog => val logs = Seq( @@ -184,19 +129,6 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { } } - test("batchIdToPath") { - withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") { - withFileStreamSinkLog { sinkLog => - assert("0" === sinkLog.batchIdToPath(0).getName) - assert("1" === sinkLog.batchIdToPath(1).getName) - assert("2.compact" === sinkLog.batchIdToPath(2).getName) - assert("3" === sinkLog.batchIdToPath(3).getName) - assert("4" === sinkLog.batchIdToPath(4).getName) - assert("5.compact" === sinkLog.batchIdToPath(5).getName) - } - } - } - testWithUninterruptibleThread("compact") { withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") { withFileStreamSinkLog { sinkLog => From 3014a037ecd6e28815eef94f3a856dd082ac2476 Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Sun, 13 Nov 2016 11:28:35 +0800 Subject: [PATCH 2/3] Refactor --- .../CompactibleFileStreamLogSuite.scala | 150 ++++++++++++++++-- 1 file changed, 141 insertions(+), 9 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala index fdf985931fca2..265ae36d91ea0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala @@ -35,6 +35,8 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext import CompactibleFileStreamLog._ + /** -- testing of `object CompactibleFileStreamLog` begins -- */ + test("getBatchIdFromFileName") { assert(1234L === getBatchIdFromFileName("1234")) assert(1234L === getBatchIdFromFileName("1234.compact")) @@ -90,18 +92,148 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext assert(Seq(8) === getAllValidBatches(8, compactInterval = 3)) } + /** -- testing of `object CompactibleFileStreamLog` ends -- */ + test("batchIdToPath") { - withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") { - withFileStreamSinkLog { sinkLog => - assert("0" === sinkLog.batchIdToPath(0).getName) - assert("1" === sinkLog.batchIdToPath(1).getName) - assert("2.compact" === sinkLog.batchIdToPath(2).getName) - assert("3" === sinkLog.batchIdToPath(3).getName) - assert("4" === sinkLog.batchIdToPath(4).getName) - assert("5.compact" === sinkLog.batchIdToPath(5).getName) - } + withFakeCompactibleFileStreamLog( + fileCleanupDelayMs = Long.MaxValue, + compactInterval = 3, + compactibleLog => { + assert("0" === compactibleLog.batchIdToPath(0).getName) + assert("1" === compactibleLog.batchIdToPath(1).getName) + assert("2.compact" === compactibleLog.batchIdToPath(2).getName) + assert("3" === compactibleLog.batchIdToPath(3).getName) + assert("4" === compactibleLog.batchIdToPath(4).getName) + assert("5.compact" === compactibleLog.batchIdToPath(5).getName) + }) + } + + test("serialize") { + withFakeCompactibleFileStreamLog( + fileCleanupDelayMs = Long.MaxValue, + compactInterval = 3, + compactibleLog => { + val logs = Array("entry_1", "entry_2", "entry_3") + val expected = s"""${FakeCompactibleFileStreamLog.VERSION} + |"entry_1" + |"entry_2" + |"entry_3"""".stripMargin + val baos = new ByteArrayOutputStream() + compactibleLog.serialize(logs, baos) + assert(expected === baos.toString(UTF_8.name())) + + baos.reset() + compactibleLog.serialize(Array(), baos) + assert(FakeCompactibleFileStreamLog.VERSION === baos.toString(UTF_8.name())) + }) + } + + test("deserialize") { + withFakeCompactibleFileStreamLog( + fileCleanupDelayMs = Long.MaxValue, + compactInterval = 3, + compactibleLog => { + val logs = s"""${FakeCompactibleFileStreamLog.VERSION} + |"entry_1" + |"entry_2" + |"entry_3"""".stripMargin + val expected = Array("entry_1", "entry_2", "entry_3") + assert(expected === + compactibleLog.deserialize(new ByteArrayInputStream(logs.getBytes(UTF_8)))) + + assert(Nil === + compactibleLog.deserialize( + new ByteArrayInputStream(FakeCompactibleFileStreamLog.VERSION.getBytes(UTF_8)))) + }) + } + + testWithUninterruptibleThread("compact") { + withFakeCompactibleFileStreamLog( + fileCleanupDelayMs = Long.MaxValue, + compactInterval = 3, + compactibleLog => { + for (batchId <- 0 to 10) { + compactibleLog.add(batchId, Array("some_path_" + batchId)) + val expectedFiles = (0 to batchId).map { id => "some_path_" + id } + assert(compactibleLog.allFiles() === expectedFiles) + if (isCompactionBatch(batchId, 3)) { + // Since batchId is a compaction batch, the batch log file should contain all logs + assert(compactibleLog.get(batchId).getOrElse(Nil) === expectedFiles) + } + } + }) + } + + testWithUninterruptibleThread("delete expired file") { + // Set `fileCleanupDelayMs` to 0 so that we can detect the deleting behaviour deterministically + withFakeCompactibleFileStreamLog( + fileCleanupDelayMs = 0, + compactInterval = 3, + compactibleLog => { + val fs = compactibleLog.metadataPath.getFileSystem(spark.sessionState.newHadoopConf()) + + def listBatchFiles(): Set[String] = { + fs.listStatus(compactibleLog.metadataPath).map(_.getPath.getName).filter { fileName => + try { + getBatchIdFromFileName(fileName) + true + } catch { + case _: NumberFormatException => false + } + }.toSet + } + + compactibleLog.add(0, Array("some_path_0")) + assert(Set("0") === listBatchFiles()) + compactibleLog.add(1, Array("some_path_1")) + assert(Set("0", "1") === listBatchFiles()) + compactibleLog.add(2, Array("some_path_2")) + assert(Set("2.compact") === listBatchFiles()) + compactibleLog.add(3, Array("some_path_3")) + assert(Set("2.compact", "3") === listBatchFiles()) + compactibleLog.add(4, Array("some_path_4")) + assert(Set("2.compact", "3", "4") === listBatchFiles()) + compactibleLog.add(5, Array("some_path_5")) + assert(Set("5.compact") === listBatchFiles()) + }) + } + + private def withFakeCompactibleFileStreamLog( + fileCleanupDelayMs: Long, + compactInterval: Int, + f: FakeCompactibleFileStreamLog => Unit + ): Unit = { + withTempDir { file => + val compactibleLog = new FakeCompactibleFileStreamLog( + fileCleanupDelayMs, + compactInterval, + spark, + file.getCanonicalPath) + f(compactibleLog) } } +} + +object FakeCompactibleFileStreamLog { + val VERSION = "test_version" +} + +class FakeCompactibleFileStreamLog( + _fileCleanupDelayMs: Long, + _compactInterval: Int, + sparkSession: SparkSession, + path: String) + extends CompactibleFileStreamLog[String]( + FakeCompactibleFileStreamLog.VERSION, + sparkSession, + path + ) { + + override protected def fileCleanupDelayMs: Long = _fileCleanupDelayMs + + override protected def isDeletingExpiredLog: Boolean = true + override protected def compactInterval: Int = _compactInterval + override def compactLogs(logs: Seq[String]): Seq[String] = logs } From ca5e9841c3d2efb8f427dcd9b9005aa7419f63db Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Sun, 13 Nov 2016 12:40:27 +0800 Subject: [PATCH 3/3] CompactibleFileStreamLog should not use "compactInterval" direcly with user setting --- .../streaming/CompactibleFileStreamLog.scala | 132 +++++- .../streaming/FileStreamSourceLog.scala | 8 +- .../execution/streaming/HDFSMetadataLog.scala | 26 +- .../CompactibleFileStreamLogSuite.scala | 382 ++++++++++++++++-- .../streaming/FileStreamSinkLogSuite.scala | 8 +- .../sql/streaming/FileStreamSourceSuite.scala | 6 +- 6 files changed, 499 insertions(+), 63 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index 8af3db1968882..7ffd3cd1e970f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming import java.io.{InputStream, IOException, OutputStream} import java.nio.charset.StandardCharsets.UTF_8 +import java.util import scala.io.{Source => IOSource} import scala.reflect.ClassTag @@ -70,8 +71,53 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( */ def compactLogs(logs: Seq[T]): Seq[T] + /** + * Upon restart, we should pick up any previous batches including compaction batches. This is + * not simple since `compactInterval` could vary from each run; to support the following + * situations: + * + * (1) a fresh run + * (2) the previous run with `compactInterval` = 2 + * 0 + * (3) the previous run with `compactInterval` = 2 + * 0 1.compact + * (4) previous run with `compactInterval` = 2 and `compactInterval` = 5 + * 0 1.compact 2 3.compact 4.compact + * (5)last run with `compactInterval` = 2 and `compactInterval` = 5 + * 0 1.compact 2 3.compact 4.compact 5 6 7 8 + * + * We introduce `knownCompactionBatches` which holds the existing compaction batches before + * this run, and `zeroBatch` which holds the first batch this run should write to. Thus we can + * support the above situations with: + * + * (1) `knownCompactionBatches` = (), `zeroBatch` = 0 + * (2) `knownCompactionBatches` = (), `zeroBatch` = 1 + * (3) `knownCompactionBatches` = (1), `zeroBatch` = 2 + * (4) `knownCompactionBatches` = (1, 3, 4), `zeroBatch` = 5 + * (5) `knownCompactionBatches` = (1, 3, 4), `zeroBatch` = 9 + */ + private[sql] val (knownCompactionBatches: Array[Long], zeroBatch: Long) = { + val fileNames: Array[String] = + listExistingFiles() + .filter(isBatchFile) + .map(path => (getBatchIdFromFileName(path.getName), path)) + .sortBy(_._1) + .reverse + .dropWhile(idAndPath => super.get(idAndPath._2).isEmpty) + .reverse + .map(idAndPath => idAndPath._2.getName) + + val knownCompactionBatches = + fileNames + .filter(isCompactionBatchFromFileName) + .map(getBatchIdFromFileName) + val zeroBatch = fileNames.map(getBatchIdFromFileName).lastOption + + (knownCompactionBatches, zeroBatch.map(_ + 1).getOrElse(0L)) + } + override def batchIdToPath(batchId: Long): Path = { - if (isCompactionBatch(batchId, compactInterval)) { + if (isCompactionBatch(knownCompactionBatches, zeroBatch, batchId, compactInterval)) { new Path(metadataPath, s"$batchId$COMPACT_FILE_SUFFIX") } else { new Path(metadataPath, batchId.toString) @@ -113,7 +159,7 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( } override def add(batchId: Long, logs: Array[T]): Boolean = { - if (isCompactionBatch(batchId, compactInterval)) { + if (isCompactionBatch(knownCompactionBatches, zeroBatch, batchId, compactInterval)) { compact(batchId, logs) } else { super.add(batchId, logs) @@ -125,7 +171,8 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( * corresponding `batchId` file. It will delete expired files as well if enabled. */ private def compact(batchId: Long, logs: Array[T]): Boolean = { - val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval) + val validBatches = getValidBatchesBeforeCompactionBatch( + knownCompactionBatches, zeroBatch, batchId, compactInterval) val allLogs = validBatches.flatMap(batchId => super.get(batchId)).flatten ++ logs if (super.add(batchId, compactLogs(allLogs).toArray)) { if (isDeletingExpiredLog) { @@ -150,7 +197,9 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( if (latestId >= 0) { try { val logs = - getAllValidBatches(latestId, compactInterval).flatMap(id => super.get(id)).flatten + getAllValidBatches(knownCompactionBatches, zeroBatch, latestId, compactInterval) + .flatMap(id => super.get(id)) + .flatten return compactLogs(logs).toArray } catch { case e: IOException => @@ -158,7 +207,7 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( // `StreamFileIndex` are reading. However, it only happens when a compaction is // deleting old files. If so, let's try the next compaction batch and we should find it. // Otherwise, this is a real IO issue and we should throw it. - latestId = nextCompactionBatchId(latestId, compactInterval) + latestId = nextCompactionBatchId(zeroBatch, latestId, compactInterval) super.get(latestId).getOrElse { throw e } @@ -203,29 +252,57 @@ object CompactibleFileStreamLog { fileName.stripSuffix(COMPACT_FILE_SUFFIX).toLong } + def isCompactionBatchFromFileName(fileName: String): Boolean = { + fileName.endsWith(COMPACT_FILE_SUFFIX) + } + /** * Returns if this is a compaction batch. FileStreamSinkLog will compact old logs every * `compactInterval` commits. * - * E.g., if `compactInterval` is 3, then 2, 5, 8, ... are all compaction batches. + * E.g., given `zeroBatch` equals 10 and `compactInterval` equals 3, then 12, 15, 18, ... are all + * compaction batches. */ - def isCompactionBatch(batchId: Long, compactInterval: Int): Boolean = { - (batchId + 1) % compactInterval == 0 + def isCompactionBatch( + knownCompactionBatches: Array[Long], + zeroBatch: Long, + batchId: Long, + compactInterval: Int): Boolean = { + if (batchId < zeroBatch) { + knownCompactionBatches.nonEmpty && + util.Arrays.binarySearch(knownCompactionBatches, batchId) >= 0 + } + else { + (batchId - zeroBatch + 1) % compactInterval == 0 + } } /** * Returns all valid batches before the specified `compactionBatchId`. They contain all logs we * need to do a new compaction. * - * E.g., if `compactInterval` is 3 and `compactionBatchId` is 5, this method should returns - * `Seq(2, 3, 4)` (Note: it includes the previous compaction batch 2). + * E.g., given `zeroBatch` equals 10 and `compactInterval` equals 3, this method should return + * `Seq(12, 13, 14)` for `compactionBatchId` 15 (Note: it includes the previous compaction batch + * 12). */ def getValidBatchesBeforeCompactionBatch( + knownCompactionBatches: Array[Long], + zeroBatch: Long, compactionBatchId: Long, compactInterval: Int): Seq[Long] = { - assert(isCompactionBatch(compactionBatchId, compactInterval), + assert( + isCompactionBatch(knownCompactionBatches, zeroBatch, compactionBatchId, compactInterval), s"$compactionBatchId is not a compaction batch") - (math.max(0, compactionBatchId - compactInterval)) until compactionBatchId + assert(compactionBatchId >= zeroBatch, s"start at least with zeroBatch = $zeroBatch!") + + if (compactionBatchId - compactInterval >= zeroBatch) { + // we have at least one compaction batch since zeroBatch + (compactionBatchId - compactInterval) until compactionBatchId + } else { + // we have no compaction batch yet since zeroBatch + // so pick the latest compaction batch (if exist) from previous runs, or just pick 0 + knownCompactionBatches.lastOption.getOrElse(0L) until compactionBatchId + } } /** @@ -233,16 +310,37 @@ object CompactibleFileStreamLog { * return itself. Otherwise, it will find the previous compaction batch and return all batches * between it and `batchId`. */ - def getAllValidBatches(batchId: Long, compactInterval: Long): Seq[Long] = { + def getAllValidBatches( + knownCompactionBatches: Array[Long], + zeroBatch: Long, + batchId: Long, + compactInterval: Long): Seq[Long] = { assert(batchId >= 0) - val start = math.max(0, (batchId + 1) / compactInterval * compactInterval - 1) - start to batchId + if (batchId >= zeroBatch) { + val _nextCompactionBatchId = nextCompactionBatchId(zeroBatch, batchId, compactInterval) + if (_nextCompactionBatchId - compactInterval >= zeroBatch) { + // we have at least one compaction batch since zeroBatch + // so we pick the latest compaction batch id in this run + return (_nextCompactionBatchId - compactInterval) to batchId + } + } + // we have no compaction batch yet since zeroBatch + // so pick the latest compaction batch less than or equal to `batchId` (if exist) from + // previous runs, or just pick 0 + return knownCompactionBatches.reverse.find(_ <= batchId).getOrElse(0L) to batchId } /** * Returns the next compaction batch id after `batchId`. + * + * E.g., given `zeroBatch` equals 10, `compactInterval` equals 3, this method should return 12 for + * `batchId` 10, 11, should return 15 for `batchId` 12, 13, 14. */ - def nextCompactionBatchId(batchId: Long, compactInterval: Long): Long = { - (batchId + compactInterval + 1) / compactInterval * compactInterval - 1 + def nextCompactionBatchId( + zeroBatch: Long, + batchId: Long, + compactInterval: Long): Long = { + assert(batchId >= zeroBatch, s"start at least with zeroBatch = $zeroBatch!") + (batchId - zeroBatch + compactInterval + 1) / compactInterval * compactInterval + zeroBatch - 1 } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala index fe81b15607068..98af77b401add 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala @@ -66,7 +66,7 @@ class FileStreamSourceLog( override def add(batchId: Long, logs: Array[FileEntry]): Boolean = { if (super.add(batchId, logs)) { - if (isCompactionBatch(batchId, compactInterval)) { + if (isCompactionBatch(knownCompactionBatches, zeroBatch, batchId, compactInterval)) { fileEntryCache.put(batchId, logs) } true @@ -80,7 +80,8 @@ class FileStreamSourceLog( val endBatchId = getLatest().map(_._1).getOrElse(0L) val (existedBatches, removedBatches) = (startBatchId to endBatchId).map { id => - if (isCompactionBatch(id, compactInterval) && fileEntryCache.containsKey(id)) { + if (isCompactionBatch(knownCompactionBatches, zeroBatch, id, compactInterval) + && fileEntryCache.containsKey(id)) { (id, Some(fileEntryCache.get(id))) } else { val logs = super.get(id).map(_.filter(_.batchId == id)) @@ -99,7 +100,8 @@ class FileStreamSourceLog( if (latestBatchId < 0) { Map.empty[Long, Option[Array[FileEntry]]] } else { - val latestCompactedBatchId = getAllValidBatches(latestBatchId, compactInterval)(0) + val latestCompactedBatchId = + getAllValidBatches(knownCompactionBatches, zeroBatch, latestBatchId, compactInterval)(0) val allLogs = new mutable.HashMap[Long, mutable.ArrayBuffer[FileEntry]] super.get(latestCompactedBatchId).foreach { entries => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index db7057d7da70c..5607f386b42e1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -199,8 +199,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: (e.getMessage != null && e.getMessage.startsWith("File already exists: ")) } - override def get(batchId: Long): Option[T] = { - val batchMetadataFile = batchIdToPath(batchId) + protected def get(batchMetadataFile: Path): Option[T] = { if (fileManager.exists(batchMetadataFile)) { val input = fileManager.open(batchMetadataFile) try { @@ -214,13 +213,19 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: } } + override def get(batchId: Long): Option[T] = { + get(batchIdToPath(batchId)) + } + + protected def listExistingFiles(): Array[Path] = { + fileManager.list(metadataPath, batchFilesFilter).map(_.getPath) + } + override def get(startId: Option[Long], endId: Option[Long]): Array[(Long, T)] = { - val files = fileManager.list(metadataPath, batchFilesFilter) - val batchIds = files - .map(f => pathToBatchId(f.getPath)) + val batchIds = listExistingFiles().map(p => pathToBatchId(p)) .filter { batchId => (endId.isEmpty || batchId <= endId.get) && (startId.isEmpty || batchId >= startId.get) - } + } batchIds.sorted.map(batchId => (batchId, get(batchId))).filter(_._2.isDefined).map { case (batchId, metadataOption) => (batchId, metadataOption.get) @@ -228,8 +233,8 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: } override def getLatest(): Option[(Long, T)] = { - val batchIds = fileManager.list(metadataPath, batchFilesFilter) - .map(f => pathToBatchId(f.getPath)) + val batchIds = listExistingFiles() + .map(p => pathToBatchId(p)) .sorted .reverse for (batchId <- batchIds) { @@ -245,9 +250,8 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: * Removes all the log entry earlier than thresholdBatchId (exclusive). */ override def purge(thresholdBatchId: Long): Unit = { - val batchIds = fileManager.list(metadataPath, batchFilesFilter) - .map(f => pathToBatchId(f.getPath)) - + val batchIds = listExistingFiles() + .map(p => pathToBatchId(p)) for (batchId <- batchIds if batchId < thresholdBatchId) { val path = batchIdToPath(batchId) fileManager.delete(path) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala index 265ae36d91ea0..7c0fafd2fb85d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala @@ -35,6 +35,64 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext import CompactibleFileStreamLog._ + testWithUninterruptibleThread( + "correct results from multiples runs with different compact intervals") { + withTempDir { dir => + def newFakeCompactibleFileStreamLog(compactInterval: Int): FakeCompactibleFileStreamLog = + new FakeCompactibleFileStreamLog( + _fileCleanupDelayMs = Long.MaxValue, + _compactInterval = compactInterval, + spark, + dir.getCanonicalPath) + + var compactibleLog = newFakeCompactibleFileStreamLog(2) + assert(compactibleLog.knownCompactionBatches === Array()) + assert(compactibleLog.zeroBatch === 0) + assert(compactibleLog.allFiles() === Array()) + + compactibleLog.add(0, Array("entry_0")) + compactibleLog.add(1, Array("entry_1")) // should compact + + compactibleLog = newFakeCompactibleFileStreamLog(2) + assert(compactibleLog.knownCompactionBatches === Array(1)) + assert(compactibleLog.zeroBatch === 2) + assert(compactibleLog.allFiles() === (0 to 1).map(idx => s"entry_$idx")) + + compactibleLog.add(2, Array("entry_2")) + compactibleLog.add(3, Array("entry_3")) // should compact + + compactibleLog = newFakeCompactibleFileStreamLog(3) + assert(compactibleLog.knownCompactionBatches === Array(1, 3)) + assert(compactibleLog.zeroBatch === 4) + assert(compactibleLog.allFiles() === (0 to 3).map(idx => s"entry_$idx")) + + compactibleLog.add(4, Array("entry_4")) + compactibleLog.add(5, Array("entry_5")) + compactibleLog.add(6, Array("entry_6")) // should compact + + compactibleLog = newFakeCompactibleFileStreamLog(5) + assert(compactibleLog.knownCompactionBatches === Array(1, 3, 6)) + assert(compactibleLog.zeroBatch === 7) + assert(compactibleLog.allFiles() === (0 to 6).map(idx => s"entry_$idx")) + + compactibleLog.add(7, Array("entry_7")) + compactibleLog.add(8, Array("entry_8")) + compactibleLog.add(9, Array("entry_9")) + compactibleLog.add(10, Array("entry_10")) + + compactibleLog = newFakeCompactibleFileStreamLog(2) + assert(compactibleLog.knownCompactionBatches === Array(1, 3, 6)) + assert(compactibleLog.zeroBatch === 11) + assert(compactibleLog.allFiles() === (0 to 10).map(idx => s"entry_$idx")) + } + } + + private val emptyKnownCompactionBatches = Array[Long]() + private val knownCompactionBatches = Array[Long]( + 1, 3, // produced with interval = 2 + 6 // produced with interval = 3 + ) + /** -- testing of `object CompactibleFileStreamLog` begins -- */ test("getBatchIdFromFileName") { @@ -45,51 +103,278 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext } } + test("isCompactionBatchFromFileName") { + assert(false === isCompactionBatchFromFileName("1234")) + assert(true === isCompactionBatchFromFileName("1234.compact")) + } + test("isCompactionBatch") { - assert(false === isCompactionBatch(0, compactInterval = 3)) - assert(false === isCompactionBatch(1, compactInterval = 3)) - assert(true === isCompactionBatch(2, compactInterval = 3)) - assert(false === isCompactionBatch(3, compactInterval = 3)) - assert(false === isCompactionBatch(4, compactInterval = 3)) - assert(true === isCompactionBatch(5, compactInterval = 3)) + // test empty knownCompactionBatches cases + assert(false === isCompactionBatch( + emptyKnownCompactionBatches, zeroBatch = 0, batchId = 0, compactInterval = 3)) + assert(false === isCompactionBatch( + emptyKnownCompactionBatches, zeroBatch = 0, batchId = 1, compactInterval = 3)) + assert(true === isCompactionBatch( + emptyKnownCompactionBatches, zeroBatch = 0, batchId = 2, compactInterval = 3)) + assert(false === isCompactionBatch( + emptyKnownCompactionBatches, zeroBatch = 0, batchId = 3, compactInterval = 3)) + assert(false === isCompactionBatch( + emptyKnownCompactionBatches, zeroBatch = 0, batchId = 4, compactInterval = 3)) + assert(true === isCompactionBatch( + emptyKnownCompactionBatches, zeroBatch = 0, batchId = 5, compactInterval = 3)) + + // test non-empty knownCompactionBatches cases + assert(false === isCompactionBatch( + knownCompactionBatches, zeroBatch = 7, batchId = 0, compactInterval = 3)) + assert(true === isCompactionBatch( + knownCompactionBatches, zeroBatch = 7, batchId = 1, compactInterval = 3)) + assert(false === isCompactionBatch( + knownCompactionBatches, zeroBatch = 7, batchId = 2, compactInterval = 3)) + assert(true === isCompactionBatch( + knownCompactionBatches, zeroBatch = 7, batchId = 3, compactInterval = 3)) + assert(false === isCompactionBatch( + knownCompactionBatches, zeroBatch = 7, batchId = 4, compactInterval = 3)) + assert(false === isCompactionBatch( + knownCompactionBatches, zeroBatch = 7, batchId = 5, compactInterval = 3)) + assert(true === isCompactionBatch( + knownCompactionBatches, zeroBatch = 7, batchId = 6, compactInterval = 3)) + assert(false === isCompactionBatch( + knownCompactionBatches, zeroBatch = 7, batchId = 7, compactInterval = 3)) + assert(false === isCompactionBatch( + knownCompactionBatches, zeroBatch = 7, batchId = 8, compactInterval = 3)) + assert(true === isCompactionBatch( + knownCompactionBatches, zeroBatch = 7, batchId = 9, compactInterval = 3)) + assert(false === isCompactionBatch( + knownCompactionBatches, zeroBatch = 7, batchId = 10, compactInterval = 3)) + + assert(false === isCompactionBatch( + knownCompactionBatches, zeroBatch = 20, batchId = 0, compactInterval = 3)) + assert(true === isCompactionBatch( + knownCompactionBatches, zeroBatch = 20, batchId = 1, compactInterval = 3)) + assert(false === isCompactionBatch( + knownCompactionBatches, zeroBatch = 20, batchId = 2, compactInterval = 3)) + assert(true === isCompactionBatch( + knownCompactionBatches, zeroBatch = 20, batchId = 3, compactInterval = 3)) + assert(false === isCompactionBatch( + knownCompactionBatches, zeroBatch = 20, batchId = 4, compactInterval = 3)) + assert(false === isCompactionBatch( + knownCompactionBatches, zeroBatch = 20, batchId = 5, compactInterval = 3)) + assert(true === isCompactionBatch( + knownCompactionBatches, zeroBatch = 20, batchId = 6, compactInterval = 3)) + assert(false === isCompactionBatch( + knownCompactionBatches, zeroBatch = 20, batchId = 7, compactInterval = 3)) + assert(false === isCompactionBatch( + knownCompactionBatches, zeroBatch = 20, batchId = 8, compactInterval = 3)) + // notice the following one, it should be false !!! + assert(false === isCompactionBatch( + knownCompactionBatches, zeroBatch = 20, batchId = 9, compactInterval = 3)) + for (batchId <- 10 until 20) { + assert(false === isCompactionBatch( + knownCompactionBatches, zeroBatch = 20, batchId = batchId, compactInterval = 3)) + } + assert(false === isCompactionBatch( + knownCompactionBatches, zeroBatch = 20, batchId = 20, compactInterval = 3)) + assert(false === isCompactionBatch( + knownCompactionBatches, zeroBatch = 20, batchId = 21, compactInterval = 3)) + assert(true === isCompactionBatch( + knownCompactionBatches, zeroBatch = 20, batchId = 22, compactInterval = 3)) } test("nextCompactionBatchId") { - assert(2 === nextCompactionBatchId(0, compactInterval = 3)) - assert(2 === nextCompactionBatchId(1, compactInterval = 3)) - assert(5 === nextCompactionBatchId(2, compactInterval = 3)) - assert(5 === nextCompactionBatchId(3, compactInterval = 3)) - assert(5 === nextCompactionBatchId(4, compactInterval = 3)) - assert(8 === nextCompactionBatchId(5, compactInterval = 3)) + assert(2 === nextCompactionBatchId(zeroBatch = 0, batchId = 0, compactInterval = 3)) + assert(2 === nextCompactionBatchId(zeroBatch = 0, batchId = 1, compactInterval = 3)) + assert(5 === nextCompactionBatchId(zeroBatch = 0, batchId = 2, compactInterval = 3)) + assert(5 === nextCompactionBatchId(zeroBatch = 0, batchId = 3, compactInterval = 3)) + assert(5 === nextCompactionBatchId(zeroBatch = 0, batchId = 4, compactInterval = 3)) + assert(8 === nextCompactionBatchId(zeroBatch = 0, batchId = 5, compactInterval = 3)) + assert(8 === nextCompactionBatchId(zeroBatch = 0, batchId = 6, compactInterval = 3)) + assert(9 === nextCompactionBatchId(zeroBatch = 7, batchId = 7, compactInterval = 3)) + assert(9 === nextCompactionBatchId(zeroBatch = 7, batchId = 8, compactInterval = 3)) + assert(12 === nextCompactionBatchId(zeroBatch = 7, batchId = 9, compactInterval = 3)) + assert(12 === nextCompactionBatchId(zeroBatch = 7, batchId = 10, compactInterval = 3)) + assert(12 === nextCompactionBatchId(zeroBatch = 7, batchId = 11, compactInterval = 3)) } test("getValidBatchesBeforeCompactionBatch") { + // test empty knownCompactionBatches cases + intercept[AssertionError] { + getValidBatchesBeforeCompactionBatch( + emptyKnownCompactionBatches, zeroBatch = 0, compactionBatchId = 0, compactInterval = 3) + } + intercept[AssertionError] { + getValidBatchesBeforeCompactionBatch( + emptyKnownCompactionBatches, zeroBatch = 0, compactionBatchId = 1, compactInterval = 3) + } + assert(Seq(0, 1) === + getValidBatchesBeforeCompactionBatch( + emptyKnownCompactionBatches, zeroBatch = 0, compactionBatchId = 2, compactInterval = 3)) intercept[AssertionError] { - getValidBatchesBeforeCompactionBatch(0, compactInterval = 3) + getValidBatchesBeforeCompactionBatch( + emptyKnownCompactionBatches, zeroBatch = 0, compactionBatchId = 3, compactInterval = 3) } intercept[AssertionError] { - getValidBatchesBeforeCompactionBatch(1, compactInterval = 3) + getValidBatchesBeforeCompactionBatch( + emptyKnownCompactionBatches, zeroBatch = 0, compactionBatchId = 4, compactInterval = 3) + } + assert(Seq(2, 3, 4) === + getValidBatchesBeforeCompactionBatch( + emptyKnownCompactionBatches, zeroBatch = 0, compactionBatchId = 5, compactInterval = 3)) + + // test non-empty knownCompactionBatches cases + intercept[AssertionError] { + getValidBatchesBeforeCompactionBatch( + knownCompactionBatches, zeroBatch = 7, compactionBatchId = 7, compactInterval = 3) } - assert(Seq(0, 1) === getValidBatchesBeforeCompactionBatch(2, compactInterval = 3)) intercept[AssertionError] { - getValidBatchesBeforeCompactionBatch(3, compactInterval = 3) + getValidBatchesBeforeCompactionBatch( + knownCompactionBatches, zeroBatch = 7, compactionBatchId = 8, compactInterval = 3) } + assert(Seq(6, 7, 8) === + getValidBatchesBeforeCompactionBatch( + knownCompactionBatches, zeroBatch = 7, compactionBatchId = 9, compactInterval = 3)) intercept[AssertionError] { - getValidBatchesBeforeCompactionBatch(4, compactInterval = 3) + getValidBatchesBeforeCompactionBatch( + knownCompactionBatches, zeroBatch = 7, compactionBatchId = 10, compactInterval = 3) } - assert(Seq(2, 3, 4) === getValidBatchesBeforeCompactionBatch(5, compactInterval = 3)) + intercept[AssertionError] { + getValidBatchesBeforeCompactionBatch( + knownCompactionBatches, zeroBatch = 7, compactionBatchId = 11, compactInterval = 3) + } + assert(Seq(9, 10, 11) === + getValidBatchesBeforeCompactionBatch( + knownCompactionBatches, zeroBatch = 7, compactionBatchId = 12, compactInterval = 3)) + + intercept[AssertionError] { + getValidBatchesBeforeCompactionBatch( + knownCompactionBatches, zeroBatch = 20, compactionBatchId = 20, compactInterval = 3) + } + intercept[AssertionError] { + getValidBatchesBeforeCompactionBatch( + knownCompactionBatches, zeroBatch = 20, compactionBatchId = 21, compactInterval = 3) + } + assert((6 to 21) === + getValidBatchesBeforeCompactionBatch( + knownCompactionBatches, zeroBatch = 20, compactionBatchId = 22, compactInterval = 3)) + intercept[AssertionError] { + getValidBatchesBeforeCompactionBatch( + knownCompactionBatches, zeroBatch = 20, compactionBatchId = 23, compactInterval = 3) + } + intercept[AssertionError] { + getValidBatchesBeforeCompactionBatch( + knownCompactionBatches, zeroBatch = 20, compactionBatchId = 24, compactInterval = 3) + } + assert(Seq(22, 23, 24) === + getValidBatchesBeforeCompactionBatch( + knownCompactionBatches, zeroBatch = 20, compactionBatchId = 25, compactInterval = 3)) } test("getAllValidBatches") { - assert(Seq(0) === getAllValidBatches(0, compactInterval = 3)) - assert(Seq(0, 1) === getAllValidBatches(1, compactInterval = 3)) - assert(Seq(2) === getAllValidBatches(2, compactInterval = 3)) - assert(Seq(2, 3) === getAllValidBatches(3, compactInterval = 3)) - assert(Seq(2, 3, 4) === getAllValidBatches(4, compactInterval = 3)) - assert(Seq(5) === getAllValidBatches(5, compactInterval = 3)) - assert(Seq(5, 6) === getAllValidBatches(6, compactInterval = 3)) - assert(Seq(5, 6, 7) === getAllValidBatches(7, compactInterval = 3)) - assert(Seq(8) === getAllValidBatches(8, compactInterval = 3)) + // test empty knownCompactionBatches cases + assert( + Seq(0) === getAllValidBatches( + emptyKnownCompactionBatches, zeroBatch = 0, batchId = 0, compactInterval = 3)) + assert( + Seq(0, 1) === getAllValidBatches( + emptyKnownCompactionBatches, zeroBatch = 0, batchId = 1, compactInterval = 3)) + assert( + Seq(2) === getAllValidBatches( + emptyKnownCompactionBatches, zeroBatch = 0, batchId = 2, compactInterval = 3)) + assert( + Seq(2, 3) === getAllValidBatches( + emptyKnownCompactionBatches, zeroBatch = 0, batchId = 3, compactInterval = 3)) + assert( + Seq(2, 3, 4) === getAllValidBatches( + emptyKnownCompactionBatches, zeroBatch = 0, batchId = 4, compactInterval = 3)) + assert( + Seq(5) === getAllValidBatches( + emptyKnownCompactionBatches, zeroBatch = 0, batchId = 5, compactInterval = 3)) + assert( + Seq(5, 6) === getAllValidBatches( + emptyKnownCompactionBatches, zeroBatch = 0, batchId = 6, compactInterval = 3)) + assert( + Seq(5, 6, 7) === getAllValidBatches( + emptyKnownCompactionBatches, zeroBatch = 0, batchId = 7, compactInterval = 3)) + assert( + Seq(8) === getAllValidBatches( + emptyKnownCompactionBatches, zeroBatch = 0, batchId = 8, compactInterval = 3)) + + // test non-empty knownCompactionBatches cases + assert( + Seq(0) === getAllValidBatches( + knownCompactionBatches, zeroBatch = 7, batchId = 0, compactInterval = 3)) + assert( + Seq(1) === getAllValidBatches( + knownCompactionBatches, zeroBatch = 7, batchId = 1, compactInterval = 3)) + assert( + Seq(1, 2) === getAllValidBatches( + knownCompactionBatches, zeroBatch = 7, batchId = 2, compactInterval = 3)) + assert( + Seq(3) === getAllValidBatches( + knownCompactionBatches, zeroBatch = 7, batchId = 3, compactInterval = 3)) + assert( + Seq(3, 4) === getAllValidBatches( + knownCompactionBatches, zeroBatch = 7, batchId = 4, compactInterval = 3)) + assert( + Seq(3, 4, 5) === getAllValidBatches( + knownCompactionBatches, zeroBatch = 7, batchId = 5, compactInterval = 3)) + assert( + Seq(6) === getAllValidBatches( + knownCompactionBatches, zeroBatch = 7, batchId = 6, compactInterval = 3)) + assert( + Seq(6, 7) === getAllValidBatches( + knownCompactionBatches, zeroBatch = 7, batchId = 7, compactInterval = 3)) + assert( + Seq(6, 7, 8) === getAllValidBatches( + knownCompactionBatches, zeroBatch = 7, batchId = 8, compactInterval = 3)) + assert( + Seq(9) === getAllValidBatches( + knownCompactionBatches, zeroBatch = 7, batchId = 9, compactInterval = 3)) + assert( + Seq(9, 10) === getAllValidBatches( + knownCompactionBatches, zeroBatch = 7, batchId = 10, compactInterval = 3)) + assert( + Seq(9, 10, 11) === getAllValidBatches( + knownCompactionBatches, zeroBatch = 7, batchId = 11, compactInterval = 3)) + assert( + Seq(12) === getAllValidBatches( + knownCompactionBatches, zeroBatch = 7, batchId = 12, compactInterval = 3)) + assert( + Seq(12, 13) === getAllValidBatches( + knownCompactionBatches, zeroBatch = 7, batchId = 13, compactInterval = 3)) + assert( + Seq(12, 13, 14) === getAllValidBatches( + knownCompactionBatches, zeroBatch = 7, batchId = 14, compactInterval = 3)) + assert( + Seq(15) === getAllValidBatches( + knownCompactionBatches, zeroBatch = 7, batchId = 15, compactInterval = 3)) + + assert( + (6 to 20) === getAllValidBatches( + knownCompactionBatches, zeroBatch = 20, batchId = 20, compactInterval = 3)) + assert( + (6 to 21) === getAllValidBatches( + knownCompactionBatches, zeroBatch = 20, batchId = 21, compactInterval = 3)) + assert( + Seq(22) === getAllValidBatches( + knownCompactionBatches, zeroBatch = 20, batchId = 22, compactInterval = 3)) + assert( + Seq(22, 23) === getAllValidBatches( + knownCompactionBatches, zeroBatch = 20, batchId = 23, compactInterval = 3)) + assert( + Seq(22, 23, 24) === getAllValidBatches( + knownCompactionBatches, zeroBatch = 20, batchId = 24, compactInterval = 3)) + assert( + Seq(25) === getAllValidBatches( + knownCompactionBatches, zeroBatch = 20, batchId = 25, compactInterval = 3)) + assert( + Seq(25, 26) === getAllValidBatches( + knownCompactionBatches, zeroBatch = 20, batchId = 26, compactInterval = 3)) + assert( + Seq(25, 26, 27) === getAllValidBatches( + knownCompactionBatches, zeroBatch = 20, batchId = 27, compactInterval = 3)) + assert( + Seq(28) === getAllValidBatches( + knownCompactionBatches, zeroBatch = 20, batchId = 28, compactInterval = 3)) } /** -- testing of `object CompactibleFileStreamLog` ends -- */ @@ -98,6 +383,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext withFakeCompactibleFileStreamLog( fileCleanupDelayMs = Long.MaxValue, compactInterval = 3, + existingFiles = Seq(), compactibleLog => { assert("0" === compactibleLog.batchIdToPath(0).getName) assert("1" === compactibleLog.batchIdToPath(1).getName) @@ -106,12 +392,36 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext assert("4" === compactibleLog.batchIdToPath(4).getName) assert("5.compact" === compactibleLog.batchIdToPath(5).getName) }) + + withFakeCompactibleFileStreamLog( + fileCleanupDelayMs = Long.MaxValue, + compactInterval = 3, + existingFiles = Seq( + "0", "1.compact", + "2", "3.compact", + "4", "5" + ), + compactibleLog => { + assert("0" === compactibleLog.batchIdToPath(0).getName) + assert("1.compact" === compactibleLog.batchIdToPath(1).getName) + assert("2" === compactibleLog.batchIdToPath(2).getName) + assert("3.compact" === compactibleLog.batchIdToPath(3).getName) + assert("4" === compactibleLog.batchIdToPath(4).getName) + assert("5" === compactibleLog.batchIdToPath(5).getName) + assert("6" === compactibleLog.batchIdToPath(6).getName) + assert("7" === compactibleLog.batchIdToPath(7).getName) + assert("8.compact" === compactibleLog.batchIdToPath(8).getName) + assert("9" === compactibleLog.batchIdToPath(9).getName) + assert("10" === compactibleLog.batchIdToPath(10).getName) + assert("11.compact" === compactibleLog.batchIdToPath(11).getName) + }) } test("serialize") { withFakeCompactibleFileStreamLog( fileCleanupDelayMs = Long.MaxValue, compactInterval = 3, + existingFiles = Seq(), compactibleLog => { val logs = Array("entry_1", "entry_2", "entry_3") val expected = s"""${FakeCompactibleFileStreamLog.VERSION} @@ -132,6 +442,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext withFakeCompactibleFileStreamLog( fileCleanupDelayMs = Long.MaxValue, compactInterval = 3, + existingFiles = Seq(), compactibleLog => { val logs = s"""${FakeCompactibleFileStreamLog.VERSION} |"entry_1" @@ -151,12 +462,13 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext withFakeCompactibleFileStreamLog( fileCleanupDelayMs = Long.MaxValue, compactInterval = 3, + existingFiles = Seq(), compactibleLog => { for (batchId <- 0 to 10) { compactibleLog.add(batchId, Array("some_path_" + batchId)) val expectedFiles = (0 to batchId).map { id => "some_path_" + id } assert(compactibleLog.allFiles() === expectedFiles) - if (isCompactionBatch(batchId, 3)) { + if (isCompactionBatch(emptyKnownCompactionBatches, batchId, 0, 3)) { // Since batchId is a compaction batch, the batch log file should contain all logs assert(compactibleLog.get(batchId).getOrElse(Nil) === expectedFiles) } @@ -169,6 +481,7 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext withFakeCompactibleFileStreamLog( fileCleanupDelayMs = 0, compactInterval = 3, + existingFiles = Seq(), compactibleLog => { val fs = compactibleLog.metadataPath.getFileSystem(spark.sessionState.newHadoopConf()) @@ -201,14 +514,23 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext private def withFakeCompactibleFileStreamLog( fileCleanupDelayMs: Long, compactInterval: Int, + existingFiles: Seq[String], f: FakeCompactibleFileStreamLog => Unit ): Unit = { - withTempDir { file => + withTempDir { dir => + val tmpLog = new FakeCompactibleFileStreamLog( + fileCleanupDelayMs, + compactInterval, + spark, + dir.getCanonicalPath) + for (existingFile <- existingFiles) { + tmpLog.serialize(Array(), new FileOutputStream(new File(dir, existingFile))) + } val compactibleLog = new FakeCompactibleFileStreamLog( fileCleanupDelayMs, compactInterval, spark, - file.getCanonicalPath) + dir.getCanonicalPath) f(compactibleLog) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala index e046fee0c04d3..42f111bd2e11d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala @@ -29,6 +29,12 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { import CompactibleFileStreamLog._ import FileStreamSinkLog._ + private val emptyKnownCompactionBatches = Array[Long]() + private val knownCompactionBatches = Array[Long]( + 1, 3, // produced with interval = 2 + 6 // produced with interval = 3 + ) + test("compactLogs") { withFileStreamSinkLog { sinkLog => val logs = Seq( @@ -140,7 +146,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext { id => newFakeSinkFileStatus("/a/b/" + id, FileStreamSinkLog.ADD_ACTION) } assert(sinkLog.allFiles() === expectedFiles) - if (isCompactionBatch(batchId, 3)) { + if (isCompactionBatch(emptyKnownCompactionBatches, batchId, 0, 3)) { // Since batchId is a compaction batch, the batch log file should contain all logs assert(sinkLog.get(batchId).getOrElse(Nil) === expectedFiles) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index fab7642994ffc..09e20f0632b2f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -907,7 +907,11 @@ class FileStreamSourceSuite extends FileStreamSourceTest { val fileSource = (execution invokePrivate _sources()).head.asInstanceOf[FileStreamSource] val metadataLog = fileSource invokePrivate _metadataLog() - if (isCompactionBatch(batchId, 2)) { + if (isCompactionBatch( + metadataLog.asInstanceOf[CompactibleFileStreamLog[AnyRef]].knownCompactionBatches, + metadataLog.asInstanceOf[CompactibleFileStreamLog[AnyRef]].zeroBatch, + batchId, + 2)) { val path = metadataLog.batchIdToPath(batchId) // Assert path name should be ended with compact suffix.