Skip to content

Commit

Permalink
Type Widening in ALTER TABLE CHANGE COLUMN
Browse files Browse the repository at this point in the history
## Description
This change introduces the `typeWidening` delta table feature, allowing to widen the type of existing columns and fields in a delta table using the `ALTER TABLE CHANGE COLUMN TYPE` or `ALTER TABLE REPLACE COLUMNS` commands.

The table feature is introduced as `typeWidening-dev` during implementation and is available in testing only.

For now, only byte -> short -> int are supported. Other changes will require support in the Spark parquet reader that will be introduced in Spark 4.0

Type widening feature request: #2622
Type Widening protocol RFC: #2624

A new test suite `DeltaTypeWideningSuite` is created, containing:
- `DeltaTypeWideningAlterTableTests`: Covers applying supported and unsupported type changes on partitioned columns, non-partitioned columns and nested fields
- `DeltaTypeWideningTableFeatureTests`: Covers adding the `typeWidening` table feature

## This PR introduces the following *user-facing* changes

The table feature is available in testing only, there's no user-facing changes as of now.

The type widening table feature will introduce the following changes:
- Adding the `typeWidening` via a table property:
```
ALTER TABLE t SET TBLPROPERTIES (‘delta.enableTypeWidening' = true)
```
- Apply a widening type change:
```
ALTER TABLE t CHANGE COLUMN int_col TYPE long
```
or
```
ALTER TABLE t REPLACE COLUMNS int_col TYPE long
```

Note: both ALTER TABLE commands reuse the existing syntax for setting a table property and applying a type change, no new SQL syntax is being introduced by this feature.

Closes #2645

GitOrigin-RevId: 2ca0e6b22ec24b304241460553547d0d4c6026a2
  • Loading branch information
johanl-db authored and vkorukanti committed Feb 29, 2024
1 parent eb59d4a commit 9b3fa0a
Show file tree
Hide file tree
Showing 6 changed files with 586 additions and 4 deletions.
11 changes: 11 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,17 @@ trait DeltaConfigsBase extends DeltaLogging {
"needs to be a boolean."
)

/**
* Whether widening the type of an existing column or field is allowed, either manually using
* ALTER TABLE CHANGE COLUMN or automatically if automatic schema evolution is enabled.
*/
val ENABLE_TYPE_WIDENING = buildConfig[Boolean](
key = "enableTypeWidening",
defaultValue = false.toString,
fromString = _.toBoolean,
validationFunction = _ => true,
helpMessage = "needs to be a boolean.")

val MANAGED_COMMIT_OWNER_NAME = buildConfig[Option[String]](
"managedCommits.commitOwner-dev",
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,8 @@ object TableFeature {
// managed-commits are under development and only available in testing.
ManagedCommitTableFeature,
// Row IDs are still under development and only available in testing.
RowTrackingFeature)
RowTrackingFeature,
TypeWideningTableFeature)
}
val featureMap = features.map(f => f.name.toLowerCase(Locale.ROOT) -> f).toMap
require(features.size == featureMap.size, "Lowercase feature names must not duplicate.")
Expand Down Expand Up @@ -625,6 +626,18 @@ object ManagedCommitTableFeature
}
}

object TypeWideningTableFeature extends ReaderWriterFeature(name = "typeWidening-dev")
with FeatureAutomaticallyEnabledByMetadata {
override def automaticallyUpdateProtocolOfExistingTables: Boolean = true

private def isTypeWideningSupportNeededByMetadata(metadata: Metadata): Boolean =
DeltaConfigs.ENABLE_TYPE_WIDENING.fromMetaData(metadata)

override def metadataRequiresFeatureToBeEnabled(
metadata: Metadata,
spark: SparkSession): Boolean = isTypeWideningSupportNeededByMetadata(metadata)
}

/**
* Features below are for testing only, and are being registered to the system only in the testing
* environment. See [[TableFeature.allSupportedFeaturesMap]] for the registration.
Expand Down
63 changes: 63 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.actions.{Metadata, Protocol, TableFeatureProtocolUtils}

import org.apache.spark.sql.catalyst.expressions.Cast
import org.apache.spark.sql.types._

object TypeWidening {

/**
* Returns whether the protocol version supports the Type Widening table feature.
*/
def isSupported(protocol: Protocol): Boolean =
protocol.isFeatureSupported(TypeWideningTableFeature)

