Skip to content

Commit

Permalink
[SPARK-29057][SQL] remove InsertIntoTable
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Remove `InsertIntoTable` and replace it's usage by `InsertIntoStatement`

### Why are the changes needed?

`InsertIntoTable` and `InsertIntoStatement` are almost identical (except some namings). It doesn't make sense to keep 2 identical plans. After the removal of `InsertIntoTable`, the analysis process becomes:
1. parser creates `InsertIntoStatement`
2. v2 rule `ResolveInsertInto` converts `InsertIntoStatement` to v2 commands.
3. v1 rules like `DataSourceAnalysis` and `HiveAnalysis` convert `InsertIntoStatement` to v1 commands.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

existing tests

Closes #25763 from cloud-fan/remove.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
  • Loading branch information
cloud-fan authored and HyukjinKwon committed Sep 12, 2019
1 parent 7ce0f2b commit eec728a
Show file tree
Hide file tree
Showing 10 changed files with 46 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,7 @@ class Analyzer(
}

def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case i @ InsertIntoTable(u @ UnresolvedRelation(AsTableIdentifier(ident)), _, child, _, _)
case i @ InsertIntoStatement(u @ UnresolvedRelation(AsTableIdentifier(ident)), _, child, _, _)
if child.resolved =>
EliminateSubqueryAliases(lookupTableFromCatalog(ident, u)) match {
case v: View =>
Expand Down Expand Up @@ -794,7 +794,7 @@ class Analyzer(
case scala.Right(Some(v2Table: Table)) =>
resolveV2Insert(i, v2Table)
case _ =>
InsertIntoTable(i.table, i.partitionSpec, i.query, i.overwrite, i.ifPartitionNotExists)
i
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.api.python.PythonEvalType
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalog.v2.TableChange.{AddColumn, DeleteColumn, RenameColumn, UpdateColumnComment, UpdateColumnType}
import org.apache.spark.sql.catalyst.expressions._
Expand All @@ -26,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.optimizer.BooleanSimplification
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.sql.AlterTableStatement
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableStatement, InsertIntoStatement}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -94,6 +93,9 @@ trait CheckAnalysis extends PredicateHelper {
case u: UnresolvedRelation =>
u.failAnalysis(s"Table or view not found: ${u.multipartIdentifier.quoted}")

case InsertIntoStatement(u: UnresolvedRelation, _, _, _, _) =>
failAnalysis(s"Table not found: ${u.multipartIdentifier.quoted}")

case operator: LogicalPlan =>
// Check argument data types of higher-order functions downwards first.
// If the arguments of the higher-order functions are resolved but the type check fails,
Expand Down Expand Up @@ -491,9 +493,6 @@ trait CheckAnalysis extends PredicateHelper {
throw new IllegalStateException(
"Internal error: logical hint operator should have been removed during analysis")

case InsertIntoTable(u: UnresolvedRelation, _, _, _, _) =>
failAnalysis(s"Table not found: ${u.multipartIdentifier.quoted}")

case f @ Filter(condition, _)
if PlanHelper.specialExpressionsInUnsupportedOperator(f).nonEmpty =>
val invalidExprSqls = PlanHelper.specialExpressionsInUnsupportedOperator(f).map(_.sql)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -639,42 +639,6 @@ case class ShowTables(
AttributeReference("tableName", StringType, nullable = false)())
}

/**
* Insert some data into a table. Note that this plan is unresolved and has to be replaced by the
* concrete implementations during analysis.
*
* @param table the logical plan representing the table. In the future this should be a
* [[org.apache.spark.sql.catalyst.catalog.CatalogTable]] once we converge Hive tables
* and data source tables.
* @param partition a map from the partition key to the partition value (optional). If the partition
* value is optional, dynamic partition insert will be performed.
* As an example, `INSERT INTO tbl PARTITION (a=1, b=2) AS ...` would have
* Map('a' -> Some('1'), 'b' -> Some('2')),
* and `INSERT INTO tbl PARTITION (a=1, b) AS ...`
* would have Map('a' -> Some('1'), 'b' -> None).
* @param query the logical plan representing data to write to.
* @param overwrite overwrite existing table or partitions.
* @param ifPartitionNotExists If true, only write if the partition does not exist.
* Only valid for static partitions.
*/
case class InsertIntoTable(
table: LogicalPlan,
partition: Map[String, Option[String]],
query: LogicalPlan,
overwrite: Boolean,
ifPartitionNotExists: Boolean)
extends LogicalPlan {
// IF NOT EXISTS is only valid in INSERT OVERWRITE
assert(overwrite || !ifPartitionNotExists)
// IF NOT EXISTS is only valid in static partitions
assert(partition.values.forall(_.nonEmpty) || !ifPartitionNotExists)

// We don't want `table` in children as sometimes we don't want to transform it.
override def children: Seq[LogicalPlan] = query :: Nil
override def output: Seq[Attribute] = Seq.empty
override lazy val resolved: Boolean = false
}

/**
* Insert query result into a directory.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql

import java.util.{Locale, Properties, UUID}
import java.util.{Locale, Properties}

import scala.collection.JavaConverters._

Expand All @@ -28,7 +28,8 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, InsertIntoTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect}
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect}
import org.apache.spark.sql.catalyst.plans.logical.sql.InsertIntoStatement
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.command.DDLUtils
Expand Down Expand Up @@ -408,9 +409,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {

private def insertInto(tableIdent: TableIdentifier): Unit = {
runCommand(df.sparkSession, "insertInto") {
InsertIntoTable(
InsertIntoStatement(
table = UnresolvedRelation(tableIdent),
partition = Map.empty[String, Option[String]],
partitionSpec = Map.empty[String, Option[String]],
query = df.logicalPlan,
overwrite = modeForDSV1 == SaveMode.Overwrite,
ifPartitionNotExists = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTable, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.plans.logical.sql.InsertIntoStatement
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.command._
Expand Down Expand Up @@ -140,7 +141,7 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
if query.resolved && DDLUtils.isDatasourceTable(tableDesc) =>
CreateDataSourceTableAsSelectCommand(tableDesc, mode, query, query.output.map(_.name))

case InsertIntoTable(l @ LogicalRelation(_: InsertableRelation, _, _, _),
case InsertIntoStatement(l @ LogicalRelation(_: InsertableRelation, _, _, _),
parts, query, overwrite, false) if parts.isEmpty =>
InsertIntoDataSourceCommand(l, query, overwrite)

Expand All @@ -152,7 +153,7 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast

InsertIntoDataSourceDirCommand(storage, provider.get, query, overwrite)

case i @ InsertIntoTable(
case i @ InsertIntoStatement(
l @ LogicalRelation(t: HadoopFsRelation, _, table, _), parts, query, overwrite, _) =>
// If the InsertIntoTable command is for a partitioned HadoopFsRelation and
// the user has specified static partitions, we add a Project operator on top of the query
Expand Down Expand Up @@ -241,11 +242,11 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
}

override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _)
case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta), _, _, _, _)
if DDLUtils.isDatasourceTable(tableMeta) =>
i.copy(table = readDataSourceTable(tableMeta))

case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _) =>
case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta), _, _, _, _) =>
i.copy(table = DDLUtils.readHiveTable(tableMeta))

case UnresolvedCatalogRelation(tableMeta) if DDLUtils.isDatasourceTable(tableMeta) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,21 @@ package org.apache.spark.sql.execution.datasources
import scala.collection.JavaConverters._

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.sql.InsertIntoStatement
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, FileDataSourceV2, FileTable}

/**
* Replace the File source V2 table in [[InsertIntoTable]] to V1 [[FileFormat]].
* Replace the File source V2 table in [[InsertIntoStatement]] to V1 [[FileFormat]].
* E.g, with temporary view `t` using [[FileDataSourceV2]], inserting into view `t` fails
* since there is no corresponding physical plan.
* This is a temporary hack for making current data source V2 work. It should be
* removed when Catalog support of file data source v2 is finished.
*/
class FallBackFileSourceV2(sparkSession: SparkSession) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case i @ InsertIntoTable(d @ DataSourceV2Relation(table: FileTable, _, _), _, _, _, _) =>
case i @ InsertIntoStatement(d @ DataSourceV2Relation(table: FileTable, _, _), _, _, _, _) =>
val v1FileFormat = table.fallbackFileFormat.newInstance()
val relation = HadoopFsRelation(
table.fileIndex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@ import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
import org.apache.spark.sql.catalog.v2.expressions.{FieldReference, RewritableTransform}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, RowOrdering}
import org.apache.spark.sql.catalyst.expressions.{Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, RowOrdering}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.sql.InsertIntoStatement
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
import org.apache.spark.sql.sources.InsertableRelation
import org.apache.spark.sql.types.{ArrayType, AtomicType, StructField, StructType}
import org.apache.spark.sql.types.{AtomicType, StructType}
import org.apache.spark.sql.util.SchemaUtils

/**
Expand Down Expand Up @@ -377,19 +378,19 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
}

/**
* Preprocess the [[InsertIntoTable]] plan. Throws exception if the number of columns mismatch, or
* specified partition columns are different from the existing partition columns in the target
* Preprocess the [[InsertIntoStatement]] plan. Throws exception if the number of columns mismatch,
* or specified partition columns are different from the existing partition columns in the target
* table. It also does data type casting and field renaming, to make sure that the columns to be
* inserted have the correct data type and fields have the correct names.
*/
case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
private def preprocess(
insert: InsertIntoTable,
insert: InsertIntoStatement,
tblName: String,
partColNames: Seq[String]): InsertIntoTable = {
partColNames: Seq[String]): InsertIntoStatement = {

val normalizedPartSpec = PartitioningUtils.normalizePartitionSpec(
insert.partition, partColNames, tblName, conf.resolver)
insert.partitionSpec, partColNames, tblName, conf.resolver)

val staticPartCols = normalizedPartSpec.filter(_._2.isDefined).keySet
val expectedColumns = insert.table.output.filterNot(a => staticPartCols.contains(a.name))
Expand Down Expand Up @@ -417,16 +418,16 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
""".stripMargin)
}

insert.copy(query = newQuery, partition = normalizedPartSpec)
insert.copy(query = newQuery, partitionSpec = normalizedPartSpec)
} else {
// All partition columns are dynamic because the InsertIntoTable command does
// not explicitly specify partitioning columns.
insert.copy(query = newQuery, partition = partColNames.map(_ -> None).toMap)
insert.copy(query = newQuery, partitionSpec = partColNames.map(_ -> None).toMap)
}
}

def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case i @ InsertIntoTable(table, _, query, _, _) if table.resolved && query.resolved =>
case i @ InsertIntoStatement(table, _, query, _, _) if table.resolved && query.resolved =>
table match {
case relation: HiveTableRelation =>
val metadata = relation.tableMeta
Expand Down Expand Up @@ -503,7 +504,7 @@ object PreWriteCheck extends (LogicalPlan => Unit) {

def apply(plan: LogicalPlan): Unit = {
plan.foreach {
case InsertIntoTable(l @ LogicalRelation(relation, _, _, _), partition, query, _, _) =>
case InsertIntoStatement(l @ LogicalRelation(relation, _, _, _), partition, query, _, _) =>
// Get all input data source relations of the query.
val srcRelations = query.collect {
case LogicalRelation(src, _, _, _) => src
Expand All @@ -525,7 +526,7 @@ object PreWriteCheck extends (LogicalPlan => Unit) {
case _ => failAnalysis(s"$relation does not allow insertion.")
}

case InsertIntoTable(t, _, _, _, _)
case InsertIntoStatement(t, _, _, _, _)
if !t.isInstanceOf[LeafNode] ||
t.isInstanceOf[Range] ||
t.isInstanceOf[OneRowRelation] ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark._
import org.apache.spark.sql.{functions, AnalysisException, QueryTest}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, InsertIntoTable, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.plans.logical.sql.InsertIntoStatement
import org.apache.spark.sql.execution.{QueryExecution, WholeStageCodegenExec}
import org.apache.spark.sql.execution.datasources.{CreateTable, InsertIntoHadoopFsRelationCommand}
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
Expand Down Expand Up @@ -200,8 +201,8 @@ class DataFrameCallbackSuite extends QueryTest with SharedSparkSession {
sparkContext.listenerBus.waitUntilEmpty(1000)
assert(commands.length == 3)
assert(commands(2)._1 == "insertInto")
assert(commands(2)._2.isInstanceOf[InsertIntoTable])
assert(commands(2)._2.asInstanceOf[InsertIntoTable].table
assert(commands(2)._2.isInstanceOf[InsertIntoStatement])
assert(commands(2)._2.asInstanceOf[InsertIntoStatement].table
.asInstanceOf[UnresolvedRelation].multipartIdentifier == Seq("tab"))
}
// exiting withTable adds commands(3) via onSuccess (drops tab)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTable, LogicalPlan,
ScriptTransformation, Statistics}
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, LogicalPlan, ScriptTransformation, Statistics}
import org.apache.spark.sql.catalyst.plans.logical.sql.InsertIntoStatement
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
Expand Down Expand Up @@ -143,9 +143,9 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
hiveTableWithStats(relation)

// handles InsertIntoTable specially as the table in InsertIntoTable is not added in its
// handles InsertIntoStatement specially as the table in InsertIntoStatement is not added in its
// children, hence not matched directly by previous HiveTableRelation case.
case i @ InsertIntoTable(relation: HiveTableRelation, _, _, _, _)
case i @ InsertIntoStatement(relation: HiveTableRelation, _, _, _, _)
if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
i.copy(table = hiveTableWithStats(relation))
}
Expand All @@ -159,7 +159,7 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
*/
object HiveAnalysis extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case InsertIntoTable(r: HiveTableRelation, partSpec, query, overwrite, ifPartitionNotExists)
case InsertIntoStatement(r: HiveTableRelation, partSpec, query, overwrite, ifPartitionNotExists)
if DDLUtils.isHiveTable(r.tableMeta) =>
InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite,
ifPartitionNotExists, query.output.map(_.name))
Expand Down Expand Up @@ -207,11 +207,12 @@ case class RelationConversions(
override def apply(plan: LogicalPlan): LogicalPlan = {
plan resolveOperators {
// Write path
case InsertIntoTable(r: HiveTableRelation, partition, query, overwrite, ifPartitionNotExists)
case InsertIntoStatement(
r: HiveTableRelation, partition, query, overwrite, ifPartitionNotExists)
if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
(!r.isPartitioned || SQLConf.get.getConf(HiveUtils.CONVERT_INSERTING_PARTITIONED_TABLE))
&& isConvertible(r) =>
InsertIntoTable(metastoreCatalog.convert(r), partition,
InsertIntoStatement(metastoreCatalog.convert(r), partition,
query, overwrite, ifPartitionNotExists)

// Read path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
import org.apache.spark.SparkException
import org.apache.spark.sql.{QueryTest, _}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
import org.apache.spark.sql.hive.execution.InsertIntoHiveTable
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
Expand Down

0 comments on commit eec728a

Please sign in to comment.