Skip to content

Commit

Permalink
[Spark] Support ALTER TABLE CLUSTER BY
Browse files Browse the repository at this point in the history
This PR adds support for ALTER TABLE CLUSTER BY syntax for clustered tables:
* `ALTER TABLE CLUSTER BY (col1, col2, ...)` to change the clustering columns
* `ALTER TABLE CLUSTER BY NONE` to remove the clustering columns

Closes #2556

GitOrigin-RevId: 7cc2ff2abe6fdd1cba6150648c71f27fc7432be1
  • Loading branch information
zedtang authored and allisonport-db committed Feb 5, 2024
1 parent dc574eb commit 6f4e051
Show file tree
Hide file tree
Showing 11 changed files with 324 additions and 7 deletions.
5 changes: 4 additions & 1 deletion spark/src/main/antlr4/io/delta/sql/parser/DeltaSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ statement
DROP CONSTRAINT (IF EXISTS)? name=identifier #dropTableConstraint
| ALTER TABLE table=qualifiedName
DROP FEATURE featureName=featureNameValue (TRUNCATE HISTORY)? #alterTableDropFeature
| ALTER TABLE table=qualifiedName
(clusterBySpec | CLUSTER BY NONE) #alterTableClusterBy
| OPTIMIZE (path=STRING | table=qualifiedName)
(WHERE partitionPredicate=predicateToken)?
(zorderSpec)? #optimizeTable
Expand Down Expand Up @@ -231,7 +233,7 @@ nonReserved
| NO | STATISTICS
| CLONE | SHALLOW
| FEATURE | TRUNCATE
| CLUSTER
| CLUSTER | NONE
;

// Define how the keywords above should appear in a user's SQL statement.
Expand Down Expand Up @@ -269,6 +271,7 @@ LIMIT: 'LIMIT';
LOCATION: 'LOCATION';
MINUS: '-';
NO: 'NO';
NONE: 'NONE';
NOT: 'NOT' | '!';
NULL: 'NULL';
OF: 'OF';
Expand Down
6 changes: 6 additions & 0 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@
],
"sqlState" : "42837"
},
"DELTA_ALTER_TABLE_CLUSTER_BY_NOT_ALLOWED" : {
"message" : [
"ALTER TABLE CLUSTER BY is supported only for Delta table with clustering."
],
"sqlState" : "42000"
},
"DELTA_ALTER_TABLE_SET_CLUSTERING_TABLE_FEATURE_NOT_ALLOWED" : {
"message" : [
"Cannot enable <tableFeature> table feature using ALTER TABLE SET TBLPROPERTIES. Please use CREATE OR REPLACE TABLE CLUSTER BY to create a Delta table with clustering."
Expand Down
21 changes: 20 additions & 1 deletion spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import java.util.Locale
import scala.collection.JavaConverters._

import org.apache.spark.sql.catalyst.TimeTravel
import org.apache.spark.sql.delta.skipping.clustering.temp.{ClusterByParserUtils, ClusterByPlan, ClusterBySpec}
import org.apache.spark.sql.delta.skipping.clustering.temp.{AlterTableClusterBy, ClusterByParserUtils, ClusterByPlan, ClusterBySpec}

import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.commands._
Expand Down Expand Up @@ -583,6 +583,25 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
truncateHistory)
}

/**
* Parse an ALTER TABLE CLUSTER BY command.
*/
override def visitAlterTableClusterBy(ctx: AlterTableClusterByContext): LogicalPlan = {
val table =
createUnresolvedTable(ctx.table.identifier.asScala.map(_.getText).toSeq,
"ALTER TABLE ... CLUSTER BY")
if (ctx.NONE() != null) {
AlterTableClusterBy(table, None)
} else {
assert(ctx.clusterBySpec() != null)
val columnNames =
ctx.clusterBySpec().interleave.asScala
.map(_.identifier.asScala.map(_.getText).toSeq)
.map(_.asInstanceOf[Seq[String]]).toSeq
AlterTableClusterBy(table, Some(ClusterBySpec(columnNames)))
}
}

protected def typedVisit[T](ctx: ParseTree): T = {
ctx.accept(this).asInstanceOf[T]
}
Expand Down
12 changes: 12 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3195,6 +3195,18 @@ trait DeltaErrorsBase
messageParameters = Array(s"${zOrderBy.map(_.name).mkString(", ")}"))
}

def alterClusterByNotOnDeltaTableException(): Throwable = {
new DeltaAnalysisException(
errorClass = "DELTA_ONLY_OPERATION",
messageParameters = Array("ALTER TABLE CLUSTER BY"))
}