/**
* Returns whether Type Widening is enabled on this table version. Checks that Type Widening is
* supported, which is a pre-requisite for enabling Type Widening, throws an error if
* not. When Type Widening is enabled, the type of existing columns or fields can be widened
* using ALTER TABLE CHANGE COLUMN.
*/
def isEnabled(protocol: Protocol, metadata: Metadata): Boolean = {
val isEnabled = DeltaConfigs.ENABLE_TYPE_WIDENING.fromMetaData(metadata)
if (isEnabled && !isSupported(protocol)) {
throw new IllegalStateException(
s"Table property '${DeltaConfigs.ENABLE_TYPE_WIDENING.key}' is " +
s"set on the table but this table version doesn't support table feature " +
s"'${TableFeatureProtocolUtils.propertyKey(TypeWideningTableFeature)}'.")
}
isEnabled
}

/**
* Returns whether the given type change is eligible for widening. This only checks atomic types.
* It is the responsibility of the caller to recurse into structs, maps and arrays.
*/
def isTypeChangeSupported(fromType: AtomicType, toType: AtomicType): Boolean =
(fromType, toType) match {
case (from, to) if from == to => true
// All supported type changes below are supposed to be widening, but to be safe, reject any
// non-widening change upfront.
case (from, to) if !Cast.canUpCast(from, to) => false
case (ByteType, ShortType) => true
case (ByteType | ShortType, IntegerType) => true
case _ => false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,8 @@ case class AlterTableChangeColumnDeltaCommand(
newType,
resolver,
txn.metadata.columnMappingMode,
columnPath :+ originalField.name
columnPath :+ originalField.name,
allowTypeWidening = TypeWidening.isEnabled(txn.protocol, txn.metadata)
).nonEmpty) {
throw DeltaErrors.alterTableChangeColumnException(
fieldPath = UnresolvedAttribute(columnPath :+ originalField.name).name,
Expand Down Expand Up @@ -802,6 +803,7 @@ case class AlterTableReplaceColumnsDeltaCommand(
changingSchema,
resolver,
txn.metadata.columnMappingMode,
allowTypeWidening = TypeWidening.isEnabled(txn.protocol, txn.metadata),
failOnAmbiguousChanges = true
).foreach { operation =>
throw DeltaErrors.alterTableReplaceColumnsException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal

import org.apache.spark.sql.delta.{DeltaAnalysisException, DeltaColumnMappingMode, DeltaErrors, DeltaLog, GeneratedColumn, NoMapping}
import org.apache.spark.sql.delta.{DeltaAnalysisException, DeltaColumnMappingMode, DeltaErrors, DeltaLog, GeneratedColumn, NoMapping, TypeWidening}
import org.apache.spark.sql.delta.actions.Protocol
import org.apache.spark.sql.delta.commands.cdc.CDCReader
import org.apache.spark.sql.delta.metering.DeltaLogging
Expand Down Expand Up @@ -911,6 +911,8 @@ def normalizeColumnNamesInDataType(
* @param failOnAmbiguousChanges Throw an error if a StructField both has columns dropped and new
* columns added. These are ambiguous changes, because we don't
* know if a column needs to be renamed, dropped, or added.
* @param allowTypeWidening Whether widening type changes as defined in [[TypeWidening]]
* can be applied.
* @return None if the data types can be changed, otherwise Some(err) containing the reason.
*/
def canChangeDataType(
Expand All @@ -919,7 +921,8 @@ def normalizeColumnNamesInDataType(
resolver: Resolver,
columnMappingMode: DeltaColumnMappingMode,
columnPath: Seq[String] = Nil,
failOnAmbiguousChanges: Boolean = false): Option[String] = {
failOnAmbiguousChanges: Boolean = false,
allowTypeWidening: Boolean = false): Option[String] = {
def verify(cond: Boolean, err: => String): Unit = {
if (!cond) {
throw DeltaErrors.cannotChangeDataType(err)
Expand Down Expand Up @@ -970,6 +973,11 @@ def normalizeColumnNamesInDataType(
(if (columnPath.nonEmpty) s" from $columnName" else ""))
}

case (fromDataType: AtomicType, toDataType: AtomicType) if allowTypeWidening =>
verify(TypeWidening.isTypeChangeSupported(fromDataType, toDataType),
s"changing data type of ${UnresolvedAttribute(columnPath).name} " +
s"from $fromDataType to $toDataType")

case (fromDataType, toDataType) =>
verify(fromDataType == toDataType,
s"changing data type of ${UnresolvedAttribute(columnPath).name} " +
Expand Down

0 comments on commit 9b3fa0a

Please sign in to comment.