Skip to content

Commit

Permalink
[SPARK-31255][SQL] Add SupportsMetadataColumns to DSv2
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This adds support for metadata columns to DataSourceV2. If a source implements `SupportsMetadataColumns` it must also implement `SupportsPushDownRequiredColumns` to support projecting those columns.

The analyzer is updated to resolve metadata columns from `LogicalPlan.metadataOutput`, and this adds a rule that will add metadata columns to the output of `DataSourceV2Relation` if one is used.

### Why are the changes needed?

This is the solution discussed for exposing additional data in the Kafka source. It is also needed for a generic `MERGE INTO` plan.

### Does this PR introduce any user-facing change?

Yes. Users can project additional columns from sources that implement the new API. This also updates `DescribeTableExec` to show metadata columns.

### How was this patch tested?

Will include new unit tests.

Closes #28027 from rdblue/add-dsv2-metadata-columns.

Authored-by: Ryan Blue <blue@apache.org>
Signed-off-by: Burak Yavuz <brkyvz@gmail.com>
  • Loading branch information
rdblue authored and brkyvz committed Nov 18, 2020
1 parent 27cd945 commit 1df69f7
Show file tree
Hide file tree
Showing 11 changed files with 296 additions and 22 deletions.
@@ -0,0 +1,58 @@
package org.apache.spark.sql.connector.catalog;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.types.DataType;

/**
* Interface for a metadata column.
* <p>
* A metadata column can expose additional metadata about a row. For example, rows from Kafka can
* use metadata columns to expose a message's topic, partition number, and offset.
* <p>
* A metadata column could also be the result of a transform applied to a value in the row. For
* example, a partition value produced by bucket(id, 16) could be exposed by a metadata column. In
* this case, {@link #transform()} should return a non-null {@link Transform} that produced the
* metadata column's values.
*/
@Evolving
public interface MetadataColumn {
/**
* The name of this metadata column.
*
* @return a String name
*/
String name();

/**
* The data type of values in this metadata column.
*
* @return a {@link DataType}
*/
DataType dataType();

/**
* @return whether values produced by this metadata column may be null
*/
default boolean isNullable() {
return true;
}

/**
* Documentation for this metadata column, or null.
*
* @return a documentation String
*/
default String comment() {
return null;
}

/**
* The {@link Transform} used to produce this metadata column from data rows, or null.
*
* @return a {@link Transform} used to produce the column's values, or null if there isn't one
*/
default Transform transform() {
return null;
}
}
@@ -0,0 +1,37 @@
package org.apache.spark.sql.connector.catalog;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

/**
* An interface for exposing data columns for a table that are not in the table schema. For example,
* a file source could expose a "file" column that contains the path of the file that contained each
* row.
* <p>
* The columns returned by {@link #metadataColumns()} may be passed as {@link StructField} in
* requested projections. Sources that implement this interface and column projection using
* {@link SupportsPushDownRequiredColumns} must accept metadata fields passed to
* {@link SupportsPushDownRequiredColumns#pruneColumns(StructType)}.
* <p>
* If a table column and a metadata column have the same name, the metadata column will never be
* requested. It is recommended that Table implementations reject data column name that conflict
* with metadata column names.
*/
@Evolving
public interface SupportsMetadataColumns extends Table {
/**
* Metadata columns that are supported by this {@link Table}.
* <p>
* The columns returned by this method may be passed as {@link StructField} in requested
* projections using {@link SupportsPushDownRequiredColumns#pruneColumns(StructType)}.
* <p>
* If a table column and a metadata column have the same name, the metadata column will never be
* requested and is ignored. It is recommended that Table implementations reject data column names
* that conflict with metadata column names.
*
* @return an array of {@link MetadataColumn}
*/
MetadataColumn[] metadataColumns();
}
Expand Up @@ -221,6 +221,7 @@ class Analyzer(override val catalogManager: CatalogManager)
ResolveRelations ::
ResolveTables ::
ResolvePartitionSpec ::
AddMetadataColumns ::
ResolveReferences ::
ResolveCreateNamedStruct ::
ResolveDeserializer ::
Expand Down Expand Up @@ -916,6 +917,29 @@ class Analyzer(override val catalogManager: CatalogManager)
}
}

/**
* Adds metadata columns to output for child relations when nodes are missing resolved attributes.
*
* References to metadata columns are resolved using columns from [[LogicalPlan.metadataOutput]],
* but the relation's output does not include the metadata columns until the relation is replaced
* using [[DataSourceV2Relation.withMetadataColumns()]]. Unless this rule adds metadata to the
* relation's output, the analyzer will detect that nothing produces the columns.
*
* This rule only adds metadata columns when a node is resolved but is missing input from its
* children. This ensures that metadata columns are not added to the plan unless they are used. By
* checking only resolved nodes, this ensures that * expansion is already done so that metadata
* columns are not accidentally selected by *.
*/
object AddMetadataColumns extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
case node if node.resolved && node.children.nonEmpty && node.missingInput.nonEmpty =>
node resolveOperatorsUp {
case rel: DataSourceV2Relation =>
rel.withMetadataColumns()
}
}
}

