diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 65bc57de907b2..3ca3032be485b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -22,7 +22,6 @@ import java.util.UUID import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} import javax.annotation.concurrent.GuardedBy -import scala.jdk.CollectionConverters._ import scala.util.control.NonFatal import org.apache.hadoop.fs.Path @@ -32,22 +31,21 @@ import org.apache.spark.internal.LogKeys.EXTENDED_EXPLAIN_GENERATOR import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, ExtendedExplainGenerator, Row} import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker} -import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateSubqueryAliases, LazyExpression, NameParameterizedQuery, UnresolvedRelation, UnsupportedOperationChecker} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, LazyExpression, NameParameterizedQuery, UnsupportedOperationChecker} import org.apache.spark.sql.catalyst.expressions.codegen.ByteCodeStats import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.logical.{AppendData, Command, CommandResult, CompoundBody, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, ReturnAnswer, TransactionalWrite => TransactionalWritePlan, Union, UnresolvedWith, WithCTE} +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, Command, CommandResult, CompoundBody, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, ReturnAnswer, Union, UnresolvedWith, WithCTE} import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule} import org.apache.spark.sql.catalyst.transactions.TransactionUtils import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.classic.SparkSession -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, LookupCatalog, SupportsCatalogOptions, TableCatalog, TransactionalCatalogPlugin} +import org.apache.spark.sql.connector.catalog.LookupCatalog import org.apache.spark.sql.connector.catalog.transactions.Transaction import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ROOT_ID_KEY import org.apache.spark.sql.execution.adaptive.{AdaptiveExecutionContext, InsertAdaptiveSparkPlan} import org.apache.spark.sql.execution.bucketing.{CoalesceBucketsInJoin, DisableUnnecessaryBucketedScan} -import org.apache.spark.sql.execution.datasources.DataSource -import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Utils, TransactionalExec, V2TableRefreshUtil} +import org.apache.spark.sql.execution.datasources.v2.{TransactionalExec, V2TableRefreshUtil} import org.apache.spark.sql.execution.dynamicpruning.PlanDynamicPruningFilters import org.apache.spark.sql.execution.exchange.EnsureRequirements import org.apache.spark.sql.execution.reuse.ReuseExchangeAndSubquery @@ -56,7 +54,6 @@ import org.apache.spark.sql.execution.streaming.runtime.{IncrementalExecution, W import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.scripting.SqlScriptingExecution import org.apache.spark.sql.streaming.OutputMode -import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.{LazyTry, Utils, UUIDv7Generator} import org.apache.spark.util.ArrayImplicits._ @@ -119,16 +116,9 @@ class QueryExecution( analyzerOpt.flatMap(_.catalogManager.transaction).orElse { // Only begin a new transaction for outer QEs that lead to execution. if (mode != CommandExecutionMode.SKIP) { - def resolve(w: TransactionalWritePlan): Option[TransactionalCatalogPlugin] = - pathBased(w) match { - case Some(c: TransactionalCatalogPlugin) => Some(c) - case Some(_) => None - // If the path is not data source based, fallback to catalog based resolution. - case None => TransactionalWrite.unapply(w) - } val catalog = logical match { - case UnresolvedWith(w: TransactionalWritePlan, _, _) => resolve(w) - case w: TransactionalWritePlan => resolve(w) + case UnresolvedWith(TransactionalWrite(c), _, _) => Some(c) + case TransactionalWrite(c) => Some(c) case _ => None } catalog.map(TransactionUtils.beginTransaction) @@ -139,34 +129,6 @@ class QueryExecution( } private def transactionOpt: Option[Transaction] = lazyTransactionOpt.get - // For path-based tables (e.g. `format.`/path/to/table``) the first identifier part is a - // connector name. SupportsCatalogOptions on the connector tells us which catalog actually - // owns the table. Returns Some(catalog) if parts.head is a recognized SupportsCatalogOptions - // data source (caller decides whether the catalog is transactional), or None to fall through - // to the catalog-based extractor. - private def pathBased(write: TransactionalWritePlan): Option[TableCatalog] = - EliminateSubqueryAliases(write.table) match { - case UnresolvedRelation(parts, _, _) if parts.length > 1 => - try { - DataSource.lookupDataSourceV2(parts.head, sparkSession.sessionState.conf) - .collect { case sco: SupportsCatalogOptions => sco } - .map { sco => - val sessionConfigs = DataSourceV2Utils.extractSessionConfigs( - sco, sparkSession.sessionState.conf) - // Pass the entire identifier as option. The connector can decide how to parse it - // if needed. - val options = sessionConfigs + ("identifier" -> parts.mkString(".")) - CatalogV2Util.getTableProviderCatalog( - sco, catalogManager, new CaseInsensitiveStringMap(options.asJava)) - } - } catch { - // The head of the multipart identifier is not a registered data source. - // Fallback to catalog-based detection. - case _: ClassNotFoundException => None - } - case _ => None - } - // Per-query analyzer: uses a transaction-aware CatalogManager when a transaction is active, // so that all catalog lookups and rule applications during analysis see the correct state // without relying on thread-local context. Any nested QueryExecution that is created during diff --git a/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister index 0354e545aa903..c1fc7234d7c19 100644 --- a/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister +++ b/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -18,8 +18,6 @@ org.apache.spark.sql.sources.FakeSourceOne org.apache.spark.sql.sources.FakeSourceTwo org.apache.spark.sql.sources.FakeSourceThree -org.apache.spark.sql.connector.FakePathBasedSource -org.apache.spark.sql.connector.FakePathBasedSourceWithSessionConfig org.apache.spark.sql.sources.FakeSourceFour org.apache.fakesource.FakeExternalSourceOne org.apache.fakesource.FakeExternalSourceTwo diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala deleted file mode 100644 index c81f53673af3a..0000000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/PathBasedTableTransactionSuite.scala +++ /dev/null @@ -1,249 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.connector - -import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.Row -import org.apache.spark.sql.connector.catalog.{Aborted, Committed, Identifier, InMemoryRowLevelOperationTableCatalog, InMemoryTableCatalog, SessionConfigSupport, SharedTablesInMemoryRowLevelOperationTableCatalog, SupportsCatalogOptions} -import org.apache.spark.sql.execution.streaming.runtime.{MemoryStream, StreamingQueryWrapper} -import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION -import org.apache.spark.sql.sources.DataSourceRegister -import org.apache.spark.sql.streaming.StreamingQuery -import org.apache.spark.sql.util.CaseInsensitiveStringMap - -/** - * Tests for transactional writes to path-based tables, where the table is identified by a - * bare path with no catalog prefix (e.g. `/path/to/t`), or a connector-prefixed path - * (e.g. `pathformat.`/path/to/t``). The transactional catalog is registered as the session - * catalog (`spark_catalog`). - */ -class PathBasedTableTransactionSuite extends RowLevelOperationSuiteBase { - - import testImplicits._ - - private val tablePath = "`/path/to/t`" - private val tablePathWithFormat = "pathformat.`/path/to/t`" - - override def beforeEach(): Unit = { - super.beforeEach() - spark.conf.set( - V2_SESSION_CATALOG_IMPLEMENTATION.key, - classOf[SharedTablesInMemoryRowLevelOperationTableCatalog].getName) - } - - override def afterEach(): Unit = { - SharedTablesInMemoryRowLevelOperationTableCatalog.reset() - spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key) - super.afterEach() - } - - override protected def catalog: InMemoryRowLevelOperationTableCatalog = { - spark.sessionState.catalogManager.v2SessionCatalog - .asInstanceOf[InMemoryRowLevelOperationTableCatalog] - } - - private def streamSessionCatalog(query: StreamingQuery): InMemoryRowLevelOperationTableCatalog = { - val session = query.asInstanceOf[StreamingQueryWrapper].streamingQuery.sparkSessionForStream - session.sessionState.catalogManager.v2SessionCatalog - .asInstanceOf[InMemoryRowLevelOperationTableCatalog] - } - - private def createPathTable(name: String): Unit = { - sql(s"CREATE TABLE $name (id INT, data STRING)") - } - - test("SQL insert into bare path-based table participates in transaction") { - createPathTable(tablePath) - val (txn, _) = executeTransaction { - sql(s"INSERT INTO $tablePath VALUES (1, 'a'), (2, 'b')") - } - assert(txn.currentState === Committed) - assert(txn.isClosed) - checkAnswer(spark.table(tablePath), Row(1, "a") :: Row(2, "b") :: Nil) - } - - test("SQL insert with connector-prefixed path participates in transaction") { - createPathTable(tablePathWithFormat) - val (txn, _) = executeTransaction { - sql(s"INSERT INTO $tablePathWithFormat VALUES (1, 'a'), (2, 'b')") - } - assert(txn.currentState === Committed) - assert(txn.isClosed) - checkAnswer(spark.table(tablePathWithFormat), Row(1, "a") :: Row(2, "b") :: Nil) - } - - test("SQL insert with CTE into connector-prefixed path participates in transaction") { - createPathTable(tablePathWithFormat) - val (txn, _) = executeTransaction { - sql(s""" - |WITH cte AS (SELECT 1 AS id, 'a' AS data) - |INSERT INTO $tablePathWithFormat SELECT * FROM cte - |""".stripMargin) - } - assert(txn.currentState === Committed) - assert(txn.isClosed) - checkAnswer(spark.table(tablePathWithFormat), Row(1, "a") :: Nil) - } - - test("session-config catalog controls which catalog is enrolled in transaction") { - withSQLConf( - "spark.sql.catalog.txncat" -> classOf[InMemoryRowLevelOperationTableCatalog].getName, - "spark.sql.catalog.nontxncat" -> classOf[InMemoryTableCatalog].getName) { - val txnCat = spark.sessionState.catalogManager.catalog("txncat") - .asInstanceOf[InMemoryRowLevelOperationTableCatalog] - - // Non-transactional catalog configured. - withSQLConf("spark.datasource.pathformat2.catalog" -> "nontxncat") { - createPathTable("pathformat2.`/path/to/t1`") - sql("INSERT INTO pathformat2.`/path/to/t1` VALUES (1, 'a')") - // The transaction was not routed to any of the transactional catalogs. - assert(catalog.lastTransaction == null) - assert(txnCat.lastTransaction == null) - } - - // Transactional catalog configured: pathBased resolves txncat as a - // TransactionalCatalogPlugin and opens the transaction there instead. - withSQLConf("spark.datasource.pathformat2.catalog" -> "txncat") { - createPathTable("pathformat2.`/path/to/t2`") - sql("INSERT INTO pathformat2.`/path/to/t2` VALUES (1, 'a')") - assert(txnCat.lastTransaction.currentState === Committed) - assert(txnCat.lastTransaction.isClosed) - } - } - } - - test("streaming write to path-based table participates in transaction") { - sql(s"CREATE TABLE $tablePathWithFormat (value INT)") - - withTempDir { checkpointDir => - val inputData = MemoryStream[Int] - val query = inputData.toDF() - .writeStream - .option("checkpointLocation", checkpointDir.getAbsolutePath) - .toTable(tablePathWithFormat) - - inputData.addData(1, 2, 3) - query.processAllAvailable() - query.stop() - - val streamCat = streamSessionCatalog(query) - val txn = streamCat.lastTransaction - assert(txn != null, "expected a transaction to have been committed") - assert(txn.currentState === Committed) - assert(txn.isClosed) - // Streaming must not add transactions to the main session catalog. - assert(catalog.observedTransactions.isEmpty) - checkAnswer(spark.table(tablePathWithFormat), Row(1) :: Row(2) :: Row(3) :: Nil) - } - } - - test("streaming self-join on path-based table is tracked as a scan event") { - sql(s"CREATE TABLE $tablePathWithFormat (value INT)") - sql(s"INSERT INTO $tablePathWithFormat VALUES (1), (2), (3)") - - withTempDir { checkpointDir => - val inputData = MemoryStream[Int] - val staticData = spark.read.table(tablePathWithFormat) - - val query = inputData.toDF() - .join(staticData, "value") - .writeStream - .option("checkpointLocation", checkpointDir.getAbsolutePath) - .toTable(tablePathWithFormat) - - inputData.addData(1, 2, 3) - query.processAllAvailable() - query.stop() - - val streamCat = streamSessionCatalog(query) - val txn = streamCat.lastTransaction - assert(txn != null, "expected a transaction to have been committed") - assert(txn.currentState === Committed) - assert(txn.isClosed) - // The path-based table is both the write target and a batch source in the same transaction. - assert(txn.catalog.txnTables.size === 1) - val txnTable = txn.catalog.txnTables.values.head - assert(txnTable.scanEvents.size === 1) - // Streaming must not add transactions to the main session catalog beyond the pre-existing - // INSERT transaction. - assert(catalog.observedTransactions.size === 1) - } - } - - test("SQL insert with unregistered format produces analysis error and aborts transaction") { - createPathTable(tablePathWithFormat) - // "Unregistered" is not a known catalog and not registered data source. - // So Spark falls back to treating it as a namespace in spark_catalog. The table - // does not exist, causing an AnalysisException. The transaction is started (because - // spark_catalog IS a TransactionalCatalogPlugin) and then aborted on failure. - checkError( - exception = intercept[AnalysisException] { - sql("INSERT INTO unregistered.`/path/to/t` VALUES (1, 'a'), (2, 'b')") - }, - condition = "TABLE_OR_VIEW_NOT_FOUND", - parameters = Map("relationName" -> "`unregistered`.`/path/to/t`"), - context = ExpectedContext( - fragment = "unregistered.`/path/to/t`", - start = -1, - stop = -1)) - val txn = catalog.lastTransaction - assert(txn.currentState === Aborted) - assert(txn.isClosed) - } -} - -/** - * Simulates a path-based connector (e.g. Delta) that implements [[SupportsCatalogOptions]] - * to route `pathformat.\`/path/to/t\`` SQL identifiers to the session catalog. Returning - * null from [[extractCatalog]] signals that the session catalog (`spark_catalog`) owns the - * table, matching Delta's behavior where DeltaCatalog is registered as spark_catalog. - */ -class FakePathBasedSource - extends FakeV2ProviderWithCustomSchema - with SupportsCatalogOptions - with DataSourceRegister { - - override def shortName(): String = "pathformat" - - // Use the session catalog. - override def extractCatalog(options: CaseInsensitiveStringMap): String = null - - // Not used in the transactional path. - override def extractIdentifier(options: CaseInsensitiveStringMap): Identifier = null -} - -/** - * Like [[FakePathBasedSource]] but resolves the owning catalog from the session config - * `spark.datasource.pathformat2.catalog` instead of always returning null. This simulates - * a connector that lets users configure the target catalog. - */ -class FakePathBasedSourceWithSessionConfig - extends FakeV2ProviderWithCustomSchema - with SupportsCatalogOptions - with SessionConfigSupport - with DataSourceRegister { - - override def shortName(): String = "pathformat2" - - override def keyPrefix: String = "pathformat2" - - override def extractCatalog(options: CaseInsensitiveStringMap): String = options.get("catalog") - - // Not used in the transactional path. - override def extractIdentifier(options: CaseInsensitiveStringMap): Identifier = null -}