From d58b1c3ba12916a9557ff43b39f6f76ea71afe02 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Fri, 5 Jan 2018 13:27:23 -0800 Subject: [PATCH 1/5] remove v1 sink --- .../spark/sql/execution/SparkStrategies.scala | 3 - .../sql/execution/streaming/memory.scala | 96 ------------------- .../sql/streaming/DataStreamWriter.scala | 13 +-- .../execution/streaming/MemorySinkSuite.scala | 19 ++-- .../streaming/RateSourceV2Suite.scala | 2 +- .../streaming/EventTimeWatermarkSuite.scala | 3 +- .../sql/streaming/FileStreamSourceSuite.scala | 3 +- .../spark/sql/streaming/StreamSuite.scala | 9 +- .../spark/sql/streaming/StreamTest.scala | 17 +--- .../streaming/StreamingAggregationSuite.scala | 5 +- .../sql/streaming/StreamingQuerySuite.scala | 3 +- .../continuous/ContinuousSuite.scala | 20 ++-- 12 files changed, 44 insertions(+), 149 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 82b4eb9fba242..29e7e21c8b799 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -480,9 +480,6 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case d: DataWritingCommand => DataWritingCommandExec(d, planLater(d.query)) :: Nil case r: RunnableCommand => ExecutedCommandExec(r) :: Nil - case MemoryPlan(sink, output) => - val encoder = RowEncoder(sink.schema) - LocalTableScanExec(output, sink.allData.map(r => encoder.toRow(r).copy())) :: Nil case MemoryPlanV2(sink, output) => val encoder = RowEncoder(StructType.fromAttributes(output)) LocalTableScanExec(output, sink.allData.map(r => encoder.toRow(r).copy())) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index 352d4ce9fbcaa..f60a982c59483 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -187,99 +187,3 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) } } - -class MemoryStreamDataReaderFactory(records: Array[UnsafeRow]) - extends DataReaderFactory[UnsafeRow] { - override def createDataReader(): DataReader[UnsafeRow] = { - new DataReader[UnsafeRow] { - private var currentIndex = -1 - - override def next(): Boolean = { - // Return true as long as the new index is in the array. - currentIndex += 1 - currentIndex < records.length - } - - override def get(): UnsafeRow = records(currentIndex) - - override def close(): Unit = {} - } - } -} - -/** - * A sink that stores the results in memory. This [[Sink]] is primarily intended for use in unit - * tests and does not provide durability. - */ -class MemorySink(val schema: StructType, outputMode: OutputMode) extends Sink with Logging { - - private case class AddedData(batchId: Long, data: Array[Row]) - - /** An order list of batches that have been written to this [[Sink]]. */ - @GuardedBy("this") - private val batches = new ArrayBuffer[AddedData]() - - /** Returns all rows that are stored in this [[Sink]]. */ - def allData: Seq[Row] = synchronized { - batches.map(_.data).flatten - } - - def latestBatchId: Option[Long] = synchronized { - batches.lastOption.map(_.batchId) - } - - def latestBatchData: Seq[Row] = synchronized { batches.lastOption.toSeq.flatten(_.data) } - - def toDebugString: String = synchronized { - batches.map { case AddedData(batchId, data) => - val dataStr = try data.mkString(" ") catch { - case NonFatal(e) => "[Error converting to string]" - } - s"$batchId: $dataStr" - }.mkString("\n") - } - - override def addBatch(batchId: Long, data: DataFrame): Unit = { - val notCommitted = synchronized { - latestBatchId.isEmpty || batchId > latestBatchId.get - } - if (notCommitted) { - logDebug(s"Committing batch $batchId to $this") - outputMode match { - case Append | Update => - val rows = AddedData(batchId, data.collect()) - synchronized { batches += rows } - - case Complete => - val rows = AddedData(batchId, data.collect()) - synchronized { - batches.clear() - batches += rows - } - - case _ => - throw new IllegalArgumentException( - s"Output mode $outputMode is not supported by MemorySink") - } - } else { - logDebug(s"Skipping already committed batch: $batchId") - } - } - - def clear(): Unit = synchronized { - batches.clear() - } - - override def toString(): String = "MemorySink" -} - -/** - * Used to query the data that has been written into a [[MemorySink]]. - */ -case class MemoryPlan(sink: MemorySink, output: Seq[Attribute]) extends LeafNode { - def this(sink: MemorySink) = this(sink, sink.schema.toAttributes) - - private val sizePerRow = sink.schema.toAttributes.map(_.dataType.defaultSize).sum - - override def computeStats(): Statistics = Statistics(sizePerRow * sink.allData.size) -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index 2fc903168cfa0..666d97fe1fd29 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -243,16 +243,9 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { if (extraOptions.get("queryName").isEmpty) { throw new AnalysisException("queryName must be specified for memory sink") } - val (sink, resultDf) = trigger match { - case _: ContinuousTrigger => - val s = new MemorySinkV2() - val r = Dataset.ofRows(df.sparkSession, new MemoryPlanV2(s, df.schema.toAttributes)) - (s, r) - case _ => - val s = new MemorySink(df.schema, outputMode) - val r = Dataset.ofRows(df.sparkSession, new MemoryPlan(s)) - (s, r) - } + val sink = new MemorySinkV2() + val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlanV2(sink, df.schema.toAttributes)) + val chkpointLoc = extraOptions.get("checkpointLocation") val recoverFromChkpoint = outputMode == OutputMode.Complete() val query = df.sparkSession.sessionState.streamingQueryManager.startQuery( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala index e8420eee7fe9d..4a4856cb57167 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala @@ -22,6 +22,7 @@ import scala.language.implicitConversions import org.scalatest.BeforeAndAfter import org.apache.spark.sql._ +import org.apache.spark.sql.execution.streaming.sources.MemorySinkV2 import org.apache.spark.sql.streaming.{OutputMode, StreamTest} import org.apache.spark.sql.types.{IntegerType, StructField, StructType} import org.apache.spark.util.Utils @@ -36,7 +37,7 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter { test("directly add data in Append output mode") { implicit val schema = new StructType().add(new StructField("value", IntegerType)) - val sink = new MemorySink(schema, OutputMode.Append) + val sink = new MemorySinkV2 // Before adding data, check output assert(sink.latestBatchId === None) @@ -44,30 +45,30 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter { checkAnswer(sink.allData, Seq.empty) // Add batch 0 and check outputs - sink.addBatch(0, 1 to 3) + sink.write(0, OutputMode.Append(), Range(1, 4).map(Row(_)).toArray) assert(sink.latestBatchId === Some(0)) checkAnswer(sink.latestBatchData, 1 to 3) checkAnswer(sink.allData, 1 to 3) // Add batch 1 and check outputs - sink.addBatch(1, 4 to 6) + sink.write(1, OutputMode.Append(), Range(4, 7).map(Row(_)).toArray) assert(sink.latestBatchId === Some(1)) checkAnswer(sink.latestBatchData, 4 to 6) checkAnswer(sink.allData, 1 to 6) // new data should get appended to old data // Re-add batch 1 with different data, should not be added and outputs should not be changed - sink.addBatch(1, 7 to 9) + sink.write(1, OutputMode.Append(), Range(7, 10).map(Row(_)).toArray) assert(sink.latestBatchId === Some(1)) checkAnswer(sink.latestBatchData, 4 to 6) checkAnswer(sink.allData, 1 to 6) // Add batch 2 and check outputs - sink.addBatch(2, 7 to 9) + sink.write(2, OutputMode.Append(), Range(7, 10).map(Row(_)).toArray) assert(sink.latestBatchId === Some(2)) checkAnswer(sink.latestBatchData, 7 to 9) checkAnswer(sink.allData, 1 to 9) } - +/* test("directly add data in Update output mode") { implicit val schema = new StructType().add(new StructField("value", IntegerType)) val sink = new MemorySink(schema, OutputMode.Update) @@ -135,7 +136,7 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter { checkAnswer(sink.latestBatchData, 7 to 9) checkAnswer(sink.allData, 7 to 9) } - +*/ test("registering as a table in Append output mode") { val input = MemoryStream[Int] @@ -209,7 +210,7 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter { query.stop() } - test("MemoryPlan statistics") { + /* test("MemoryPlan statistics") { implicit val schema = new StructType().add(new StructField("value", IntegerType)) val sink = new MemorySink(schema, OutputMode.Append) val plan = new MemoryPlan(sink) @@ -225,7 +226,7 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter { sink.addBatch(1, 4 to 6) plan.invalidateStatsCache() assert(plan.stats.sizeInBytes === 24) - } + } */ ignore("stress test") { // Ignore the stress test as it takes several minutes to run diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala index 983ba1668f58f..4e8b564ea2f94 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceV2Suite.scala @@ -63,7 +63,7 @@ class RateSourceV2Suite extends StreamTest { .option("rowsPerSecond", "10") .option("useManualClock", "true") .load() - testStream(input, useV2Sink = true)( + testStream(input)( AdvanceRateManualClock(seconds = 1), CheckLastBatch((0 until 10).map(v => new java.sql.Timestamp(v * 100L) -> v): _*), StopStream, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index d6bef9ce07379..93dd6e70630bb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -27,6 +27,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.execution.streaming.sources.MemorySinkV2 import org.apache.spark.sql.functions.{count, window} import org.apache.spark.sql.streaming.OutputMode._ @@ -262,7 +263,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche AssertOnQuery { q => // purge commit and clear the sink val commit = q.commitLog.getLatest().map(_._1).getOrElse(-1L) + 1L q.commitLog.purge(commit) - q.sink.asInstanceOf[MemorySink].clear() + q.sink.asInstanceOf[MemorySinkV2].clear() true }, StartStream(), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 5bb0f4d643bbe..b484a5d2dff0a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.FileStreamSource.{FileEntry, SeenFilesMap} +import org.apache.spark.sql.execution.streaming.sources.MemorySinkV2 import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.ExistsThrowsExceptionFileSystem._ import org.apache.spark.sql.streaming.util.StreamManualClock @@ -1010,7 +1011,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { .asInstanceOf[StreamingQueryWrapper] .streamingQuery q.processAllAvailable() - val memorySink = q.sink.asInstanceOf[MemorySink] + val memorySink = q.sink.asInstanceOf[MemorySinkV2] val fileSource = getSourcesFromStreamingQuery(q).head /** Check the data read in the last batch */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index d1a04833390f5..3c8e5a405d662 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.plans.logical.Range import org.apache.spark.sql.catalyst.streaming.InternalOutputModes import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.execution.streaming.sources.MemorySinkV2 import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreConf, StateStoreId, StateStoreProvider} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -304,7 +305,7 @@ class StreamSuite extends StreamTest { // For each batch, we would log the sink change after the execution // This checks whether the key of the sink change log is the expected batch id def CheckSinkLatestBatchId(expectedId: Int): AssertOnQuery = - AssertOnQuery(_.sink.asInstanceOf[MemorySink].latestBatchId.get == expectedId, + AssertOnQuery(_.sink.asInstanceOf[MemorySinkV2].latestBatchId.get == expectedId, s"sink's lastBatchId should be $expectedId") val inputData = MemoryStream[Int] @@ -757,8 +758,10 @@ class StreamSuite extends StreamTest { query.awaitTermination() } - assert(e.getMessage.contains(providerClassName)) - assert(e.getMessage.contains("instantiated")) + assert(e.getCause != null) + assert(e.getCause.getCause != null) + assert(e.getCause.getCause.getMessage.contains(providerClassName)) + assert(e.getCause.getCause.getMessage.contains("instantiated")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 37fe595529baf..9c709b1075049 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -279,8 +279,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be */ def testStream( _stream: Dataset[_], - outputMode: OutputMode = OutputMode.Append, - useV2Sink: Boolean = defaultUseV2Sink)(actions: StreamAction*): Unit = synchronized { + outputMode: OutputMode = OutputMode.Append)(actions: StreamAction*): Unit = synchronized { import org.apache.spark.sql.streaming.util.StreamManualClock // `synchronized` is added to prevent the user from calling multiple `testStream`s concurrently @@ -293,7 +292,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be var currentStream: StreamExecution = null var lastStream: StreamExecution = null val awaiting = new mutable.HashMap[Int, Offset]() // source index -> offset to wait for - val sink = if (useV2Sink) new MemorySinkV2 else new MemorySink(stream.schema, outputMode) + val sink = new MemorySinkV2 val resetConfValues = mutable.Map[String, Option[String]]() @volatile @@ -343,10 +342,8 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be } def testState = { - val sinkDebugString = sink match { - case s: MemorySink => s.toDebugString - case s: MemorySinkV2 => s.toDebugString - } + val sinkDebugString = sink.toDebugString + s""" |== Progress == |$testActions @@ -415,11 +412,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be } } - val (latestBatchData, allData) = sink match { - case s: MemorySink => (s.latestBatchData, s.allData) - case s: MemorySinkV2 => (s.latestBatchData, s.allData) - } - try if (lastOnly) latestBatchData else allData catch { + try if (lastOnly) sink.latestBatchData else sink.allData catch { case e: Exception => failTest("Exception while getting data from sink", e) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index 97e065193fd05..f9209e0d9e85c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.exchange.Exchange import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.execution.streaming.sources.MemorySinkV2 import org.apache.spark.sql.execution.streaming.state.StateStore import org.apache.spark.sql.expressions.scalalang.typed import org.apache.spark.sql.functions._ @@ -298,7 +299,7 @@ class StreamingAggregationSuite extends StateStoreMetricsTest // i.e., we don't take it from the clock, which is at 90 seconds. StopStream, AssertOnQuery { q => // clear the sink - q.sink.asInstanceOf[MemorySink].clear() + q.sink.asInstanceOf[MemorySinkV2].clear() q.commitLog.purge(3) // advance by a minute i.e., 90 seconds total clock.advance(60 * 1000L) @@ -350,7 +351,7 @@ class StreamingAggregationSuite extends StateStoreMetricsTest // i.e., we don't take it from the clock, which is at 90 days. StopStream, AssertOnQuery { q => // clear the sink - q.sink.asInstanceOf[MemorySink].clear() + q.sink.asInstanceOf[MemorySinkV2].clear() q.commitLog.purge(3) // advance by 60 days i.e., 90 days total clock.advance(DateTimeUtils.MILLIS_PER_DAY * 60) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 3f9aa0d1fa5be..b3d40ab3b3391 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -32,6 +32,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, Dataset} import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.execution.streaming.sources.MemorySinkV2 import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.v2.reader.DataReaderFactory @@ -179,7 +180,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi AssertOnQuery { q => // blow away commit log and sink result q.commitLog.purge(1) - q.sink.asInstanceOf[MemorySink].clear() + q.sink.asInstanceOf[MemorySinkV2].clear() true }, StartStream(trigger = Once), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala index 4b4ed82dc6520..6230e76dad514 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala @@ -69,7 +69,7 @@ class ContinuousSuite extends ContinuousSuiteBase { .load() .select('value) - testStream(df, useV2Sink = true)( + testStream(df)( StartStream(longContinuousTrigger), AwaitEpoch(0), Execute(waitForRateSourceTriggers(_, 2)), @@ -93,7 +93,7 @@ class ContinuousSuite extends ContinuousSuiteBase { .select('value) .map(r => r.getLong(0) * 2) - testStream(df, useV2Sink = true)( + testStream(df)( StartStream(longContinuousTrigger), AwaitEpoch(0), Execute(waitForRateSourceTriggers(_, 2)), @@ -112,7 +112,7 @@ class ContinuousSuite extends ContinuousSuiteBase { .select('value) .flatMap(r => Seq(0, r.getLong(0), r.getLong(0) * 2)) - testStream(df, useV2Sink = true)( + testStream(df)( StartStream(longContinuousTrigger), AwaitEpoch(0), Execute(waitForRateSourceTriggers(_, 2)), @@ -131,7 +131,7 @@ class ContinuousSuite extends ContinuousSuiteBase { .select('value) .where('value > 5) - testStream(df, useV2Sink = true)( + testStream(df)( StartStream(longContinuousTrigger), AwaitEpoch(0), Execute(waitForRateSourceTriggers(_, 2)), @@ -151,7 +151,7 @@ class ContinuousSuite extends ContinuousSuiteBase { .dropDuplicates() val except = intercept[AnalysisException] { - testStream(df, useV2Sink = true)(StartStream(longContinuousTrigger)) + testStream(df)(StartStream(longContinuousTrigger)) } assert(except.message.contains( @@ -167,7 +167,7 @@ class ContinuousSuite extends ContinuousSuiteBase { .select(current_timestamp()) val except = intercept[AnalysisException] { - testStream(df, useV2Sink = true)(StartStream(longContinuousTrigger)) + testStream(df)(StartStream(longContinuousTrigger)) } assert(except.message.contains( @@ -182,7 +182,7 @@ class ContinuousSuite extends ContinuousSuiteBase { .load() .select('value) - testStream(df, useV2Sink = true)( + testStream(df)( StartStream(longContinuousTrigger), AwaitEpoch(0), Execute(waitForRateSourceTriggers(_, 2)), @@ -270,7 +270,7 @@ class ContinuousStressSuite extends ContinuousSuiteBase { .load() .select('value) - testStream(df, useV2Sink = true)( + testStream(df)( StartStream(longContinuousTrigger), AwaitEpoch(0), Execute(waitForRateSourceTriggers(_, 201)), @@ -288,7 +288,7 @@ class ContinuousStressSuite extends ContinuousSuiteBase { .load() .select('value) - testStream(df, useV2Sink = true)( + testStream(df)( StartStream(Trigger.Continuous(2012)), AwaitEpoch(0), Execute(waitForRateSourceTriggers(_, 201)), @@ -305,7 +305,7 @@ class ContinuousStressSuite extends ContinuousSuiteBase { .load() .select('value) - testStream(df, useV2Sink = true)( + testStream(df)( StartStream(Trigger.Continuous(2012)), AwaitEpoch(10), StopStream, From 6d7bd3465f8894283b7805e85542a56502569946 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Mon, 22 Jan 2018 10:56:11 -0800 Subject: [PATCH 2/5] remove v2 sink from new tests --- .../org/apache/spark/sql/kafka010/KafkaContinuousTest.scala | 1 - .../test/scala/org/apache/spark/sql/streaming/StreamTest.scala | 1 - .../spark/sql/streaming/StreamingQueryListenerSuite.scala | 2 +- .../apache/spark/sql/streaming/continuous/ContinuousSuite.scala | 2 +- 4 files changed, 2 insertions(+), 4 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala index 5a1a14f7a307a..c68df21d97043 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala @@ -30,7 +30,6 @@ import org.apache.spark.sql.test.TestSparkSession // Trait to configure StreamTest for kafka continuous execution tests. trait KafkaContinuousTest extends KafkaSourceTest { override val defaultTrigger = Trigger.Continuous(1000) - override val defaultUseV2Sink = true // We need more than the default local[2] to be able to schedule all partitions simultaneously. override protected def createSparkSession = new TestSparkSession( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 9c709b1075049..8d583a88af69c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -82,7 +82,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be } protected val defaultTrigger = Trigger.ProcessingTime(0) - protected val defaultUseV2Sink = false /** How long to wait for an active stream to catch up when checking a result. */ val streamingTimeout = 10.seconds diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index b96f2bcbdd644..d5038e06e7601 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -180,7 +180,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { val listeners = (1 to 5).map(_ => new EventCollector) try { listeners.foreach(listener => spark.streams.addListener(listener)) - testStream(df, OutputMode.Append, useV2Sink = true)( + testStream(df, OutputMode.Append)( StartStream(Trigger.Continuous(1000)), StopStream, AssertOnQuery { query => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala index 6230e76dad514..25cc755aca23f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala @@ -218,7 +218,7 @@ class ContinuousSuite extends ContinuousSuiteBase { } spark.sparkContext.addSparkListener(listener) try { - testStream(df, useV2Sink = true)( + testStream(df)( StartStream(Trigger.Continuous(100)), Execute(waitForRateSourceTriggers(_, 2)), Execute { _ => From efcf03d47bda6c9e0c797d7415d20ae2534db393 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Fri, 9 Feb 2018 11:37:22 -0800 Subject: [PATCH 3/5] fix rebase --- .../spark/sql/execution/streaming/memory.scala | 18 ++++++++++++++++++ .../sql/streaming/StreamingQuerySuite.scala | 5 +++-- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index f60a982c59483..171c959a9ffd2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -187,3 +187,21 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) } } +class MemoryStreamDataReaderFactory(records: Array[UnsafeRow]) + extends DataReaderFactory[UnsafeRow] { + override def createDataReader(): DataReader[UnsafeRow] = { + new DataReader[UnsafeRow] { + private var currentIndex = -1 + + override def next(): Boolean = { + // Return true as long as the new index is in the array. + currentIndex += 1 + currentIndex < records.length + } + + override def get(): UnsafeRow = records(currentIndex) + + override def close(): Unit = {} + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index b3d40ab3b3391..0f3c6ea9a02a0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -576,8 +576,9 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi q3.processAllAvailable() } assert(e.getCause.isInstanceOf[SparkException]) - assert(e.getCause.getCause.isInstanceOf[IllegalStateException]) - assert(e.getMessage.contains("StreamingQuery cannot be used in executors")) + assert(e.getCause.getCause.getCause.isInstanceOf[IllegalStateException]) + assert(e.getCause.getCause.getCause.getMessage.contains( + "StreamingQuery cannot be used in executors")) } finally { q1.stop() q2.stop() From 77887b76ea18bdd4a5a08919afa426bf11d06b4e Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Tue, 13 Feb 2018 10:11:15 -0800 Subject: [PATCH 4/5] restore tests --- .../execution/streaming/MemorySinkSuite.scala | 57 +++++++++++-------- 1 file changed, 34 insertions(+), 23 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala index 4a4856cb57167..db8634cb3f9c2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala @@ -22,7 +22,7 @@ import scala.language.implicitConversions import org.scalatest.BeforeAndAfter import org.apache.spark.sql._ -import org.apache.spark.sql.execution.streaming.sources.MemorySinkV2 +import org.apache.spark.sql.execution.streaming.sources.{MemoryPlanV2, MemorySinkV2} import org.apache.spark.sql.streaming.{OutputMode, StreamTest} import org.apache.spark.sql.types.{IntegerType, StructField, StructType} import org.apache.spark.util.Utils @@ -35,9 +35,18 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter { sqlContext.streams.active.foreach(_.stop()) } + private def addBatchFunc( + sink: MemorySinkV2, + outputMode: OutputMode)( + batchId: Long, + vals: Seq[Int]): Unit = { + sink.write(batchId, outputMode, vals.map(Row(_)).toArray) + } + test("directly add data in Append output mode") { implicit val schema = new StructType().add(new StructField("value", IntegerType)) val sink = new MemorySinkV2 + val addBatch = addBatchFunc(sink, OutputMode.Append()) _ // Before adding data, check output assert(sink.latestBatchId === None) @@ -45,33 +54,34 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter { checkAnswer(sink.allData, Seq.empty) // Add batch 0 and check outputs - sink.write(0, OutputMode.Append(), Range(1, 4).map(Row(_)).toArray) + addBatch(0, 1 to 3) assert(sink.latestBatchId === Some(0)) checkAnswer(sink.latestBatchData, 1 to 3) checkAnswer(sink.allData, 1 to 3) // Add batch 1 and check outputs - sink.write(1, OutputMode.Append(), Range(4, 7).map(Row(_)).toArray) + addBatch(1, 4 to 6) assert(sink.latestBatchId === Some(1)) checkAnswer(sink.latestBatchData, 4 to 6) checkAnswer(sink.allData, 1 to 6) // new data should get appended to old data // Re-add batch 1 with different data, should not be added and outputs should not be changed - sink.write(1, OutputMode.Append(), Range(7, 10).map(Row(_)).toArray) + addBatch(1, 7 to 9) assert(sink.latestBatchId === Some(1)) checkAnswer(sink.latestBatchData, 4 to 6) checkAnswer(sink.allData, 1 to 6) // Add batch 2 and check outputs - sink.write(2, OutputMode.Append(), Range(7, 10).map(Row(_)).toArray) + addBatch(2, 7 to 9) assert(sink.latestBatchId === Some(2)) checkAnswer(sink.latestBatchData, 7 to 9) checkAnswer(sink.allData, 1 to 9) } -/* + test("directly add data in Update output mode") { implicit val schema = new StructType().add(new StructField("value", IntegerType)) - val sink = new MemorySink(schema, OutputMode.Update) + val sink = new MemorySinkV2 + val addBatch = addBatchFunc(sink, OutputMode.Update()) _ // Before adding data, check output assert(sink.latestBatchId === None) @@ -79,25 +89,25 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter { checkAnswer(sink.allData, Seq.empty) // Add batch 0 and check outputs - sink.addBatch(0, 1 to 3) + addBatch(0, 1 to 3) assert(sink.latestBatchId === Some(0)) checkAnswer(sink.latestBatchData, 1 to 3) checkAnswer(sink.allData, 1 to 3) // Add batch 1 and check outputs - sink.addBatch(1, 4 to 6) + addBatch(1, 4 to 6) assert(sink.latestBatchId === Some(1)) checkAnswer(sink.latestBatchData, 4 to 6) checkAnswer(sink.allData, 1 to 6) // new data should get appended to old data // Re-add batch 1 with different data, should not be added and outputs should not be changed - sink.addBatch(1, 7 to 9) + addBatch(1, 7 to 9) assert(sink.latestBatchId === Some(1)) checkAnswer(sink.latestBatchData, 4 to 6) checkAnswer(sink.allData, 1 to 6) // Add batch 2 and check outputs - sink.addBatch(2, 7 to 9) + addBatch(2, 7 to 9) assert(sink.latestBatchId === Some(2)) checkAnswer(sink.latestBatchData, 7 to 9) checkAnswer(sink.allData, 1 to 9) @@ -105,7 +115,8 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter { test("directly add data in Complete output mode") { implicit val schema = new StructType().add(new StructField("value", IntegerType)) - val sink = new MemorySink(schema, OutputMode.Complete) + val sink = new MemorySinkV2 + val addBatch = addBatchFunc(sink, OutputMode.Complete()) _ // Before adding data, check output assert(sink.latestBatchId === None) @@ -113,30 +124,29 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter { checkAnswer(sink.allData, Seq.empty) // Add batch 0 and check outputs - sink.addBatch(0, 1 to 3) + addBatch(0, 1 to 3) assert(sink.latestBatchId === Some(0)) checkAnswer(sink.latestBatchData, 1 to 3) checkAnswer(sink.allData, 1 to 3) // Add batch 1 and check outputs - sink.addBatch(1, 4 to 6) + addBatch(1, 4 to 6) assert(sink.latestBatchId === Some(1)) checkAnswer(sink.latestBatchData, 4 to 6) checkAnswer(sink.allData, 4 to 6) // new data should replace old data // Re-add batch 1 with different data, should not be added and outputs should not be changed - sink.addBatch(1, 7 to 9) + addBatch(1, 7 to 9) assert(sink.latestBatchId === Some(1)) checkAnswer(sink.latestBatchData, 4 to 6) checkAnswer(sink.allData, 4 to 6) // Add batch 2 and check outputs - sink.addBatch(2, 7 to 9) + addBatch(2, 7 to 9) assert(sink.latestBatchId === Some(2)) checkAnswer(sink.latestBatchData, 7 to 9) checkAnswer(sink.allData, 7 to 9) } -*/ test("registering as a table in Append output mode") { val input = MemoryStream[Int] @@ -210,23 +220,24 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter { query.stop() } - /* test("MemoryPlan statistics") { + test("MemoryPlan statistics") { implicit val schema = new StructType().add(new StructField("value", IntegerType)) - val sink = new MemorySink(schema, OutputMode.Append) - val plan = new MemoryPlan(sink) + val sink = new MemorySinkV2 + val plan = new MemoryPlanV2(sink, schema.toAttributes) + val addBatch = addBatchFunc(sink, OutputMode.Append()) _ // Before adding data, check output checkAnswer(sink.allData, Seq.empty) assert(plan.stats.sizeInBytes === 0) - sink.addBatch(0, 1 to 3) + addBatch(0, 1 to 3) plan.invalidateStatsCache() assert(plan.stats.sizeInBytes === 12) - sink.addBatch(1, 4 to 6) + addBatch(1, 4 to 6) plan.invalidateStatsCache() assert(plan.stats.sizeInBytes === 24) - } */ + } ignore("stress test") { // Ignore the stress test as it takes several minutes to run From 6059a7f22f59f30030d508c3469fade58491da72 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Tue, 13 Feb 2018 10:15:59 -0800 Subject: [PATCH 5/5] remove v2 from names --- .../spark/sql/execution/SparkStrategies.scala | 4 +- .../sources/{memoryV2.scala => memory.scala} | 8 +- .../sql/streaming/DataStreamWriter.scala | 6 +- .../execution/streaming/MemorySinkSuite.scala | 71 ++++++++++++++-- .../streaming/MemorySinkV2Suite.scala | 83 ------------------- .../streaming/EventTimeWatermarkSuite.scala | 4 +- .../sql/streaming/FileStreamSourceSuite.scala | 4 +- .../spark/sql/streaming/StreamSuite.scala | 4 +- .../spark/sql/streaming/StreamTest.scala | 4 +- .../streaming/StreamingAggregationSuite.scala | 6 +- .../sql/streaming/StreamingQuerySuite.scala | 4 +- 11 files changed, 86 insertions(+), 112 deletions(-) rename sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/{memoryV2.scala => memory.scala} (94%) delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 29e7e21c8b799..8da90ddd854f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide} import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.execution.streaming.sources.MemoryPlanV2 +import org.apache.spark.sql.execution.streaming.sources.MemoryPlan import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.StreamingQuery import org.apache.spark.sql.types.StructType @@ -480,7 +480,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case d: DataWritingCommand => DataWritingCommandExec(d, planLater(d.query)) :: Nil case r: RunnableCommand => ExecutedCommandExec(r) :: Nil - case MemoryPlanV2(sink, output) => + case MemoryPlan(sink, output) => val encoder = RowEncoder(StructType.fromAttributes(output)) LocalTableScanExec(output, sink.allData.map(r => encoder.toRow(r).copy())) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala similarity index 94% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala index f960208155e3b..c0814f56f53b6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.types.StructType * A sink that stores the results in memory. This [[Sink]] is primarily intended for use in unit * tests and does not provide durability. */ -class MemorySinkV2 extends DataSourceV2 with StreamWriteSupport with Logging { +class MemorySink extends DataSourceV2 with StreamWriteSupport with Logging { override def createStreamWriter( queryId: String, schema: StructType, @@ -112,7 +112,7 @@ class MemorySinkV2 extends DataSourceV2 with StreamWriteSupport with Logging { case class MemoryWriterCommitMessage(partition: Int, data: Seq[Row]) extends WriterCommitMessage {} -class MemoryWriter(sink: MemorySinkV2, batchId: Long, outputMode: OutputMode) +class MemoryWriter(sink: MemorySink, batchId: Long, outputMode: OutputMode) extends DataSourceWriter with Logging { override def createWriterFactory: MemoryWriterFactory = MemoryWriterFactory(outputMode) @@ -129,7 +129,7 @@ class MemoryWriter(sink: MemorySinkV2, batchId: Long, outputMode: OutputMode) } } -class MemoryStreamWriter(val sink: MemorySinkV2, outputMode: OutputMode) +class MemoryStreamWriter(val sink: MemorySink, outputMode: OutputMode) extends StreamWriter { override def createWriterFactory: MemoryWriterFactory = MemoryWriterFactory(outputMode) @@ -174,7 +174,7 @@ class MemoryDataWriter(partition: Int, outputMode: OutputMode) /** * Used to query the data that has been written into a [[MemorySink]]. */ -case class MemoryPlanV2(sink: MemorySinkV2, override val output: Seq[Attribute]) extends LeafNode { +case class MemoryPlan(sink: MemorySink, override val output: Seq[Attribute]) extends LeafNode { private val sizePerRow = output.map(_.dataType.defaultSize).sum override def computeStats(): Statistics = Statistics(sizePerRow * sink.allData.size) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index 666d97fe1fd29..7d5daa4d56bc4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger -import org.apache.spark.sql.execution.streaming.sources.{MemoryPlanV2, MemorySinkV2} +import org.apache.spark.sql.execution.streaming.sources.{MemoryPlan, MemorySink} import org.apache.spark.sql.sources.v2.StreamWriteSupport /** @@ -243,8 +243,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { if (extraOptions.get("queryName").isEmpty) { throw new AnalysisException("queryName must be specified for memory sink") } - val sink = new MemorySinkV2() - val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlanV2(sink, df.schema.toAttributes)) + val sink = new MemorySink() + val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink, df.schema.toAttributes)) val chkpointLoc = extraOptions.get("checkpointLocation") val recoverFromChkpoint = outputMode == OutputMode.Complete() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala index db8634cb3f9c2..7a8baa6432526 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala @@ -22,7 +22,7 @@ import scala.language.implicitConversions import org.scalatest.BeforeAndAfter import org.apache.spark.sql._ -import org.apache.spark.sql.execution.streaming.sources.{MemoryPlanV2, MemorySinkV2} +import org.apache.spark.sql.execution.streaming.sources._ import org.apache.spark.sql.streaming.{OutputMode, StreamTest} import org.apache.spark.sql.types.{IntegerType, StructField, StructType} import org.apache.spark.util.Utils @@ -36,7 +36,7 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter { } private def addBatchFunc( - sink: MemorySinkV2, + sink: MemorySink, outputMode: OutputMode)( batchId: Long, vals: Seq[Int]): Unit = { @@ -45,7 +45,7 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter { test("directly add data in Append output mode") { implicit val schema = new StructType().add(new StructField("value", IntegerType)) - val sink = new MemorySinkV2 + val sink = new MemorySink val addBatch = addBatchFunc(sink, OutputMode.Append()) _ // Before adding data, check output @@ -80,7 +80,7 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter { test("directly add data in Update output mode") { implicit val schema = new StructType().add(new StructField("value", IntegerType)) - val sink = new MemorySinkV2 + val sink = new MemorySink val addBatch = addBatchFunc(sink, OutputMode.Update()) _ // Before adding data, check output @@ -115,7 +115,7 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter { test("directly add data in Complete output mode") { implicit val schema = new StructType().add(new StructField("value", IntegerType)) - val sink = new MemorySinkV2 + val sink = new MemorySink val addBatch = addBatchFunc(sink, OutputMode.Complete()) _ // Before adding data, check output @@ -222,8 +222,8 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter { test("MemoryPlan statistics") { implicit val schema = new StructType().add(new StructField("value", IntegerType)) - val sink = new MemorySinkV2 - val plan = new MemoryPlanV2(sink, schema.toAttributes) + val sink = new MemorySink + val plan = new MemoryPlan(sink, schema.toAttributes) val addBatch = addBatchFunc(sink, OutputMode.Append()) _ // Before adding data, check output @@ -297,6 +297,63 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter { } } + test("data writer") { + val partition = 1234 + val writer = new MemoryDataWriter(partition, OutputMode.Append()) + writer.write(Row(1)) + writer.write(Row(2)) + writer.write(Row(44)) + val msg = writer.commit() + assert(msg.data.map(_.getInt(0)) == Seq(1, 2, 44)) + assert(msg.partition == partition) + + // Buffer should be cleared, so repeated commits should give empty. + assert(writer.commit().data.isEmpty) + } + + test("continuous writer") { + val sink = new MemorySink + val writer = new MemoryStreamWriter(sink, OutputMode.Append()) + writer.commit(0, + Array( + MemoryWriterCommitMessage(0, Seq(Row(1), Row(2))), + MemoryWriterCommitMessage(1, Seq(Row(3), Row(4))), + MemoryWriterCommitMessage(2, Seq(Row(6), Row(7))) + )) + assert(sink.latestBatchId.contains(0)) + assert(sink.latestBatchData.map(_.getInt(0)).sorted == Seq(1, 2, 3, 4, 6, 7)) + writer.commit(19, + Array( + MemoryWriterCommitMessage(3, Seq(Row(11), Row(22))), + MemoryWriterCommitMessage(0, Seq(Row(33))) + )) + assert(sink.latestBatchId.contains(19)) + assert(sink.latestBatchData.map(_.getInt(0)).sorted == Seq(11, 22, 33)) + + assert(sink.allData.map(_.getInt(0)).sorted == Seq(1, 2, 3, 4, 6, 7, 11, 22, 33)) + } + + test("microbatch writer") { + val sink = new MemorySink + new MemoryWriter(sink, 0, OutputMode.Append()).commit( + Array( + MemoryWriterCommitMessage(0, Seq(Row(1), Row(2))), + MemoryWriterCommitMessage(1, Seq(Row(3), Row(4))), + MemoryWriterCommitMessage(2, Seq(Row(6), Row(7))) + )) + assert(sink.latestBatchId.contains(0)) + assert(sink.latestBatchData.map(_.getInt(0)).sorted == Seq(1, 2, 3, 4, 6, 7)) + new MemoryWriter(sink, 19, OutputMode.Append()).commit( + Array( + MemoryWriterCommitMessage(3, Seq(Row(11), Row(22))), + MemoryWriterCommitMessage(0, Seq(Row(33))) + )) + assert(sink.latestBatchId.contains(19)) + assert(sink.latestBatchData.map(_.getInt(0)).sorted == Seq(11, 22, 33)) + + assert(sink.allData.map(_.getInt(0)).sorted == Seq(1, 2, 3, 4, 6, 7, 11, 22, 33)) + } + private def checkAnswer(rows: Seq[Row], expected: Seq[Int])(implicit schema: StructType): Unit = { checkAnswer( sqlContext.createDataFrame(sparkContext.makeRDD(rows), schema), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala deleted file mode 100644 index 9be22d94b5654..0000000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.streaming - -import org.scalatest.BeforeAndAfter - -import org.apache.spark.sql.Row -import org.apache.spark.sql.execution.streaming.sources._ -import org.apache.spark.sql.streaming.{OutputMode, StreamTest} - -class MemorySinkV2Suite extends StreamTest with BeforeAndAfter { - test("data writer") { - val partition = 1234 - val writer = new MemoryDataWriter(partition, OutputMode.Append()) - writer.write(Row(1)) - writer.write(Row(2)) - writer.write(Row(44)) - val msg = writer.commit() - assert(msg.data.map(_.getInt(0)) == Seq(1, 2, 44)) - assert(msg.partition == partition) - - // Buffer should be cleared, so repeated commits should give empty. - assert(writer.commit().data.isEmpty) - } - - test("continuous writer") { - val sink = new MemorySinkV2 - val writer = new MemoryStreamWriter(sink, OutputMode.Append()) - writer.commit(0, - Array( - MemoryWriterCommitMessage(0, Seq(Row(1), Row(2))), - MemoryWriterCommitMessage(1, Seq(Row(3), Row(4))), - MemoryWriterCommitMessage(2, Seq(Row(6), Row(7))) - )) - assert(sink.latestBatchId.contains(0)) - assert(sink.latestBatchData.map(_.getInt(0)).sorted == Seq(1, 2, 3, 4, 6, 7)) - writer.commit(19, - Array( - MemoryWriterCommitMessage(3, Seq(Row(11), Row(22))), - MemoryWriterCommitMessage(0, Seq(Row(33))) - )) - assert(sink.latestBatchId.contains(19)) - assert(sink.latestBatchData.map(_.getInt(0)).sorted == Seq(11, 22, 33)) - - assert(sink.allData.map(_.getInt(0)).sorted == Seq(1, 2, 3, 4, 6, 7, 11, 22, 33)) - } - - test("microbatch writer") { - val sink = new MemorySinkV2 - new MemoryWriter(sink, 0, OutputMode.Append()).commit( - Array( - MemoryWriterCommitMessage(0, Seq(Row(1), Row(2))), - MemoryWriterCommitMessage(1, Seq(Row(3), Row(4))), - MemoryWriterCommitMessage(2, Seq(Row(6), Row(7))) - )) - assert(sink.latestBatchId.contains(0)) - assert(sink.latestBatchData.map(_.getInt(0)).sorted == Seq(1, 2, 3, 4, 6, 7)) - new MemoryWriter(sink, 19, OutputMode.Append()).commit( - Array( - MemoryWriterCommitMessage(3, Seq(Row(11), Row(22))), - MemoryWriterCommitMessage(0, Seq(Row(33))) - )) - assert(sink.latestBatchId.contains(19)) - assert(sink.latestBatchData.map(_.getInt(0)).sorted == Seq(11, 22, 33)) - - assert(sink.allData.map(_.getInt(0)).sorted == Seq(1, 2, 3, 4, 6, 7, 11, 22, 33)) - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index 93dd6e70630bb..d9f04150b26d6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.execution.streaming.sources.MemorySinkV2 +import org.apache.spark.sql.execution.streaming.sources.MemorySink import org.apache.spark.sql.functions.{count, window} import org.apache.spark.sql.streaming.OutputMode._ @@ -263,7 +263,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche AssertOnQuery { q => // purge commit and clear the sink val commit = q.commitLog.getLatest().map(_._1).getOrElse(-1L) + 1L q.commitLog.purge(commit) - q.sink.asInstanceOf[MemorySinkV2].clear() + q.sink.asInstanceOf[MemorySink].clear() true }, StartStream(), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index b484a5d2dff0a..980de180b3ded 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.FileStreamSource.{FileEntry, SeenFilesMap} -import org.apache.spark.sql.execution.streaming.sources.MemorySinkV2 +import org.apache.spark.sql.execution.streaming.sources.MemorySink import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.ExistsThrowsExceptionFileSystem._ import org.apache.spark.sql.streaming.util.StreamManualClock @@ -1011,7 +1011,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { .asInstanceOf[StreamingQueryWrapper] .streamingQuery q.processAllAvailable() - val memorySink = q.sink.asInstanceOf[MemorySinkV2] + val memorySink = q.sink.asInstanceOf[MemorySink] val fileSource = getSourcesFromStreamingQuery(q).head /** Check the data read in the last batch */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 5d224fcdf3093..37a4870f160f0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.plans.logical.Range import org.apache.spark.sql.catalyst.streaming.InternalOutputModes import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.execution.streaming.sources.MemorySinkV2 +import org.apache.spark.sql.execution.streaming.sources.MemorySink import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreConf, StateStoreId, StateStoreProvider} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -305,7 +305,7 @@ class StreamSuite extends StreamTest { // For each batch, we would log the sink change after the execution // This checks whether the key of the sink change log is the expected batch id def CheckSinkLatestBatchId(expectedId: Int): AssertOnQuery = - AssertOnQuery(_.sink.asInstanceOf[MemorySinkV2].latestBatchId.get == expectedId, + AssertOnQuery(_.sink.asInstanceOf[MemorySink].latestBatchId.get == expectedId, s"sink's lastBatchId should be $expectedId") val inputData = MemoryStream[Int] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index b147528be750b..98c9fc5049a9f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.continuous.{ContinuousExecution, ContinuousTrigger, EpochCoordinatorRef, IncrementAndGetEpoch} -import org.apache.spark.sql.execution.streaming.sources.MemorySinkV2 +import org.apache.spark.sql.execution.streaming.sources.MemorySink import org.apache.spark.sql.execution.streaming.state.StateStore import org.apache.spark.sql.streaming.StreamingQueryListener._ import org.apache.spark.sql.test.SharedSQLContext @@ -291,7 +291,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be var currentStream: StreamExecution = null var lastStream: StreamExecution = null val awaiting = new mutable.HashMap[Int, Offset]() // source index -> offset to wait for - val sink = new MemorySinkV2 + val sink = new MemorySink val resetConfValues = mutable.Map[String, Option[String]]() @volatile diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index f9209e0d9e85c..d80a6dfe2b52d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.exchange.Exchange import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.execution.streaming.sources.MemorySinkV2 +import org.apache.spark.sql.execution.streaming.sources.MemorySink import org.apache.spark.sql.execution.streaming.state.StateStore import org.apache.spark.sql.expressions.scalalang.typed import org.apache.spark.sql.functions._ @@ -299,7 +299,7 @@ class StreamingAggregationSuite extends StateStoreMetricsTest // i.e., we don't take it from the clock, which is at 90 seconds. StopStream, AssertOnQuery { q => // clear the sink - q.sink.asInstanceOf[MemorySinkV2].clear() + q.sink.asInstanceOf[MemorySink].clear() q.commitLog.purge(3) // advance by a minute i.e., 90 seconds total clock.advance(60 * 1000L) @@ -351,7 +351,7 @@ class StreamingAggregationSuite extends StateStoreMetricsTest // i.e., we don't take it from the clock, which is at 90 days. StopStream, AssertOnQuery { q => // clear the sink - q.sink.asInstanceOf[MemorySinkV2].clear() + q.sink.asInstanceOf[MemorySink].clear() q.commitLog.purge(3) // advance by 60 days i.e., 90 days total clock.advance(DateTimeUtils.MILLIS_PER_DAY * 60) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 0f3c6ea9a02a0..679b8fd402b97 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -32,7 +32,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, Dataset} import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.execution.streaming.sources.MemorySinkV2 +import org.apache.spark.sql.execution.streaming.sources.MemorySink import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.v2.reader.DataReaderFactory @@ -180,7 +180,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi AssertOnQuery { q => // blow away commit log and sink result q.commitLog.purge(1) - q.sink.asInstanceOf[MemorySinkV2].clear() + q.sink.asInstanceOf[MemorySink].clear() true }, StartStream(trigger = Once),