Skip to content

Commit

Permalink
modify the append logic
Browse files Browse the repository at this point in the history
  • Loading branch information
windpiger committed Jan 19, 2017
1 parent 21c5e3f commit 2bf67c7
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 26 deletions.
14 changes: 2 additions & 12 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogRelation, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation}
import org.apache.spark.sql.sources.BaseRelation
Expand Down Expand Up @@ -372,11 +372,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
val tableIdentWithDB = tableIdent.copy(database = Some(db))
val tableName = tableIdentWithDB.unquotedString

var tableRelation: Option[LogicalPlan] = None
if (tableExists) {
tableRelation = Some(catalog.lookupRelation(tableIdentWithDB))
}

(tableExists, mode) match {
case (true, SaveMode.Ignore) =>
// Do nothing
Expand All @@ -391,7 +386,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
case relation: CatalogRelation if DDLUtils.isHiveTable(relation.catalogTable) =>
relation.catalogTable.identifier
}
EliminateSubqueryAliases(tableRelation.get) match {
EliminateSubqueryAliases(catalog.lookupRelation(tableIdentWithDB)) match {
// check if the table is a data source table (the relation is a BaseRelation).
case LogicalRelation(dest: BaseRelation, _, _) if srcRelations.contains(dest) =>
throw new AnalysisException(
Expand All @@ -410,11 +405,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
// Refresh the cache of the table in the catalog.
catalog.refreshTable(tableIdentWithDB)

case (true, SaveMode.Append)
if tableRelation.isDefined && tableRelation.get.isInstanceOf[CatalogRelation]
&& DDLUtils.isHiveTable(EliminateSubqueryAliases(tableRelation.get)
.asInstanceOf[CatalogRelation].catalogTable) =>
insertInto(tableIdentWithDB)
case _ => createTable(tableIdent)
}
}
Expand Down
Expand Up @@ -112,15 +112,19 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
throw new AnalysisException("Saving data into a view is not allowed.")
}

val (existingProvider, specifiedProvider) = DDLUtils.isHiveTable(existingTable) match {
case false =>
(DataSource.lookupDataSource(existingTable.provider.get),
DataSource.lookupDataSource(tableDesc.provider.get))
case true =>
(existingTable.provider.get, tableDesc.provider.get)
}
// Check if the specified data source match the data source of the existing table.
val existingProvider = DataSource.lookupDataSource(existingTable.provider.get)
val specifiedProvider = DataSource.lookupDataSource(tableDesc.provider.get)
// TODO: Check that options from the resolved relation match the relation that we are
// inserting into (i.e. using the same compression).
if (existingProvider != specifiedProvider) {
throw new AnalysisException(s"The format of the existing table $tableName is " +
s"`${existingProvider.getSimpleName}`. It doesn't match the specified format " +
s"`${specifiedProvider.getSimpleName}`.")
s"`$existingProvider`. It doesn't match the specified format `$specifiedProvider`.")
}

if (analyzedQuery.schema.length != existingTable.schema.length) {
Expand Down
Expand Up @@ -93,18 +93,11 @@ class HiveAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
InsertIntoHiveTable(table, partSpec, query, overwrite, ifNotExists)

case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) =>
// Currently `DataFrameWriter.saveAsTable` doesn't support the Append mode of hive serde
// tables yet.
if (mode == SaveMode.Append) {
throw new AnalysisException(
"CTAS for hive serde tables does not support append semantics.")
}

val dbName = tableDesc.identifier.database.getOrElse(session.catalog.currentDatabase)
CreateHiveTableAsSelectCommand(
tableDesc.copy(identifier = tableDesc.identifier.copy(database = Some(dbName))),
query,
mode == SaveMode.Ignore)
mode == SaveMode.Ignore | mode == SaveMode.Append)
}

/**
Expand Down
Expand Up @@ -69,7 +69,7 @@ case class CreateHiveTableAsSelectCommand(
withFormat
}

sparkSession.sessionState.catalog.createTable(withSchema, ignoreIfExists = false)
sparkSession.sessionState.catalog.createTable(withSchema, ignoreIfExists = true)

// Get the Metastore Relation
sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match {
Expand All @@ -81,7 +81,8 @@ case class CreateHiveTableAsSelectCommand(
// processing.
if (sparkSession.sessionState.catalog.tableExists(tableIdentifier)) {
if (ignoreIfExists) {
// table already exists, will do nothing, to keep consistent with Hive
sparkSession.sessionState.executePlan(InsertIntoTable(
metastoreRelation, Map(), query, overwrite = false, ifNotExists = false)).toRdd
} else {
throw new AnalysisException(s"$tableIdentifier already exists.")
}
Expand Down

0 comments on commit 2bf67c7

Please sign in to comment.