Skip to content

Commit

Permalink
Reverted changes to DataSourceV2*
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Feb 1, 2018
1 parent 5adf1fe commit 478ad17
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import org.apache.spark.sql.sources.v2.reader._
trait DataSourceReaderHolder {

/**
* The output of the data source reader, without column pruning.
* The full output of the data source reader, without column pruning.
*/
def output: Seq[Attribute]
def fullOutput: Seq[AttributeReference]

/**
* The held data source reader.
Expand All @@ -46,7 +46,7 @@ trait DataSourceReaderHolder {
case s: SupportsPushDownFilters => s.pushedFilters().toSet
case _ => Nil
}
Seq(output, reader.getClass, reader.readSchema(), filters)
Seq(fullOutput, reader.getClass, reader.readSchema(), filters)
}

def canEqual(other: Any): Boolean
Expand All @@ -61,4 +61,8 @@ trait DataSourceReaderHolder {
override def hashCode(): Int = {
metadata.map(Objects.hashCode).foldLeft(0)((a, b) => 31 * a + b)
}

lazy val output: Seq[Attribute] = reader.readSchema().map(_.name).map { name =>
fullOutput.find(_.name == name).get
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
import org.apache.spark.sql.sources.v2.reader._

case class DataSourceV2Relation(
output: Seq[Attribute],
reader: DataSourceReader) extends LeafNode with DataSourceReaderHolder {
fullOutput: Seq[AttributeReference],
reader: DataSourceReader)
extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {

override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2Relation]

Expand All @@ -33,15 +35,19 @@ case class DataSourceV2Relation(
case _ =>
Statistics(sizeInBytes = conf.defaultSizeInBytes)
}

override def newInstance(): DataSourceV2Relation = {
copy(fullOutput = fullOutput.map(_.newInstance()))
}
}

/**
* A specialization of DataSourceV2Relation with the streaming bit set to true. Otherwise identical
* to the non-streaming relation.
*/
class StreamingDataSourceV2Relation(
output: Seq[Attribute],
reader: DataSourceReader) extends DataSourceV2Relation(output, reader) {
fullOutput: Seq[AttributeReference],
reader: DataSourceReader) extends DataSourceV2Relation(fullOutput, reader) {
override def isStreaming: Boolean = true
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,21 @@ import org.apache.spark.sql.catalyst.plans.physical
import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec}
import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.streaming.reader.ContinuousReader
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader
import org.apache.spark.sql.types.StructType

/**
* Physical plan node for scanning data from a data source.
*/
case class DataSourceV2ScanExec(
override val output: Seq[Attribute],
fullOutput: Seq[AttributeReference],
@transient reader: DataSourceReader)
extends LeafExecNode with DataSourceReaderHolder with ColumnarBatchScan {

override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2ScanExec]

override def producedAttributes: AttributeSet = AttributeSet(fullOutput)

override def outputPartitioning: physical.Partitioning = reader match {
case s: SupportsReportPartitioning =>
new DataSourcePartitioning(
Expand Down

0 comments on commit 478ad17

Please sign in to comment.