Skip to content

Commit

Permalink
improve data source v2 explain
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Feb 21, 2018
1 parent aadf953 commit a5171e6
Show file tree
Hide file tree
Showing 14 changed files with 164 additions and 118 deletions.
Expand Up @@ -60,7 +60,7 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest {
eventually(timeout(streamingTimeout)) {
assert(
query.lastExecution.logical.collectFirst {
case StreamingDataSourceV2Relation(_, r: KafkaContinuousReader) => r
case StreamingDataSourceV2Relation(_, _, r: KafkaContinuousReader) => r
}.exists { r =>
// Ensure the new topic is present and the old topic is gone.
r.knownPartitions.exists(_.topic == topic2)
Expand Down
Expand Up @@ -47,7 +47,7 @@ trait KafkaContinuousTest extends KafkaSourceTest {
eventually(timeout(streamingTimeout)) {
assert(
query.lastExecution.logical.collectFirst {
case StreamingDataSourceV2Relation(_, r: KafkaContinuousReader) => r
case StreamingDataSourceV2Relation(_, _, r: KafkaContinuousReader) => r
}.exists(_.knownPartitions.size == newCount),
s"query never reconfigured to $newCount partitions")
}
Expand Down
Expand Up @@ -119,7 +119,7 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
} ++ (query.get.lastExecution match {
case null => Seq()
case e => e.logical.collect {
case StreamingDataSourceV2Relation(_, reader: KafkaContinuousReader) => reader
case StreamingDataSourceV2Relation(_, _, reader: KafkaContinuousReader) => reader
}
})
}.distinct
Expand Down

This file was deleted.

@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import java.util.Objects

import org.apache.commons.lang3.StringUtils

import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2.DataSourceV2
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.util.Utils

/**
* A base class for data source v2 related query plan(both logical and physical). It defines the
* equals/hashCode methods, and provides a string representation of the query plan, according to
* some common information.
*/
trait DataSourceV2QueryPlan {

/**
* The output of the data source reader, w.r.t. column pruning.
*/
def output: Seq[Attribute]

/**
* The instance of this data source implementation. Note that we only consider its class in
* equals/hashCode, not the instance itself.
*/
def source: DataSourceV2

/**
* The created data source reader. Here we use it to get the filters that has been pushed down
* so far, itself doesn't take part in the equals/hashCode.
*/
def reader: DataSourceReader

private lazy val filters = reader match {
case s: SupportsPushDownCatalystFilters => s.pushedCatalystFilters().toSet
case s: SupportsPushDownFilters => s.pushedFilters().toSet
case _ => Set.empty
}

/**
* The metadata of this data source query plan that can be used for equality check.
*/
private def metadata: Seq[Any] = Seq(output, source.getClass, filters)

def canEqual(other: Any): Boolean

override def equals(other: Any): Boolean = other match {
case other: DataSourceV2QueryPlan => canEqual(other) && metadata == other.metadata
case _ => false
}

override def hashCode(): Int = {
metadata.map(Objects.hashCode).foldLeft(0)((a, b) => 31 * a + b)
}

def metadataString: String = {
val entries = scala.collection.mutable.ArrayBuffer.empty[(String, String)]
if (filters.nonEmpty) entries += "Pushed Filters" -> filters.mkString("[", ", ", "]")

val outputStr = Utils.truncatedString(output, "[", ", ", "]")

val entriesStr = if (entries.nonEmpty) {
Utils.truncatedString(entries.map {
case (key, value) => key + ": " + StringUtils.abbreviate(redact(value), 100)
}, " (", ", ", ")")
} else {
""
}

s"${source.getClass.getSimpleName.stripSuffix("$")}$outputStr$entriesStr"
}

private def redact(text: String): String = {
Utils.redact(SQLConf.get.stringRedationPattern, text)
}
}
Expand Up @@ -35,15 +35,14 @@ case class DataSourceV2Relation(
options: Map[String, String],
projection: Seq[AttributeReference],
filters: Option[Seq[Expression]] = None,
userSpecifiedSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation {
userSpecifiedSchema: Option[StructType] = None)
extends LeafNode with MultiInstanceRelation with DataSourceV2QueryPlan {

import DataSourceV2Relation._

override def simpleString: String = {
s"DataSourceV2Relation(source=${source.name}, " +
s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " +
s"filters=[${pushedFilters.mkString(", ")}], options=$options)"
}
override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2Relation]

override def simpleString: String = "Relation " + metadataString

override lazy val schema: StructType = reader.readSchema()

Expand Down Expand Up @@ -107,17 +106,24 @@ case class DataSourceV2Relation(
}

/**
* A specialization of DataSourceV2Relation with the streaming bit set to true. Otherwise identical
* to the non-streaming relation.
* A specialization of [[DataSourceV2Relation]] with the streaming bit set to true.
*
* Note that, this plan has a mutable reader, so Spark won't apply operator push-down for this plan,
* to avoid making the plan mutable. We should consolidate this plan and [[DataSourceV2Relation]]
* after we figure out how to apply operator push-down for streaming data sources.
*/
case class StreamingDataSourceV2Relation(
output: Seq[AttributeReference],
source: DataSourceV2,
reader: DataSourceReader)
extends LeafNode with DataSourceReaderHolder with MultiInstanceRelation {
extends LeafNode with MultiInstanceRelation with DataSourceV2QueryPlan {

override def isStreaming: Boolean = true

override def canEqual(other: Any): Boolean = other.isInstanceOf[StreamingDataSourceV2Relation]

override def simpleString: String = "Streaming Relation " + metadataString

override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance()))

override def computeStats(): Statistics = reader match {
Expand Down
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical
import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec}
import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.sources.v2.DataSourceV2
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader
import org.apache.spark.sql.types.StructType
Expand All @@ -36,11 +37,14 @@ import org.apache.spark.sql.types.StructType
*/
case class DataSourceV2ScanExec(
output: Seq[AttributeReference],
@transient source: DataSourceV2,
@transient reader: DataSourceReader)
extends LeafExecNode with DataSourceReaderHolder with ColumnarBatchScan {
extends LeafExecNode with DataSourceV2QueryPlan with ColumnarBatchScan {

override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2ScanExec]

override def simpleString: String = "Scan " + metadataString

override def outputPartitioning: physical.Partitioning = reader match {
case s: SupportsReportPartitioning =>
new DataSourcePartitioning(
Expand Down
Expand Up @@ -23,11 +23,11 @@ import org.apache.spark.sql.execution.SparkPlan

object DataSourceV2Strategy extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case relation: DataSourceV2Relation =>
DataSourceV2ScanExec(relation.output, relation.reader) :: Nil
case r: DataSourceV2Relation =>
DataSourceV2ScanExec(r.output, r.source, r.reader) :: Nil

case relation: StreamingDataSourceV2Relation =>
DataSourceV2ScanExec(relation.output, relation.reader) :: Nil
case r: StreamingDataSourceV2Relation =>
DataSourceV2ScanExec(r.output, r.source, r.reader) :: Nil

case WriteToDataSourceV2(writer, query) =>
WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil
Expand Down
Expand Up @@ -26,7 +26,7 @@ object PushDownOperatorsToDataSource extends Rule[LogicalPlan] {
override def apply(
plan: LogicalPlan): LogicalPlan = plan transformUp {
// PhysicalOperation guarantees that filters are deterministic; no need to check
case PhysicalOperation(project, newFilters, relation : DataSourceV2Relation) =>
case PhysicalOperation(project, newFilters, relation: DataSourceV2Relation) =>
// merge the filters
val filters = relation.filters match {
case Some(existing) =>
Expand Down
Expand Up @@ -20,16 +20,16 @@ package org.apache.spark.sql.execution.streaming
import java.util.Optional

import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
import scala.collection.mutable.{Map => MutableMap}

import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, WriteToDataSourceV2}
import org.apache.spark.sql.execution.streaming.sources.{InternalRowMicroBatchWriter, MicroBatchWriter}
import org.apache.spark.sql.sources.v2.{DataSourceOptions, MicroBatchReadSupport, StreamWriteSupport}
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, MicroBatchReadSupport, StreamWriteSupport}
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset => OffsetV2}
import org.apache.spark.sql.sources.v2.writer.SupportsWriteInternalRow
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
Expand All @@ -52,6 +52,8 @@ class MicroBatchExecution(

@volatile protected var sources: Seq[BaseStreamingSource] = Seq.empty

private val readerToDataSourceMap = MutableMap.empty[MicroBatchReader, DataSourceV2]

private val triggerExecutor = trigger match {
case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
case OneTimeTrigger => OneTimeExecutor()
Expand All @@ -77,31 +79,32 @@ class MicroBatchExecution(
sparkSession.sqlContext.conf.disabledV2StreamingMicroBatchReaders.split(",")

val _logicalPlan = analyzedPlan.transform {
case streamingRelation@StreamingRelation(dataSourceV1, sourceName, output) =>
toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
case s @ StreamingRelation(dsV1, sourceName, output) =>
toExecutionRelationMap.getOrElseUpdate(s, {
// Materialize source to avoid creating it in every batch
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
val source = dataSourceV1.createSource(metadataPath)
val source = dsV1.createSource(metadataPath)
nextSourceId += 1
logInfo(s"Using Source [$source] from DataSourceV1 named '$sourceName' [$dataSourceV1]")
logInfo(s"Using Source [$source] from DataSourceV1 named '$sourceName' [$dsV1]")
StreamingExecutionRelation(source, output)(sparkSession)
})
case s @ StreamingRelationV2(
dataSourceV2: MicroBatchReadSupport, sourceName, options, output, _) if
!disabledSources.contains(dataSourceV2.getClass.getCanonicalName) =>

case s @ StreamingRelationV2(dsV2: MicroBatchReadSupport, sourceName, options, output, _)
if !disabledSources.contains(dsV2.getClass.getCanonicalName) =>
v2ToExecutionRelationMap.getOrElseUpdate(s, {
// Materialize source to avoid creating it in every batch
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
val reader = dataSourceV2.createMicroBatchReader(
val reader = dsV2.createMicroBatchReader(
Optional.empty(), // user specified schema
metadataPath,
new DataSourceOptions(options.asJava))
nextSourceId += 1
logInfo(s"Using MicroBatchReader [$reader] from " +
s"DataSourceV2 named '$sourceName' [$dataSourceV2]")
readerToDataSourceMap(reader) = dsV2
logInfo(s"Using MicroBatchReader [$reader] from DataSourceV2 named '$sourceName' [$dsV2]")
StreamingExecutionRelation(reader, output)(sparkSession)
})
case s @ StreamingRelationV2(dataSourceV2, sourceName, _, output, v1Relation) =>

case s @ StreamingRelationV2(dsV2, sourceName, _, output, v1Relation) =>
v2ToExecutionRelationMap.getOrElseUpdate(s, {
// Materialize source to avoid creating it in every batch
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
Expand All @@ -111,7 +114,7 @@ class MicroBatchExecution(
}
val source = v1Relation.get.dataSource.createSource(metadataPath)
nextSourceId += 1
logInfo(s"Using Source [$source] from DataSourceV2 named '$sourceName' [$dataSourceV2]")
logInfo(s"Using Source [$source] from DataSourceV2 named '$sourceName' [$dsV2]")
StreamingExecutionRelation(source, output)(sparkSession)
})
}
Expand Down Expand Up @@ -415,12 +418,14 @@ class MicroBatchExecution(
case v1: SerializedOffset => reader.deserializeOffset(v1.json)
case v2: OffsetV2 => v2
}
reader.setOffsetRange(
toJava(current),
Optional.of(availableV2))
reader.setOffsetRange(toJava(current), Optional.of(availableV2))
logDebug(s"Retrieving data from $reader: $current -> $availableV2")
Some(reader ->
new StreamingDataSourceV2Relation(reader.readSchema().toAttributes, reader))
Some(reader -> StreamingDataSourceV2Relation(
output = reader.readSchema().toAttributes,
// Provide a fake value here just in case something went wrong, e.g. the reader gives
// a wrong `equals` implementation.
source = readerToDataSourceMap.getOrElse(reader, FakeDataSourceV2),
reader = reader))
case _ => None
}
}
Expand Down Expand Up @@ -508,3 +513,5 @@ class MicroBatchExecution(
Optional.ofNullable(scalaOption.orNull)
}
}

object FakeDataSourceV2 extends DataSourceV2

0 comments on commit a5171e6

Please sign in to comment.