diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index d907d027b0edb..b19bde3d422fc 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -1445,6 +1445,12 @@ ], "sqlState" : "22023" }, + "CREATE_BRANCH_WITH_IF_NOT_EXISTS_AND_REPLACE" : { + "message" : [ + "ALTER TABLE CREATE BRANCH with both IF NOT EXISTS and OR REPLACE is not allowed." + ], + "sqlState" : "42601" + }, "CREATE_PERMANENT_VIEW_WITHOUT_ALIAS" : { "message" : [ "Not allowed to create the permanent view without explicitly assigning an alias for the expression ." diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md index 4f21b7b4b3c79..f477cc762381f 100644 --- a/docs/sql-ref-ansi-compliance.md +++ b/docs/sql-ref-ansi-compliance.md @@ -437,6 +437,8 @@ Below is a list of all the keywords in Spark SQL. |BINDING|non-reserved|non-reserved|non-reserved| |BOOLEAN|non-reserved|non-reserved|reserved| |BOTH|reserved|non-reserved|reserved| +|BRANCH|non-reserved|non-reserved|non-reserved| +|BRANCHES|non-reserved|non-reserved|non-reserved| |BUCKET|non-reserved|non-reserved|non-reserved| |BUCKETS|non-reserved|non-reserved|non-reserved| |BY|non-reserved|non-reserved|reserved| @@ -546,6 +548,7 @@ Below is a list of all the keywords in Spark SQL. |EXTERNAL|non-reserved|non-reserved|reserved| |EXTRACT|non-reserved|non-reserved|reserved| |FALSE|reserved|non-reserved|reserved| +|FAST|non-reserved|non-reserved|non-reserved| |FETCH|reserved|non-reserved|reserved| |FIELDS|non-reserved|non-reserved|non-reserved| |FILTER|reserved|non-reserved|reserved| @@ -558,6 +561,7 @@ Below is a list of all the keywords in Spark SQL. |FOREIGN|reserved|non-reserved|reserved| |FORMAT|non-reserved|non-reserved|non-reserved| |FORMATTED|non-reserved|non-reserved|non-reserved| +|FORWARD|non-reserved|non-reserved|non-reserved| |FOUND|non-reserved|non-reserved|non-reserved| |FROM|reserved|non-reserved|reserved| |FULL|reserved|strict-non-reserved|reserved| diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 index af71f441012c1..23a900fc517ee 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 @@ -156,6 +156,8 @@ BINARY: 'BINARY'; BINDING: 'BINDING'; BOOLEAN: 'BOOLEAN'; BOTH: 'BOTH'; +BRANCH: 'BRANCH'; +BRANCHES: 'BRANCHES'; BUCKET: 'BUCKET'; BUCKETS: 'BUCKETS'; BY: 'BY'; @@ -264,6 +266,7 @@ EXTENDED: 'EXTENDED'; EXTERNAL: 'EXTERNAL'; EXTRACT: 'EXTRACT'; FALSE: 'FALSE'; +FAST: 'FAST'; FETCH: 'FETCH'; FIELDS: 'FIELDS'; FILTER: 'FILTER'; @@ -276,6 +279,7 @@ FOR: 'FOR'; FOREIGN: 'FOREIGN'; FORMAT: 'FORMAT'; FORMATTED: 'FORMATTED'; +FORWARD: 'FORWARD'; FOUND: 'FOUND'; FROM: 'FROM'; FULL: 'FULL'; diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index 744c472b20179..f41cc4dd4431b 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -316,6 +316,14 @@ statement | ALTER TABLE identifierReference DROP CONSTRAINT (IF EXISTS)? name=identifier (RESTRICT | CASCADE)? #dropTableConstraint + | ALTER TABLE identifierReference + CREATE (OR REPLACE)? BRANCH (IF errorCapturingNot EXISTS)? + branch=identifier + (AS OF VERSION snapshotId=INTEGER_VALUE)? #createBranch + | ALTER TABLE identifierReference + DROP BRANCH (IF EXISTS)? branch=identifier #dropBranch + | ALTER TABLE identifierReference + FAST FORWARD branch=identifier TO target=identifier #fastForwardBranch | DROP TABLE (IF EXISTS)? identifierReference PURGE? #dropTable | DROP VIEW (IF EXISTS)? identifierReference #dropView | CREATE (OR REPLACE)? (GLOBAL? TEMPORARY)? @@ -374,6 +382,7 @@ statement | SHOW VIEWS ((FROM | IN) identifierReference)? (LIKE? pattern=stringLit)? #showViews | SHOW PARTITIONS identifierReference partitionSpec? #showPartitions + | SHOW BRANCHES (FROM | IN) identifierReference #showBranches | SHOW functionScope=simpleIdentifier? FUNCTIONS ((FROM | IN) ns=identifierReference)? (LIKE? (legacy=multipartIdentifier | pattern=stringLit))? #showFunctions | SHOW PROCEDURES ((FROM | IN) identifierReference)? #showProcedures @@ -587,11 +596,11 @@ query ; insertInto - : INSERT (WITH SCHEMA EVOLUTION)? OVERWRITE TABLE? identifierReference optionsClause? (partitionSpec (IF errorCapturingNot EXISTS)?)? ((BY NAME) | identifierList)? #insertOverwriteTable - | INSERT (WITH SCHEMA EVOLUTION)? INTO TABLE? identifierReference optionsClause? partitionSpec? (IF errorCapturingNot EXISTS)? ((BY NAME) | identifierList)? #insertIntoTable - | INSERT (WITH SCHEMA EVOLUTION)? INTO TABLE? identifierReference tableAlias optionsClause? (BY NAME)? + : INSERT (WITH SCHEMA EVOLUTION)? OVERWRITE TABLE? identifierReference temporalClause? optionsClause? (partitionSpec (IF errorCapturingNot EXISTS)?)? ((BY NAME) | identifierList)? #insertOverwriteTable + | INSERT (WITH SCHEMA EVOLUTION)? INTO TABLE? identifierReference temporalClause? optionsClause? partitionSpec? (IF errorCapturingNot EXISTS)? ((BY NAME) | identifierList)? #insertIntoTable + | INSERT (WITH SCHEMA EVOLUTION)? INTO TABLE? identifierReference temporalClause? tableAlias optionsClause? (BY NAME)? REPLACE (WHERE | ON) replaceCondition=booleanExpression #insertIntoReplaceBooleanCond - | INSERT (WITH SCHEMA EVOLUTION)? INTO TABLE? identifierReference tableAlias optionsClause? (BY NAME)? + | INSERT (WITH SCHEMA EVOLUTION)? INTO TABLE? identifierReference temporalClause? tableAlias optionsClause? (BY NAME)? REPLACE USING identifierList #insertIntoReplaceUsing | INSERT OVERWRITE LOCAL? DIRECTORY path=stringLit rowFormat? createFileFormat? #insertOverwriteHiveDir | INSERT OVERWRITE LOCAL? DIRECTORY (path=stringLit)? tableProvider (OPTIONS options=propertyList)? #insertOverwriteDir @@ -921,8 +930,10 @@ fromClause ; temporalClause - : FOR? (SYSTEM_VERSION | VERSION) AS OF version + : FOR? (SYSTEM_VERSION | VERSION) AS OF BRANCH branch=stringLit + | FOR? (SYSTEM_VERSION | VERSION) AS OF version | FOR? (SYSTEM_TIME | TIMESTAMP) AS OF timestamp=valueExpression + | FOR BRANCH branch=stringLit ; changesClause @@ -1957,6 +1968,8 @@ ansiNonReserved | BINARY_HEX | BINDING | BOOLEAN + | BRANCH + | BRANCHES | BUCKET | BUCKETS | BY @@ -2042,6 +2055,7 @@ ansiNonReserved | EXTENDED | EXTERNAL | EXTRACT + | FAST | FIELDS | FILEFORMAT | FIRST @@ -2050,6 +2064,7 @@ ansiNonReserved | FOLLOWING | FORMAT | FORMATTED + | FORWARD | FOUND | FUNCTION | FUNCTIONS @@ -2340,6 +2355,8 @@ nonReserved | BINDING | BOOLEAN | BOTH + | BRANCH + | BRANCHES | BUCKET | BUCKETS | BY @@ -2447,6 +2464,7 @@ nonReserved | EXTERNAL | EXTRACT | FALSE + | FAST | FETCH | FILTER | FIELDS @@ -2459,6 +2477,7 @@ nonReserved | FOREIGN | FORMAT | FORMATTED + | FORWARD | FROM | FOUND | FUNCTION diff --git a/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala b/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala index a3cbf8753f70a..cc543d0bb0cc7 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala @@ -712,6 +712,16 @@ private[sql] object QueryParsingErrors extends DataTypeErrorsBase { ctx) } + def createBranchWithBothIfNotExistsAndReplaceError( + tableName: String, + branchName: String, + ctx: ParserRuleContext): Throwable = { + new ParseException( + errorClass = "CREATE_BRANCH_WITH_IF_NOT_EXISTS_AND_REPLACE", + messageParameters = Map("tableName" -> tableName, "branchName" -> branchName), + ctx) + } + def temporaryViewWithSchemaBindingMode(ctx: StatementContext): Throwable = { new ParseException( errorClass = "UNSUPPORTED_FEATURE.TEMPORARY_VIEW_WITH_SCHEMA_BINDING_MODE", diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsBranching.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsBranching.java new file mode 100644 index 0000000000000..6829db39d680e --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsBranching.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import java.util.OptionalLong; + +import org.apache.spark.annotation.Evolving; + +/** + * A mix-in interface for {@link Table} branching support. Data sources can implement this + * interface to expose multi-branch capabilities (such as Iceberg branches) through standard + * Spark SQL DDL: + * + *
+ *   ALTER TABLE t CREATE [OR REPLACE] BRANCH [IF NOT EXISTS] name [AS OF VERSION snapshotId]
+ *   ALTER TABLE t DROP BRANCH [IF EXISTS] name
+ *   ALTER TABLE t FAST FORWARD branch TO target
+ *   SHOW BRANCHES IN t
+ * 
+ * + *

The meaning of {@code snapshotId} is left to the data source. When a snapshot id is not + * supplied the implementation should branch from the current snapshot of the table. + * + * @since 4.3.0 + */ +@Evolving +public interface SupportsBranching extends Table { + + /** + * Create a new branch on this table. + * + * @param name the branch name; must not be {@code null} + * @param sourceSnapshotId an optional snapshot id to branch from. If empty, the branch is + * created from the table's current snapshot. + * @return a {@link TableBranch} describing the new branch + * @throws BranchAlreadyExistsException if a branch with the given name already exists + */ + TableBranch createBranch(String name, OptionalLong sourceSnapshotId); + + /** + * Replace an existing branch (or create it if missing). + * + *

The default implementation drops the branch (if present) and then creates it again. + * Implementations may override this to perform the replacement atomically. + */ + default TableBranch replaceBranch(String name, OptionalLong sourceSnapshotId) { + dropBranch(name); + return createBranch(name, sourceSnapshotId); + } + + /** + * Drop the named branch. + * + * @return {@code true} if the branch was removed; {@code false} if it did not exist. + */ + boolean dropBranch(String name); + + /** + * Fast-forward {@code branch} to the head of {@code targetBranch}. + * + *

The operation succeeds only when the branch's current snapshot is an ancestor of the + * target branch's current snapshot. Implementations should throw + * {@link InvalidFastForwardException} otherwise. + * + * @return the updated {@link TableBranch} after fast-forwarding + * @throws BranchNotFoundException if either branch is missing + * @throws InvalidFastForwardException if the operation would not be a fast-forward + */ + TableBranch fastForward(String branch, String targetBranch); + + /** + * List all branches of this table. The default implementation returns an empty array. + */ + default TableBranch[] listBranches() { + return new TableBranch[0]; + } + + /** + * Return a {@link Table} view that targets the named branch. Reads issued against the returned + * table see the rows currently on that branch; writes commit into that branch. + * + *

The returned table shares schema with the parent table. Implementations are free to return + * the same instance and remember the branch via internal state, or to return a wrapper. + * + * @throws BranchNotFoundException if the branch does not exist + */ + Table loadBranch(String branchName); + + /** + * Thrown when a branch with the requested name already exists. + */ + class BranchAlreadyExistsException extends RuntimeException { + public BranchAlreadyExistsException(String branchName) { + super("Branch already exists: " + branchName); + } + } + + /** + * Thrown when an operation references a branch that does not exist on the table. + */ + class BranchNotFoundException extends RuntimeException { + public BranchNotFoundException(String branchName) { + super("Branch not found: " + branchName); + } + } + + /** + * Thrown by {@link #fastForward(String, String)} when the requested update would not be a + * fast-forward, i.e. the branch's current snapshot is not an ancestor of the target + * branch's current snapshot. + */ + class InvalidFastForwardException extends RuntimeException { + public InvalidFastForwardException(String message) { + super(message); + } + } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableBranch.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableBranch.java new file mode 100644 index 0000000000000..c9317b6d90f75 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableBranch.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import java.util.Objects; +import java.util.OptionalLong; + +import org.apache.spark.annotation.Evolving; + +/** + * Describes a named branch of a {@link Table} that supports branching. + *

+ * A branch is a named reference to a particular snapshot of a table. The interpretation of + * {@code snapshotId} is left to the data source. + * + * @since 4.3.0 + */ +@Evolving +public final class TableBranch { + private final String name; + private final OptionalLong snapshotId; + private final long creationTimeMs; + + public TableBranch(String name, OptionalLong snapshotId, long creationTimeMs) { + this.name = Objects.requireNonNull(name, "name"); + this.snapshotId = Objects.requireNonNull(snapshotId, "snapshotId"); + this.creationTimeMs = creationTimeMs; + } + + public TableBranch(String name, long snapshotId, long creationTimeMs) { + this(name, OptionalLong.of(snapshotId), creationTimeMs); + } + + /** The branch name. */ + public String name() { + return name; + } + + /** + * The snapshot the branch points at, if known. Some implementations may not expose snapshot + * identifiers (for example, an empty table that has no snapshots yet). + */ + public OptionalLong snapshotId() { + return snapshotId; + } + + /** Milliseconds since the epoch at which the branch was created. */ + public long creationTimeMs() { + return creationTimeMs; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + TableBranch that = (TableBranch) o; + return creationTimeMs == that.creationTimeMs + && name.equals(that.name) + && snapshotId.equals(that.snapshotId); + } + + @Override + public int hashCode() { + return Objects.hash(name, snapshotId, creationTimeMs); + } + + @Override + public String toString() { + return "TableBranch{name=" + name + + ", snapshotId=" + snapshotId + + ", creationTimeMs=" + creationTimeMs + "}"; + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 24a16a6270b40..0f3d8a36480ca 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1130,9 +1130,10 @@ class Analyzer( case r: V2TableReference => relationResolution.resolveReference(r) - case r @ RelationTimeTravel(u: UnresolvedRelation, timestamp, version) + case r @ RelationTimeTravel(u: UnresolvedRelation, timestamp, version, branch) if timestamp.forall(ts => ts.resolved && !SubqueryExpression.hasSubquery(ts)) => - val timeTravelSpec = TimeTravelSpec.create(timestamp, version, conf.sessionLocalTimeZone) + val timeTravelSpec = + TimeTravelSpec.create(timestamp, version, branch, conf.sessionLocalTimeZone) resolveRelation(u, timeTravelSpec).getOrElse(r) case r @ RelationChanges(u: UnresolvedRelation, ctx) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala index 1623fa98dad6d..dfbb772f91302 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala @@ -414,7 +414,7 @@ object CTESubstitution extends Rule[LogicalPlan] { plan.resolveOperatorsUpWithPruning( _.containsAnyPattern(RELATION_TIME_TRAVEL, RELATION_CHANGES, UNRESOLVED_RELATION, PLAN_EXPRESSION, UNRESOLVED_IDENTIFIER, PLAN_WITH_UNRESOLVED_IDENTIFIER)) { - case RelationTimeTravel(UnresolvedRelation(Seq(table), _, _), _, _) + case RelationTimeTravel(UnresolvedRelation(Seq(table), _, _), _, _, _) if cteRelations.exists(r => plan.conf.resolver(r._1, table)) => throw QueryCompilationErrors.timeTravelUnsupportedError(toSQLId(table)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 9c4fbd719a966..f104172442b87 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -616,7 +616,7 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString if (stagedError.isDefined) stagedError.get.apply() operator match { - case RelationTimeTravel(u: UnresolvedRelation, _, _) => + case RelationTimeTravel(u: UnresolvedRelation, _, _, _) => u.tableNotFound( u.multipartIdentifier, searchPathForUnresolvedRelation(u.multipartIdentifier)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala index 55a7ad10790ea..10ce15fa6290b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala @@ -167,6 +167,7 @@ class RelationResolution( u.options, conf.getConf(SQLConf.TIME_TRAVEL_TIMESTAMP_KEY), conf.getConf(SQLConf.TIME_TRAVEL_VERSION_KEY), + UnresolvedRelation.BRANCH_AS_OF, conf.sessionLocalTimeZone ) if (timeTravelSpec.nonEmpty && timeTravelSpecFromOptions.nonEmpty) { @@ -232,7 +233,17 @@ class RelationResolution( private def tryResolvePersistent( u: UnresolvedRelation, identifier: Seq[String], - finalTimeTravelSpec: Option[TimeTravelSpec]): Option[LogicalPlan] = { + explicitTimeTravelSpec: Option[TimeTravelSpec]): Option[LogicalPlan] = { + // Apply the session default branch only on the persistent-relation path; temp views + // and shared relations cannot be branched. + val defaultBranch = conf.getConf(SQLConf.DEFAULT_BRANCH) + val finalTimeTravelSpec = if (explicitTimeTravelSpec.isDefined) { + explicitTimeTravelSpec + } else if (defaultBranch != null && defaultBranch.nonEmpty) { + Some(AsOfBranch(defaultBranch, isExplicit = false)) + } else { + None + } expandIdentifier(identifier) match { case CatalogAndIdentifier(catalog, ident) => val key = toCacheKey(catalog, ident, finalTimeTravelSpec) @@ -242,7 +253,7 @@ class RelationResolution( .map(adaptCachedRelation(_, planId)) .orElse { val writePrivileges = u.options.get(UnresolvedRelation.REQUIRED_WRITE_PRIVILEGES) - val finalOptions = u.clearWritePrivileges.options + val finalOptions = u.clearWritePrivileges.clearBranchAsOf.options // For a `TableViewCatalog` with no time-travel / write privileges, the single-RPC // `loadTableOrView` answers both "is there a table?" and "is there a view?" in one // call. Time-travel and write privileges apply to tables only, so for those the diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationTimeTravel.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationTimeTravel.scala index 6e0d0998883c9..0e5f8a554d467 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationTimeTravel.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationTimeTravel.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.{RELATION_TIME_TRAVEL, Tr case class RelationTimeTravel( relation: LogicalPlan, timestamp: Option[Expression], - version: Option[String]) extends UnresolvedLeafNode { + version: Option[String], + branch: Option[String] = None) extends UnresolvedLeafNode { override val nodePatterns: Seq[TreePattern] = Seq(RELATION_TIME_TRAVEL) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeTravelSpec.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeTravelSpec.scala index d2fe2ad602c12..40026a124bce7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeTravelSpec.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeTravelSpec.scala @@ -35,6 +35,10 @@ case class AsOfVersion(version: String) extends TimeTravelSpec { override def toString: String = s"VERSION AS OF '$version'" } +case class AsOfBranch(branch: String, isExplicit: Boolean = true) extends TimeTravelSpec { + override def toString: String = s"BRANCH '$branch'" +} + object TimeTravelSpec { /** @@ -75,8 +79,10 @@ object TimeTravelSpec { def create( timestamp: Option[Expression], version: Option[String], + branch: Option[String], sessionLocalTimeZone: String) : Option[TimeTravelSpec] = { - if (timestamp.nonEmpty && version.nonEmpty) { + val nonEmptyCount = Seq(timestamp.nonEmpty, version.nonEmpty, branch.nonEmpty).count(identity) + if (nonEmptyCount > 1) { throw QueryCompilationErrors.invalidTimeTravelSpecError() } else if (timestamp.nonEmpty) { val ts = timestamp.get @@ -84,39 +90,66 @@ object TimeTravelSpec { Some(AsOfTimestamp(resolveTimestampExpression(ts, sessionLocalTimeZone))) } else if (version.nonEmpty) { Some(AsOfVersion(version.get)) + } else if (branch.nonEmpty) { + Some(AsOfBranch(branch.get)) } else { None } } + // Kept for binary compatibility with callers that only know about timestamp/version. + def create( + timestamp: Option[Expression], + version: Option[String], + sessionLocalTimeZone: String) : Option[TimeTravelSpec] = { + create(timestamp, version, None, sessionLocalTimeZone) + } + def fromOptions( options: CaseInsensitiveStringMap, timestampKey: String, versionKey: String, sessionLocalTimeZone: String): Option[TimeTravelSpec] = { - (Option(options.get(timestampKey)), Option(options.get(versionKey))) match { - case (Some(_), Some(_)) => - throw QueryCompilationErrors.invalidTimeTravelSpecError() + fromOptions(options, timestampKey, versionKey, branchKey = null, sessionLocalTimeZone) + } - case (Some(timestampStr), None) => - val timestampValue = Cast( - Literal(timestampStr), - TimestampType, - Some(sessionLocalTimeZone), - ansiEnabled = false - ).eval() - if (timestampValue == null) { - throw new AnalysisException( - "INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.OPTION", - Map("expr" -> s"'$timestampStr'") - ) - } - Some(AsOfTimestamp(timestampValue.asInstanceOf[Long])) + def fromOptions( + options: CaseInsensitiveStringMap, + timestampKey: String, + versionKey: String, + branchKey: String, + sessionLocalTimeZone: String): Option[TimeTravelSpec] = { + val timestampOpt = Option(options.get(timestampKey)) + val versionOpt = Option(options.get(versionKey)) + val branchOpt = Option(branchKey).flatMap(k => Option(options.get(k))) - case (None, Some(versionStr)) => - Some(AsOfVersion(versionStr)) + val provided = Seq(timestampOpt.nonEmpty, versionOpt.nonEmpty, branchOpt.nonEmpty) + .count(identity) + if (provided > 1) { + throw QueryCompilationErrors.invalidTimeTravelSpecError() + } - case _ => None + if (timestampOpt.isDefined) { + val timestampStr = timestampOpt.get + val timestampValue = Cast( + Literal(timestampStr), + TimestampType, + Some(sessionLocalTimeZone), + ansiEnabled = false + ).eval() + if (timestampValue == null) { + throw new AnalysisException( + "INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.OPTION", + Map("expr" -> s"'$timestampStr'") + ) + } + Some(AsOfTimestamp(timestampValue.asInstanceOf[Long])) + } else if (versionOpt.isDefined) { + Some(AsOfVersion(versionOpt.get)) + } else if (branchOpt.isDefined) { + Some(AsOfBranch(branchOpt.get)) + } else { + None } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index a5b467d0f0816..c49480a9c82d9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -163,6 +163,17 @@ case class UnresolvedRelation( } } + def clearBranchAsOf: UnresolvedRelation = { + if (options.containsKey(UnresolvedRelation.BRANCH_AS_OF)) { + val newOptions = new java.util.HashMap[String, String] + newOptions.putAll(options) + newOptions.remove(UnresolvedRelation.BRANCH_AS_OF) + copy(options = new CaseInsensitiveStringMap(newOptions)) + } else { + this + } + } + final override val nodePatterns: Seq[TreePattern] = Seq(UNRESOLVED_RELATION) } @@ -171,6 +182,10 @@ object UnresolvedRelation { // writing data to this relation. val REQUIRED_WRITE_PRIVILEGES = "__required_write_privileges__" + // An internal option of `UnresolvedRelation` to specify that reads/writes should target a + // particular named branch on a `SupportsBranching` table. + val BRANCH_AS_OF = "__branch_as_of__" + def apply( tableIdentifier: TableIdentifier, extraOptions: CaseInsensitiveStringMap, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index ff69be1956c6b..cc20534e389f0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -905,7 +905,8 @@ class AstBuilder extends DataTypeAstBuilder partitionSpec: Map[String, Option[String]], ifPartitionNotExists: Boolean, byName: Boolean, - replaceCriteriaOpt: Option[InsertReplaceCriteria] = None) + replaceCriteriaOpt: Option[InsertReplaceCriteria] = None, + temporal: Option[TemporalClauseContext] = None) /** * Parameters used for writing query to a directory: (isLocal, CatalogStorageFormat, provider). @@ -944,7 +945,8 @@ class AstBuilder extends DataTypeAstBuilder createInsertIntoStatement( insertParams = insertParams, tableSlot = buildWriteTableSlot( - insertParams.relationCtx, insertParams.options, privileges), + insertParams.relationCtx, insertParams.options, privileges, + insertParams.temporal.orNull), query = query, overwrite = false, withSchemaEvolution = table.EVOLUTION() != null) @@ -954,7 +956,8 @@ class AstBuilder extends DataTypeAstBuilder createInsertIntoStatement( insertParams = insertParams, tableSlot = buildWriteTableSlot( - insertParams.relationCtx, insertParams.options, privileges), + insertParams.relationCtx, insertParams.options, privileges, + insertParams.temporal.orNull), query = query, overwrite = true, withSchemaEvolution = table.EVOLUTION() != null) @@ -981,7 +984,8 @@ class AstBuilder extends DataTypeAstBuilder // `PlanWithUnresolvedIdentifier` is a `NamedRelation`, so it can occupy // `OverwriteByExpression.table` directly; the materialization happens in // `ResolveIdentifierClause` via its `OverwriteByExpression` special-case. - val table = buildWriteTableSlot(ctx.identifierReference, options, privileges) + val table = buildWriteTableSlot( + ctx.identifierReference, options, privileges, ctx.temporalClause()) val deleteExpr = expression(ctx.replaceCondition) val isByName = ctx.NAME() != null if (isByName) { @@ -1012,7 +1016,8 @@ class AstBuilder extends DataTypeAstBuilder createInsertIntoStatement( insertParams = insertParams, tableSlot = buildWriteTableSlot( - insertParams.relationCtx, insertParams.options, privileges), + insertParams.relationCtx, insertParams.options, privileges, + insertParams.temporal.orNull), query = finalQuery, overwrite = true, withSchemaEvolution = ctx.EVOLUTION() != null) @@ -1023,7 +1028,8 @@ class AstBuilder extends DataTypeAstBuilder createInsertIntoStatement( insertParams = insertParams, tableSlot = buildWriteTableSlot( - insertParams.relationCtx, insertParams.options, privileges), + insertParams.relationCtx, insertParams.options, privileges, + insertParams.temporal.orNull), query = query, overwrite = true, withSchemaEvolution = ctx.EVOLUTION() != null) @@ -1059,7 +1065,8 @@ class AstBuilder extends DataTypeAstBuilder userSpecifiedCols = userSpecifiedCols, partitionSpec = partitionSpec, ifPartitionNotExists = false, - byName = ctx.NAME() != null) + byName = ctx.NAME() != null, + temporal = Option(ctx.temporalClause())) } /** @@ -1086,7 +1093,8 @@ class AstBuilder extends DataTypeAstBuilder userSpecifiedCols = userSpecifiedCols, partitionSpec = partitionSpec, ifPartitionNotExists = ctx.EXISTS() != null, - byName = ctx.NAME() != null) + byName = ctx.NAME() != null, + temporal = Option(ctx.temporalClause())) } /** @@ -1107,7 +1115,8 @@ class AstBuilder extends DataTypeAstBuilder relationCtx = ctx.identifierReference(), optionsCtx = ctx.optionsClause(), byName = byName, - replaceCriteriaOpt = Some(InsertReplaceUsing(replaceUsingCols)) + replaceCriteriaOpt = Some(InsertReplaceUsing(replaceUsingCols)), + temporalCtx = ctx.temporalClause() ) } @@ -1131,7 +1140,8 @@ class AstBuilder extends DataTypeAstBuilder relationCtx = ctx.identifierReference(), optionsCtx = ctx.optionsClause(), byName = byName, - replaceCriteriaOpt = Some(InsertReplaceOn(replaceOnCond, tableAliasOpt)) + replaceCriteriaOpt = Some(InsertReplaceOn(replaceOnCond, tableAliasOpt)), + temporalCtx = ctx.temporalClause() ) } @@ -1139,7 +1149,8 @@ class AstBuilder extends DataTypeAstBuilder relationCtx: IdentifierReferenceContext, optionsCtx: OptionsClauseContext, byName: Boolean, - replaceCriteriaOpt: Option[InsertReplaceCriteria]): InsertTableParams = { + replaceCriteriaOpt: Option[InsertReplaceCriteria], + temporalCtx: TemporalClauseContext = null): InsertTableParams = { InsertTableParams( relationCtx = relationCtx, options = Option(optionsCtx), @@ -1147,7 +1158,8 @@ class AstBuilder extends DataTypeAstBuilder partitionSpec = Map[String, Option[String]](), ifPartitionNotExists = false, byName = byName, - replaceCriteriaOpt = replaceCriteriaOpt) + replaceCriteriaOpt = replaceCriteriaOpt, + temporal = Option(temporalCtx)) } /** @@ -1186,10 +1198,38 @@ class AstBuilder extends DataTypeAstBuilder private def buildWriteTableSlot( ctx: IdentifierReferenceContext, optionsClause: Option[OptionsClauseContext], - writePrivileges: Set[TableWritePrivilege]): NamedRelation = { - withIdentClause(ctx, parts => + writePrivileges: Set[TableWritePrivilege], + temporalCtx: TemporalClauseContext = null): NamedRelation = { + val base = withIdentClause(ctx, parts => createUnresolvedRelation(ctx, parts, optionsClause, writePrivileges, isStreaming = false)) .asInstanceOf[NamedRelation] + if (temporalCtx == null) { + base + } else { + val version = visitVersion(temporalCtx.version) + val timestamp = Option(temporalCtx.timestamp).map(expression) + val branch = Option(temporalCtx.branch).map(b => string(visitStringLit(b))) + if (branch.isEmpty) { + throw QueryParsingErrors.invalidTimeTravelSpec( + "writes can only target a named branch via FOR BRANCH or VERSION AS OF BRANCH", + temporalCtx) + } + if (version.nonEmpty || timestamp.nonEmpty) { + throw QueryParsingErrors.invalidTimeTravelSpec( + "BRANCH cannot be combined with VERSION or TIMESTAMP", + temporalCtx) + } + base match { + case u: UnresolvedRelation => + val newOptions = new java.util.HashMap[String, String] + newOptions.putAll(u.options) + newOptions.put(UnresolvedRelation.BRANCH_AS_OF, branch.get) + u.copy(options = new CaseInsensitiveStringMap(newOptions)) + case other => + throw new IllegalStateException( + s"FOR BRANCH on write requires a constant table identifier; got ${other.getClass}") + } + } } /** @@ -2632,14 +2672,14 @@ class AstBuilder extends DataTypeAstBuilder private def withTimeTravel( ctx: TemporalClauseContext, plan: LogicalPlan): LogicalPlan = withOrigin(ctx) { - val v = ctx.version val version = visitVersion(ctx.version) val timestamp = Option(ctx.timestamp).map(expression) + val branch = Option(ctx.branch).map(b => string(visitStringLit(b))) if (timestamp.exists(_.references.nonEmpty)) { throw QueryParsingErrors.invalidTimeTravelSpec( "timestamp expression cannot refer to any columns", ctx.timestamp) } - RelationTimeTravel(plan, timestamp, version) + RelationTimeTravel(plan, timestamp, version, branch) } /** @@ -6738,6 +6778,76 @@ class AstBuilder extends DataTypeAstBuilder "ALTER TABLE ... RECOVER PARTITIONS")) } + /** + * Create a [[CreateBranch]]. + * + * For example: + * {{{ + * ALTER TABLE t CREATE BRANCH name [AS OF VERSION 123] + * ALTER TABLE t CREATE OR REPLACE BRANCH name [AS OF VERSION 123] + * ALTER TABLE t CREATE BRANCH IF NOT EXISTS name + * }}} + */ + override def visitCreateBranch(ctx: CreateBranchContext): LogicalPlan = withOrigin(ctx) { + val ifNotExists = ctx.EXISTS() != null + val replace = ctx.REPLACE() != null + if (ifNotExists && replace) { + throw QueryParsingErrors.createBranchWithBothIfNotExistsAndReplaceError( + ctx.identifierReference().getText, ctx.branch.getText, ctx) + } + val snapshotId = Option(ctx.snapshotId).map(_.getText.toLong) + CreateBranch( + createUnresolvedTable(ctx.identifierReference, "ALTER TABLE ... CREATE BRANCH"), + ctx.branch.getText, + snapshotId, + ifNotExists, + replace) + } + + /** + * Create a [[DropBranch]]. + * + * For example: + * {{{ + * ALTER TABLE t DROP BRANCH [IF EXISTS] name + * }}} + */ + override def visitDropBranch(ctx: DropBranchContext): LogicalPlan = withOrigin(ctx) { + DropBranch( + createUnresolvedTable(ctx.identifierReference, "ALTER TABLE ... DROP BRANCH"), + ctx.branch.getText, + ctx.EXISTS() != null) + } + + /** + * Create a [[FastForwardBranch]]. + * + * For example: + * {{{ + * ALTER TABLE t FAST FORWARD branch TO target + * }}} + */ + override def visitFastForwardBranch( + ctx: FastForwardBranchContext): LogicalPlan = withOrigin(ctx) { + FastForwardBranch( + createUnresolvedTable(ctx.identifierReference, "ALTER TABLE ... FAST FORWARD"), + ctx.branch.getText, + ctx.target.getText) + } + + /** + * Create a [[ShowBranches]]. + * + * For example: + * {{{ + * SHOW BRANCHES (FROM | IN) table + * }}} + */ + override def visitShowBranches(ctx: ShowBranchesContext): LogicalPlan = withOrigin(ctx) { + ShowBranches( + createUnresolvedTable(ctx.identifierReference, "SHOW BRANCHES")) + } + /** * Create an [[AddPartitions]]. * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 40cf5009b97dc..b59fb8970ec98 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -1697,6 +1697,59 @@ case class RecoverPartitions(child: LogicalPlan) extends UnaryCommand { copy(child = newChild) } +/** + * The logical plan of the ALTER TABLE ... CREATE BRANCH command. + */ +case class CreateBranch( + child: LogicalPlan, + branchName: String, + sourceSnapshotId: Option[Long], + ifNotExists: Boolean, + replace: Boolean) extends UnaryCommand { + override protected def withNewChildInternal(newChild: LogicalPlan): CreateBranch = + copy(child = newChild) +} + +/** + * The logical plan of the ALTER TABLE ... DROP BRANCH command. + */ +case class DropBranch( + child: LogicalPlan, + branchName: String, + ifExists: Boolean) extends UnaryCommand { + override protected def withNewChildInternal(newChild: LogicalPlan): DropBranch = + copy(child = newChild) +} + +/** + * The logical plan of the ALTER TABLE ... FAST FORWARD command. + */ +case class FastForwardBranch( + child: LogicalPlan, + branchName: String, + targetBranchName: String) extends UnaryCommand { + override protected def withNewChildInternal(newChild: LogicalPlan): FastForwardBranch = + copy(child = newChild) +} + +/** + * The logical plan of the SHOW BRANCHES command. + */ +case class ShowBranches( + child: LogicalPlan, + override val output: Seq[Attribute] = ShowBranches.getOutputAttrs) extends UnaryCommand { + override protected def withNewChildInternal(newChild: LogicalPlan): ShowBranches = + copy(child = newChild) +} + +object ShowBranches { + def getOutputAttrs: Seq[Attribute] = Seq( + AttributeReference("name", StringType, nullable = false)(), + AttributeReference("snapshot_id", org.apache.spark.sql.types.LongType, nullable = true)(), + AttributeReference("creation_time_ms", + org.apache.spark.sql.types.LongType, nullable = false)()) +} + /** * The logical plan of the LOAD DATA INTO TABLE command. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index a7c5cf5e54311..aa07de69fcc9f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -25,7 +25,7 @@ import scala.jdk.CollectionConverters._ import org.apache.spark.{SparkException, SparkIllegalArgumentException} import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.CurrentUserContext -import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchFunctionException, NoSuchTableException, RelationCache, TimeTravelSpec} +import org.apache.spark.sql.catalyst.analysis.{AsOfBranch, AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchFunctionException, NoSuchTableException, RelationCache, TimeTravelSpec} import org.apache.spark.sql.catalyst.catalog.ClusterBySpec import org.apache.spark.sql.catalyst.expressions.{Expression, Literal, V2ExpressionUtils} import org.apache.spark.sql.catalyst.plans.logical.{SerdeInfo, TableSpec} @@ -35,6 +35,7 @@ import org.apache.spark.sql.connector.catalog.TableChange._ import org.apache.spark.sql.connector.catalog.constraints.Constraint import org.apache.spark.sql.connector.catalog.functions.UnboundFunction import org.apache.spark.sql.connector.expressions.{ClusterByTransform, LiteralValue, Transform} +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{ArrayType, MapType, Metadata, MetadataBuilder, StructField, StructType} @@ -466,22 +467,39 @@ private[sql] object CatalogV2Util { ident: Identifier, timeTravelSpec: Option[TimeTravelSpec] = None, writePrivilegesString: Option[String] = None): Table = { - if (timeTravelSpec.nonEmpty) { + val branchSpec = timeTravelSpec.collect { case b: AsOfBranch => b } + val nonBranchSpec = timeTravelSpec.filter { + case _: AsOfBranch => false + case _ => true + } + if (nonBranchSpec.nonEmpty) { assert(writePrivilegesString.isEmpty, "Should not write to a table with time travel") - timeTravelSpec.get match { - case v: AsOfVersion => - catalog.asTableCatalog.loadTable(ident, v.version) - case ts: AsOfTimestamp => - catalog.asTableCatalog.loadTable(ident, ts.timestamp) - } - } else { - if (writePrivilegesString.isDefined) { - val writePrivileges = writePrivilegesString.get.split(",").map(_.trim) - .map(TableWritePrivilege.valueOf).toSet.asJava - catalog.asTableCatalog.loadTable(ident, writePrivileges) - } else { - catalog.asTableCatalog.loadTable(ident) - } + } + val baseTable: Table = nonBranchSpec match { + case Some(v: AsOfVersion) => + catalog.asTableCatalog.loadTable(ident, v.version) + case Some(ts: AsOfTimestamp) => + catalog.asTableCatalog.loadTable(ident, ts.timestamp) + case _ => + if (writePrivilegesString.isDefined) { + val writePrivileges = writePrivilegesString.get.split(",").map(_.trim) + .map(TableWritePrivilege.valueOf).toSet.asJava + catalog.asTableCatalog.loadTable(ident, writePrivileges) + } else { + catalog.asTableCatalog.loadTable(ident) + } + } + branchSpec match { + case Some(spec) => + baseTable match { + case b: SupportsBranching => b.loadBranch(spec.branch) + case other if spec.isExplicit => + throw QueryCompilationErrors.tableDoesNotSupportBranchingError(other) + case other => + // Non-explicit (session default) branch: silently ignore for non-branching tables. + other + } + case None => baseTable } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index d5a9cc723bc34..05dc4ac865f27 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -1709,6 +1709,10 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat tableDoesNotSupportError("truncates", table) } + def tableDoesNotSupportBranchingError(table: Table): Throwable = { + tableDoesNotSupportError("branching", table) + } + def tableDoesNotSupportPartitionManagementError(table: Table): Throwable = { tableDoesNotSupportError("partition management", table) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala index b24885270f52d..d85dfda080a62 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala @@ -22,7 +22,7 @@ import scala.jdk.CollectionConverters._ import org.apache.spark.sql.catalyst.analysis.{PartitionSpec, ResolvedPartitionSpec, UnresolvedPartitionSpec} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, MetadataAttribute} import org.apache.spark.sql.catalyst.types.DataTypeUtils -import org.apache.spark.sql.connector.catalog.{MetadataColumn, SupportsAtomicPartitionManagement, SupportsDeleteV2, SupportsPartitionManagement, SupportsRead, SupportsWrite, Table, TableCapability, TruncatableTable} +import org.apache.spark.sql.connector.catalog.{MetadataColumn, SupportsAtomicPartitionManagement, SupportsBranching, SupportsDeleteV2, SupportsPartitionManagement, SupportsRead, SupportsWrite, Table, TableCapability, TruncatableTable} import org.apache.spark.sql.connector.write.RowLevelOperationTable import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.types.{StructField, StructType} @@ -65,6 +65,14 @@ object DataSourceV2Implicits { } } + def asBranchable: SupportsBranching = { + table match { + case t: SupportsBranching => t + case _ => + throw QueryCompilationErrors.tableDoesNotSupportBranchingError(table) + } + } + def supportsPartitions: Boolean = table match { case _: SupportsPartitionManagement => true case _ => false diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 270b8aa31a565..43a9c11aefbda 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -6912,6 +6912,17 @@ object SQLConf { .stringConf .createWithDefault("versionAsOf") + val DEFAULT_BRANCH = + buildConf("spark.sql.defaultBranch") + .doc("When non-empty, sets a default branch for reads and writes against data sources " + + "that implement SupportsBranching. An explicit FOR BRANCH or VERSION AS OF BRANCH " + + "clause in the query takes precedence. Tables that do not implement SupportsBranching " + + "ignore this setting.") + .version("4.3.0") + .withBindingPolicy(ConfigBindingPolicy.SESSION) + .stringConf + .createWithDefault("") + val OPERATOR_PIPE_SYNTAX_ENABLED = buildConf("spark.sql.operatorPipeSyntaxEnabled") .doc("If true, enable operator pipe syntax for Apache Spark SQL. This uses the operator " + diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 1db22037d31f8..6a33b74081dba 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -3662,4 +3662,77 @@ class DDLParserSuite extends AnalysisTest { ) } } + + test("alter table: create branch") { + comparePlans( + parsePlan("ALTER TABLE t CREATE BRANCH b"), + CreateBranch( + UnresolvedTable(Seq("t"), "ALTER TABLE ... CREATE BRANCH"), + "b", + None, + ifNotExists = false, + replace = false)) + + comparePlans( + parsePlan("ALTER TABLE t CREATE BRANCH IF NOT EXISTS b"), + CreateBranch( + UnresolvedTable(Seq("t"), "ALTER TABLE ... CREATE BRANCH"), + "b", + None, + ifNotExists = true, + replace = false)) + + comparePlans( + parsePlan("ALTER TABLE t CREATE OR REPLACE BRANCH b AS OF VERSION 42"), + CreateBranch( + UnresolvedTable(Seq("t"), "ALTER TABLE ... CREATE BRANCH"), + "b", + Some(42L), + ifNotExists = false, + replace = true)) + } + + test("alter table: create branch with both REPLACE and IF NOT EXISTS is rejected") { + val sqlText = "ALTER TABLE t CREATE OR REPLACE BRANCH IF NOT EXISTS b" + checkError( + exception = parseException(sqlText), + condition = "CREATE_BRANCH_WITH_IF_NOT_EXISTS_AND_REPLACE", + parameters = Map("tableName" -> "t", "branchName" -> "b"), + context = ExpectedContext(fragment = sqlText, start = 0, stop = sqlText.length - 1)) + } + + test("alter table: drop branch") { + comparePlans( + parsePlan("ALTER TABLE t DROP BRANCH b"), + DropBranch( + UnresolvedTable(Seq("t"), "ALTER TABLE ... DROP BRANCH"), + "b", + ifExists = false)) + + comparePlans( + parsePlan("ALTER TABLE t DROP BRANCH IF EXISTS b"), + DropBranch( + UnresolvedTable(Seq("t"), "ALTER TABLE ... DROP BRANCH"), + "b", + ifExists = true)) + } + + test("alter table: fast forward") { + comparePlans( + parsePlan("ALTER TABLE t FAST FORWARD b TO target"), + FastForwardBranch( + UnresolvedTable(Seq("t"), "ALTER TABLE ... FAST FORWARD"), + "b", + "target")) + } + + test("show branches") { + comparePlans( + parsePlan("SHOW BRANCHES IN t"), + ShowBranches(UnresolvedTable(Seq("t"), "SHOW BRANCHES"))) + + comparePlans( + parsePlan("SHOW BRANCHES FROM t"), + ShowBranches(UnresolvedTable(Seq("t"), "SHOW BRANCHES"))) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index 5370d1ee8d313..660b25f4a9632 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -2174,6 +2174,22 @@ class PlanParserSuite extends AnalysisTest { Seq("current_date"), Nil, isDistinct = false)) :: Nil, OneRowRelation()))), None))) + // FOR BRANCH and VERSION AS OF BRANCH both produce a RelationTimeTravel with a branch. + def testBranch(sqlText: String): Unit = { + comparePlans( + parsePlan(sqlText), + Project(Seq(UnresolvedStar(None)), + RelationTimeTravel( + UnresolvedRelation(Seq("a", "b", "c")), + None, + None, + Some("dev")))) + } + testBranch("SELECT * FROM a.b.c FOR BRANCH 'dev'") + testBranch("SELECT * FROM a.b.c VERSION AS OF BRANCH 'dev'") + testBranch("SELECT * FROM a.b.c SYSTEM_VERSION AS OF BRANCH 'dev'") + testBranch("SELECT * FROM a.b.c FOR VERSION AS OF BRANCH 'dev'") + val sql = "SELECT * FROM a.b.c TIMESTAMP AS OF col" val fragment = "TIMESTAMP AS OF col" checkError( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala index 66db9c18fa981..75124b1d037a7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala @@ -18,7 +18,10 @@ package org.apache.spark.sql.connector.catalog import java.util -import java.util.{Objects, UUID} +import java.util.{Objects, OptionalLong, UUID} +import java.util.concurrent.ConcurrentHashMap + +import scala.jdk.CollectionConverters._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.catalog.constraints.Constraint @@ -50,7 +53,79 @@ class InMemoryTable( override val id: String = UUID.randomUUID().toString) extends InMemoryBaseTable(name, columns, partitioning, properties, constraints, distribution, ordering, numPartitions, advisoryPartitionSize, isDistributionStrictlyRequired, - numRowsPerSplit) with SupportsDelete { + numRowsPerSplit) with SupportsDelete with SupportsBranching { + + private val branches = new ConcurrentHashMap[String, TableBranch]() + // Data lives in an independent InMemoryTable per branch so that reads/writes targeting a + // branch see only that branch's rows. Branch tables share schema, partitioning, and + // properties with the parent. + private val branchTables = new ConcurrentHashMap[String, InMemoryTable]() + + override def createBranch(branchName: String, + sourceSnapshotId: OptionalLong): TableBranch = { + val snapshot = if (sourceSnapshotId.isPresent) sourceSnapshotId.getAsLong + else version().toLong + val branch = new TableBranch(branchName, snapshot, System.currentTimeMillis()) + val existing = branches.putIfAbsent(branchName, branch) + if (existing != null) { + throw new SupportsBranching.BranchAlreadyExistsException(branchName) + } + branchTables.put(branchName, newBranchTable(branchName)) + branch + } + + override def dropBranch(branchName: String): Boolean = { + branchTables.remove(branchName) + branches.remove(branchName) != null + } + + override def loadBranch(branchName: String): Table = { + val branch = branches.get(branchName) + if (branch == null) { + throw new SupportsBranching.BranchNotFoundException(branchName) + } + branchTables.get(branchName) + } + + private def newBranchTable(branchName: String): InMemoryTable = { + new InMemoryTable( + name = s"$name@$branchName", + columns = columns(), + partitioning = partitioning, + properties = properties, + constraints = constraints, + distribution = distribution, + ordering = ordering, + numPartitions = numPartitions, + advisoryPartitionSize = advisoryPartitionSize, + isDistributionStrictlyRequired = isDistributionStrictlyRequired, + numRowsPerSplit = numRowsPerSplit) + } + + override def fastForward(branchName: String, targetBranchName: String): TableBranch = { + val current = branches.get(branchName) + if (current == null) { + throw new SupportsBranching.BranchNotFoundException(branchName) + } + val target = branches.get(targetBranchName) + if (target == null) { + throw new SupportsBranching.BranchNotFoundException(targetBranchName) + } + val currentSnapshot = if (current.snapshotId().isPresent) current.snapshotId().getAsLong else 0L + val targetSnapshot = if (target.snapshotId().isPresent) target.snapshotId().getAsLong else 0L + if (targetSnapshot < currentSnapshot) { + throw new SupportsBranching.InvalidFastForwardException( + s"Cannot fast-forward $branchName: target $targetBranchName is behind " + + s"(target=$targetSnapshot, current=$currentSnapshot)") + } + val updated = new TableBranch(branchName, targetSnapshot, current.creationTimeMs()) + branches.put(branchName, updated) + updated + } + + override def listBranches(): Array[TableBranch] = { + branches.values().asScala.toArray.sortBy(_.name()) + } def this( name: String, diff --git a/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala index 1f525a541daae..ca18f202fcf35 100644 --- a/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala +++ b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala @@ -210,7 +210,7 @@ class SparkConnectDatabaseMetaDataSuite extends ConnectFunSuite with RemoteSpark val metadata = conn.getMetaData // scalastyle:off line.size.limit // CURRENT_PATH and SYSTEM are excluded: getSQLKeywords drops SQL:2003 reserved words (see companion). - assert(metadata.getSQLKeywords === "ADD,AFTER,AGGREGATE,ALWAYS,ANALYZE,ANTI,ANY_VALUE,APPROX,ARCHIVE,ASC,BERNOULLI,BINDING,BUCKET,BUCKETS,BYTE,CACHE,CASCADE,CATALOG,CATALOGS,CHANGE,CHANGES,CLEAR,CLUSTER,CLUSTERED,CODEGEN,COLLATION,COLLATIONS,COLLECTION,COLUMNS,COMMENT,COMPACT,COMPACTIONS,COMPENSATION,COMPUTE,CONCATENATE,CONTAINS,CONTINUE,COST,CURRENT_DATABASE,CURRENT_SCHEMA,DATA,DATABASE,DATABASES,DATEADD,DATEDIFF,DATE_ADD,DATE_DIFF,DAYOFYEAR,DAYS,DBPROPERTIES,DEFAULT_PATH,DEFINED,DEFINER,DELAY,DELIMITED,DESC,DFS,DIRECTORIES,DIRECTORY,DISTANCE,DISTRIBUTE,DIV,DO,ELSEIF,ENFORCED,ESCAPED,EVOLUTION,EXACT,EXCHANGE,EXCLUDE,EXCLUSIVE,EXIT,EXPLAIN,EXPORT,EXTEND,EXTENDED,FIELDS,FILEFORMAT,FIRST,FLOW,FOLLOWING,FORMAT,FORMATTED,FOUND,FUNCTIONS,GENERATED,GEOGRAPHY,GEOMETRY,HANDLER,HOURS,IDENTIFIED,IDENTIFIER,IF,IGNORE,ILIKE,IMMEDIATE,INCLUDE,INCLUSIVE,INCREMENT,INDEX,INDEXES,INPATH,INPUT,INPUTFORMAT,INVOKER,ITEMS,ITERATE,JSON,KEY,KEYS,LAST,LAZY,LEAVE,LEVEL,LIMIT,LINES,LIST,LOAD,LOCATION,LOCK,LOCKS,LOGICAL,LONG,LOOP,MACRO,MAP,MATCHED,MATERIALIZED,MEASURE,METRICS,MICROSECOND,MICROSECONDS,MILLISECOND,MILLISECONDS,MINUS,MINUTES,MONTHS,MSCK,NAME,NAMESPACE,NAMESPACES,NANOSECOND,NANOSECONDS,NEAREST,NORELY,NULLS,OFFSET,OPTION,OPTIONS,OUTPUTFORMAT,OVERWRITE,PARTITIONED,PARTITIONS,PATH,PERCENT,PIVOT,PLACING,PRECEDING,PRINCIPALS,PROCEDURES,PROPERTIES,PURGE,QUALIFY,QUARTER,QUERY,RECORDREADER,RECORDWRITER,RECOVER,RECURSION,REDUCE,REFRESH,RELY,RENAME,REPAIR,REPEAT,REPEATABLE,REPLACE,RESET,RESPECT,RESTRICT,ROLE,ROLES,SCHEMA,SCHEMAS,SECONDS,SECURITY,SEMI,SEPARATED,SERDE,SERDEPROPERTIES,SETS,SHORT,SHOW,SIMILARITY,SINGLE,SKEWED,SORT,SORTED,SOURCE,STATISTICS,STORED,STRATIFY,STREAM,STREAMING,STRING,STRUCT,SUBSTR,SYNC,SYSTEM_PATH,SYSTEM_TIME,SYSTEM_VERSION,TABLES,TARGET,TBLPROPERTIES,TERMINATED,TIMEDIFF,TIMESTAMPADD,TIMESTAMPDIFF,TIMESTAMP_LTZ,TIMESTAMP_NTZ,TINYINT,TOUCH,TRANSACTION,TRANSACTIONS,TRANSFORM,TRUNCATE,TRY_CAST,TYPE,UNARCHIVE,UNBOUNDED,UNCACHE,UNLOCK,UNPIVOT,UNSET,UNTIL,USE,VAR,VARIABLE,VARIANT,VERSION,VIEW,VIEWS,VOID,WATERMARK,WEEK,WEEKS,WHILE,X,YEARS,ZONE") + assert(metadata.getSQLKeywords === "ADD,AFTER,AGGREGATE,ALWAYS,ANALYZE,ANTI,ANY_VALUE,APPROX,ARCHIVE,ASC,BERNOULLI,BINDING,BRANCH,BRANCHES,BUCKET,BUCKETS,BYTE,CACHE,CASCADE,CATALOG,CATALOGS,CHANGE,CHANGES,CLEAR,CLUSTER,CLUSTERED,CODEGEN,COLLATION,COLLATIONS,COLLECTION,COLUMNS,COMMENT,COMPACT,COMPACTIONS,COMPENSATION,COMPUTE,CONCATENATE,CONTAINS,CONTINUE,COST,CURRENT_DATABASE,CURRENT_SCHEMA,DATA,DATABASE,DATABASES,DATEADD,DATEDIFF,DATE_ADD,DATE_DIFF,DAYOFYEAR,DAYS,DBPROPERTIES,DEFAULT_PATH,DEFINED,DEFINER,DELAY,DELIMITED,DESC,DFS,DIRECTORIES,DIRECTORY,DISTANCE,DISTRIBUTE,DIV,DO,ELSEIF,ENFORCED,ESCAPED,EVOLUTION,EXACT,EXCHANGE,EXCLUDE,EXCLUSIVE,EXIT,EXPLAIN,EXPORT,EXTEND,EXTENDED,FAST,FIELDS,FILEFORMAT,FIRST,FLOW,FOLLOWING,FORMAT,FORMATTED,FORWARD,FOUND,FUNCTIONS,GENERATED,GEOGRAPHY,GEOMETRY,HANDLER,HOURS,IDENTIFIED,IDENTIFIER,IF,IGNORE,ILIKE,IMMEDIATE,INCLUDE,INCLUSIVE,INCREMENT,INDEX,INDEXES,INPATH,INPUT,INPUTFORMAT,INVOKER,ITEMS,ITERATE,JSON,KEY,KEYS,LAST,LAZY,LEAVE,LEVEL,LIMIT,LINES,LIST,LOAD,LOCATION,LOCK,LOCKS,LOGICAL,LONG,LOOP,MACRO,MAP,MATCHED,MATERIALIZED,MEASURE,METRICS,MICROSECOND,MICROSECONDS,MILLISECOND,MILLISECONDS,MINUS,MINUTES,MONTHS,MSCK,NAME,NAMESPACE,NAMESPACES,NANOSECOND,NANOSECONDS,NEAREST,NORELY,NULLS,OFFSET,OPTION,OPTIONS,OUTPUTFORMAT,OVERWRITE,PARTITIONED,PARTITIONS,PATH,PERCENT,PIVOT,PLACING,PRECEDING,PRINCIPALS,PROCEDURES,PROPERTIES,PURGE,QUALIFY,QUARTER,QUERY,RECORDREADER,RECORDWRITER,RECOVER,RECURSION,REDUCE,REFRESH,RELY,RENAME,REPAIR,REPEAT,REPEATABLE,REPLACE,RESET,RESPECT,RESTRICT,ROLE,ROLES,SCHEMA,SCHEMAS,SECONDS,SECURITY,SEMI,SEPARATED,SERDE,SERDEPROPERTIES,SETS,SHORT,SHOW,SIMILARITY,SINGLE,SKEWED,SORT,SORTED,SOURCE,STATISTICS,STORED,STRATIFY,STREAM,STREAMING,STRING,STRUCT,SUBSTR,SYNC,SYSTEM_PATH,SYSTEM_TIME,SYSTEM_VERSION,TABLES,TARGET,TBLPROPERTIES,TERMINATED,TIMEDIFF,TIMESTAMPADD,TIMESTAMPDIFF,TIMESTAMP_LTZ,TIMESTAMP_NTZ,TINYINT,TOUCH,TRANSACTION,TRANSACTIONS,TRANSFORM,TRUNCATE,TRY_CAST,TYPE,UNARCHIVE,UNBOUNDED,UNCACHE,UNLOCK,UNPIVOT,UNSET,UNTIL,USE,VAR,VARIABLE,VARIANT,VERSION,VIEW,VIEWS,VOID,WATERMARK,WEEK,WEEKS,WHILE,X,YEARS,ZONE") // scalastyle:on line.size.limit } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/EvalSubqueriesForTimeTravel.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/EvalSubqueriesForTimeTravel.scala index ebe91529fae6a..2572f42c7eb4c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/EvalSubqueriesForTimeTravel.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/EvalSubqueriesForTimeTravel.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.classic.SparkSession class EvalSubqueriesForTimeTravel extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning( _.containsPattern(RELATION_TIME_TRAVEL)) { - case r @ RelationTimeTravel(_, Some(ts), _) + case r @ RelationTimeTravel(_, Some(ts), _, _) if ts.resolved && SubqueryExpression.hasSubquery(ts) => val subqueryEvaluated = ts.transform { case s: ScalarSubquery => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 7122dd52ef1a5..0e78aee938679 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -116,7 +116,7 @@ class ResolveSQLOnFile(sparkSession: SparkSession) extends Rule[LogicalPlan] { } def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case r @ RelationTimeTravel(u: UnresolvedRelation, timestamp, _) + case r @ RelationTimeTravel(u: UnresolvedRelation, timestamp, _, _) if maybeSQLFile(u) && timestamp.forall(_.resolved) => // If we successfully look up the data source, then this is a path-based table, so we should // fail to time travel. Otherwise, this is some other catalog table that isn't resolved yet, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BranchingExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BranchingExec.scala new file mode 100644 index 0000000000000..cec5b9d9f6b39 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BranchingExec.scala @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import java.util.OptionalLong + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.SupportsBranching +import org.apache.spark.unsafe.types.UTF8String + +/** + * Physical plan node for ALTER TABLE ... CREATE BRANCH. + */ +case class CreateBranchExec( + table: SupportsBranching, + branchName: String, + sourceSnapshotId: Option[Long], + ifNotExists: Boolean, + replace: Boolean) extends LeafV2CommandExec { + + override def output: Seq[Attribute] = Seq.empty + + override protected def run(): Seq[InternalRow] = { + val source = sourceSnapshotId match { + case Some(id) => OptionalLong.of(id) + case None => OptionalLong.empty() + } + if (replace) { + table.replaceBranch(branchName, source) + } else { + try { + table.createBranch(branchName, source) + } catch { + case _: SupportsBranching.BranchAlreadyExistsException if ifNotExists => + // ignored: branch already exists and IF NOT EXISTS was requested + } + } + Seq.empty + } +} + +/** + * Physical plan node for ALTER TABLE ... DROP BRANCH. + */ +case class DropBranchExec( + table: SupportsBranching, + branchName: String, + ifExists: Boolean) extends LeafV2CommandExec { + + override def output: Seq[Attribute] = Seq.empty + + override protected def run(): Seq[InternalRow] = { + val removed = table.dropBranch(branchName) + if (!removed && !ifExists) { + throw new SupportsBranching.BranchNotFoundException(branchName) + } + Seq.empty + } +} + +/** + * Physical plan node for ALTER TABLE ... FAST FORWARD. + */ +case class FastForwardBranchExec( + table: SupportsBranching, + branchName: String, + targetBranchName: String) extends LeafV2CommandExec { + + override def output: Seq[Attribute] = Seq.empty + + override protected def run(): Seq[InternalRow] = { + table.fastForward(branchName, targetBranchName) + Seq.empty + } +} + +/** + * Physical plan node for SHOW BRANCHES. + */ +case class ShowBranchesExec( + output: Seq[Attribute], + table: SupportsBranching) extends LeafV2CommandExec { + + override protected def run(): Seq[InternalRow] = { + table.listBranches().toSeq.map { branch => + val snapshot: Any = if (branch.snapshotId().isPresent) { + branch.snapshotId().getAsLong + } else { + null + } + InternalRow( + UTF8String.fromString(branch.name()), + snapshot, + branch.creationTimeMs()) + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 4fd7d993cc3d0..e16ff982ecc6b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -751,6 +751,19 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat Seq(part).asResolvedPartitionSpecs.head, recacheTable(r, includeTimeTravel = false)) :: Nil + case CreateBranch(r: ResolvedTable, branchName, sourceSnapshotId, ifNotExists, replace) => + CreateBranchExec( + r.table.asBranchable, branchName, sourceSnapshotId, ifNotExists, replace) :: Nil + + case DropBranch(r: ResolvedTable, branchName, ifExists) => + DropBranchExec(r.table.asBranchable, branchName, ifExists) :: Nil + + case FastForwardBranch(r: ResolvedTable, branchName, targetBranchName) => + FastForwardBranchExec(r.table.asBranchable, branchName, targetBranchName) :: Nil + + case sb @ ShowBranches(r: ResolvedTable, _) => + ShowBranchesExec(sb.output, r.table.asBranchable) :: Nil + case ShowColumns(resolvedTable: ResolvedTable, ns, output) => ns match { case Some(namespace) => diff --git a/sql/core/src/test/resources/sql-tests/results/keywords-enforced.sql.out b/sql/core/src/test/resources/sql-tests/results/keywords-enforced.sql.out index 6bcbdd2840f90..4bb3cd1500bfd 100644 --- a/sql/core/src/test/resources/sql-tests/results/keywords-enforced.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/keywords-enforced.sql.out @@ -32,6 +32,8 @@ BINARY false BINDING false BOOLEAN false BOTH true +BRANCH false +BRANCHES false BUCKET false BUCKETS false BY false @@ -141,6 +143,7 @@ EXTENDED false EXTERNAL false EXTRACT false FALSE true +FAST false FETCH true FIELDS false FILEFORMAT false @@ -153,6 +156,7 @@ FOR true FOREIGN true FORMAT false FORMATTED false +FORWARD false FOUND false FROM true FULL true diff --git a/sql/core/src/test/resources/sql-tests/results/keywords.sql.out b/sql/core/src/test/resources/sql-tests/results/keywords.sql.out index a010343264469..3d748b73be375 100644 --- a/sql/core/src/test/resources/sql-tests/results/keywords.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/keywords.sql.out @@ -32,6 +32,8 @@ BINARY false BINDING false BOOLEAN false BOTH false +BRANCH false +BRANCHES false BUCKET false BUCKETS false BY false @@ -141,6 +143,7 @@ EXTENDED false EXTERNAL false EXTRACT false FALSE false +FAST false FETCH false FIELDS false FILEFORMAT false @@ -153,6 +156,7 @@ FOR false FOREIGN false FORMAT false FORMATTED false +FORWARD false FOUND false FROM false FULL false diff --git a/sql/core/src/test/resources/sql-tests/results/nonansi/keywords.sql.out b/sql/core/src/test/resources/sql-tests/results/nonansi/keywords.sql.out index a010343264469..3d748b73be375 100644 --- a/sql/core/src/test/resources/sql-tests/results/nonansi/keywords.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/nonansi/keywords.sql.out @@ -32,6 +32,8 @@ BINARY false BINDING false BOOLEAN false BOTH false +BRANCH false +BRANCHES false BUCKET false BUCKETS false BY false @@ -141,6 +143,7 @@ EXTENDED false EXTERNAL false EXTRACT false FALSE false +FAST false FETCH false FIELDS false FILEFORMAT false @@ -153,6 +156,7 @@ FOR false FOREIGN false FORMAT false FORMATTED false +FORWARD false FOUND false FROM false FULL false diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsBranchingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsBranchingSuite.scala new file mode 100644 index 0000000000000..68f6302bcd4c3 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsBranchingSuite.scala @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector + +import java.util +import java.util.Locale + +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.connector.catalog.{Column, Identifier, InMemoryCatalog, SupportsBranching, Table, TableCapability} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType + +class SupportsBranchingSuite extends QueryTest with DatasourceV2SQLBase { + + private def branchingTable(tableName: String): SupportsBranching = { + val cat = catalog("testcat").asInstanceOf[InMemoryCatalog] + cat.loadTable(Identifier.of(Array("ns"), tableName)).asInstanceOf[SupportsBranching] + } + + private def withBranchingTable(tableName: String)(f: String => Unit): Unit = { + val fqName = s"testcat.ns.$tableName" + sql(s"CREATE TABLE $fqName (id bigint, data string) USING foo") + try { + f(fqName) + } finally { + sql(s"DROP TABLE IF EXISTS $fqName") + } + } + + test("CREATE BRANCH on an InMemoryTable creates a branch") { + withBranchingTable("t") { fq => + sql(s"ALTER TABLE $fq CREATE BRANCH dev") + val t = branchingTable("t") + val branches = t.listBranches() + assert(branches.length == 1) + assert(branches(0).name == "dev") + } + } + + test("CREATE BRANCH with explicit snapshot id") { + withBranchingTable("t") { fq => + sql(s"ALTER TABLE $fq CREATE BRANCH staging AS OF VERSION 7") + val branch = branchingTable("t").listBranches().head + assert(branch.name == "staging") + assert(branch.snapshotId.isPresent && branch.snapshotId.getAsLong == 7L) + } + } + + test("CREATE BRANCH IF NOT EXISTS is idempotent") { + withBranchingTable("t") { fq => + sql(s"ALTER TABLE $fq CREATE BRANCH dev") + sql(s"ALTER TABLE $fq CREATE BRANCH IF NOT EXISTS dev") + assert(branchingTable("t").listBranches().length == 1) + } + } + + test("CREATE BRANCH without IF NOT EXISTS fails on duplicate") { + withBranchingTable("t") { fq => + sql(s"ALTER TABLE $fq CREATE BRANCH dev") + val ex = intercept[SupportsBranching.BranchAlreadyExistsException] { + sql(s"ALTER TABLE $fq CREATE BRANCH dev") + } + assert(ex.getMessage.contains("dev")) + } + } + + test("CREATE OR REPLACE BRANCH overwrites an existing branch") { + withBranchingTable("t") { fq => + sql(s"ALTER TABLE $fq CREATE BRANCH dev AS OF VERSION 1") + sql(s"ALTER TABLE $fq CREATE OR REPLACE BRANCH dev AS OF VERSION 9") + val branch = branchingTable("t").listBranches().head + assert(branch.snapshotId.getAsLong == 9L) + } + } + + test("DROP BRANCH removes a branch") { + withBranchingTable("t") { fq => + sql(s"ALTER TABLE $fq CREATE BRANCH dev") + sql(s"ALTER TABLE $fq DROP BRANCH dev") + assert(branchingTable("t").listBranches().isEmpty) + } + } + + test("DROP BRANCH IF EXISTS does not fail on missing branch") { + withBranchingTable("t") { fq => + sql(s"ALTER TABLE $fq DROP BRANCH IF EXISTS missing") + } + } + + test("DROP BRANCH fails on missing branch when IF EXISTS not specified") { + withBranchingTable("t") { fq => + val ex = intercept[SupportsBranching.BranchNotFoundException] { + sql(s"ALTER TABLE $fq DROP BRANCH missing") + } + assert(ex.getMessage.contains("missing")) + } + } + + test("FAST FORWARD updates branch to target's snapshot") { + withBranchingTable("t") { fq => + sql(s"ALTER TABLE $fq CREATE BRANCH dev AS OF VERSION 1") + sql(s"ALTER TABLE $fq CREATE BRANCH main AS OF VERSION 5") + sql(s"ALTER TABLE $fq FAST FORWARD dev TO main") + val dev = branchingTable("t").listBranches().find(_.name == "dev").get + assert(dev.snapshotId.getAsLong == 5L) + } + } + + test("FAST FORWARD fails when target is behind") { + withBranchingTable("t") { fq => + sql(s"ALTER TABLE $fq CREATE BRANCH dev AS OF VERSION 5") + sql(s"ALTER TABLE $fq CREATE BRANCH main AS OF VERSION 1") + val ex = intercept[SupportsBranching.InvalidFastForwardException] { + sql(s"ALTER TABLE $fq FAST FORWARD dev TO main") + } + assert(ex.getMessage.contains("behind")) + } + } + + test("SHOW BRANCHES lists branches with snapshot ids") { + withBranchingTable("t") { fq => + sql(s"ALTER TABLE $fq CREATE BRANCH dev AS OF VERSION 1") + sql(s"ALTER TABLE $fq CREATE BRANCH main AS OF VERSION 2") + val rows = sql(s"SHOW BRANCHES IN $fq").collect() + val byName = rows.map(r => r.getString(0) -> r.getLong(1)).toMap + assert(byName == Map("dev" -> 1L, "main" -> 2L)) + } + } + + test("asBranchable rejects tables that do not implement SupportsBranching") { + val plainTable = new Table { + override def name(): String = "plain" + override def columns(): Array[Column] = Array.empty + override def schema(): StructType = new StructType() + override def capabilities(): util.Set[TableCapability] = util.Set.of() + } + val ex = intercept[AnalysisException] { + plainTable.asBranchable + } + assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("branching")) + } + + // ------------------------------------------------------------------ + // SELECT and INSERT targeting a branch + // ------------------------------------------------------------------ + + test("SELECT FOR BRANCH reads from the branch's data") { + withBranchingTable("t") { fq => + sql(s"INSERT INTO $fq VALUES (1L, 'main')") + sql(s"ALTER TABLE $fq CREATE BRANCH dev") + sql(s"INSERT INTO $fq FOR BRANCH 'dev' VALUES (2L, 'dev')") + // Reads without FOR BRANCH go to the main table. + checkAnswer(sql(s"SELECT data FROM $fq"), Seq(Row("main"))) + // Reads with FOR BRANCH go to the branch. + checkAnswer(sql(s"SELECT data FROM $fq FOR BRANCH 'dev'"), Seq(Row("dev"))) + } + } + + test("SELECT VERSION AS OF BRANCH is equivalent to FOR BRANCH") { + withBranchingTable("t") { fq => + sql(s"ALTER TABLE $fq CREATE BRANCH dev") + sql(s"INSERT INTO $fq FOR BRANCH 'dev' VALUES (1L, 'a'), (2L, 'b')") + checkAnswer( + sql(s"SELECT data FROM $fq VERSION AS OF BRANCH 'dev' ORDER BY id"), + Seq(Row("a"), Row("b"))) + } + } + + test("INSERT FOR BRANCH writes to the branch only") { + withBranchingTable("t") { fq => + sql(s"ALTER TABLE $fq CREATE BRANCH dev") + sql(s"INSERT INTO $fq VALUES (1L, 'main')") + sql(s"INSERT INTO $fq FOR BRANCH 'dev' VALUES (1L, 'dev')") + // Main table only sees main row. + checkAnswer(sql(s"SELECT data FROM $fq"), Seq(Row("main"))) + checkAnswer( + sql(s"SELECT data FROM $fq FOR BRANCH 'dev'"), + Seq(Row("dev"))) + } + } + + test("INSERT OVERWRITE FOR BRANCH overwrites branch data") { + withBranchingTable("t") { fq => + sql(s"ALTER TABLE $fq CREATE BRANCH dev") + sql(s"INSERT INTO $fq FOR BRANCH 'dev' VALUES (1L, 'a')") + sql(s"INSERT OVERWRITE $fq FOR BRANCH 'dev' VALUES (2L, 'b')") + checkAnswer(sql(s"SELECT data FROM $fq FOR BRANCH 'dev'"), Seq(Row("b"))) + } + } + + test("spark.sql.defaultBranch routes reads and writes to the named branch") { + withBranchingTable("t") { fq => + sql(s"ALTER TABLE $fq CREATE BRANCH dev") + withSQLConf(SQLConf.DEFAULT_BRANCH.key -> "dev") { + sql(s"INSERT INTO $fq VALUES (1L, 'x')") + checkAnswer(sql(s"SELECT data FROM $fq"), Seq(Row("x"))) + } + // Without the conf, the main table has no rows. + checkAnswer(sql(s"SELECT data FROM $fq"), Nil) + // The branch should hold the row that was written under the conf. + checkAnswer(sql(s"SELECT data FROM $fq FOR BRANCH 'dev'"), Seq(Row("x"))) + } + } + + test("explicit FOR BRANCH overrides spark.sql.defaultBranch") { + withBranchingTable("t") { fq => + sql(s"ALTER TABLE $fq CREATE BRANCH dev") + sql(s"ALTER TABLE $fq CREATE BRANCH staging") + withSQLConf(SQLConf.DEFAULT_BRANCH.key -> "dev") { + sql(s"INSERT INTO $fq FOR BRANCH 'staging' VALUES (1L, 'staging')") + } + checkAnswer(sql(s"SELECT data FROM $fq FOR BRANCH 'staging'"), Seq(Row("staging"))) + checkAnswer(sql(s"SELECT data FROM $fq FOR BRANCH 'dev'"), Nil) + } + } + + test("spark.sql.defaultBranch is silently ignored for non-branching tables") { + // The session catalog's default fallback (InMemoryTableSessionCatalog) uses InMemoryTable + // which is branching-capable, so create a temp view instead -- views aren't branching. + withSQLConf(SQLConf.DEFAULT_BRANCH.key -> "dev") { + sql("CREATE OR REPLACE TEMPORARY VIEW v AS SELECT 1 AS x") + checkAnswer(sql("SELECT x FROM v"), Seq(Row(1))) + } + } + + test("explicit FOR BRANCH on a non-branching table fails") { + withSQLConf(SQLConf.DEFAULT_BRANCH.key -> "") { + sql("CREATE OR REPLACE TEMPORARY VIEW v AS SELECT 1 AS x") + val ex = intercept[AnalysisException] { + sql("SELECT x FROM v FOR BRANCH 'dev'") + } + // Temp views don't support time travel at all -- accept either error. + val msg = ex.getMessage.toLowerCase(Locale.ROOT) + assert(msg.contains("time travel") || msg.contains("branching")) + } + } +} diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala index 1ecf5b3dae4a0..ceaa9bca331cc 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala @@ -214,7 +214,7 @@ trait ThriftServerWithSparkContextSuite extends SharedThriftServer { val sessionHandle = client.openSession(user, "") val infoValue = client.getInfo(sessionHandle, GetInfoType.CLI_ODBC_KEYWORDS) // scalastyle:off line.size.limit - assert(infoValue.getStringValue == "ADD,AFTER,AGGREGATE,ALL,ALTER,ALWAYS,ANALYZE,AND,ANTI,ANY,ANY_VALUE,APPROX,ARCHIVE,ARRAY,AS,ASC,ASENSITIVE,AT,ATOMIC,AUTHORIZATION,BEGIN,BERNOULLI,BETWEEN,BIGINT,BINARY,BINDING,BOOLEAN,BOTH,BUCKET,BUCKETS,BY,BYTE,CACHE,CALL,CALLED,CASCADE,CASE,CAST,CATALOG,CATALOGS,CHANGE,CHANGES,CHAR,CHARACTER,CHECK,CLEAR,CLOSE,CLUSTER,CLUSTERED,CODEGEN,COLLATE,COLLATION,COLLATIONS,COLLECTION,COLUMN,COLUMNS,COMMENT,COMMIT,COMPACT,COMPACTIONS,COMPENSATION,COMPUTE,CONCATENATE,CONDITION,CONSTRAINT,CONTAINS,CONTINUE,COST,CREATE,CROSS,CUBE,CURRENT,CURRENT_DATABASE,CURRENT_DATE,CURRENT_PATH,CURRENT_SCHEMA,CURRENT_TIME,CURRENT_TIMESTAMP,CURRENT_USER,CURSOR,DATA,DATABASE,DATABASES,DATE,DATEADD,DATEDIFF,DATE_ADD,DATE_DIFF,DAY,DAYOFYEAR,DAYS,DBPROPERTIES,DEC,DECIMAL,DECLARE,DEFAULT,DEFAULT_PATH,DEFINED,DEFINER,DELAY,DELETE,DELIMITED,DESC,DESCRIBE,DETERMINISTIC,DFS,DIRECTORIES,DIRECTORY,DISTANCE,DISTINCT,DISTRIBUTE,DIV,DO,DOUBLE,DROP,ELSE,ELSEIF,END,ENFORCED,ESCAPE,ESCAPED,EVOLUTION,EXACT,EXCEPT,EXCHANGE,EXCLUDE,EXCLUSIVE,EXECUTE,EXISTS,EXIT,EXPLAIN,EXPORT,EXTEND,EXTENDED,EXTERNAL,EXTRACT,FALSE,FETCH,FIELDS,FILEFORMAT,FILTER,FIRST,FLOAT,FLOW,FOLLOWING,FOR,FOREIGN,FORMAT,FORMATTED,FOUND,FROM,FULL,FUNCTION,FUNCTIONS,GENERATED,GEOGRAPHY,GEOMETRY,GLOBAL,GRANT,GROUP,GROUPING,HANDLER,HAVING,HOUR,HOURS,IDENTIFIED,IDENTIFIER,IDENTITY,IF,IGNORE,ILIKE,IMMEDIATE,IMPORT,IN,INCLUDE,INCLUSIVE,INCREMENT,INDEX,INDEXES,INNER,INPATH,INPUT,INPUTFORMAT,INSENSITIVE,INSERT,INT,INTEGER,INTERSECT,INTERVAL,INTO,INVOKER,IS,ITEMS,ITERATE,JOIN,JSON,KEY,KEYS,LANGUAGE,LAST,LATERAL,LAZY,LEADING,LEAVE,LEFT,LEVEL,LIKE,LIMIT,LINES,LIST,LOAD,LOCAL,LOCATION,LOCK,LOCKS,LOGICAL,LONG,LOOP,MACRO,MAP,MATCHED,MATERIALIZED,MAX,MEASURE,MERGE,METRICS,MICROSECOND,MICROSECONDS,MILLISECOND,MILLISECONDS,MINUS,MINUTE,MINUTES,MODIFIES,MONTH,MONTHS,MSCK,NAME,NAMESPACE,NAMESPACES,NANOSECOND,NANOSECONDS,NATURAL,NEAREST,NEXT,NO,NONE,NORELY,NOT,NULL,NULLS,NUMERIC,OF,OFFSET,ON,ONLY,OPEN,OPTION,OPTIONS,OR,ORDER,OUT,OUTER,OUTPUTFORMAT,OVER,OVERLAPS,OVERLAY,OVERWRITE,PARTITION,PARTITIONED,PARTITIONS,PATH,PERCENT,PIVOT,PLACING,POSITION,PRECEDING,PRIMARY,PRINCIPALS,PROCEDURE,PROCEDURES,PROPERTIES,PURGE,QUALIFY,QUARTER,QUERY,RANGE,READ,READS,REAL,RECORDREADER,RECORDWRITER,RECOVER,RECURSION,RECURSIVE,REDUCE,REFERENCES,REFRESH,RELY,RENAME,REPAIR,REPEAT,REPEATABLE,REPLACE,RESET,RESPECT,RESTRICT,RETURN,RETURNS,REVOKE,RIGHT,ROLE,ROLES,ROLLBACK,ROLLUP,ROW,ROWS,SCHEMA,SCHEMAS,SECOND,SECONDS,SECURITY,SELECT,SEMI,SEPARATED,SERDE,SERDEPROPERTIES,SESSION_USER,SET,SETS,SHORT,SHOW,SIMILARITY,SINGLE,SKEWED,SMALLINT,SOME,SORT,SORTED,SOURCE,SPECIFIC,SQL,SQLEXCEPTION,SQLSTATE,START,STATISTICS,STORED,STRATIFY,STREAM,STREAMING,STRING,STRUCT,SUBSTR,SUBSTRING,SYNC,SYSTEM,SYSTEM_PATH,SYSTEM_TIME,SYSTEM_VERSION,TABLE,TABLES,TABLESAMPLE,TARGET,TBLPROPERTIES,TERMINATED,THEN,TIME,TIMEDIFF,TIMESTAMP,TIMESTAMPADD,TIMESTAMPDIFF,TIMESTAMP_LTZ,TIMESTAMP_NTZ,TINYINT,TO,TOUCH,TRAILING,TRANSACTION,TRANSACTIONS,TRANSFORM,TRIM,TRUE,TRUNCATE,TRY_CAST,TYPE,UNARCHIVE,UNBOUNDED,UNCACHE,UNION,UNIQUE,UNKNOWN,UNLOCK,UNPIVOT,UNSET,UNTIL,UPDATE,USE,USER,USING,VALUE,VALUES,VAR,VARCHAR,VARIABLE,VARIANT,VERSION,VIEW,VIEWS,VOID,WATERMARK,WEEK,WEEKS,WHEN,WHERE,WHILE,WINDOW,WITH,WITHIN,WITHOUT,X,YEAR,YEARS,ZONE") + assert(infoValue.getStringValue == "ADD,AFTER,AGGREGATE,ALL,ALTER,ALWAYS,ANALYZE,AND,ANTI,ANY,ANY_VALUE,APPROX,ARCHIVE,ARRAY,AS,ASC,ASENSITIVE,AT,ATOMIC,AUTHORIZATION,BEGIN,BERNOULLI,BETWEEN,BIGINT,BINARY,BINDING,BOOLEAN,BOTH,BRANCH,BRANCHES,BUCKET,BUCKETS,BY,BYTE,CACHE,CALL,CALLED,CASCADE,CASE,CAST,CATALOG,CATALOGS,CHANGE,CHANGES,CHAR,CHARACTER,CHECK,CLEAR,CLOSE,CLUSTER,CLUSTERED,CODEGEN,COLLATE,COLLATION,COLLATIONS,COLLECTION,COLUMN,COLUMNS,COMMENT,COMMIT,COMPACT,COMPACTIONS,COMPENSATION,COMPUTE,CONCATENATE,CONDITION,CONSTRAINT,CONTAINS,CONTINUE,COST,CREATE,CROSS,CUBE,CURRENT,CURRENT_DATABASE,CURRENT_DATE,CURRENT_PATH,CURRENT_SCHEMA,CURRENT_TIME,CURRENT_TIMESTAMP,CURRENT_USER,CURSOR,DATA,DATABASE,DATABASES,DATE,DATEADD,DATEDIFF,DATE_ADD,DATE_DIFF,DAY,DAYOFYEAR,DAYS,DBPROPERTIES,DEC,DECIMAL,DECLARE,DEFAULT,DEFAULT_PATH,DEFINED,DEFINER,DELAY,DELETE,DELIMITED,DESC,DESCRIBE,DETERMINISTIC,DFS,DIRECTORIES,DIRECTORY,DISTANCE,DISTINCT,DISTRIBUTE,DIV,DO,DOUBLE,DROP,ELSE,ELSEIF,END,ENFORCED,ESCAPE,ESCAPED,EVOLUTION,EXACT,EXCEPT,EXCHANGE,EXCLUDE,EXCLUSIVE,EXECUTE,EXISTS,EXIT,EXPLAIN,EXPORT,EXTEND,EXTENDED,EXTERNAL,EXTRACT,FALSE,FAST,FETCH,FIELDS,FILEFORMAT,FILTER,FIRST,FLOAT,FLOW,FOLLOWING,FOR,FOREIGN,FORMAT,FORMATTED,FORWARD,FOUND,FROM,FULL,FUNCTION,FUNCTIONS,GENERATED,GEOGRAPHY,GEOMETRY,GLOBAL,GRANT,GROUP,GROUPING,HANDLER,HAVING,HOUR,HOURS,IDENTIFIED,IDENTIFIER,IDENTITY,IF,IGNORE,ILIKE,IMMEDIATE,IMPORT,IN,INCLUDE,INCLUSIVE,INCREMENT,INDEX,INDEXES,INNER,INPATH,INPUT,INPUTFORMAT,INSENSITIVE,INSERT,INT,INTEGER,INTERSECT,INTERVAL,INTO,INVOKER,IS,ITEMS,ITERATE,JOIN,JSON,KEY,KEYS,LANGUAGE,LAST,LATERAL,LAZY,LEADING,LEAVE,LEFT,LEVEL,LIKE,LIMIT,LINES,LIST,LOAD,LOCAL,LOCATION,LOCK,LOCKS,LOGICAL,LONG,LOOP,MACRO,MAP,MATCHED,MATERIALIZED,MAX,MEASURE,MERGE,METRICS,MICROSECOND,MICROSECONDS,MILLISECOND,MILLISECONDS,MINUS,MINUTE,MINUTES,MODIFIES,MONTH,MONTHS,MSCK,NAME,NAMESPACE,NAMESPACES,NANOSECOND,NANOSECONDS,NATURAL,NEAREST,NEXT,NO,NONE,NORELY,NOT,NULL,NULLS,NUMERIC,OF,OFFSET,ON,ONLY,OPEN,OPTION,OPTIONS,OR,ORDER,OUT,OUTER,OUTPUTFORMAT,OVER,OVERLAPS,OVERLAY,OVERWRITE,PARTITION,PARTITIONED,PARTITIONS,PATH,PERCENT,PIVOT,PLACING,POSITION,PRECEDING,PRIMARY,PRINCIPALS,PROCEDURE,PROCEDURES,PROPERTIES,PURGE,QUALIFY,QUARTER,QUERY,RANGE,READ,READS,REAL,RECORDREADER,RECORDWRITER,RECOVER,RECURSION,RECURSIVE,REDUCE,REFERENCES,REFRESH,RELY,RENAME,REPAIR,REPEAT,REPEATABLE,REPLACE,RESET,RESPECT,RESTRICT,RETURN,RETURNS,REVOKE,RIGHT,ROLE,ROLES,ROLLBACK,ROLLUP,ROW,ROWS,SCHEMA,SCHEMAS,SECOND,SECONDS,SECURITY,SELECT,SEMI,SEPARATED,SERDE,SERDEPROPERTIES,SESSION_USER,SET,SETS,SHORT,SHOW,SIMILARITY,SINGLE,SKEWED,SMALLINT,SOME,SORT,SORTED,SOURCE,SPECIFIC,SQL,SQLEXCEPTION,SQLSTATE,START,STATISTICS,STORED,STRATIFY,STREAM,STREAMING,STRING,STRUCT,SUBSTR,SUBSTRING,SYNC,SYSTEM,SYSTEM_PATH,SYSTEM_TIME,SYSTEM_VERSION,TABLE,TABLES,TABLESAMPLE,TARGET,TBLPROPERTIES,TERMINATED,THEN,TIME,TIMEDIFF,TIMESTAMP,TIMESTAMPADD,TIMESTAMPDIFF,TIMESTAMP_LTZ,TIMESTAMP_NTZ,TINYINT,TO,TOUCH,TRAILING,TRANSACTION,TRANSACTIONS,TRANSFORM,TRIM,TRUE,TRUNCATE,TRY_CAST,TYPE,UNARCHIVE,UNBOUNDED,UNCACHE,UNION,UNIQUE,UNKNOWN,UNLOCK,UNPIVOT,UNSET,UNTIL,UPDATE,USE,USER,USING,VALUE,VALUES,VAR,VARCHAR,VARIABLE,VARIANT,VERSION,VIEW,VIEWS,VOID,WATERMARK,WEEK,WEEKS,WHEN,WHERE,WHILE,WINDOW,WITH,WITHIN,WITHOUT,X,YEAR,YEARS,ZONE") // scalastyle:on line.size.limit } }