From 7c09b376eef6a4e6c118c78ad9459cb55e59e67f Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 11 Jan 2018 08:44:19 -0800 Subject: [PATCH 01/16] save for so far --- .../apache/spark/sql/execution/streaming/LongOffset.scala | 4 +++- .../org/apache/spark/sql/execution/streaming/memory.scala | 6 ++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala index 5f0b195fcfcb8..491859e4bd85e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala @@ -17,10 +17,12 @@ package org.apache.spark.sql.execution.streaming +import org.apache.spark.sql.sources.v2.streaming.reader.{Offset => OffsetV2} + /** * A simple offset for sources that produce a single linear stream of data. */ -case class LongOffset(offset: Long) extends Offset { +case class LongOffset(offset: Long) extends OffsetV2 { override val json = offset.toString diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index 3041d4d703cb4..94bb88d4c9f3a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LocalRelation, Statistics} import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._ import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.sources.v2.streaming.reader.MicroBatchReader import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType import org.apache.spark.util.Utils @@ -51,9 +52,10 @@ object MemoryStream { * available. */ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) - extends Source with Logging { + extends MicroBatchReader with Logging { protected val encoder = encoderFor[A] - protected val logicalPlan = StreamingExecutionRelation(this, sqlContext.sparkSession) + private val attributes = encoder.schema.toAttributes + protected val logicalPlan = StreamingExecutionRelation(this, attributes)(sqlContext.sparkSession) protected val output = logicalPlan.output /** From 78c50f860aa13f569669f4ad77f4325d80085c8b Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Fri, 12 Jan 2018 10:27:49 -0800 Subject: [PATCH 02/16] Save so far --- .../streaming/MicroBatchExecution.scala | 2 +- .../sql/execution/streaming/memory.scala | 36 +++++++++++++------ 2 files changed, 27 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 42240eeb58d4b..1f9423243cf1a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -275,7 +275,7 @@ class MicroBatchExecution( toJava(availableOffsets.get(s).map(off => s.deserializeOffset(off.json))), Optional.empty()) - (s, Some(s.getEndOffset)) + (s, Option(s.getEndOffset)) } }.toMap availableOffsets ++= latestOffsets.filter { case (_, o) => o.nonEmpty }.mapValues(_.get) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index 94bb88d4c9f3a..f09371ae11bb7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.streaming +import java.util +import java.util.Optional import java.util.concurrent.atomic.AtomicInteger import javax.annotation.concurrent.GuardedBy @@ -32,7 +34,8 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LocalRelation, Statistics} import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._ import org.apache.spark.sql.execution.SQLExecution -import org.apache.spark.sql.sources.v2.streaming.reader.MicroBatchReader +import org.apache.spark.sql.sources.v2.reader.ReadTask +import org.apache.spark.sql.sources.v2.streaming.reader.{Offset => OffsetV2, MicroBatchReader} import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType import org.apache.spark.util.Utils @@ -68,6 +71,9 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) @GuardedBy("this") protected var currentOffset: LongOffset = new LongOffset(-1) + private var startOffset = new LongOffset(-1) + private var endOffset = new LongOffset(-1) + /** * Last offset that was discarded, or -1 if no commits have occurred. Note that the value * -1 is used in calculations below and isn't just an arbitrary constant. @@ -75,8 +81,6 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) @GuardedBy("this") protected var lastOffsetCommitted : LongOffset = new LongOffset(-1) - def schema: StructType = encoder.schema - def toDS(): Dataset[A] = { Dataset(sqlContext.sparkSession, logicalPlan) } @@ -91,7 +95,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) def addData(data: TraversableOnce[A]): Offset = { val encoded = data.toVector.map(d => encoder.toRow(d).copy()) - val plan = new LocalRelation(schema.toAttributes, encoded, isStreaming = true) + val plan = new LocalRelation(attributes, encoded, isStreaming = true) val ds = Dataset[A](sqlContext.sparkSession, plan) logDebug(s"Adding ds: $ds") this.synchronized { @@ -103,12 +107,24 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) override def toString: String = s"MemoryStream[${Utils.truncatedString(output, ",")}]" - override def getOffset: Option[Offset] = synchronized { - if (currentOffset.offset == -1) { - None - } else { - Some(currentOffset) + override def setOffsetRange(start: Optional[OffsetV2], end: Optional[OffsetV2]): Unit = { + if (start.isPresent) { + startOffset = start.get().asInstanceOf[LongOffset] } + endOffset = end.orElse(currentOffset).asInstanceOf[LongOffset] + } + + override def readSchema(): StructType = encoder.schema + + override def deserializeOffset(json: String): OffsetV2 = LongOffset(json.toLong) + + override def getStartOffset: OffsetV2 = if (startOffset.offset == -1) null else startOffset + + override def getEndOffset: OffsetV2 = if (endOffset.offset == -1) null else endOffset + + override def createReadTasks(): util.List[ReadTask[Row]] = { + + ??? } override def getBatch(start: Option[Offset], end: Offset): DataFrame = { @@ -149,7 +165,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) } } - override def commit(end: Offset): Unit = synchronized { + override def commit(end: OffsetV2): Unit = synchronized { def check(newOffset: LongOffset): Unit = { val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt From 2777b5b38596a1fb68bcf8ee928aec1a58dc372c Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Fri, 12 Jan 2018 17:43:03 -0800 Subject: [PATCH 03/16] save so far --- .../sql/execution/streaming/memory.scala | 49 ++++++++++++------- .../sources/RateStreamSourceV2.scala | 2 +- .../spark/sql/streaming/StreamTest.scala | 2 +- .../StreamingQueryListenerSuite.scala | 5 +- .../sql/streaming/StreamingQuerySuite.scala | 17 ++++--- 5 files changed, 45 insertions(+), 30 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index f09371ae11bb7..0e70b4dfd6f2f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.streaming -import java.util +import java.{util => ju} import java.util.Optional import java.util.concurrent.atomic.AtomicInteger import javax.annotation.concurrent.GuardedBy @@ -33,9 +33,8 @@ import org.apache.spark.sql.catalyst.encoders.encoderFor import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LocalRelation, Statistics} import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._ -import org.apache.spark.sql.execution.SQLExecution -import org.apache.spark.sql.sources.v2.reader.ReadTask -import org.apache.spark.sql.sources.v2.streaming.reader.{Offset => OffsetV2, MicroBatchReader} +import org.apache.spark.sql.sources.v2.reader.{DataReader, ReadTask} +import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, Offset => OffsetV2} import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType import org.apache.spark.util.Utils @@ -122,16 +121,10 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) override def getEndOffset: OffsetV2 = if (endOffset.offset == -1) null else endOffset - override def createReadTasks(): util.List[ReadTask[Row]] = { - - ??? - } - - override def getBatch(start: Option[Offset], end: Offset): DataFrame = { + override def createReadTasks(): ju.List[ReadTask[Row]] = { // Compute the internal batch numbers to fetch: [startOrdinal, endOrdinal) - val startOrdinal = - start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset.toInt + 1 - val endOrdinal = LongOffset.convert(end).getOrElse(LongOffset(-1)).offset.toInt + 1 + val startOrdinal = startOffset.offset.toInt + 1 + val endOrdinal = endOffset.offset.toInt + 1 // Internal buffer only holds the batches after lastCommittedOffset. val newBlocks = synchronized { @@ -140,14 +133,14 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) batches.slice(sliceStart, sliceEnd) } + require(newBlocks.nonEmpty, "No data selected!") + logDebug(generateDebugString(newBlocks, startOrdinal, endOrdinal)) - newBlocks - .map(_.toDF()) - .reduceOption(_ union _) - .getOrElse { - sys.error("No data selected!") - } + newBlocks.map { ds => + val items = ds.toDF().collect() + new MemoryStreamReadTask(items).asInstanceOf[ReadTask[Row]] + }.asJava } private def generateDebugString( @@ -193,6 +186,24 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) } } +class MemoryStreamReadTask(records: Array[Row]) extends ReadTask[Row] { + override def createDataReader(): DataReader[Row] = new MemoryStreamDataReader(records) +} + +class MemoryStreamDataReader(records: Array[Row]) extends DataReader[Row] { + private var currentIndex = -1 + + override def next(): Boolean = { + // Return true as long as the new index is in the array. + currentIndex += 1 + currentIndex < records.length + } + + override def get(): Row = records(currentIndex) + + override def close(): Unit = {} +} + /** * A sink that stores the results in memory. This [[Sink]] is primarily intended for use in unit * tests and does not provide durability. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala index c0ed12cec25ef..509f69430c5ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamSourceV2.scala @@ -152,7 +152,7 @@ case class RateStreamBatchTask(vals: Seq[(Long, Long)]) extends ReadTask[Row] { } class RateStreamBatchReader(vals: Seq[(Long, Long)]) extends DataReader[Row] { - var currentIndex = -1 + private var currentIndex = -1 override def next(): Boolean = { // Return true as long as the new index is in the seq. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index d46461fa9bf6d..1423ab1a49966 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -116,7 +116,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be case class AddDataMemory[A](source: MemoryStream[A], data: Seq[A]) extends AddData { override def toString: String = s"AddData to $source: ${data.mkString(",")}" - override def addData(query: Option[StreamExecution]): (Source, Offset) = { + override def addData(query: Option[StreamExecution]): (BaseStreamingSource, Offset) = { (source, source.addData(data)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 9ff02dee288fb..a0604b5fdcdb1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -33,6 +33,7 @@ import org.apache.spark.scheduler._ import org.apache.spark.sql.{Encoder, SparkSession} import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.v2.streaming.reader.{Offset => OffsetV2} import org.apache.spark.sql.streaming.StreamingQueryListener._ import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.util.JsonProtocol @@ -273,9 +274,9 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { try { val input = new MemoryStream[Int](0, sqlContext) { @volatile var numTriggers = 0 - override def getOffset: Option[Offset] = { + override def getEndOffset: OffsetV2 = { numTriggers += 1 - super.getOffset + super.getEndOffset } } val clock = new StreamManualClock() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 2fa4595dab376..f43613d8ec22a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.streaming +import java.{util => ju} import java.util.concurrent.CountDownLatch import org.apache.commons.lang3.RandomStringUtils @@ -29,10 +30,12 @@ import org.scalatest.mockito.MockitoSugar import org.apache.spark.SparkException import org.apache.spark.internal.Logging -import org.apache.spark.sql.{DataFrame, Dataset} +import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.v2.reader.ReadTask +import org.apache.spark.sql.sources.v2.streaming.reader.{Offset => OffsetV2} import org.apache.spark.sql.streaming.util.{BlockingSource, MockSourceProvider, StreamManualClock} import org.apache.spark.sql.types.StructType import org.apache.spark.util.ManualClock @@ -207,18 +210,18 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi /** Custom MemoryStream that waits for manual clock to reach a time */ val inputData = new MemoryStream[Int](0, sqlContext) { // getOffset should take 50 ms the first time it is called - override def getOffset: Option[Offset] = { - val offset = super.getOffset - if (offset.nonEmpty) { + override def getEndOffset: OffsetV2 = { + val offset = super.getEndOffset + if (offset != null) { clock.waitTillTime(1050) } offset } // getBatch should take 100 ms the first time it is called - override def getBatch(start: Option[Offset], end: Offset): DataFrame = { - if (start.isEmpty) clock.waitTillTime(1150) - super.getBatch(start, end) + override def createReadTasks(): ju.List[ReadTask[Row]] = { + if (getStartOffset.asInstanceOf[LongOffset].offset == -1L) clock.waitTillTime(1150) + super.createReadTasks() } } From 50a541b5890f328a655a7ef1fca4f8480b9a35f0 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 16 Jan 2018 11:14:08 -0800 Subject: [PATCH 04/16] Compiles and I think also runs correctly --- .../sql/execution/streaming/MicroBatchExecution.scala | 8 ++++++-- .../org/apache/spark/sql/execution/streaming/memory.scala | 2 +- .../org/apache/spark/sql/streaming/StreamSuite.scala | 8 ++++---- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 1f9423243cf1a..9d862f6c4793e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -398,10 +398,14 @@ class MicroBatchExecution( case (reader: MicroBatchReader, available) if committedOffsets.get(reader).map(_ != available).getOrElse(true) => val current = committedOffsets.get(reader).map(off => reader.deserializeOffset(off.json)) + val availableV2: OffsetV2 = available match { + case v1: SerializedOffset => reader.deserializeOffset(v1.json) + case v2: OffsetV2 => v2 + } reader.setOffsetRange( toJava(current), - Optional.of(available.asInstanceOf[OffsetV2])) - logDebug(s"Retrieving data from $reader: $current -> $available") + Optional.of(availableV2)) + logDebug(s"Retrieving data from $reader: $current -> $availableV2") Some(reader -> new StreamingDataSourceV2Relation(reader.readSchema().toAttributes, reader)) case _ => None diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index 0e70b4dfd6f2f..56799ed24a719 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -94,7 +94,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) def addData(data: TraversableOnce[A]): Offset = { val encoded = data.toVector.map(d => encoder.toRow(d).copy()) - val plan = new LocalRelation(attributes, encoded, isStreaming = true) + val plan = new LocalRelation(attributes, encoded, isStreaming = false) val ds = Dataset[A](sqlContext.sparkSession, plan) logDebug(s"Adding ds: $ds") this.synchronized { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index c65e5d3dd75c2..d1a04833390f5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -492,16 +492,16 @@ class StreamSuite extends StreamTest { val explainWithoutExtended = q.explainInternal(false) // `extended = false` only displays the physical plan. - assert("LocalRelation".r.findAllMatchIn(explainWithoutExtended).size === 0) - assert("LocalTableScan".r.findAllMatchIn(explainWithoutExtended).size === 1) + assert("StreamingDataSourceV2Relation".r.findAllMatchIn(explainWithoutExtended).size === 0) + assert("DataSourceV2Scan".r.findAllMatchIn(explainWithoutExtended).size === 1) // Use "StateStoreRestore" to verify that it does output a streaming physical plan assert(explainWithoutExtended.contains("StateStoreRestore")) val explainWithExtended = q.explainInternal(true) // `extended = true` displays 3 logical plans (Parsed/Optimized/Optimized) and 1 physical // plan. - assert("LocalRelation".r.findAllMatchIn(explainWithExtended).size === 3) - assert("LocalTableScan".r.findAllMatchIn(explainWithExtended).size === 1) + assert("StreamingDataSourceV2Relation".r.findAllMatchIn(explainWithExtended).size === 3) + assert("DataSourceV2Scan".r.findAllMatchIn(explainWithExtended).size === 1) // Use "StateStoreRestore" to verify that it does output a streaming physical plan assert(explainWithExtended.contains("StateStoreRestore")) } finally { From fd61724c6afcab5831fe8c602ad134d0c473184b Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 16 Jan 2018 11:25:39 -0800 Subject: [PATCH 05/16] save --- .../scala/org/apache/spark/sql/execution/streaming/memory.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index 56799ed24a719..98d65e0761c4b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -23,7 +23,6 @@ import java.util.concurrent.atomic.AtomicInteger import javax.annotation.concurrent.GuardedBy import scala.collection.JavaConverters._ -import scala.collection.mutable import scala.collection.mutable.{ArrayBuffer, ListBuffer} import scala.util.control.NonFatal From a81c2ecdafd54a2c5bfb07c6f1f53546eaa96c7c Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 16 Jan 2018 14:26:28 -0800 Subject: [PATCH 06/16] fix hive --- .../org/apache/spark/sql/hive/HiveSessionStateBuilder.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala index dc92ad3b0c1ac..2d0c64cd68d71 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.SparkPlanner import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy import org.apache.spark.sql.hive.client.HiveClient import org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionResourceLoader, SessionState} @@ -101,6 +102,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session override def strategies: Seq[Strategy] = { experimentalMethods.extraStrategies ++ extraPlanningStrategies ++ Seq( + DataSourceV2Strategy, FileSourceStrategy, DataSourceStrategy(conf), SpecialLimits, From 1a4f4108118d976857778916b18499b4e0bf140c Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 26 Jan 2018 17:11:01 -0800 Subject: [PATCH 07/16] Undo changes to HiveSessionStateBuilder.scala --- .../sql/hive/HiveSessionStateBuilder.scala | 19 +------------------ 1 file changed, 1 insertion(+), 18 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala index 2d0c64cd68d71..12c74368dd184 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -24,7 +24,6 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.SparkPlanner import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy import org.apache.spark.sql.hive.client.HiveClient import org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionResourceLoader, SessionState} @@ -97,23 +96,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session override val sparkSession: SparkSession = session override def extraPlanningStrategies: Seq[Strategy] = - super.extraPlanningStrategies ++ customPlanningStrategies - - override def strategies: Seq[Strategy] = { - experimentalMethods.extraStrategies ++ - extraPlanningStrategies ++ Seq( - DataSourceV2Strategy, - FileSourceStrategy, - DataSourceStrategy(conf), - SpecialLimits, - InMemoryScans, - HiveTableScans, - Scripts, - Aggregation, - JoinSelection, - BasicOperators - ) - } + super.extraPlanningStrategies ++ customPlanningStrategies ++ Seq(HiveTableScans, Scripts) } } From a817c8d40e4ecaf5e4e0c46f43313c5cceeec54e Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 29 Jan 2018 14:27:22 -0800 Subject: [PATCH 08/16] Fixed the setOffsetRange bug --- .../sql/execution/streaming/memory.scala | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index 98c84969230bb..7de26581fc4e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -69,7 +69,10 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) @GuardedBy("this") protected var currentOffset: LongOffset = new LongOffset(-1) + @GuardedBy("this") private var startOffset = new LongOffset(-1) + + @GuardedBy("this") private var endOffset = new LongOffset(-1) /** @@ -106,21 +109,25 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) override def toString: String = s"MemoryStream[${Utils.truncatedString(output, ",")}]" override def setOffsetRange(start: Optional[OffsetV2], end: Optional[OffsetV2]): Unit = { - if (start.isPresent) { - startOffset = start.get().asInstanceOf[LongOffset] + synchronized { + startOffset = start.orElse(LongOffset(-1)).asInstanceOf[LongOffset] + endOffset = end.orElse(currentOffset).asInstanceOf[LongOffset] } - endOffset = end.orElse(currentOffset).asInstanceOf[LongOffset] } override def readSchema(): StructType = encoder.schema override def deserializeOffset(json: String): OffsetV2 = LongOffset(json.toLong) - override def getStartOffset: OffsetV2 = if (startOffset.offset == -1) null else startOffset + override def getStartOffset: OffsetV2 = synchronized { + if (startOffset.offset == -1) null else startOffset + } - override def getEndOffset: OffsetV2 = if (endOffset.offset == -1) null else endOffset + override def getEndOffset: OffsetV2 = synchronized { + if (endOffset.offset == -1) null else endOffset + } - override def createReadTasks(): ju.List[ReadTask[Row]] = { + override def createReadTasks(): ju.List[ReadTask[Row]] = synchronized { // Compute the internal batch numbers to fetch: [startOrdinal, endOrdinal) val startOrdinal = startOffset.offset.toInt + 1 val endOrdinal = endOffset.offset.toInt + 1 @@ -179,6 +186,8 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) def reset(): Unit = synchronized { batches.clear() + startOffset = LongOffset(-1) + endOffset = LongOffset(-1) currentOffset = new LongOffset(-1) lastOffsetCommitted = new LongOffset(-1) } From 35b8854ae466e0313ff926cc1efb8c423d3eefea Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 30 Jan 2018 12:42:56 -0800 Subject: [PATCH 09/16] Fixed DataSourceV2ScanExec canonicalization bug --- .../datasources/v2/DataSourceReaderHolder.scala | 10 +++------- .../datasources/v2/DataSourceV2Relation.scala | 8 ++++---- .../datasources/v2/DataSourceV2ScanExec.scala | 4 +--- 3 files changed, 8 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceReaderHolder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceReaderHolder.scala index 6093df26630cd..8324cb405f367 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceReaderHolder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceReaderHolder.scala @@ -28,9 +28,9 @@ import org.apache.spark.sql.sources.v2.reader._ trait DataSourceReaderHolder { /** - * The full output of the data source reader, without column pruning. + * The output of the data source reader, without column pruning. */ - def fullOutput: Seq[AttributeReference] + def output: Seq[Attribute] /** * The held data source reader. @@ -46,7 +46,7 @@ trait DataSourceReaderHolder { case s: SupportsPushDownFilters => s.pushedFilters().toSet case _ => Nil } - Seq(fullOutput, reader.getClass, reader.readSchema(), filters) + Seq(output, reader.getClass, reader.readSchema(), filters) } def canEqual(other: Any): Boolean @@ -61,8 +61,4 @@ trait DataSourceReaderHolder { override def hashCode(): Int = { metadata.map(Objects.hashCode).foldLeft(0)((a, b) => 31 * a + b) } - - lazy val output: Seq[Attribute] = reader.readSchema().map(_.name).map { name => - fullOutput.find(_.name == name).get - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index cba20dd902007..0e9990f5ba1cb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -17,12 +17,12 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} import org.apache.spark.sql.sources.v2.reader._ case class DataSourceV2Relation( - fullOutput: Seq[AttributeReference], + output: Seq[Attribute], reader: DataSourceV2Reader) extends LeafNode with DataSourceReaderHolder { override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2Relation] @@ -40,8 +40,8 @@ case class DataSourceV2Relation( * to the non-streaming relation. */ class StreamingDataSourceV2Relation( - fullOutput: Seq[AttributeReference], - reader: DataSourceV2Reader) extends DataSourceV2Relation(fullOutput, reader) { + output: Seq[Attribute], + reader: DataSourceV2Reader) extends DataSourceV2Relation(output, reader) { override def isStreaming: Boolean = true } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala index 2c22239e81869..68bb1428c4c78 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala @@ -35,14 +35,12 @@ import org.apache.spark.sql.types.StructType * Physical plan node for scanning data from a data source. */ case class DataSourceV2ScanExec( - fullOutput: Seq[AttributeReference], + override val output: Seq[Attribute], @transient reader: DataSourceV2Reader) extends LeafExecNode with DataSourceReaderHolder with ColumnarBatchScan { override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2ScanExec] - override def producedAttributes: AttributeSet = AttributeSet(fullOutput) - override def outputPartitioning: physical.Partitioning = reader match { case s: SupportsReportPartitioning => new DataSourcePartitioning( From e66d809fe501b19b923a88d1b4cb9df69b4ae329 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 30 Jan 2018 16:57:59 -0800 Subject: [PATCH 10/16] Fixed metrics reported by MicroBatchExecution --- .../streaming/MicroBatchExecution.scala | 19 +++--- .../sql/streaming/StreamingQuerySuite.scala | 64 +++++++++++-------- 2 files changed, 49 insertions(+), 34 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 675177e6108ed..875936d97c4ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -268,16 +268,17 @@ class MicroBatchExecution( } case s: MicroBatchReader => updateStatusMessage(s"Getting offsets from $s") - reportTimeTaken("getOffset") { - // Once v1 streaming source execution is gone, we can refactor this away. - // For now, we set the range here to get the source to infer the available end offset, - // get that offset, and then set the range again when we later execute. - s.setOffsetRange( - toJava(availableOffsets.get(s).map(off => s.deserializeOffset(off.json))), - Optional.empty()) - - (s, Option(s.getEndOffset)) + reportTimeTaken("setOffsetRange") { + // Once v1 streaming source execution is gone, we can refactor this away. + // For now, we set the range here to get the source to infer the available end offset, + // get that offset, and then set the range again when we later execute. + s.setOffsetRange( + toJava(availableOffsets.get(s).map(off => s.deserializeOffset(off.json))), + Optional.empty()) } + + val currentOffset = reportTimeTaken("getEndOffset") { s.getEndOffset() } + (s, Option(currentOffset)) }.toMap availableOffsets ++= latestOffsets.filter { case (_, o) => o.nonEmpty }.mapValues(_.get) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 519ee64a50f89..af63c349d2e28 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -18,13 +18,12 @@ package org.apache.spark.sql.streaming import java.{util => ju} +import java.util.Optional import java.util.concurrent.CountDownLatch import org.apache.commons.lang3.RandomStringUtils -import org.mockito.Mockito._ import org.scalactic.TolerantNumerics import org.scalatest.BeforeAndAfter -import org.scalatest.concurrent.Eventually._ import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.mockito.MockitoSugar @@ -38,7 +37,6 @@ import org.apache.spark.sql.sources.v2.reader.ReadTask import org.apache.spark.sql.sources.v2.streaming.reader.{Offset => OffsetV2} import org.apache.spark.sql.streaming.util.{BlockingSource, MockSourceProvider, StreamManualClock} import org.apache.spark.sql.types.StructType -import org.apache.spark.util.ManualClock class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging with MockitoSugar { @@ -209,18 +207,28 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi /** Custom MemoryStream that waits for manual clock to reach a time */ val inputData = new MemoryStream[Int](0, sqlContext) { - // getOffset should take 50 ms the first time it is called - override def getEndOffset: OffsetV2 = { - val offset = super.getEndOffset - if (offset != null) { - clock.waitTillTime(1050) + + private def dataAdded: Boolean = currentOffset.offset != -1 + + // setOffsetRange should take 50 ms the first time it is called after data is added + override def setOffsetRange(start: Optional[OffsetV2], end: Optional[OffsetV2]): Unit = { + synchronized { + if (dataAdded) clock.waitTillTime(1050) + super.setOffsetRange(start, end) + } + } + + // getEndOffset should take 100 ms the first time it is called after data is added + override def getEndOffset(): OffsetV2 = synchronized { + if (currentOffset.offset != -1) { // no data available + clock.waitTillTime(1150) } - offset + super.getEndOffset } // getBatch should take 100 ms the first time it is called - override def createReadTasks(): ju.List[ReadTask[Row]] = { - if (getStartOffset.asInstanceOf[LongOffset].offset == -1L) clock.waitTillTime(1150) + override def createReadTasks(): ju.List[ReadTask[Row]] = synchronized { + clock.waitTillTime(1350) super.createReadTasks() } } @@ -261,39 +269,44 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi AssertOnQuery(_.status.message === "Waiting for next trigger"), AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0), - // Test status and progress while offset is being fetched + // Test status and progress when setOffsetRange is being called AddData(inputData, 1, 2), - AdvanceManualClock(1000), // time = 1000 to start new trigger, will block on getOffset + AdvanceManualClock(1000), // time = 1000 to start new trigger, will block on setOffsetRange AssertStreamExecThreadIsWaitingForTime(1050), AssertOnQuery(_.status.isDataAvailable === false), AssertOnQuery(_.status.isTriggerActive === true), AssertOnQuery(_.status.message.startsWith("Getting offsets from")), AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0), - // Test status and progress while batch is being fetched - AdvanceManualClock(50), // time = 1050 to unblock getOffset + AdvanceManualClock(50), // time = 1050 to unblock setOffsetRange AssertClockTime(1050), - AssertStreamExecThreadIsWaitingForTime(1150), // will block on getBatch that needs 1150 + AssertStreamExecThreadIsWaitingForTime(1150), // will block on getEndOffset that needs 1150 + AssertOnQuery(_.status.isDataAvailable === false), + AssertOnQuery(_.status.isTriggerActive === true), + AssertOnQuery(_.status.message.startsWith("Getting offsets from")), + AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0), + + AdvanceManualClock(100), // time = 1150 to unblock getEndOffset + AssertClockTime(1150), + AssertStreamExecThreadIsWaitingForTime(1350), // will block on createReadTasks that needs 1350 AssertOnQuery(_.status.isDataAvailable === true), AssertOnQuery(_.status.isTriggerActive === true), AssertOnQuery(_.status.message === "Processing new data"), AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0), - // Test status and progress while batch is being processed - AdvanceManualClock(100), // time = 1150 to unblock getBatch - AssertClockTime(1150), - AssertStreamExecThreadIsWaitingForTime(1500), // will block in Spark job that needs 1500 + AdvanceManualClock(200), // time = 1350 to unblock createReadTasks + AssertClockTime(1350), + AssertStreamExecThreadIsWaitingForTime(1500), // will block on map task that needs 1500 AssertOnQuery(_.status.isDataAvailable === true), AssertOnQuery(_.status.isTriggerActive === true), AssertOnQuery(_.status.message === "Processing new data"), AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0), // Test status and progress while batch processing has completed - AssertOnQuery { _ => clock.getTimeMillis() === 1150 }, - AdvanceManualClock(350), // time = 1500 to unblock job + AdvanceManualClock(150), // time = 1500 to unblock map task AssertClockTime(1500), CheckAnswer(2), - AssertStreamExecThreadIsWaitingForTime(2000), + AssertStreamExecThreadIsWaitingForTime(2000), // will block until the next trigger AssertOnQuery(_.status.isDataAvailable === true), AssertOnQuery(_.status.isTriggerActive === false), AssertOnQuery(_.status.message === "Waiting for next trigger"), @@ -310,10 +323,11 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi assert(progress.numInputRows === 2) assert(progress.processedRowsPerSecond === 4.0) - assert(progress.durationMs.get("getOffset") === 50) - assert(progress.durationMs.get("getBatch") === 100) + assert(progress.durationMs.get("setOffsetRange") === 50) + assert(progress.durationMs.get("getEndOffset") === 100) assert(progress.durationMs.get("queryPlanning") === 0) assert(progress.durationMs.get("walCommit") === 0) + assert(progress.durationMs.get("addBatch") === 350) assert(progress.durationMs.get("triggerExecution") === 500) assert(progress.sources.length === 1) From 478ad17e0941e4f8a6d776fb1ce5f0dcb53ea884 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 1 Feb 2018 15:01:57 -0800 Subject: [PATCH 11/16] Reverted changes to DataSourceV2* --- .../datasources/v2/DataSourceReaderHolder.scala | 10 +++++++--- .../datasources/v2/DataSourceV2Relation.scala | 16 +++++++++++----- .../datasources/v2/DataSourceV2ScanExec.scala | 6 ++++-- 3 files changed, 22 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceReaderHolder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceReaderHolder.scala index a77d4ae9604ea..6460c97abe344 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceReaderHolder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceReaderHolder.scala @@ -28,9 +28,9 @@ import org.apache.spark.sql.sources.v2.reader._ trait DataSourceReaderHolder { /** - * The output of the data source reader, without column pruning. + * The full output of the data source reader, without column pruning. */ - def output: Seq[Attribute] + def fullOutput: Seq[AttributeReference] /** * The held data source reader. @@ -46,7 +46,7 @@ trait DataSourceReaderHolder { case s: SupportsPushDownFilters => s.pushedFilters().toSet case _ => Nil } - Seq(output, reader.getClass, reader.readSchema(), filters) + Seq(fullOutput, reader.getClass, reader.readSchema(), filters) } def canEqual(other: Any): Boolean @@ -61,4 +61,8 @@ trait DataSourceReaderHolder { override def hashCode(): Int = { metadata.map(Objects.hashCode).foldLeft(0)((a, b) => 31 * a + b) } + + lazy val output: Seq[Attribute] = reader.readSchema().map(_.name).map { name => + fullOutput.find(_.name == name).get + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index b22ae737938e1..eebfa29f91b99 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -17,13 +17,15 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation +import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} import org.apache.spark.sql.sources.v2.reader._ case class DataSourceV2Relation( - output: Seq[Attribute], - reader: DataSourceReader) extends LeafNode with DataSourceReaderHolder { + fullOutput: Seq[AttributeReference], + reader: DataSourceReader) + extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2Relation] @@ -33,6 +35,10 @@ case class DataSourceV2Relation( case _ => Statistics(sizeInBytes = conf.defaultSizeInBytes) } + + override def newInstance(): DataSourceV2Relation = { + copy(fullOutput = fullOutput.map(_.newInstance())) + } } /** @@ -40,8 +46,8 @@ case class DataSourceV2Relation( * to the non-streaming relation. */ class StreamingDataSourceV2Relation( - output: Seq[Attribute], - reader: DataSourceReader) extends DataSourceV2Relation(output, reader) { + fullOutput: Seq[AttributeReference], + reader: DataSourceReader) extends DataSourceV2Relation(fullOutput, reader) { override def isStreaming: Boolean = true } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala index 019ea791c9e25..df469af2c262a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala @@ -28,19 +28,21 @@ import org.apache.spark.sql.catalyst.plans.physical import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec} import org.apache.spark.sql.execution.streaming.continuous._ import org.apache.spark.sql.sources.v2.reader._ -import org.apache.spark.sql.sources.v2.streaming.reader.ContinuousReader +import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader import org.apache.spark.sql.types.StructType /** * Physical plan node for scanning data from a data source. */ case class DataSourceV2ScanExec( - override val output: Seq[Attribute], + fullOutput: Seq[AttributeReference], @transient reader: DataSourceReader) extends LeafExecNode with DataSourceReaderHolder with ColumnarBatchScan { override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2ScanExec] + override def producedAttributes: AttributeSet = AttributeSet(fullOutput) + override def outputPartitioning: physical.Partitioning = reader match { case s: SupportsReportPartitioning => new DataSourcePartitioning( From 3f50f33db4b49d94495e5075e0027fd5babc6779 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 1 Feb 2018 18:12:45 -0800 Subject: [PATCH 12/16] Updated package paths to fix compilation --- .../org/apache/spark/sql/execution/streaming/LongOffset.scala | 2 +- .../scala/org/apache/spark/sql/execution/streaming/memory.scala | 2 +- .../spark/sql/streaming/StreamingQueryListenerSuite.scala | 2 +- .../org/apache/spark/sql/streaming/StreamingQuerySuite.scala | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala index 491859e4bd85e..3ff5b86ac45d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.streaming -import org.apache.spark.sql.sources.v2.streaming.reader.{Offset => OffsetV2} +import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2} /** * A simple offset for sources that produce a single linear stream of data. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index 23fca107003ca..519544b20ab2e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LocalRelation, Statistics} import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._ import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} -import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, Offset => OffsetV2} +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset => OffsetV2} import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType import org.apache.spark.util.Utils diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index f3d15f7e6da2c..b96f2bcbdd644 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.scheduler._ import org.apache.spark.sql.{Encoder, SparkSession} import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.sources.v2.streaming.reader.{Offset => OffsetV2} +import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2} import org.apache.spark.sql.streaming.StreamingQueryListener._ import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.util.JsonProtocol diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 1fce3f636d1d5..ec8d6674074de 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.v2.reader.DataReaderFactory -import org.apache.spark.sql.sources.v2.streaming.reader.{Offset => OffsetV2} +import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2} import org.apache.spark.sql.streaming.util.{BlockingSource, MockSourceProvider, StreamManualClock} import org.apache.spark.sql.types.StructType From c7130486fec011fd8a553fefd59ecd056c137606 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 2 Feb 2018 14:31:46 -0800 Subject: [PATCH 13/16] Store added data as rows not datasets --- .../sql/execution/streaming/memory.scala | 66 ++++++++++--------- .../sql/streaming/StreamingQuerySuite.scala | 15 +++-- 2 files changed, 42 insertions(+), 39 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index 519544b20ab2e..19597ef82b054 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -29,10 +29,10 @@ import scala.util.control.NonFatal import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.encoders.encoderFor -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LocalRelation, Statistics} +import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._ -import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory} +import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory, SupportsScanUnsafeRow} import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset => OffsetV2} import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType @@ -53,7 +53,7 @@ object MemoryStream { * available. */ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) - extends MicroBatchReader with Logging { + extends MicroBatchReader with SupportsScanUnsafeRow with Logging { protected val encoder = encoderFor[A] private val attributes = encoder.schema.toAttributes protected val logicalPlan = StreamingExecutionRelation(this, attributes)(sqlContext.sparkSession) @@ -64,7 +64,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) * Stored in a ListBuffer to facilitate removing committed batches. */ @GuardedBy("this") - protected val batches = new ListBuffer[Dataset[A]] + protected val batches = new ListBuffer[Array[UnsafeRow]] @GuardedBy("this") protected var currentOffset: LongOffset = new LongOffset(-1) @@ -95,13 +95,12 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) } def addData(data: TraversableOnce[A]): Offset = { - val encoded = data.toVector.map(d => encoder.toRow(d).copy()) - val plan = new LocalRelation(attributes, encoded, isStreaming = false) - val ds = Dataset[A](sqlContext.sparkSession, plan) - logDebug(s"Adding ds: $ds") + val objects = data.toSeq + val rows = objects.iterator.map(d => encoder.toRow(d).copy().asInstanceOf[UnsafeRow]).toArray + logDebug(s"Adding: $objects") this.synchronized { currentOffset = currentOffset + 1 - batches += ds + batches += rows currentOffset } } @@ -127,28 +126,30 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) if (endOffset.offset == -1) null else endOffset } - override def createDataReaderFactories(): ju.List[DataReaderFactory[Row]] = synchronized { - // Compute the internal batch numbers to fetch: [startOrdinal, endOrdinal) - val startOrdinal = startOffset.offset.toInt + 1 - val endOrdinal = endOffset.offset.toInt + 1 - - // Internal buffer only holds the batches after lastCommittedOffset. - val newBlocks = synchronized { - val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1 - val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1 - assert(sliceStart <= sliceEnd, s"sliceStart: $sliceStart sliceEnd: $sliceEnd") - batches.slice(sliceStart, sliceEnd) - } + override def createUnsafeRowReaderFactories(): ju.List[DataReaderFactory[UnsafeRow]] = { + synchronized { + // Compute the internal batch numbers to fetch: [startOrdinal, endOrdinal) + val startOrdinal = startOffset.offset.toInt + 1 + val endOrdinal = endOffset.offset.toInt + 1 + + // Internal buffer only holds the batches after lastCommittedOffset. + val newBlocks = synchronized { + val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1 + val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1 + assert(sliceStart <= sliceEnd, s"sliceStart: $sliceStart sliceEnd: $sliceEnd") + batches.slice(sliceStart, sliceEnd) + } - logDebug(generateDebugString(newBlocks, startOrdinal, endOrdinal)) + logDebug(generateDebugString(newBlocks, startOrdinal, endOrdinal)) - newBlocks.map { ds => - new MemoryStreamDataReaderFactory(ds.toDF().collect()).asInstanceOf[DataReaderFactory[Row]] - }.asJava + newBlocks.map { block => + new MemoryStreamDataReaderFactory(block).asInstanceOf[DataReaderFactory[UnsafeRow]] + }.asJava + } } private def generateDebugString( - blocks: TraversableOnce[Dataset[A]], + blocks: Iterable[Array[UnsafeRow]], startOrdinal: Int, endOrdinal: Int): String = { val originalUnsupportedCheck = @@ -156,7 +157,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) try { sqlContext.setConf("spark.sql.streaming.unsupportedOperationCheck", "false") s"MemoryBatch [$startOrdinal, $endOrdinal]: " + - s"${blocks.flatMap(_.collect()).mkString(", ")}" + s"${blocks.flatten.map(row => encoder.fromRow(row)).mkString(", ")}" } finally { sqlContext.setConf("spark.sql.streaming.unsupportedOperationCheck", originalUnsupportedCheck) } @@ -193,9 +194,10 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) } -class MemoryStreamDataReaderFactory(records: Array[Row]) extends DataReaderFactory[Row] { - override def createDataReader(): DataReader[Row] = { - new DataReader[Row] { +class MemoryStreamDataReaderFactory(records: Array[UnsafeRow]) + extends DataReaderFactory[UnsafeRow] { + override def createDataReader(): DataReader[UnsafeRow] = { + new DataReader[UnsafeRow] { private var currentIndex = -1 override def next(): Boolean = { @@ -204,7 +206,7 @@ class MemoryStreamDataReaderFactory(records: Array[Row]) extends DataReaderFacto currentIndex < records.length } - override def get(): Row = records(currentIndex) + override def get(): UnsafeRow = records(currentIndex) override def close(): Unit = {} } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index ec8d6674074de..3f9aa0d1fa5be 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -29,7 +29,8 @@ import org.scalatest.mockito.MockitoSugar import org.apache.spark.SparkException import org.apache.spark.internal.Logging -import org.apache.spark.sql.{DataFrame, Dataset, Row} +import org.apache.spark.sql.{DataFrame, Dataset} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -220,16 +221,16 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi // getEndOffset should take 100 ms the first time it is called after data is added override def getEndOffset(): OffsetV2 = synchronized { - if (currentOffset.offset != -1) { // no data available - clock.waitTillTime(1150) - } + if (dataAdded) clock.waitTillTime(1150) super.getEndOffset() } // getBatch should take 100 ms the first time it is called - override def createDataReaderFactories(): ju.List[DataReaderFactory[Row]] = synchronized { - clock.waitTillTime(1350) - super.createDataReaderFactories() + override def createUnsafeRowReaderFactories(): ju.List[DataReaderFactory[UnsafeRow]] = { + synchronized { + clock.waitTillTime(1350) + super.createUnsafeRowReaderFactories() + } } } From 1204755d8bdb0e8f0627a72bc8f456fdc12fc7ea Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 2 Feb 2018 17:23:18 -0800 Subject: [PATCH 14/16] Fixed ForeachSinkSuite --- .../streaming/ForeachSinkSuite.scala | 55 +++++++------------ 1 file changed, 20 insertions(+), 35 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala index 41434e6d8b974..b249dd41a84a6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala @@ -46,49 +46,34 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf .foreach(new TestForeachWriter()) .start() - // -- batch 0 --------------------------------------- - input.addData(1, 2, 3, 4) - query.processAllAvailable() + def verifyOutput(expectedVersion: Int, expectedData: Seq[Int]): Unit = { + import ForeachSinkSuite._ - var expectedEventsForPartition0 = Seq( - ForeachSinkSuite.Open(partition = 0, version = 0), - ForeachSinkSuite.Process(value = 2), - ForeachSinkSuite.Process(value = 3), - ForeachSinkSuite.Close(None) - ) - var expectedEventsForPartition1 = Seq( - ForeachSinkSuite.Open(partition = 1, version = 0), - ForeachSinkSuite.Process(value = 1), - ForeachSinkSuite.Process(value = 4), - ForeachSinkSuite.Close(None) - ) + val events = ForeachSinkSuite.allEvents() + assert(events.size === 2) // one seq of events for each of the 2 partitions - var allEvents = ForeachSinkSuite.allEvents() - assert(allEvents.size === 2) - assert(allEvents.toSet === Set(expectedEventsForPartition0, expectedEventsForPartition1)) + // Verify both seq of events have an Open event as the first event + assert(events.map(_.head).toSet === Set(0, 1).map(p => Open(p, expectedVersion))) + + // Verify all the Process event correspond to the expected data + val allProcessEvents = events.flatMap(_.filter(_.isInstanceOf[Process[_]])) + assert(allProcessEvents.toSet === expectedData.map { data => Process(data) }.toSet) + + // Verify both seq of events have a Close event as the last event + assert(events.map(_.last).toSet === Set(Close(None), Close(None))) + } + // -- batch 0 --------------------------------------- ForeachSinkSuite.clear() + input.addData(1, 2, 3, 4) + query.processAllAvailable() + verifyOutput(expectedVersion = 0, expectedData = 1 to 4) // -- batch 1 --------------------------------------- + ForeachSinkSuite.clear() input.addData(5, 6, 7, 8) query.processAllAvailable() - - expectedEventsForPartition0 = Seq( - ForeachSinkSuite.Open(partition = 0, version = 1), - ForeachSinkSuite.Process(value = 5), - ForeachSinkSuite.Process(value = 7), - ForeachSinkSuite.Close(None) - ) - expectedEventsForPartition1 = Seq( - ForeachSinkSuite.Open(partition = 1, version = 1), - ForeachSinkSuite.Process(value = 6), - ForeachSinkSuite.Process(value = 8), - ForeachSinkSuite.Close(None) - ) - - allEvents = ForeachSinkSuite.allEvents() - assert(allEvents.size === 2) - assert(allEvents.toSet === Set(expectedEventsForPartition0, expectedEventsForPartition1)) + verifyOutput(expectedVersion = 1, expectedData = 5 to 8) query.stop() } From f0ce5df92ba79be9d868ffaa823607fc1fb5dd5c Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 6 Feb 2018 21:31:01 -0800 Subject: [PATCH 15/16] Fixed bug --- .../spark/sql/execution/streaming/memory.scala | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index 19597ef82b054..04a4215eaec48 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -140,7 +140,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) batches.slice(sliceStart, sliceEnd) } - logDebug(generateDebugString(newBlocks, startOrdinal, endOrdinal)) + logDebug(generateDebugString(newBlocks.flatten, startOrdinal, endOrdinal)) newBlocks.map { block => new MemoryStreamDataReaderFactory(block).asInstanceOf[DataReaderFactory[UnsafeRow]] @@ -149,18 +149,12 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) } private def generateDebugString( - blocks: Iterable[Array[UnsafeRow]], + blocks: Seq[UnsafeRow], startOrdinal: Int, endOrdinal: Int): String = { - val originalUnsupportedCheck = - sqlContext.getConf("spark.sql.streaming.unsupportedOperationCheck") - try { - sqlContext.setConf("spark.sql.streaming.unsupportedOperationCheck", "false") - s"MemoryBatch [$startOrdinal, $endOrdinal]: " + - s"${blocks.flatten.map(row => encoder.fromRow(row)).mkString(", ")}" - } finally { - sqlContext.setConf("spark.sql.streaming.unsupportedOperationCheck", originalUnsupportedCheck) - } + val fromRow = encoder.resolveAndBind().fromRow _ + s"MemoryBatch [$startOrdinal, $endOrdinal]: " + + s"${blocks.map(row => fromRow(row)).mkString(", ")}" } override def commit(end: OffsetV2): Unit = synchronized { From c3508e9094aced3dbbbbc52029bc6d5bc6fc955e Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 7 Feb 2018 11:58:07 -0800 Subject: [PATCH 16/16] Addressed comments --- .../org/apache/spark/sql/execution/streaming/memory.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index 04a4215eaec48..352d4ce9fbcaa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -149,12 +149,12 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) } private def generateDebugString( - blocks: Seq[UnsafeRow], + rows: Seq[UnsafeRow], startOrdinal: Int, endOrdinal: Int): String = { val fromRow = encoder.resolveAndBind().fromRow _ s"MemoryBatch [$startOrdinal, $endOrdinal]: " + - s"${blocks.map(row => fromRow(row)).mkString(", ")}" + s"${rows.map(row => fromRow(row)).mkString(", ")}" } override def commit(end: OffsetV2): Unit = synchronized {