def alterClusterByNotAllowedException(): Throwable = {
new DeltaAnalysisException(
errorClass = "DELTA_ALTER_TABLE_CLUSTER_BY_NOT_ALLOWED",
messageParameters = Array.empty)
}

def clusteringTablePreviewDisabledException(): Throwable = {
val msg = s"""
|A clustered table is currently in preview and is disabled by default. Please set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,14 @@ object DeltaOperations {
override val operationMetrics: Set[String] = DeltaOperationMetrics.OPTIMIZE
}

/** Recorded when clustering columns are changed on clustered tables. */
case class ClusterBy(
oldClusteringColumns: String,
newClusteringColumns: String) extends Operation("CLUSTER BY") {
override val parameters: Map[String, Any] = Map(
"oldClusteringColumns" -> oldClusteringColumns,
"newClusteringColumns" -> newClusteringColumns)
}

private def structFieldToMap(colPath: Seq[String], field: StructField): Map[String, Any] = {
Map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.sql.delta.skipping.clustering.ClusteredTableUtils
import org.apache.spark.sql.delta.skipping.clustering.temp.ClusterBySpec
import org.apache.spark.sql.delta.skipping.clustering.temp.{ClusterBy, ClusterBySpec}
import org.apache.spark.sql.delta.skipping.clustering.temp.{ClusterByTransform => TempClusterByTransform}
import org.apache.spark.sql.delta.{DeltaConfigs, DeltaErrors, DeltaTableUtils}
import org.apache.spark.sql.delta.{DeltaLog, DeltaOptions}
Expand Down Expand Up @@ -589,6 +589,8 @@ class DeltaCatalog extends DelegatingCatalogExtension
}
val table = loadTable(ident) match {
case deltaTable: DeltaTableV2 => deltaTable
case _ if changes.exists(_.isInstanceOf[ClusterBy]) =>
throw DeltaErrors.alterClusterByNotOnDeltaTableException()
case _ => return super.alterTable(ident, changes: _*)
}

Expand Down Expand Up @@ -755,6 +757,18 @@ class DeltaCatalog extends DelegatingCatalogExtension
AlterTableDropFeatureDeltaCommand(
table, featureName, truncateHistory = truncateHistory).run(spark)

case (t, clusterBy) if t == classOf[ClusterBy] =>
clusterBy.asInstanceOf[Seq[ClusterBy]].foreach { c =>
if (c.clusteringColumns.nonEmpty) {
val clusterBySpec = ClusterBySpec(c.clusteringColumns.toSeq)
validateClusterBySpec(Some(clusterBySpec), table.schema())
}
if (!ClusteredTableUtils.isSupported(table.initialSnapshot.protocol)) {
throw DeltaErrors.alterClusterByNotAllowedException()
}
AlterTableClusterByDeltaCommand(
table, c.clusteringColumns.map(_.fieldNames().toSeq).toSeq).run(spark)
}
}

columnUpdates.foreach { case (fieldNames, (newField, newPositionOpt)) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit

import scala.util.control.NonFatal

import org.apache.spark.sql.delta.skipping.clustering.ClusteredTableUtils
import org.apache.spark.sql.delta.skipping.clustering.ClusteringColumnInfo
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions.Protocol
Expand All @@ -42,6 +43,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, QualifiedC
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog.TableCatalog
import org.apache.spark.sql.connector.catalog.TableChange.{After, ColumnPosition, First}
import org.apache.spark.sql.connector.expressions.FieldReference
import org.apache.spark.sql.execution.command.LeafRunnableCommand
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -930,3 +932,58 @@ case class AlterTableDropConstraintDeltaCommand(
}
}

/**
* Command for altering clustering columns for clustered tables.
* - ALTER TABLE .. CLUSTER BY (col1, col2, ...)
* - ALTER TABLE .. CLUSTER BY NONE
*
* Note that the given `clusteringColumns` are empty when CLUSTER BY NONE is specified.
* Also, `clusteringColumns` are validated (e.g., duplication / existence check) in
* DeltaCatalog.alterTable().
*/
case class AlterTableClusterByDeltaCommand(
table: DeltaTableV2,
clusteringColumns: Seq[Seq[String]])
extends LeafRunnableCommand with AlterDeltaTableCommand with IgnoreCachedData {
override def run(sparkSession: SparkSession): Seq[Row] = {
val deltaLog = table.deltaLog
ClusteredTableUtils.validateNumClusteringColumns(clusteringColumns, Some(deltaLog))
recordDeltaOperation(deltaLog, "delta.ddl.alter.clusterBy") {
val txn = startTransaction()

val clusteringColsLogicalNames = ClusteringColumnInfo.extractLogicalNames(txn.snapshot)
val oldLogicalClusteringColumnsString = clusteringColsLogicalNames.mkString(",")
val oldColumnsCount = clusteringColsLogicalNames.size

val newLogicalClusteringColumns = clusteringColumns.map(FieldReference(_).toString)
ClusteredTableUtils.validateClusteringColumnsInStatsSchema(
txn.snapshot, newLogicalClusteringColumns)

val newDomainMetadata =
ClusteredTableUtils
.getClusteringDomainMetadataForAlterTableClusterBy(newLogicalClusteringColumns, txn)

recordDeltaEvent(
deltaLog,
"delta.ddl.alter.clusterBy",
data = Map(
"isNewClusteredTable" -> !ClusteredTableUtils.isSupported(txn.protocol),
"oldColumnsCount" -> oldColumnsCount, "newColumnsCount" -> clusteringColumns.size))
// Add clustered table properties if the current table is not clustered.
// [[DeltaCatalog.alterTable]] already ensures that the table is not partitioned.
if (!ClusteredTableUtils.isSupported(txn.protocol)) {
txn.updateMetadata(
txn.metadata.copy(
configuration = txn.metadata.configuration ++
ClusteredTableUtils.getTableFeatureProperties(txn.metadata.configuration)
))
}
txn.commit(
newDomainMetadata,
DeltaOperations.ClusterBy(
oldLogicalClusteringColumnsString,
newLogicalClusteringColumns.mkString(",")))
}
Seq.empty[Row]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,20 @@ trait ClusteredTableUtilsBase extends DeltaLogging {
ClusteringMetadataDomain.fromSnapshot(snapshot).map(_.toDomainMetadata).toSeq
}

/**
* Create new clustering [[DomainMetadata]] actions given updated column names for
* 'ALTER TABLE ... CLUSTER BY'.
*/
def getClusteringDomainMetadataForAlterTableClusterBy(
newLogicalClusteringColumns: Seq[String],
txn: OptimisticTransaction): Seq[DomainMetadata] = {
val newClusteringColumns =
newLogicalClusteringColumns.map(ClusteringColumn(txn.metadata.schema, _))
val clusteringMetadataDomainOpt =
Some(ClusteringMetadataDomain.fromClusteringColumns(newClusteringColumns).toDomainMetadata)
clusteringMetadataDomainOpt.toSeq
}

/**
* Validate stats will be collected for all clustering columns.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.skipping.clustering.temp

import org.apache.spark.sql.catalyst.plans.logical.{AlterTableCommand, LogicalPlan}
import org.apache.spark.sql.connector.catalog.TableChange
import org.apache.spark.sql.connector.expressions.NamedReference

/**
* The logical plan of the following commands:
* - ALTER TABLE ... CLUSTER BY (col1, col2, ...)
* - ALTER TABLE ... CLUSTER BY NONE
*/
case class AlterTableClusterBy(
table: LogicalPlan, clusterBySpec: Option[ClusterBySpec]) extends AlterTableCommand {
override def changes: Seq[TableChange] =
Seq(ClusterBy(clusterBySpec
.map(_.columnNames) // CLUSTER BY (col1, col2, ...)
.getOrElse(Seq.empty))) // CLUSTER BY NONE

protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = copy(table = newChild)
}

/** A TableChange to alter clustering columns for a table. */
case class ClusterBy(clusteringColumns: Seq[NamedReference]) extends TableChange {}
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,25 @@ class ClusteringTableFeatureSuite extends SparkFunSuite with DeltaSQLCommandTest
parameters = Map("tableFeature" -> "clustering"))
}
}

test("alter table cluster by non-clustered tables is not allowed.") {
withTable("tbl") {
sql("CREATE TABLE tbl(a INT, b STRING) USING DELTA")
val e1 = intercept[DeltaAnalysisException] {
sql("ALTER TABLE tbl CLUSTER BY (a)")
}
checkError(
e1,
"DELTA_ALTER_TABLE_CLUSTER_BY_NOT_ALLOWED",
parameters = Map.empty)

val e2 = intercept[DeltaAnalysisException] {
sql("ALTER TABLE tbl CLUSTER BY NONE")
}
checkError(
e2,
"DELTA_ALTER_TABLE_CLUSTER_BY_NOT_ALLOWED",
parameters = Map.empty)
}
}
}

0 comments on commit 6f4e051

Please sign in to comment.