Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-41726][SQL] Remove OptimizedCreateHiveTableAsSelectCommand #39263

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils, InsertIntoDataSourceDirCommand}
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSourceStrategy}
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSourceStrategy, HadoopFsRelation, InsertIntoHadoopFsRelationCommand, LogicalRelation}
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.hive.execution.HiveScriptTransformationExec
import org.apache.spark.sql.internal.HiveSerDe
Expand Down Expand Up @@ -232,15 +233,36 @@ case class RelationConversions(
if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) =>
metastoreCatalog.convert(relation, isWrite = false)

// CTAS
case CreateTable(tableDesc, mode, Some(query))
// CTAS path
// This `InsertIntoHiveTable` is derived from `CreateHiveTableAsSelectCommand`,
// that only matches table insertion inside Hive CTAS.
// This pattern would not cause conflicts because this rule is always applied before
// `HiveAnalysis` and both of these rules are running once.
case InsertIntoHiveTable(tableDesc, _, query, overwrite, ifPartitionNotExists, _)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait, how do we optimize hive table insertion before? Given there is no case for InsertIntoHiveTable before.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before, we optimize hive table insertion inside OptimizedCreateHiveTableAsSelectCommand. It returns InsertIntoHadoopFsRelationCommand in getWritingCommand. Now, CreateHiveTableAsSelectCommand always use InsertIntoHiveTable for data writing and we match this pattern to do optimize.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean normal table insertion, not table insertion inside CTAS.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before we match InsertIntoStatement and covert hive relation to HadoopFsRelation then datasource can tune it to InsertIntoHadoopFsRelationCommand. see line 223

That said, we can make CreateHiveTableAsSelectCommand use InsertIntoStatement instead of InsertIntoHiveTable but one issue is we can not aware of if the InsertIntoStatement is from CTAS. So here I use InsertIntoHiveTable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, can we make the code comments clearer? This only matches table insertion inside Hive CTAS.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated comment

if query.resolved && DDLUtils.isHiveTable(tableDesc) &&
tableDesc.partitionColumnNames.isEmpty && isConvertible(tableDesc) &&
conf.getConf(HiveUtils.CONVERT_METASTORE_CTAS) =>
// validation is required to be done here before relation conversion.
DDLUtils.checkTableColumns(tableDesc.copy(schema = query.schema))
OptimizedCreateHiveTableAsSelectCommand(
tableDesc, query, query.output.map(_.name), mode)
val hiveTable = DDLUtils.readHiveTable(tableDesc)
val hadoopRelation = metastoreCatalog.convert(hiveTable, isWrite = true) match {
case LogicalRelation(t: HadoopFsRelation, _, _, _) => t
case _ => throw QueryCompilationErrors.tableIdentifierNotConvertedToHadoopFsRelationError(
tableDesc.identifier)
}
InsertIntoHadoopFsRelationCommand(
hadoopRelation.location.rootPaths.head,
Map.empty, // We don't support to convert partitioned table.
ifPartitionNotExists,
Seq.empty, // We don't support to convert partitioned table.
hadoopRelation.bucketSpec,
hadoopRelation.fileFormat,
hadoopRelation.options,
query,
if (overwrite) SaveMode.Overwrite else SaveMode.Append,
Some(tableDesc),
Some(hadoopRelation.location),
query.output.map(_.name))

// INSERT HIVE DIR
case InsertIntoDir(_, storage, provider, query, overwrite)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,21 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SessionCatalog}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.command.{DataWritingCommand, DDLUtils, LeafRunnableCommand}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoHadoopFsRelationCommand, LogicalRelation}
import org.apache.spark.sql.hive.HiveSessionCatalog
import org.apache.spark.util.Utils

