Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.v2.avro

import java.util

import org.apache.spark.sql.avro.AvroFileFormat
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.execution.datasources.FileFormat
Expand All @@ -29,15 +31,15 @@ class AvroDataSourceV2 extends FileDataSourceV2 {

override def shortName(): String = "avro"

override def getTable(options: CaseInsensitiveStringMap): Table = {
val paths = getPaths(options)
override def getTable(properties: util.Map[String, String]): Table = {
val paths = getPaths(properties)
val tableName = getTableName(paths)
AvroTable(tableName, sparkSession, options, paths, None, fallbackFileFormat)
AvroTable(tableName, sparkSession, properties, paths, None, fallbackFileFormat)
}

override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
val paths = getPaths(options)
override def getTable(schema: StructType, properties: util.Map[String, String]): Table = {
val paths = getPaths(properties)
val tableName = getTableName(paths)
AvroTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat)
AvroTable(tableName, sparkSession, properties, paths, Some(schema), fallbackFileFormat)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
case class AvroTable(
name: String,
sparkSession: SparkSession,
options: CaseInsensitiveStringMap,
options: java.util.Map[String, String],
paths: Seq[String],
userSpecifiedSchema: Option[StructType],
fallbackFileFormat: Class[_ <: FileFormat])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.kafka010.KafkaConfigUpdater
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability, TableProvider}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.read.{Batch, Scan, ScanBuilder}
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.connector.write.{BatchWrite, WriteBuilder}
Expand Down Expand Up @@ -108,8 +109,21 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
failOnDataLoss(caseInsensitiveParameters))
}

