Permalink
Browse files

fix streaming

  • Loading branch information...
cloud-fan committed Feb 8, 2018
1 parent a3acb97 commit 0cc0600b8f6f3a46189ae38850835f34b57bd945
@@ -60,7 +60,8 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest {
eventually(timeout(streamingTimeout)) {
assert(
query.lastExecution.logical.collectFirst {
case DataSourceV2Relation(_, _, r: KafkaContinuousReader) => r
case r: DataSourceV2Relation if r.reader.isInstanceOf[KafkaContinuousReader] =>
r.reader.asInstanceOf[KafkaContinuousReader]
}.exists { r =>
// Ensure the new topic is present and the old topic is gone.
r.knownPartitions.exists(_.topic == topic2)
@@ -47,7 +47,8 @@ trait KafkaContinuousTest extends KafkaSourceTest {
eventually(timeout(streamingTimeout)) {
assert(
query.lastExecution.logical.collectFirst {
case DataSourceV2Relation(_, _, r: KafkaContinuousReader) => r
case r: DataSourceV2Relation if r.reader.isInstanceOf[KafkaContinuousReader] =>
r.reader.asInstanceOf[KafkaContinuousReader]
}.exists(_.knownPartitions.size == newCount),
s"query never reconfigured to $newCount partitions")
}
@@ -117,7 +117,8 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
} ++ (query.get.lastExecution match {
case null => Seq()
case e => e.logical.collect {
case DataSourceV2Relation(_, _, reader: KafkaContinuousReader) => reader
case r: DataSourceV2Relation if r.reader.isInstanceOf[KafkaContinuousReader] =>
r.reader.asInstanceOf[KafkaContinuousReader]
}
})
if (sources.isEmpty) {
@@ -26,12 +26,16 @@ import org.apache.spark.sql.sources.v2.reader._
case class DataSourceV2Relation(
output: Seq[AttributeReference],
source: DataSourceV2,
reader: DataSourceReader)
reader: DataSourceReader,
override val isStreaming: Boolean)
extends LeafNode with MultiInstanceRelation with DataSourceV2QueryPlan {
override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2Relation]
override def simpleString: String = s"Relation $metadataString"
override def simpleString: String = {
val streamingHeader = if (isStreaming) "Streaming " else ""
s"${streamingHeader}Relation $metadataString"
}
override def computeStats(): Statistics = reader match {
case r: SupportsReportStatistics =>
@@ -45,19 +49,8 @@ case class DataSourceV2Relation(
}
}
/**
* A specialization of DataSourceV2Relation with the streaming bit set to true. Otherwise identical
* to the non-streaming relation.
*/
class StreamingDataSourceV2Relation(
output: Seq[AttributeReference],
source: DataSourceV2,
reader: DataSourceReader) extends DataSourceV2Relation(output, source, reader) {
override def isStreaming: Boolean = true
}
object DataSourceV2Relation {
def apply(source: DataSourceV2, reader: DataSourceReader): DataSourceV2Relation = {
new DataSourceV2Relation(reader.readSchema().toAttributes, source, reader)
new DataSourceV2Relation(reader.readSchema().toAttributes, source, reader, isStreaming = false)
}
}
@@ -23,8 +23,8 @@ import org.apache.spark.sql.execution.SparkPlan
object DataSourceV2Strategy extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case DataSourceV2Relation(output, source, reader) =>
DataSourceV2ScanExec(output, source, reader) :: Nil
case r: DataSourceV2Relation =>
DataSourceV2ScanExec(r.output, r.source, r.reader) :: Nil
case WriteToDataSourceV2(writer, query) =>
WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil
@@ -39,11 +39,11 @@ object PushDownOperatorsToDataSource extends Rule[LogicalPlan] with PredicateHel
// TODO: Ideally column pruning should be implemented via a plan property that is propagated
// top-down, then we can simplify the logic here and only collect target operators.
val filterPushed = plan transformUp {
case FilterAndProject(fields, condition, r @ DataSourceV2Relation(_, _, reader)) =>
case FilterAndProject(fields, condition, r: DataSourceV2Relation) =>
val (candidates, nonDeterministic) =
splitConjunctivePredicates(condition).partition(_.deterministic)
val stayUpFilters: Seq[Expression] = reader match {
val stayUpFilters: Seq[Expression] = r.reader match {
case r: SupportsPushDownCatalystFilters =>
r.pushCatalystFilters(candidates.toArray)
@@ -27,9 +27,9 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, WriteToDataSourceV2}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2}
import org.apache.spark.sql.execution.streaming.sources.{InternalRowMicroBatchWriter, MicroBatchWriter}
import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceOptions, MicroBatchReadSupport, StreamWriteSupport}
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, MicroBatchReadSupport, StreamWriteSupport}
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset => OffsetV2}
import org.apache.spark.sql.sources.v2.writer.SupportsWriteInternalRow
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
@@ -410,12 +410,13 @@ class MicroBatchExecution(
}
reader.setOffsetRange(toJava(current), Optional.of(availableV2))
logDebug(s"Retrieving data from $reader: $current -> $availableV2")
Some(reader -> new StreamingDataSourceV2Relation(
Some(reader -> new DataSourceV2Relation(
reader.readSchema().toAttributes,
// Provide a fake value here just in case something went wrong, e.g. the reader gives
// a wrong `equals` implementation.
readerToDataSourceMap.getOrElse(reader, FakeDataSourceV2),
reader))
reader,
isStreaming = true))
case _ => None
}
}
@@ -29,7 +29,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentDate, CurrentTimestamp}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, StreamingDataSourceV2Relation, WriteToDataSourceV2}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2}
import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, StreamWriteSupport}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, PartitionOffset}
@@ -180,7 +180,7 @@ class ContinuousExecution(
val loggedOffset = offsets.offsets(0)
val realOffset = loggedOffset.map(off => reader.deserializeOffset(off.json))
reader.setStartOffset(java.util.Optional.ofNullable(realOffset.orNull))
new StreamingDataSourceV2Relation(newOutput, ds, reader)
new DataSourceV2Relation(newOutput, ds, reader, isStreaming = true)
}
// Rewire the plan to use the new attributes that were returned by the source.
@@ -201,7 +201,8 @@ class ContinuousExecution(
val withSink = WriteToDataSourceV2(writer, triggerLogicalPlan)
val reader = withSink.collect {
case DataSourceV2Relation(_, _, r: ContinuousReader) => r
case r: DataSourceV2Relation if r.reader.isInstanceOf[ContinuousReader] =>
r.reader.asInstanceOf[ContinuousReader]
}.head
reportTimeTaken("queryPlanning") {
@@ -492,16 +492,16 @@ class StreamSuite extends StreamTest {
val explainWithoutExtended = q.explainInternal(false)
// `extended = false` only displays the physical plan.
assert("StreamingDataSourceV2Relation".r.findAllMatchIn(explainWithoutExtended).size === 0)
assert("DataSourceV2Scan".r.findAllMatchIn(explainWithoutExtended).size === 1)
assert("Streaming Relation".r.findAllMatchIn(explainWithoutExtended).size === 0)
assert("Scan FakeDataSourceV2".r.findAllMatchIn(explainWithoutExtended).size === 1)
// Use "StateStoreRestore" to verify that it does output a streaming physical plan
assert(explainWithoutExtended.contains("StateStoreRestore"))
val explainWithExtended = q.explainInternal(true)
// `extended = true` displays 3 logical plans (Parsed/Optimized/Optimized) and 1 physical
// plan.
assert("StreamingDataSourceV2Relation".r.findAllMatchIn(explainWithExtended).size === 3)
assert("DataSourceV2Scan".r.findAllMatchIn(explainWithExtended).size === 1)
assert("Streaming Relation".r.findAllMatchIn(explainWithExtended).size === 3)
assert("Scan FakeDataSourceV2".r.findAllMatchIn(explainWithExtended).size === 1)
// Use "StateStoreRestore" to verify that it does output a streaming physical plan
assert(explainWithExtended.contains("StateStoreRestore"))
} finally {

0 comments on commit 0cc0600

Please sign in to comment.