/**
* Resolve table relations with concrete relations from v2 catalog.
*
Expand Down
Expand Up @@ -33,6 +33,9 @@ abstract class LogicalPlan
with QueryPlanConstraints
with Logging {

/** Metadata fields that can be projected from this node */
def metadataOutput: Seq[Attribute] = children.flatMap(_.metadataOutput)

/** Returns true if this subtree has data from a streaming data source. */
def isStreaming: Boolean = children.exists(_.isStreaming)

Expand Down Expand Up @@ -86,7 +89,8 @@ abstract class LogicalPlan
}
}

private[this] lazy val childAttributes = AttributeSeq(children.flatMap(_.output))
private[this] lazy val childAttributes =
AttributeSeq(children.flatMap(c => c.output ++ c.metadataOutput))

private[this] lazy val outputAttributes = AttributeSeq(output)

Expand Down
Expand Up @@ -886,6 +886,12 @@ case class SubqueryAlias(
val qualifierList = identifier.qualifier :+ alias
child.output.map(_.withQualifier(qualifierList))
}

override def metadataOutput: Seq[Attribute] = {
val qualifierList = identifier.qualifier :+ alias
child.metadataOutput.map(_.withQualifier(qualifierList))
}

override def doCanonicalize(): LogicalPlan = child.canonicalized
}

