Skip to content

Commit

Permalink
add docs
Browse files Browse the repository at this point in the history
  • Loading branch information
brkyvz committed Aug 12, 2019
1 parent 428e82a commit d9f478d
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,10 @@

import org.apache.spark.sql.sources.v2.Table;

/**
* Internal interface used for table definitions, which we do not have complete information to
* resolve yet. This is primarily used by the `CatalogTableAsV2` wrapper returned by the
* V2SessionCatalog. When a `CatalogTableAsV2` is returned by the V2SessionCatalog, we defer
* planning to V1 data source code paths.
*/
public interface UnresolvedTable extends Table {}
35 changes: 16 additions & 19 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ import org.apache.spark.annotation.Stable
import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, TableCatalog}
import org.apache.spark.sql.catalog.v2.expressions._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, UnresolvedRelation}
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, InsertIntoTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, DataSourceUtils, LogicalRelation}
Expand Down Expand Up @@ -252,19 +252,13 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
assertNotBucketed("save")

val session = df.sparkSession
val useV1Sources =
session.sessionState.conf.useV1SourceWriterList.toLowerCase(Locale.ROOT).split(",")
val cls = DataSource.lookupDataSource(source, session.sessionState.conf)
val shouldUseV1Source = cls.newInstance() match {
case d: DataSourceRegister if useV1Sources.contains(d.shortName()) => true
case _ => useV1Sources.contains(cls.getCanonicalName.toLowerCase(Locale.ROOT))
}
val canUseV2 = canUseV2Source(session, cls) && partitioningColumns.isEmpty

// In Data Source V2 project, partitioning is still under development.
// Here we fallback to V1 if partitioning columns are specified.
// TODO(SPARK-26778): use V2 implementations when partitioning feature is supported.
if (!shouldUseV1Source && classOf[TableProvider].isAssignableFrom(cls) &&
partitioningColumns.isEmpty) {
if (canUseV2) {
val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider]
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
provider, session.sessionState.conf)
Expand Down Expand Up @@ -495,15 +489,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._

val session = df.sparkSession
val useV1Sources =
session.sessionState.conf.useV1SourceWriterList.toLowerCase(Locale.ROOT).split(",")
val cls = DataSource.lookupDataSource(source, session.sessionState.conf)
val shouldUseV1Source = cls.newInstance() match {
case d: DataSourceRegister if useV1Sources.contains(d.shortName()) => true
case _ => useV1Sources.contains(cls.getCanonicalName.toLowerCase(Locale.ROOT))
}

val canUseV2 = !shouldUseV1Source && classOf[TableProvider].isAssignableFrom(cls)
val provider = DataSource.lookupDataSource(source, session.sessionState.conf)
val canUseV2 = canUseV2Source(session, provider)
val sessionCatalogOpt = session.sessionState.analyzer.sessionCatalog

session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match {
Expand Down Expand Up @@ -848,6 +835,16 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {

private def modeForDSV2 = mode.getOrElse(SaveMode.Append)

private def canUseV2Source(session: SparkSession, providerClass: Class[_]): Boolean = {
val useV1Sources =
session.sessionState.conf.useV1SourceWriterList.toLowerCase(Locale.ROOT).split(",")
val shouldUseV1Source = providerClass.newInstance() match {
case d: DataSourceRegister if useV1Sources.contains(d.shortName()) => true
case _ => useV1Sources.contains(providerClass.getCanonicalName.toLowerCase(Locale.ROOT))
}
!shouldUseV1Source && classOf[TableProvider].isAssignableFrom(providerClass)
}

///////////////////////////////////////////////////////////////////////////////////////
// Builder pattern config options
///////////////////////////////////////////////////////////////////////////////////////
Expand Down

0 comments on commit d9f478d

Please sign in to comment.