Skip to content

Commit

Permalink
[SPARK-41726][SQL] Remove OptimizedCreateHiveTableAsSelectCommand
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This pr removes `OptimizedCreateHiveTableAsSelectCommand` and move the code that tune `InsertIntoHiveTable` to `InsertIntoHadoopFsRelationCommand` into `RelationConversions`.

### Why are the changes needed?

CTAS use a nested execution to do data writing, so it is unnecessary to have `OptimizedCreateHiveTableAsSelectCommand`. The inside `InsertIntoHiveTable` would be converted to `InsertIntoHadoopFsRelationCommand` if possible.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

fix test

Closes #39263 from ulysses-you/SPARK-41726.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
ulysses-you authored and dongjoon-hyun committed Dec 30, 2022
1 parent bb18703 commit cfdbfb7
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils, InsertIntoDataSourceDirCommand}
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSourceStrategy}
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSourceStrategy, HadoopFsRelation, InsertIntoHadoopFsRelationCommand, LogicalRelation}
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.hive.execution.HiveScriptTransformationExec
import org.apache.spark.sql.internal.HiveSerDe
Expand Down Expand Up @@ -232,15 +233,36 @@ case class RelationConversions(
if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) =>
metastoreCatalog.convert(relation, isWrite = false)

// CTAS
case CreateTable(tableDesc, mode, Some(query))
// CTAS path
// This `InsertIntoHiveTable` is derived from `CreateHiveTableAsSelectCommand`,
// that only matches table insertion inside Hive CTAS.
// This pattern would not cause conflicts because this rule is always applied before
// `HiveAnalysis` and both of these rules are running once.
case InsertIntoHiveTable(tableDesc, _, query, overwrite, ifPartitionNotExists, _)
if query.resolved && DDLUtils.isHiveTable(tableDesc) &&
tableDesc.partitionColumnNames.isEmpty && isConvertible(tableDesc) &&
conf.getConf(HiveUtils.CONVERT_METASTORE_CTAS) =>
// validation is required to be done here before relation conversion.
DDLUtils.checkTableColumns(tableDesc.copy(schema = query.schema))
OptimizedCreateHiveTableAsSelectCommand(
tableDesc, query, query.output.map(_.name), mode)
val hiveTable = DDLUtils.readHiveTable(tableDesc)
val hadoopRelation = metastoreCatalog.convert(hiveTable, isWrite = true) match {
case LogicalRelation(t: HadoopFsRelation, _, _, _) => t
case _ => throw QueryCompilationErrors.tableIdentifierNotConvertedToHadoopFsRelationError(
tableDesc.identifier)
}
InsertIntoHadoopFsRelationCommand(
hadoopRelation.location.rootPaths.head,
Map.empty, // We don't support to convert partitioned table.
ifPartitionNotExists,
Seq.empty, // We don't support to convert partitioned table.
hadoopRelation.bucketSpec,
hadoopRelation.fileFormat,
hadoopRelation.options,
query,
if (overwrite) SaveMode.Overwrite else SaveMode.Append,
Some(tableDesc),
Some(hadoopRelation.location),
query.output.map(_.name))

// INSERT HIVE DIR
case InsertIntoDir(_, storage, provider, query, overwrite)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,21 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SessionCatalog}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.command.{DataWritingCommand, DDLUtils, LeafRunnableCommand}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoHadoopFsRelationCommand, LogicalRelation}
import org.apache.spark.sql.hive.HiveSessionCatalog
import org.apache.spark.util.Utils