Expand Down
Expand Up @@ -21,7 +21,9 @@ import scala.collection.JavaConverters._

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{PartitionSpec, ResolvedPartitionSpec, UnresolvedPartitionSpec}
import org.apache.spark.sql.connector.catalog.{SupportsAtomicPartitionManagement, SupportsDelete, SupportsPartitionManagement, SupportsRead, SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.connector.catalog.{MetadataColumn, SupportsAtomicPartitionManagement, SupportsDelete, SupportsPartitionManagement, SupportsRead, SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap

object DataSourceV2Implicits {
Expand Down Expand Up @@ -78,6 +80,18 @@ object DataSourceV2Implicits {
def supportsAny(capabilities: TableCapability*): Boolean = capabilities.exists(supports)
}

implicit class MetadataColumnsHelper(metadata: Array[MetadataColumn]) {
def asStruct: StructType = {
val fields = metadata.map { metaCol =>
val field = StructField(metaCol.name, metaCol.dataType, metaCol.isNullable)
Option(metaCol.comment).map(field.withComment).getOrElse(field)
}
StructType(fields)
}

def toAttributes: Seq[AttributeReference] = asStruct.toAttributes
}

implicit class OptionsHelper(options: Map[String, String]) {
def asOptions: CaseInsensitiveStringMap = {
new CaseInsensitiveStringMap(options.asJava)
Expand Down
Expand Up @@ -21,10 +21,11 @@ import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelat
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, Table, TableCapability}
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, MetadataColumn, SupportsMetadataColumns, Table, TableCapability}
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, Statistics => V2Statistics, SupportsReportStatistics}
import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream}
import org.apache.spark.sql.connector.write.WriteBuilder
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.Utils

Expand All @@ -48,6 +49,21 @@ case class DataSourceV2Relation(

import DataSourceV2Implicits._

override lazy val metadataOutput: Seq[AttributeReference] = table match {
case hasMeta: SupportsMetadataColumns =>
val resolve = SQLConf.get.resolver
val outputNames = outputSet.map(_.name)
def isOutputColumn(col: MetadataColumn): Boolean = {
outputNames.exists(name => resolve(col.name, name))
}
// filter out metadata columns that have names conflicting with output columns. if the table
// has a column "line" and the table can produce a metadata column called "line", then the
// data column should be returned, not the metadata column.
hasMeta.metadataColumns.filterNot(isOutputColumn).toAttributes
case _ =>
Nil
}

override def name: String = table.name()

override def skipSchemaResolution: Boolean = table.supports(TableCapability.ACCEPT_ANY_SCHEMA)
Expand Down Expand Up @@ -78,6 +94,14 @@ case class DataSourceV2Relation(
override def newInstance(): DataSourceV2Relation = {
copy(output = output.map(_.newInstance()))
}

def withMetadataColumns(): DataSourceV2Relation = {
if (metadataOutput.nonEmpty) {
DataSourceV2Relation(table, output ++ metadataOutput, catalog, identifier, options)
} else {
this
}
}
}

/**
Expand Down
Expand Up @@ -27,15 +27,17 @@ import scala.collection.mutable
import org.scalatest.Assertions._

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, JoinedRow}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.expressions.{BucketTransform, DaysTransform, HoursTransform, IdentityTransform, MonthsTransform, Transform, YearsTransform}
import org.apache.spark.sql.connector.read._
import org.apache.spark.sql.connector.write._
import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite}
import org.apache.spark.sql.sources.{And, EqualTo, Filter, IsNotNull}
import org.apache.spark.sql.types.{DataType, DateType, StructType, TimestampType}
import org.apache.spark.sql.types.{DataType, DateType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.unsafe.types.UTF8String

/**
* A simple in-memory table. Rows are stored as a buffered group produced by each output task.
Expand All @@ -45,7 +47,24 @@ class InMemoryTable(
val schema: StructType,
override val partitioning: Array[Transform],
override val properties: util.Map[String, String])
extends Table with SupportsRead with SupportsWrite with SupportsDelete {
extends Table with SupportsRead with SupportsWrite with SupportsDelete
with SupportsMetadataColumns {

private object PartitionKeyColumn extends MetadataColumn {
override def name: String = "_partition"
override def dataType: DataType = StringType
override def comment: String = "Partition key used to store the row"
}

private object IndexColumn extends MetadataColumn {
override def name: String = "index"
override def dataType: DataType = StringType
override def comment: String = "Metadata column used to conflict with a data column"
}

// purposely exposes a metadata column that conflicts with a data column in some tests
override val metadataColumns: Array[MetadataColumn] = Array(IndexColumn, PartitionKeyColumn)
private val metadataColumnNames = metadataColumns.map(_.name).toSet -- schema.map(_.name)

private val allowUnsupportedTransforms =
properties.getOrDefault("allow-unsupported-transforms", "false").toBoolean
Expand Down Expand Up @@ -146,7 +165,7 @@ class InMemoryTable(
val key = getKey(row)
dataMap += dataMap.get(key)
.map(key -> _.withRow(row))
.getOrElse(key -> new BufferedRows().withRow(row))
.getOrElse(key -> new BufferedRows(key.toArray.mkString("/")).withRow(row))
})
this
}
Expand All @@ -160,17 +179,38 @@ class InMemoryTable(
TableCapability.TRUNCATE).asJava

override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
() => new InMemoryBatchScan(data.map(_.asInstanceOf[InputPartition]))
new InMemoryScanBuilder(schema)
}

class InMemoryScanBuilder(tableSchema: StructType) extends ScanBuilder
with SupportsPushDownRequiredColumns {
private var schema: StructType = tableSchema

override def build: Scan =
new InMemoryBatchScan(data.map(_.asInstanceOf[InputPartition]), schema)

override def pruneColumns(requiredSchema: StructType): Unit = {
// if metadata columns are projected, return the table schema and metadata columns
val hasMetadataColumns = requiredSchema.map(_.name).exists(metadataColumnNames.contains)
if (hasMetadataColumns) {
schema = StructType(tableSchema ++ metadataColumnNames
.flatMap(name => metadataColumns.find(_.name == name))
.map(col => StructField(col.name, col.dataType, col.isNullable)))
}
}
}

class InMemoryBatchScan(data: Array[InputPartition]) extends Scan with Batch {
class InMemoryBatchScan(data: Array[InputPartition], schema: StructType) extends Scan with Batch {
override def readSchema(): StructType = schema

override def toBatch: Batch = this

override def planInputPartitions(): Array[InputPartition] = data

override def createReaderFactory(): PartitionReaderFactory = BufferedRowsReaderFactory
override def createReaderFactory(): PartitionReaderFactory = {
val metadataColumns = schema.map(_.name).filter(metadataColumnNames.contains)
new BufferedRowsReaderFactory(metadataColumns)
}
}

override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
Expand Down Expand Up @@ -340,7 +380,8 @@ object InMemoryTable {
}
}

class BufferedRows extends WriterCommitMessage with InputPartition with Serializable {
class BufferedRows(
val key: String = "") extends WriterCommitMessage with InputPartition with Serializable {
val rows = new mutable.ArrayBuffer[InternalRow]()

def withRow(row: InternalRow): BufferedRows = {
Expand All @@ -349,21 +390,32 @@ class BufferedRows extends WriterCommitMessage with InputPartition with Serializ
}
}

private object BufferedRowsReaderFactory extends PartitionReaderFactory {
private class BufferedRowsReaderFactory(
metadataColumns: Seq[String]) extends PartitionReaderFactory {
override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
new BufferedRowsReader(partition.asInstanceOf[BufferedRows])
new BufferedRowsReader(partition.asInstanceOf[BufferedRows], metadataColumns)
}
}

private class BufferedRowsReader(partition: BufferedRows) extends PartitionReader[InternalRow] {
private class BufferedRowsReader(
partition: BufferedRows,
metadataColumns: Seq[String]) extends PartitionReader[InternalRow] {
private def addMetadata(row: InternalRow): InternalRow = {
val metadataRow = new GenericInternalRow(metadataColumns.map {
case "index" => index
case "_partition" => UTF8String.fromString(partition.key)
}.toArray)
new JoinedRow(row, metadataRow)
}

private var index: Int = -1

override def next(): Boolean = {
index += 1
index < partition.rows.length
}

override def get(): InternalRow = partition.rows(index)
override def get(): InternalRow = addMetadata(partition.rows(index))

override def close(): Unit = {}
}
Expand Down

0 comments on commit 1df69f7

Please sign in to comment.