override def getTable(options: CaseInsensitiveStringMap): KafkaTable = {
val includeHeaders = options.getBoolean(INCLUDE_HEADERS, false)
override def getTable(schema: StructType, properties: ju.Map[String, String]): Table = {
throw new UnsupportedOperationException(
"Kafka source does not support user-specified schema/partitioning.")
}

override def getTable(
schema: StructType,
partitioning: Array[Transform],
properties: ju.Map[String, String]): Table = {
throw new UnsupportedOperationException(
"Kafka source does not support user-specified schema/partitioning.")
}

override def getTable(properties: ju.Map[String, String]): KafkaTable = {
val includeHeaders = Option(properties.get(INCLUDE_HEADERS)).map(_.toBoolean).getOrElse(false)
new KafkaTable(includeHeaders)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.spark.sql.connector.catalog;

import java.util.Map;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

Expand All @@ -36,26 +39,44 @@
public interface TableProvider {

/**
* Return a {@link Table} instance to do read/write with user-specified options.
* Return a {@link Table} instance with the user-specified table properties to do read/write.
* Implementations should infer the table schema and partitioning.
*
* @param properties The user-specified table properties that can identify a table, e.g. file
* path, Kafka topic name, etc. The properties map may be
* {@link CaseInsensitiveStringMap}.
*/
Table getTable(Map<String, String> properties);

/**
* Return a {@link Table} instance with the user-specified table schema and properties to do
* read/write. Implementations should infer the table partitioning.
*
* @param schema The user-specified table schema.
* @param properties The user-specified table properties that can identify a table, e.g. file
* path, Kafka topic name, etc. The properties map may be
* {@link CaseInsensitiveStringMap}.
*
* @param options the user-specified options that can identify a table, e.g. file path, Kafka
* topic name, etc. It's an immutable case-insensitive string-to-string map.
* @throws IllegalArgumentException if the user-specified schema does not match the actual table
* schema.
*/
Table getTable(CaseInsensitiveStringMap options);
Table getTable(StructType schema, Map<String, String> properties);

/**
* Return a {@link Table} instance to do read/write with user-specified schema and options.
* <p>
* By default this method throws {@link UnsupportedOperationException}, implementations should
* override this method to handle user-specified schema.
* </p>
* @param options the user-specified options that can identify a table, e.g. file path, Kafka
* topic name, etc. It's an immutable case-insensitive string-to-string map.
* @param schema the user-specified schema.
* @throws UnsupportedOperationException
* Return a {@link Table} instance with the user-specified table schema, partitioning and
* properties to do read/write. The returned table must report the same schema and partitioning
* with the user-specified ones, or Spark will fail the operation.
*
* @param schema The user-specified table schema.
* @param partitioning The user-specified table partitioning.
* @param properties The properties of the table to load. It should be sufficient to define and
* access a table. The properties map may be {@link CaseInsensitiveStringMap}.
*
* @throws IllegalArgumentException if the user-specified schema/partitioning does not match the
* actual table schema/partitioning.
*/
default Table getTable(CaseInsensitiveStringMap options, StructType schema) {
throw new UnsupportedOperationException(
this.getClass().getSimpleName() + " source does not support user-specified schema");
}
Table getTable(
StructType schema,
Transform[] partitioning,
Map<String, String> properties);
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,16 @@ import org.apache.spark.sql.types.StructType
/**
* An implementation of catalog v2 `Table` to expose v1 table metadata.
*/
private[sql] case class V1Table(v1Table: CatalogTable) extends Table {
private[sql] case class V1Table(catalogTable: CatalogTable) extends Table {
assert(catalogTable.provider.isDefined)

implicit class IdentifierHelper(identifier: TableIdentifier) {
def quoted: String = {
identifier.database match {
case Some(db) =>
Seq(db, identifier.table).map(quote).mkString(".")
case _ =>
quote(identifier.table)

}
}

Expand All @@ -51,38 +52,36 @@ private[sql] case class V1Table(v1Table: CatalogTable) extends Table {
}
}

def catalogTable: CatalogTable = v1Table

lazy val options: Map[String, String] = {
v1Table.storage.locationUri match {
catalogTable.storage.locationUri match {
case Some(uri) =>
v1Table.storage.properties + ("path" -> uri.toString)
catalogTable.storage.properties + ("path" -> uri.toString)
case _ =>
v1Table.storage.properties
catalogTable.storage.properties
}
}

override lazy val properties: util.Map[String, String] = v1Table.properties.asJava
override lazy val properties: util.Map[String, String] = catalogTable.properties.asJava
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I leave the options/properties unchanged here, but we need to figure it out later. Currently there are 2 directions:

  1. We have table options and table properties. Table options are special table properties with "option." prefix in its name. Table options will be extracted and used as scan/write options.
  2. We only have table properties. The OPTIONS clause in CREATE TABLE should be the same as the TBLPROPERTIES clause.

We can have more discussion about it later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

V2 has only table properties, so I think the question is whether we want to mix options into those table properties directly, or whether we want to prefix them so they can be recovered. I'm in favor of being able to recover them so we can constract the catalog table as it would be in the v1 path.


override lazy val schema: StructType = v1Table.schema
override lazy val schema: StructType = catalogTable.schema

override lazy val partitioning: Array[Transform] = {
val partitions = new mutable.ArrayBuffer[Transform]()

v1Table.partitionColumnNames.foreach { col =>
catalogTable.partitionColumnNames.foreach { col =>
partitions += LogicalExpressions.identity(col)
}

v1Table.bucketSpec.foreach { spec =>
catalogTable.bucketSpec.foreach { spec =>
partitions += LogicalExpressions.bucket(spec.numBuckets, spec.bucketColumnNames: _*)
}

partitions.toArray
}

override def name: String = v1Table.identifier.quoted
override def name: String = catalogTable.identifier.quoted

override def capabilities: util.Set[TableCapability] = new util.HashSet[TableCapability]()

override def toString: String = s"UnresolvedTable($name)"
override def toString: String = s"V1Table($name)"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Good catch, this would have been confusing.

}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ class InMemoryTable(
this
}

def clear(): Unit = dataMap.synchronized(dataMap.clear())

override def capabilities: util.Set[TableCapability] = Set(
TableCapability.BATCH_READ,
TableCapability.BATCH_WRITE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
val finalOptions = sessionOptions ++ extraOptions.toMap ++ pathsOption
val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava)
val table = userSpecifiedSchema match {
case Some(schema) => provider.getTable(dsOptions, schema)
case Some(schema) => provider.getTable(schema, dsOptions)
case _ => provider.getTable(dsOptions)
}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability, TableProvider}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, SupportsTruncate, WriteBuilder, WriterCommitMessage}
import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite}
import org.apache.spark.sql.sources.DataSourceRegister
Expand All @@ -35,7 +36,20 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
*/
class NoopDataSource extends TableProvider with DataSourceRegister {
override def shortName(): String = "noop"
override def getTable(options: CaseInsensitiveStringMap): Table = NoopTable
override def getTable(properties: util.Map[String, String]): Table = NoopTable

override def getTable(schema: StructType, properties: util.Map[String, String]): Table = {
throw new UnsupportedOperationException(
"Cannot read noop source with user-specified schema/partitioning.")
}

override def getTable(
schema: StructType,
partitioning: Array[Transform],
properties: util.Map[String, String]): Table = {
throw new UnsupportedOperationException(
"Cannot read noop source with user-specified schema/partitioning.")
}
}

private[noop] object NoopTable extends Table with SupportsWrite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,24 @@
*/
package org.apache.spark.sql.execution.datasources.v2

import java.util

import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog.TableProvider
import org.apache.spark.sql.connector.catalog.{Table, TableProvider}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils

/**
* A base interface for data source v2 implementations of the built-in file-based data sources.
*/
trait FileDataSourceV2 extends TableProvider with DataSourceRegister {

/**
* Returns a V1 [[FileFormat]] class of the same file data source.
* This is a solution for the following cases:
Expand All @@ -41,7 +45,7 @@ trait FileDataSourceV2 extends TableProvider with DataSourceRegister {

lazy val sparkSession = SparkSession.active

protected def getPaths(map: CaseInsensitiveStringMap): Seq[String] = {
protected def getPaths(map: java.util.Map[String, String]): Seq[String] = {
val objectMapper = new ObjectMapper()
val paths = Option(map.get("paths")).map { pathStr =>
objectMapper.readValue(pathStr, classOf[Array[String]]).toSeq
Expand All @@ -59,4 +63,12 @@ trait FileDataSourceV2 extends TableProvider with DataSourceRegister {
val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toString
}

override def getTable(
schema: StructType,
partitioning: Array[Transform],
properties: util.Map[String, String]): Table = {
throw new UnsupportedOperationException(
"file source v2 does not support user-specified partitioning yet.")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,20 @@ import org.apache.spark.sql.util.SchemaUtils

abstract class FileTable(
sparkSession: SparkSession,
options: CaseInsensitiveStringMap,
options: java.util.Map[String, String],
paths: Seq[String],
userSpecifiedSchema: Option[StructType])
extends Table with SupportsRead with SupportsWrite {

import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

private def caseSensitiveOptions = options match {
case m: CaseInsensitiveStringMap => m.asCaseSensitiveMap()
case other => other
}

lazy val fileIndex: PartitioningAwareFileIndex = {
val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
val caseSensitiveMap = caseSensitiveOptions.asScala.toMap
// Hadoop Configurations are case sensitive.
val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
if (FileStreamSink.hasMetadata(paths, hadoopConf, sparkSession.sessionState.conf)) {
Expand Down Expand Up @@ -104,7 +109,7 @@ abstract class FileTable(

override def partitioning: Array[Transform] = fileIndex.partitionSchema.asTransforms

override def properties: util.Map[String, String] = options.asCaseSensitiveMap
override def properties: util.Map[String, String] = caseSensitiveOptions

override def capabilities: java.util.Set[TableCapability] = FileTable.CAPABILITIES

Expand Down
Loading