trait CreateHiveTableAsSelectBase extends LeafRunnableCommand {
val tableDesc: CatalogTable
val query: LogicalPlan
val outputColumnNames: Seq[String]
val mode: SaveMode
import org.apache.spark.sql.execution.command.{DataWritingCommand, LeafRunnableCommand}

/**
* Create table and insert the query result into it.
*
* @param tableDesc the table description, which may contain serde, storage handler etc.
* @param query the query whose result will be insert into the new relation
* @param mode SaveMode
*/
case class CreateHiveTableAsSelectCommand(
tableDesc: CatalogTable,
query: LogicalPlan,
outputColumnNames: Seq[String],
mode: SaveMode)
extends LeafRunnableCommand {
assert(query.resolved)
override def innerChildren: Seq[LogicalPlan] = query :: Nil

Expand All @@ -60,9 +64,9 @@ trait CreateHiveTableAsSelectBase extends LeafRunnableCommand {
val qe = sparkSession.sessionState.executePlan(command)
qe.assertCommandExecuted()
} else {
tableDesc.storage.locationUri.foreach { p =>
DataWritingCommand.assertEmptyRootPath(p, mode, sparkSession.sessionState.newHadoopConf)
}
tableDesc.storage.locationUri.foreach { p =>
DataWritingCommand.assertEmptyRootPath(p, mode, sparkSession.sessionState.newHadoopConf)
}
// TODO ideally, we should get the output data ready first and then
// add the relation into catalog, just in case of failure occurs while data
// processing.
Expand Down Expand Up @@ -90,38 +94,7 @@ trait CreateHiveTableAsSelectBase extends LeafRunnableCommand {
Seq.empty[Row]
}

// Returns `DataWritingCommand` which actually writes data into the table.
def getWritingCommand(
catalog: SessionCatalog,
tableDesc: CatalogTable,
tableExists: Boolean): DataWritingCommand

// A subclass should override this with the Class name of the concrete type expected to be
// returned from `getWritingCommand`.
def writingCommandClassName: String

override def argString(maxFields: Int): String = {
s"[Database: ${tableDesc.database}, " +
s"TableName: ${tableDesc.identifier.table}, " +
s"${writingCommandClassName}]"
}
}

/**
* Create table and insert the query result into it.
*
* @param tableDesc the table description, which may contain serde, storage handler etc.
* @param query the query whose result will be insert into the new relation
* @param mode SaveMode
*/
case class CreateHiveTableAsSelectCommand(
tableDesc: CatalogTable,
query: LogicalPlan,
outputColumnNames: Seq[String],
mode: SaveMode)
extends CreateHiveTableAsSelectBase {

override def getWritingCommand(
private def getWritingCommand(
catalog: SessionCatalog,
tableDesc: CatalogTable,
tableExists: Boolean): DataWritingCommand = {
Expand All @@ -136,53 +109,8 @@ case class CreateHiveTableAsSelectCommand(
outputColumnNames = outputColumnNames)
}

override def writingCommandClassName: String =
Utils.getSimpleName(classOf[InsertIntoHiveTable])
}

/**
* Create table and insert the query result into it. This creates Hive table but inserts
* the query result into it by using data source.
*
* @param tableDesc the table description, which may contain serde, storage handler etc.
* @param query the query whose result will be insert into the new relation
* @param mode SaveMode
*/
case class OptimizedCreateHiveTableAsSelectCommand(
tableDesc: CatalogTable,
query: LogicalPlan,
outputColumnNames: Seq[String],
mode: SaveMode)
extends CreateHiveTableAsSelectBase {

override def getWritingCommand(
catalog: SessionCatalog,
tableDesc: CatalogTable,
tableExists: Boolean): DataWritingCommand = {
val metastoreCatalog = catalog.asInstanceOf[HiveSessionCatalog].metastoreCatalog
val hiveTable = DDLUtils.readHiveTable(tableDesc)

val hadoopRelation = metastoreCatalog.convert(hiveTable, isWrite = true) match {
case LogicalRelation(t: HadoopFsRelation, _, _, _) => t
case _ => throw QueryCompilationErrors.tableIdentifierNotConvertedToHadoopFsRelationError(
tableIdentifier)
}

InsertIntoHadoopFsRelationCommand(
hadoopRelation.location.rootPaths.head,
Map.empty, // We don't support to convert partitioned table.
false,
Seq.empty, // We don't support to convert partitioned table.
hadoopRelation.bucketSpec,
hadoopRelation.fileFormat,
hadoopRelation.options,
query,
if (tableExists) mode else SaveMode.Overwrite,
Some(tableDesc),
Some(hadoopRelation.location),
query.output.map(_.name))
override def argString(maxFields: Int): String = {
s"[Database: ${tableDesc.database}, " +
s"TableName: ${tableDesc.identifier.table}]"
}

override def writingCommandClassName: String =
Utils.getSimpleName(classOf[InsertIntoHadoopFsRelationCommand])
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,10 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.tags.SlowHiveTest
import org.apache.spark.util.Utils

/**
* A set of tests that validates support for Hive Explain command.
Expand Down Expand Up @@ -185,27 +182,6 @@ class HiveExplainSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
}
}

test("SPARK-26661: Show actual class name of the writing command in CTAS explain") {
Seq(true, false).foreach { convertCTAS =>
withSQLConf(
HiveUtils.CONVERT_METASTORE_CTAS.key -> convertCTAS.toString,
HiveUtils.CONVERT_METASTORE_PARQUET.key -> convertCTAS.toString) {

val df = sql(s"EXPLAIN CREATE TABLE tab1 STORED AS PARQUET AS SELECT * FROM range(2)")
val keywords = if (convertCTAS) {
Seq(
s"Execute ${Utils.getSimpleName(classOf[OptimizedCreateHiveTableAsSelectCommand])}",
Utils.getSimpleName(classOf[InsertIntoHadoopFsRelationCommand]))
} else {
Seq(
s"Execute ${Utils.getSimpleName(classOf[CreateHiveTableAsSelectCommand])}",
Utils.getSimpleName(classOf[InsertIntoHiveTable]))
}
checkKeywordsExist(df, keywords: _*)
}
}
}

test("SPARK-28595: explain should not trigger partition listing") {
Seq(true, false).foreach { legacyBucketedScan =>
withSQLConf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,19 @@ import com.google.common.io.Files
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.{SparkException, TestUtils}
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, CatalogUtils, HiveTableRelation}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import org.apache.spark.sql.execution.TestUncaughtExceptionHandler
import org.apache.spark.sql.execution.{SparkPlanInfo, TestUncaughtExceptionHandler}
import org.apache.spark.sql.execution.adaptive.{DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
import org.apache.spark.sql.execution.command.{InsertIntoDataSourceDirCommand, LoadDataCommand}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
import org.apache.spark.sql.hive.test.{HiveTestJars, TestHiveSingleton}
Expand Down Expand Up @@ -2296,47 +2298,6 @@ abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with TestHi
}
}

test("SPARK-25271: Hive ctas commands should use data source if it is convertible") {
withTempView("p") {
Seq(1, 2, 3).toDF("id").createOrReplaceTempView("p")

Seq("orc", "parquet").foreach { format =>
Seq(true, false).foreach { isConverted =>
withSQLConf(
HiveUtils.CONVERT_METASTORE_ORC.key -> s"$isConverted",
HiveUtils.CONVERT_METASTORE_PARQUET.key -> s"$isConverted") {
Seq(true, false).foreach { isConvertedCtas =>
withSQLConf(HiveUtils.CONVERT_METASTORE_CTAS.key -> s"$isConvertedCtas") {

val targetTable = "targetTable"
withTable(targetTable) {
val df = sql(s"CREATE TABLE $targetTable STORED AS $format AS SELECT id FROM p")
checkAnswer(sql(s"SELECT id FROM $targetTable"),
Row(1) :: Row(2) :: Row(3) :: Nil)

val ctasDSCommand = df.queryExecution.analyzed.collect {
case _: OptimizedCreateHiveTableAsSelectCommand => true
}.headOption
val ctasCommand = df.queryExecution.analyzed.collect {
case _: CreateHiveTableAsSelectCommand => true
}.headOption

if (isConverted && isConvertedCtas) {
assert(ctasDSCommand.nonEmpty)
assert(ctasCommand.isEmpty)
} else {
assert(ctasDSCommand.isEmpty)
assert(ctasCommand.nonEmpty)
}
}
}
}
}
}
}
}
}

test("SPARK-26181 hasMinMaxStats method of ColumnStatsMap is not correct") {
withSQLConf(SQLConf.CBO_ENABLED.key -> "true") {
withTable("all_null") {
Expand Down Expand Up @@ -2682,10 +2643,63 @@ abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with TestHi

@SlowHiveTest
class SQLQuerySuite extends SQLQuerySuiteBase with DisableAdaptiveExecutionSuite {
import spark.implicits._

test("SPARK-36421: Validate all SQL configs to prevent from wrong use for ConfigEntry") {
val df = spark.sql("set -v").select("Meaning")
assert(df.collect().forall(!_.getString(0).contains("ConfigEntry")))
}

test("SPARK-25271: Hive ctas commands should use data source if it is convertible") {
withTempView("p") {
Seq(1, 2, 3).toDF("id").createOrReplaceTempView("p")

Seq("orc", "parquet").foreach { format =>
Seq(true, false).foreach { isConverted =>
withSQLConf(
HiveUtils.CONVERT_METASTORE_ORC.key -> s"$isConverted",
HiveUtils.CONVERT_METASTORE_PARQUET.key -> s"$isConverted") {
Seq(true, false).foreach { isConvertedCtas =>
withSQLConf(HiveUtils.CONVERT_METASTORE_CTAS.key -> s"$isConvertedCtas") {

val targetTable = "targetTable"
withTable(targetTable) {
var commands: Seq[SparkPlanInfo] = Seq.empty
val listener = new SparkListener {
override def onOtherEvent(event: SparkListenerEvent): Unit = {
event match {
case start: SparkListenerSQLExecutionStart =>
commands = commands ++ Seq(start.sparkPlanInfo)
case _ => // ignore other events
}
}
}
spark.sparkContext.addSparkListener(listener)
try {
sql(s"CREATE TABLE $targetTable STORED AS $format AS SELECT id FROM p")
checkAnswer(sql(s"SELECT id FROM $targetTable"),
Row(1) :: Row(2) :: Row(3) :: Nil)
spark.sparkContext.listenerBus.waitUntilEmpty()
assert(commands.size == 3)
assert(commands.head.nodeName == "Execute CreateHiveTableAsSelectCommand")

val v1WriteCommand = commands(1)
if (isConverted && isConvertedCtas) {
assert(v1WriteCommand.nodeName == "Execute InsertIntoHadoopFsRelationCommand")
} else {
assert(v1WriteCommand.nodeName == "Execute InsertIntoHiveTable")
}
} finally {
spark.sparkContext.removeSparkListener(listener)
}
}
}
}
}
}
}
}
}
}
@SlowHiveTest
class SQLQuerySuiteAE extends SQLQuerySuiteBase with EnableAdaptiveExecutionSuite
Expand Down

0 comments on commit cfdbfb7

Please sign in to comment.