trait CreateHiveTableAsSelectBase extends LeafRunnableCommand {
val tableDesc: CatalogTable
val query: LogicalPlan
val outputColumnNames: Seq[String]
val mode: SaveMode
import org.apache.spark.sql.execution.command.{DataWritingCommand, LeafRunnableCommand}

/**
* Create table and insert the query result into it.
*
* @param tableDesc the table description, which may contain serde, storage handler etc.
* @param query the query whose result will be insert into the new relation
* @param mode SaveMode
*/
case class CreateHiveTableAsSelectCommand(
tableDesc: CatalogTable,
query: LogicalPlan,
outputColumnNames: Seq[String],
mode: SaveMode)
extends LeafRunnableCommand {
assert(query.resolved)
override def innerChildren: Seq[LogicalPlan] = query :: Nil

Expand All @@ -60,9 +64,9 @@ trait CreateHiveTableAsSelectBase extends LeafRunnableCommand {
val qe = sparkSession.sessionState.executePlan(command)
qe.assertCommandExecuted()
} else {
tableDesc.storage.locationUri.foreach { p =>
DataWritingCommand.assertEmptyRootPath(p, mode, sparkSession.sessionState.newHadoopConf)
}
tableDesc.storage.locationUri.foreach { p =>
DataWritingCommand.assertEmptyRootPath(p, mode, sparkSession.sessionState.newHadoopConf)
}
// TODO ideally, we should get the output data ready first and then
// add the relation into catalog, just in case of failure occurs while data
// processing.
Expand Down Expand Up @@ -90,38 +94,7 @@ trait CreateHiveTableAsSelectBase extends LeafRunnableCommand {
Seq.empty[Row]
}

// Returns `DataWritingCommand` which actually writes data into the table.
def getWritingCommand(
catalog: SessionCatalog,
tableDesc: CatalogTable,
tableExists: Boolean): DataWritingCommand

// A subclass should override this with the Class name of the concrete type expected to be
// returned from `getWritingCommand`.
def writingCommandClassName: String

override def argString(maxFields: Int): String = {
s"[Database: ${tableDesc.database}, " +
s"TableName: ${tableDesc.identifier.table}, " +
s"${writingCommandClassName}]"
}
}

/**
* Create table and insert the query result into it.
*
* @param tableDesc the table description, which may contain serde, storage handler etc.
* @param query the query whose result will be insert into the new relation
* @param mode SaveMode
*/
case class CreateHiveTableAsSelectCommand(
tableDesc: CatalogTable,
query: LogicalPlan,
outputColumnNames: Seq[String],
mode: SaveMode)
extends CreateHiveTableAsSelectBase {

override def getWritingCommand(
private def getWritingCommand(
catalog: SessionCatalog,
tableDesc: CatalogTable,
tableExists: Boolean): DataWritingCommand = {
Expand All @@ -136,53 +109,8 @@ case class CreateHiveTableAsSelectCommand(
outputColumnNames = outputColumnNames)
}

override def writingCommandClassName: String =
Utils.getSimpleName(classOf[InsertIntoHiveTable])
}

/**
* Create table and insert the query result into it. This creates Hive table but inserts
* the query result into it by using data source.
*
* @param tableDesc the table description, which may contain serde, storage handler etc.
* @param query the query whose result will be insert into the new relation
* @param mode SaveMode
*/
case class OptimizedCreateHiveTableAsSelectCommand(
tableDesc: CatalogTable,
query: LogicalPlan,
outputColumnNames: Seq[String],
mode: SaveMode)
extends CreateHiveTableAsSelectBase {

override def getWritingCommand(
catalog: SessionCatalog,
tableDesc: CatalogTable,
tableExists: Boolean): DataWritingCommand = {
val metastoreCatalog = catalog.asInstanceOf[HiveSessionCatalog].metastoreCatalog
val hiveTable = DDLUtils.readHiveTable(tableDesc)

val hadoopRelation = metastoreCatalog.convert(hiveTable, isWrite = true) match {
case LogicalRelation(t: HadoopFsRelation, _, _, _) => t
case _ => throw QueryCompilationErrors.tableIdentifierNotConvertedToHadoopFsRelationError(
tableIdentifier)
}

InsertIntoHadoopFsRelationCommand(
hadoopRelation.location.rootPaths.head,
Map.empty, // We don't support to convert partitioned table.
false,
Seq.empty, // We don't support to convert partitioned table.
hadoopRelation.bucketSpec,
hadoopRelation.fileFormat,
hadoopRelation.options,
query,
if (tableExists) mode else SaveMode.Overwrite,
Some(tableDesc),
Some(hadoopRelation.location),
query.output.map(_.name))
override def argString(maxFields: Int): String = {
s"[Database: ${tableDesc.database}, " +
s"TableName: ${tableDesc.identifier.table}]"
}

override def writingCommandClassName: String =
Utils.getSimpleName(classOf[InsertIntoHadoopFsRelationCommand])
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,10 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.tags.SlowHiveTest
import org.apache.spark.util.Utils

/**
* A set of tests that validates support for Hive Explain command.
Expand Down Expand Up @@ -185,27 +182,6 @@ class HiveExplainSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
}
}

test("SPARK-26661: Show actual class name of the writing command in CTAS explain") {
Seq(true, false).foreach { convertCTAS =>
withSQLConf(
HiveUtils.CONVERT_METASTORE_CTAS.key -> convertCTAS.toString,
HiveUtils.CONVERT_METASTORE_PARQUET.key -> convertCTAS.toString) {

val df = sql(s"EXPLAIN CREATE TABLE tab1 STORED AS PARQUET AS SELECT * FROM range(2)")
val keywords = if (convertCTAS) {
Seq(
s"Execute ${Utils.getSimpleName(classOf[OptimizedCreateHiveTableAsSelectCommand])}",
Utils.getSimpleName(classOf[InsertIntoHadoopFsRelationCommand]))
} else {
Seq(
s"Execute ${Utils.getSimpleName(classOf[CreateHiveTableAsSelectCommand])}",
Utils.getSimpleName(classOf[InsertIntoHiveTable]))
}
checkKeywordsExist(df, keywords: _*)
}
}
}

test("SPARK-28595: explain should not trigger partition listing") {
Seq(true, false).foreach { legacyBucketedScan =>
withSQLConf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,19 @@ import com.google.common.io.Files
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.{SparkException, TestUtils}
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, CatalogUtils, HiveTableRelation}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import org.apache.spark.sql.execution.TestUncaughtExceptionHandler
import org.apache.spark.sql.execution.{SparkPlanInfo, TestUncaughtExceptionHandler}
import org.apache.spark.sql.execution.adaptive.{DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
import org.apache.spark.sql.execution.command.{InsertIntoDataSourceDirCommand, LoadDataCommand}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
import org.apache.spark.sql.hive.test.{HiveTestJars, TestHiveSingleton}
Expand Down Expand Up @@ -2296,47 +2298,6 @@ abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with TestHi
}
}

test("SPARK-25271: Hive ctas commands should use data source if it is convertible") {
withTempView("p") {
Seq(1, 2, 3).toDF("id").createOrReplaceTempView("p")

Seq("orc", "parquet").foreach { format =>
Seq(true, false).foreach { isConverted =>
withSQLConf(
HiveUtils.CONVERT_METASTORE_ORC.key -> s"$isConverted",
HiveUtils.CONVERT_METASTORE_PARQUET.key -> s"$isConverted") {
Seq(true, false).foreach { isConvertedCtas =>
withSQLConf(HiveUtils.CONVERT_METASTORE_CTAS.key -> s"$isConvertedCtas") {

val targetTable = "targetTable"
withTable(targetTable) {
val df = sql(s"CREATE TABLE $targetTable STORED AS $format AS SELECT id FROM p")
checkAnswer(sql(s"SELECT id FROM $targetTable"),
Row(1) :: Row(2) :: Row(3) :: Nil)

val ctasDSCommand = df.queryExecution.analyzed.collect {
case _: OptimizedCreateHiveTableAsSelectCommand => true
}.headOption
val ctasCommand = df.queryExecution.analyzed.collect {
case _: CreateHiveTableAsSelectCommand => true
}.headOption

if (isConverted && isConvertedCtas) {
assert(ctasDSCommand.nonEmpty)
assert(ctasCommand.isEmpty)
} else {
assert(ctasDSCommand.isEmpty)
assert(ctasCommand.nonEmpty)
}
}
}
}
}
}
}
}
}

test("SPARK-26181 hasMinMaxStats method of ColumnStatsMap is not correct") {
withSQLConf(SQLConf.CBO_ENABLED.key -> "true") {
withTable("all_null") {
Expand Down Expand Up @@ -2682,10 +2643,63 @@ abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with TestHi

@SlowHiveTest
class SQLQuerySuite extends SQLQuerySuiteBase with DisableAdaptiveExecutionSuite {
import spark.implicits._

test("SPARK-36421: Validate all SQL configs to prevent from wrong use for ConfigEntry") {
val df = spark.sql("set -v").select("Meaning")
assert(df.collect().forall(!_.getString(0).contains("ConfigEntry")))
}

test("SPARK-25271: Hive ctas commands should use data source if it is convertible") {
withTempView("p") {
Seq(1, 2, 3).toDF("id").createOrReplaceTempView("p")

Seq("orc", "parquet").foreach { format =>
Seq(true, false).foreach { isConverted =>
withSQLConf(
HiveUtils.CONVERT_METASTORE_ORC.key -> s"$isConverted",
HiveUtils.CONVERT_METASTORE_PARQUET.key -> s"$isConverted") {
Seq(true, false).foreach { isConvertedCtas =>
withSQLConf(HiveUtils.CONVERT_METASTORE_CTAS.key -> s"$isConvertedCtas") {

val targetTable = "targetTable"
withTable(targetTable) {
var commands: Seq[SparkPlanInfo] = Seq.empty
val listener = new SparkListener {
override def onOtherEvent(event: SparkListenerEvent): Unit = {
event match {
case start: SparkListenerSQLExecutionStart =>
commands = commands ++ Seq(start.sparkPlanInfo)
case _ => // ignore other events
}
}
}
spark.sparkContext.addSparkListener(listener)
try {
sql(s"CREATE TABLE $targetTable STORED AS $format AS SELECT id FROM p")
checkAnswer(sql(s"SELECT id FROM $targetTable"),
Row(1) :: Row(2) :: Row(3) :: Nil)
spark.sparkContext.listenerBus.waitUntilEmpty()
assert(commands.size == 3)
assert(commands.head.nodeName == "Execute CreateHiveTableAsSelectCommand")

val v1WriteCommand = commands(1)
if (isConverted && isConvertedCtas) {
assert(v1WriteCommand.nodeName == "Execute InsertIntoHadoopFsRelationCommand")
} else {
assert(v1WriteCommand.nodeName == "Execute InsertIntoHiveTable")
}
} finally {
spark.sparkContext.removeSparkListener(listener)
}
}
}
}
}
}
}
}
}
}
@SlowHiveTest
class SQLQuerySuiteAE extends SQLQuerySuiteBase with EnableAdaptiveExecutionSuite
Expand Down