Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Assign materialized Row ID and Row commit version column names #1896

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@
],
"sqlState" : "0B000"
},
"DELTA_ADDING_COLUMN_WITH_INTERNAL_NAME_FAILED" : {
"message" : [
"Failed to add column <colName> because the name is reserved."
],
"sqlState" : "42000"
},
"DELTA_ADDING_DELETION_VECTORS_DISALLOWED" : {
"message" : [
"The current operation attempted to add a deletion vector to a table that does not permit the creation of new deletion vectors. Please file a bug report."
Expand Down Expand Up @@ -1079,6 +1085,12 @@
],
"sqlState" : "42K03"
},
"DELTA_MATERIALIZED_ROW_TRACKING_COLUMN_NAME_MISSING" : {
"message" : [
"Materialized <rowTrackingColumn> column name missing for <tableName>."
],
"sqlState" : "22000"
},
"DELTA_MAX_ARRAY_SIZE_EXCEEDED" : {
"message" : [
"Please use a limit less than Int.MaxValue - 8."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,14 @@ class DeltaAnalysis(session: SparkSession)
if (src.provider.exists(DeltaSourceUtils.isDeltaDataSourceName)) {
val deltaLogSrc = DeltaTableV2(session, new Path(src.location))

// maxColumnId field cannot be set externally. If column-mapping is
// used on the source delta table, then maxColumnId would be set for the sourceTable
// and needs to be removed from the targetTable's configuration
// maxColumnId will be set in the targetTable's configuration internally after
// Column mapping and row tracking fields cannot be set externally. If the features are
// used on the source delta table, then the corresponding fields would be set for the
// sourceTable and needs to be removed from the targetTable's configuration. The fields
// will then be set in the targetTable's configuration internally after.
val config =
deltaLogSrc.snapshot.metadata.configuration.-("delta.columnMapping.maxColumnId")
.-(MaterializedRowId.MATERIALIZED_COLUMN_NAME_PROP)
johanl-db marked this conversation as resolved.
Show resolved Hide resolved
.-(MaterializedRowCommitVersion.MATERIALIZED_COLUMN_NAME_PROP)

new CatalogTable(
identifier = targetTableIdentifier,
Expand Down
21 changes: 21 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2805,6 +2805,27 @@ trait DeltaErrorsBase
new DeltaIllegalStateException(errorClass = "DELTA_ROW_ID_ASSIGNMENT_WITHOUT_STATS")
}

def addingColumnWithInternalNameFailed(colName: String): Throwable = {
new DeltaRuntimeException(
errorClass = "DELTA_ADDING_COLUMN_WITH_INTERNAL_NAME_FAILED",
messageParameters = Array(colName)
)
}

def materializedRowIdMetadataMissing(tableName: String): Throwable = {
new DeltaIllegalStateException(
errorClass = "DELTA_MATERIALIZED_ROW_TRACKING_COLUMN_NAME_MISSING",
messageParameters = Array("Row ID", tableName)
)
}

def materializedRowCommitVersionMetadataMissing(tableName: String): Throwable = {
new DeltaIllegalStateException(
errorClass = "DELTA_MATERIALIZED_ROW_TRACKING_COLUMN_NAME_MISSING",
messageParameters = Array("Row Commit Version", tableName)
)
}

def domainMetadataDuplicate(domainName: String): Throwable = {
new DeltaIllegalArgumentException(
errorClass = "DELTA_DUPLICATE_DOMAIN_METADATA_INTERNAL_ERROR",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta

import java.util.UUID

import org.apache.spark.sql.delta.actions.{Metadata, Protocol}

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.catalyst.expressions.Attribute

/**
* Represents a materialized row tracking column. Concrete implementations are [[MaterializedRowId]]
* and [[MaterializedRowCommitVersion]].
*/
abstract class MaterializedRowTrackingColumn {
/**
* Table metadata configuration property name storing the name of this materialized row tracking
* column.
*/
val MATERIALIZED_COLUMN_NAME_PROP: String

/** Prefix to use for the name of this materialized row tracking column */
val MATERIALIZED_COLUMN_NAME_PREFIX: String

/**
* Returns the exception to throw when the materialized column name is not set in the table
* metadata. The table name is passed as argument.
*/
def missingMetadataException: String => Throwable

/**
* Generate a random name for a materialized row tracking column. The generated name contains a
* unique UUID, we assume it shall not conflict with existing column.
*/
private def generateMaterializedColumnName: String =
MATERIALIZED_COLUMN_NAME_PREFIX + UUID.randomUUID().toString

/**
* Update this materialized row tracking column name in the metadata.
* - If row tracking is not allowed or not supported, this operation is a noop.
* - If row tracking is supported on the table and no name is assigned to the old metadata, we
* assign a name. If a name was already assigned, we copy over this name.
* Throws in case the assignment of a new name fails due to a conflict.
*/
private[delta] def updateMaterializedColumnName(
protocol: Protocol,
oldMetadata: Metadata,
newMetadata: Metadata): Metadata = {
if (!RowTracking.isSupported(protocol)) {
// During a CLONE we might not enable row tracking, but still receive the materialized column
// name from the source. In this case, we need to remove the column name to not have the same
// column name in two different tables.
return newMetadata.copy(
configuration = newMetadata.configuration - MATERIALIZED_COLUMN_NAME_PROP)
}

// Take the materialized column name from the old metadata, as this is the materialized column
// name of the current table. We overwrite the materialized column name of the new metadata as
// it could contain a materialized column name from another table, e.g. the source table during
// a CLONE.
val materializedColumnName = oldMetadata.configuration
.getOrElse(MATERIALIZED_COLUMN_NAME_PROP, generateMaterializedColumnName)
newMetadata.copy(configuration = newMetadata.configuration +
(MATERIALIZED_COLUMN_NAME_PROP -> materializedColumnName))
}

/**
* Throws an exception if row tracking is allowed and the materialized column name conflicts with
* another column name.
*/
private[delta] def throwIfMaterializedColumnNameConflictsWithSchema(metadata: Metadata): Unit = {
val logicalColumnNames = metadata.schema.fields.map(_.name)
val physicalColumnNames = metadata.schema.fields
.map(field => DeltaColumnMapping.getPhysicalName(field))

metadata.configuration.get(MATERIALIZED_COLUMN_NAME_PROP).foreach { columnName =>
if (logicalColumnNames.contains(columnName) || physicalColumnNames.contains(columnName)) {
throw DeltaErrors.addingColumnWithInternalNameFailed(columnName)
}
}
}

/** Extract the materialized column name from the [[Metadata]] of a [[DeltaLog]]. */
def getMaterializedColumnName(protocol: Protocol, metadata: Metadata): Option[String] = {
if (RowTracking.isEnabled(protocol, metadata)) {
metadata.configuration.get(MATERIALIZED_COLUMN_NAME_PROP)
} else {
None
}
}

/** Convenience method that throws if the materialized column name cannot be extracted. */
def getMaterializedColumnNameOrThrow(
protocol: Protocol, metadata: Metadata, tableId: String): String = {
getMaterializedColumnName(protocol, metadata).getOrElse {
throw missingMetadataException(tableId)
}
}

/**
* If Row tracking is enabled, return an Expression referencing this Row tracking column Attribute
* in 'dataFrame' if one is available. Otherwise returns None.
*/
private[delta] def getAttribute(
snapshot: Snapshot, dataFrame: DataFrame): Option[Attribute] = {
if (!RowTracking.isEnabled(snapshot.protocol, snapshot.metadata)) {
return None
}

val materializedColumnName = getMaterializedColumnNameOrThrow(
snapshot.protocol, snapshot.metadata, snapshot.deltaLog.tableId)

val analyzedPlan = dataFrame.queryExecution.analyzed
analyzedPlan.outputSet.view.find(attr => materializedColumnName == attr.name)
}
}

object MaterializedRowId extends MaterializedRowTrackingColumn {
/**
* Table metadata configuration property name storing the name of the column in which the
* Row IDs are materialized.
*/
val MATERIALIZED_COLUMN_NAME_PROP = "delta.rowTracking.materializedRowIdColumnName"

/** Prefix to use for the name of the materialized Row ID column */
val MATERIALIZED_COLUMN_NAME_PREFIX = "_row-id-col-"

def missingMetadataException: String => Throwable = DeltaErrors.materializedRowIdMetadataMissing
}

object MaterializedRowCommitVersion extends MaterializedRowTrackingColumn {
/**
* Table metadata configuration property name storing the name of the column in which the
* Row commit versions are materialized.
*/
val MATERIALIZED_COLUMN_NAME_PROP = "delta.rowTracking.materializedRowCommitVersionColumnName"

/** Prefix to use for the name of the materialized Row commit version column */
val MATERIALIZED_COLUMN_NAME_PREFIX = "_row-commit-version-col-"

def missingMetadataException: String => Throwable =
DeltaErrors.materializedRowCommitVersionMetadataMissing
}
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,10 @@ trait OptimisticTransactionImpl extends TransactionalWrite
setNewProtocolWithFeaturesEnabledByMetadata(newMetadataTmp)
}

newMetadataTmp = MaterializedRowId.updateMaterializedColumnName(
protocol, oldMetadata = snapshot.metadata, newMetadataTmp)
newMetadataTmp = MaterializedRowCommitVersion.updateMaterializedColumnName(
protocol, oldMetadata = snapshot.metadata, newMetadataTmp)

RowId.verifyMetadata(
snapshot.protocol, protocol, snapshot.metadata, newMetadataTmp, isCreatingNewTable)
Expand Down Expand Up @@ -652,6 +656,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite
if (spark.conf.get(DeltaSQLConf.DELTA_TABLE_PROPERTY_CONSTRAINTS_CHECK_ENABLED)) {
Protocol.assertTablePropertyConstraintsSatisfied(spark, metadata, snapshot)
}
MaterializedRowId.throwIfMaterializedColumnNameConflictsWithSchema(metadata)
MaterializedRowCommitVersion.throwIfMaterializedColumnNameConflictsWithSchema(metadata)
}

private def setNewProtocolWithFeaturesEnabledByMetadata(metadata: Metadata): Unit = {
Expand Down
16 changes: 0 additions & 16 deletions spark/src/main/scala/org/apache/spark/sql/delta/RowId.scala
Original file line number Diff line number Diff line change
Expand Up @@ -127,20 +127,4 @@ object RowId {
*/
private[delta] def extractHighWatermark(snapshot: Snapshot): Option[Long] =
RowTrackingMetadataDomain.fromSnapshot(snapshot).map(_.rowIdHighWaterMark)

/**
* Checks whether CONVERT TO DELTA collects statistics if row tracking is supported. If it does
* not collect statistics, we cannot assign fresh row IDs, hence we throw an error to either rerun
* the command without enabling the row tracking table feature, or to enable the necessary
* flags to collect statistics.
*/
private[delta] def checkStatsCollectedIfRowTrackingSupported(
protocol: Protocol,
convertToDeltaShouldCollectStats: Boolean,
statsCollectionEnabled: Boolean): Unit = {
if (!isSupported(protocol)) return
if (!convertToDeltaShouldCollectStats || !statsCollectionEnabled) {
throw DeltaErrors.convertToDeltaRowTrackingEnabledWithoutStatsCollection
}
}
}
44 changes: 44 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/RowTracking.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,48 @@ object RowTracking {
}
isEnabled
}

/**
* Checks whether CONVERT TO DELTA collects statistics if row tracking is supported. If it does
* not collect statistics, we cannot assign fresh row IDs, hence we throw an error to either rerun
* the command without enabling the row tracking table feature, or to enable the necessary
* flags to collect statistics.
*/
private[delta] def checkStatsCollectedIfRowTrackingSupported(
protocol: Protocol,
convertToDeltaShouldCollectStats: Boolean,
statsCollectionEnabled: Boolean): Unit = {
if (!isSupported(protocol)) return
if (!convertToDeltaShouldCollectStats || !statsCollectionEnabled) {
throw DeltaErrors.convertToDeltaRowTrackingEnabledWithoutStatsCollection
}
}

/**
* Returns the sourceMetadata with the row tracking property coming from the targetMetadata.
*/
private[delta] def takeRowTrackingPropertyFromTarget(
targetMetadata: Metadata,
sourceMetadata: Metadata): Metadata = {
var newConfig = sourceMetadata.configuration - DeltaConfigs.ROW_TRACKING_ENABLED.key
targetMetadata.configuration.get(DeltaConfigs.ROW_TRACKING_ENABLED.key).foreach { v =>
newConfig += DeltaConfigs.ROW_TRACKING_ENABLED.key -> v
}
sourceMetadata.copy(configuration = newConfig)
}

/**
* Removes the row tracking property from the metadata.
*/
private[delta] def removeRowTrackingProperty(metadata: Metadata): Metadata = {
metadata.copy(configuration = metadata.configuration - DeltaConfigs.ROW_TRACKING_ENABLED.key)
}

/**
* Removes the row tracking table feature from the protocol.
*/
private[delta] def removeRowTrackingTableFeature(protocol: Protocol): Protocol = {
val writerFeaturesWithoutRowTracking = protocol.writerFeatures.map(_ - RowTrackingFeature.name)
protocol.copy(writerFeatures = writerFeaturesWithoutRowTracking)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -276,10 +276,25 @@ abstract class CloneTableBase(
private def prepareSourceMetadata(
targetSnapshot: SnapshotDescriptor,
opName: String): Metadata = {
var clonedMetadata =
sourceTable.metadata.copy(
id = UUID.randomUUID().toString,
name = targetSnapshot.metadata.name,
description = targetSnapshot.metadata.description)
if (opName == CloneTableCommand.OP_NAME) {
// If it's a new table, we remove the row tracking table property to create a 1:1 CLONE of
// the source, just without row tracking. If it's an existing table, we take whatever
// setting is currently on the target, as the setting should be independent between
// target and source.
if (targetSnapshot.version == -1) {
clonedMetadata = RowTracking.removeRowTrackingProperty(clonedMetadata)
} else {
clonedMetadata = RowTracking.takeRowTrackingPropertyFromTarget(
targetMetadata = targetSnapshot.metadata,
sourceMetadata = clonedMetadata)
}
}
clonedMetadata
}

/**
Expand Down Expand Up @@ -344,12 +359,18 @@ abstract class CloneTableBase(
conf.getConf(DeltaSQLConf.RESTORE_TABLE_PROTOCOL_DOWNGRADE_ALLOWED) ||
// It's not a real downgrade if the table doesn't exist before the CLONE.
txn.snapshot.version == -1
val sourceProtocolWithoutRowTracking = RowTracking.removeRowTrackingTableFeature(sourceProtocol)

if (protocolDowngradeAllowed) {
minReaderVersion = minReaderVersion.max(sourceProtocol.minReaderVersion)
minWriterVersion = minWriterVersion.max(sourceProtocol.minWriterVersion)
val minProtocol = Protocol(minReaderVersion, minWriterVersion).withFeatures(enabledFeatures)
sourceProtocol.merge(minProtocol)
// Row tracking settings should be independent between target and source.
if (opName == CloneTableCommand.OP_NAME) {
sourceProtocolWithoutRowTracking.merge(minProtocol)
} else {
sourceProtocol.merge(minProtocol)
}
} else {
// Take the maximum of all protocol versions being merged to ensure that table features
// from table property overrides are correctly added to the table feature list or are only
Expand All @@ -359,7 +380,12 @@ abstract class CloneTableBase(
minWriterVersion = Seq(
targetProtocol.minWriterVersion, sourceProtocol.minWriterVersion, minWriterVersion).max
val minProtocol = Protocol(minReaderVersion, minWriterVersion).withFeatures(enabledFeatures)
targetProtocol.merge(sourceProtocol, minProtocol)
// Row tracking settings should be independent between target and source.
if (opName == CloneTableCommand.OP_NAME) {
targetProtocol.merge(sourceProtocolWithoutRowTracking, minProtocol)
} else {
targetProtocol.merge(sourceProtocol, minProtocol)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ abstract class ConvertToDeltaCommandBase(
// TODO: we have not decided on how to implement CONVERT TO DELTA under column mapping modes
// for some convert targets so we block this feature for them here
checkColumnMapping(txn.metadata, targetTable)
RowId.checkStatsCollectedIfRowTrackingSupported(
RowTracking.checkStatsCollectedIfRowTrackingSupported(
txn.protocol,
collectStats,
statsEnabled)
Expand Down
Loading