Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.util.UUID
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import javax.annotation.concurrent.GuardedBy

import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal

import org.apache.hadoop.fs.Path
Expand All @@ -32,22 +31,21 @@ import org.apache.spark.internal.LogKeys.EXTENDED_EXPLAIN_GENERATOR
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, ExtendedExplainGenerator, Row}
import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker}
import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateSubqueryAliases, LazyExpression, NameParameterizedQuery, UnresolvedRelation, UnsupportedOperationChecker}
import org.apache.spark.sql.catalyst.analysis.{Analyzer, LazyExpression, NameParameterizedQuery, UnsupportedOperationChecker}
import org.apache.spark.sql.catalyst.expressions.codegen.ByteCodeStats
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, Command, CommandResult, CompoundBody, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, ReturnAnswer, TransactionalWrite => TransactionalWritePlan, Union, UnresolvedWith, WithCTE}
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, Command, CommandResult, CompoundBody, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, ReturnAnswer, Union, UnresolvedWith, WithCTE}
import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule}
import org.apache.spark.sql.catalyst.transactions.TransactionUtils
import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.classic.SparkSession
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, LookupCatalog, SupportsCatalogOptions, TableCatalog, TransactionalCatalogPlugin}
import org.apache.spark.sql.connector.catalog.LookupCatalog
import org.apache.spark.sql.connector.catalog.transactions.Transaction
import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ROOT_ID_KEY
import org.apache.spark.sql.execution.adaptive.{AdaptiveExecutionContext, InsertAdaptiveSparkPlan}
import org.apache.spark.sql.execution.bucketing.{CoalesceBucketsInJoin, DisableUnnecessaryBucketedScan}
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Utils, TransactionalExec, V2TableRefreshUtil}
import org.apache.spark.sql.execution.datasources.v2.{TransactionalExec, V2TableRefreshUtil}
import org.apache.spark.sql.execution.dynamicpruning.PlanDynamicPruningFilters
import org.apache.spark.sql.execution.exchange.EnsureRequirements
import org.apache.spark.sql.execution.reuse.ReuseExchangeAndSubquery
Expand All @@ -56,7 +54,6 @@ import org.apache.spark.sql.execution.streaming.runtime.{IncrementalExecution, W
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.scripting.SqlScriptingExecution
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.{LazyTry, Utils, UUIDv7Generator}
import org.apache.spark.util.ArrayImplicits._

Expand Down Expand Up @@ -119,16 +116,9 @@ class QueryExecution(
analyzerOpt.flatMap(_.catalogManager.transaction).orElse {
// Only begin a new transaction for outer QEs that lead to execution.
if (mode != CommandExecutionMode.SKIP) {
def resolve(w: TransactionalWritePlan): Option[TransactionalCatalogPlugin] =
pathBased(w) match {
case Some(c: TransactionalCatalogPlugin) => Some(c)
case Some(_) => None
// If the path is not data source based, fallback to catalog based resolution.
case None => TransactionalWrite.unapply(w)
}
val catalog = logical match {
case UnresolvedWith(w: TransactionalWritePlan, _, _) => resolve(w)
case w: TransactionalWritePlan => resolve(w)
case UnresolvedWith(TransactionalWrite(c), _, _) => Some(c)
case TransactionalWrite(c) => Some(c)
case _ => None
}
catalog.map(TransactionUtils.beginTransaction)
Expand All @@ -139,34 +129,6 @@ class QueryExecution(
}
private def transactionOpt: Option[Transaction] = lazyTransactionOpt.get

// For path-based tables (e.g. `format.`/path/to/table``) the first identifier part is a
// connector name. SupportsCatalogOptions on the connector tells us which catalog actually
// owns the table. Returns Some(catalog) if parts.head is a recognized SupportsCatalogOptions
// data source (caller decides whether the catalog is transactional), or None to fall through
// to the catalog-based extractor.
private def pathBased(write: TransactionalWritePlan): Option[TableCatalog] =
EliminateSubqueryAliases(write.table) match {
case UnresolvedRelation(parts, _, _) if parts.length > 1 =>
try {
DataSource.lookupDataSourceV2(parts.head, sparkSession.sessionState.conf)
.collect { case sco: SupportsCatalogOptions => sco }
.map { sco =>
val sessionConfigs = DataSourceV2Utils.extractSessionConfigs(
sco, sparkSession.sessionState.conf)
// Pass the entire identifier as option. The connector can decide how to parse it
// if needed.
val options = sessionConfigs + ("identifier" -> parts.mkString("."))
CatalogV2Util.getTableProviderCatalog(
sco, catalogManager, new CaseInsensitiveStringMap(options.asJava))
}
} catch {
// The head of the multipart identifier is not a registered data source.
// Fallback to catalog-based detection.
case _: ClassNotFoundException => None
}
case _ => None
}

// Per-query analyzer: uses a transaction-aware CatalogManager when a transaction is active,
// so that all catalog lookups and rule applications during analysis see the correct state
// without relying on thread-local context. Any nested QueryExecution that is created during
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
org.apache.spark.sql.sources.FakeSourceOne
org.apache.spark.sql.sources.FakeSourceTwo
org.apache.spark.sql.sources.FakeSourceThree
org.apache.spark.sql.connector.FakePathBasedSource
org.apache.spark.sql.connector.FakePathBasedSourceWithSessionConfig
org.apache.spark.sql.sources.FakeSourceFour
org.apache.fakesource.FakeExternalSourceOne
org.apache.fakesource.FakeExternalSourceTwo
Expand Down

This file was deleted.