Skip to content

Commit

Permalink
Introduce InCommitTimestamp feature and write monotonically increasin…
Browse files Browse the repository at this point in the history
…g timestamps in CommitInfo

Follow-up for #2532.

Adds a new writer feature called `inCommitTimestamp`. When this feature is enabled, the writer will make sure that it writes `commitTimestamp` in CommitInfo which contains a monotonically increasing timestamp.

This PR is an initial implementation, it does not handle timestamp retrieval efficiently. It does not try to populate the inCommitTimestamp in Snapshot even in places where it is already available, instead Snapshot has to perform an IO to read the timestamp.

Closes #2596

GitOrigin-RevId: 44904e734eee74378ee55f708beb29a484cd93e6
  • Loading branch information
dhruvarya-db authored and allisonport-db committed Mar 5, 2024
1 parent f50bd83 commit b15a2c9
Show file tree
Hide file tree
Showing 14 changed files with 753 additions and 53 deletions.
12 changes: 12 additions & 0 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -1388,6 +1388,18 @@
],
"sqlState" : "42703"
},
"DELTA_MISSING_COMMIT_INFO" : {
"message" : [
"This table has the feature <featureName> enabled which requires the presence of the CommitInfo action in every commit. However, the CommitInfo action is missing from commit version <version>."
],
"sqlState" : "KD004"
},
"DELTA_MISSING_COMMIT_TIMESTAMP" : {
"message" : [
"This table has the feature <featureName> enabled which requires the presence of commitTimestamp in the CommitInfo action. However, this field has not been set in commit version <version>."
],
"sqlState" : "KD004"
},
"DELTA_MISSING_DELTA_TABLE" : {
"message" : [
"<tableName> is not a Delta table."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ private[delta] class ConflictChecker(
checkForAddedFilesThatShouldHaveBeenReadByCurrentTxn()
checkForDeletedFilesAgainstCurrentTxnReadFiles()
checkForDeletedFilesAgainstCurrentTxnDeletedFiles()
resolveTimestampOrderingConflicts()

logMetrics()
currentTransactionInfo
Expand Down Expand Up @@ -554,6 +555,53 @@ private[delta] class ConflictChecker(
currentTransactionInfo = currentTransactionInfo.copy(actions = newActions)
}

/**
* Adjust the current transaction's commit timestamp to account for the winning
* transaction's commit timestamp. If this transaction newly enabled ICT, also update
* the table properties to reflect the adjusted enablement version and timestamp.
*/
private def resolveTimestampOrderingConflicts(): Unit = {
if (!DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.fromMetaData(currentTransactionInfo.metadata)) {
return
}

val winningCommitTimestamp =
if (InCommitTimestampUtils.didCurrentTransactionEnableICT(
currentTransactionInfo.metadata, currentTransactionInfo.readSnapshot)) {
// Since the current transaction enabled inCommitTimestamps, we should use the file
// timestamp from the winning transaction as its commit timestamp.
winningCommitFileStatus.getModificationTime
} else {
// Get the inCommitTimestamp from the winning transaction.
CommitInfo.getRequiredInCommitTimestamp(
winningCommitSummary.commitInfo, winningCommitVersion.toString)
}
val currentTransactionTimestamp = CommitInfo.getRequiredInCommitTimestamp(
currentTransactionInfo.commitInfo, "NEW_COMMIT")
// getRequiredInCommitTimestamp will throw an exception if commitInfo is None.
val currentTransactionCommitInfo = currentTransactionInfo.commitInfo.get
val updatedCommitTimestamp = Math.max(currentTransactionTimestamp, winningCommitTimestamp + 1)
val updatedCommitInfo =
currentTransactionCommitInfo.copy(inCommitTimestamp = Some(updatedCommitTimestamp))
currentTransactionInfo = currentTransactionInfo.copy(commitInfo = Some(updatedCommitInfo))
val nextAvailableVersion = winningCommitVersion + 1L
val updatedMetadata =
InCommitTimestampUtils.getUpdatedMetadataWithICTEnablementInfo(
updatedCommitTimestamp,
currentTransactionInfo.readSnapshot,
currentTransactionInfo.metadata,
nextAvailableVersion)
updatedMetadata.foreach { updatedMetadata =>
currentTransactionInfo = currentTransactionInfo.copy(
metadata = updatedMetadata,
actions = currentTransactionInfo.actions.map {
case _: Metadata => updatedMetadata
case other => other
}
)
}
}

/** A helper function for pretty printing a specific partition directory. */
protected def getPrettyPartitionMessage(partitionValues: Map[String, String]): String = {
val partitionColumns = currentTransactionInfo.partitionSchemaAtReadTime
Expand Down
31 changes: 31 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,37 @@ trait DeltaConfigsBase extends DeltaLogging {
_ => true,
"A string-to-string map of configuration properties for the managed commit owner.")

val IN_COMMIT_TIMESTAMPS_ENABLED = buildConfig[Boolean](
"enableInCommitTimestamps-dev",
false.toString,
_.toBoolean,
validationFunction = _ => true,
"needs to be a boolean."
)

/**
* This table property is used to track the version of the table at which
* inCommitTimestamps were enabled.
*/
val IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION = buildConfig[Option[Long]](
"inCommitTimestampEnablementVersion-dev",
null,
v => Option(v).map(_.toLong),
validationFunction = _ => true,
"needs to be a long."
)

/**
* This table property is used to track the timestamp at which inCommitTimestamps
* were enabled. More specifically, it is the inCommitTimestamp of the commit with
* the version specified in [[IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION]].
*/
val IN_COMMIT_TIMESTAMP_ENABLEMENT_TIMESTAMP = buildConfig[Option[Long]](
"inCommitTimestampEnablementTimestamp-dev",
null,
v => Option(v).map(_.toLong),
validationFunction = _ => true,
"needs to be a long.")
}

object DeltaConfigs extends DeltaConfigsBase
12 changes: 12 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,18 @@ trait DeltaErrorsBase
cause = cause)
}

def missingCommitInfo(featureName: String, commitVersion: String): DeltaIllegalStateException = {
new DeltaIllegalStateException(
errorClass = "DELTA_MISSING_COMMIT_INFO",
messageParameters = Array(featureName, commitVersion))
}

def missingCommitTimestamp(commitVersion: String): DeltaIllegalStateException = {
new DeltaIllegalStateException(
errorClass = "DELTA_MISSING_COMMIT_TIMESTAMP",
messageParameters = Array(InCommitTimestampTableFeature.name, commitVersion))
}

def failOnCheckpointRename(src: Path, dest: Path): DeltaIllegalStateException = {
new DeltaIllegalStateException(
errorClass = "DELTA_CANNOT_RENAME_PATH",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,25 +255,37 @@ class DeltaHistoryManager(

/** Contains many utility methods that can also be executed on Spark executors. */
object DeltaHistoryManager extends DeltaLogging {
/** Get the persisted commit info for the given delta file. */
private def getCommitInfo(
/** Get the persisted commit info (if available) for the given delta file. */
def getCommitInfoOpt(
logStore: LogStore,
basePath: Path,
version: Long,
hadoopConf: Configuration): CommitInfo = {
hadoopConf: Configuration): Option[CommitInfo] = {
val logs = logStore.readAsIterator(FileNames.deltaFile(basePath, version), hadoopConf)
try {
val info = logs.map(Action.fromJson).collectFirst { case c: CommitInfo => c }
if (info.isEmpty) {
CommitInfo.empty(Some(version))
} else {
info.head.copy(version = Some(version))
}
logs
.map(Action.fromJson)
.collectFirst { case c: CommitInfo => c.copy(version = Some(version)) }
} finally {
logs.close()
}
}

/**
* Get the persisted commit info for the given delta file. If commit info
* is not found in the commit, a mostly empty [[CommitInfo]] object with only
* the version populated will be returned.
*/
private def getCommitInfo(
logStore: LogStore,
basePath: Path,
version: Long,
hadoopConf: Configuration): CommitInfo = {
getCommitInfoOpt(logStore, basePath, version, hadoopConf).getOrElse {
CommitInfo.empty(Some(version))
}
}

/**
* Get the earliest commit available for this table. Note that this version isn't guaranteed to
* exist when performing an action as a concurrent operation can delete the file during cleanup.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.stats._
import org.apache.spark.sql.util.ScalaExtensions._
import org.apache.hadoop.fs.{FileStatus, Path}

import org.apache.spark.SparkException
Expand Down Expand Up @@ -296,7 +297,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite
* Tracks the start time since we started trying to write a particular commit.
* Used for logging duration of retried transactions.
*/
protected var commitAttemptStartTime: Long = _
protected var commitAttemptStartTimeMillis: Long = _

/**
* Tracks actions within the transaction, will commit along with the passed-in actions in the
Expand Down Expand Up @@ -371,6 +372,18 @@ trait OptimisticTransactionImpl extends TransactionalWrite
// will be detected as a conflict and the transaction will anyway fail.
private[delta] val preCommitCommitStoreOpt: Option[CommitStore] = snapshot.commitStoreOpt

/**
* Generates a timestamp which is greater than the commit timestamp
* of the last snapshot. Note that this is only needed when the
* feature `inCommitTimestamps` is enabled.
*/
protected[delta] def generateInCommitTimestampForFirstCommitAttempt(
currentTimestamp: Long): Option[Long] =
Option.when(DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.fromMetaData(metadata)) {
val lastCommitTimestamp = snapshot.timestamp
math.max(currentTimestamp, lastCommitTimestamp + 1)
}

/** The end to end execution time of this transaction. */
def txnExecutionTimeMs: Option[Long] = if (commitEndNano == -1) {
None
Expand Down Expand Up @@ -1086,31 +1099,50 @@ trait OptimisticTransactionImpl extends TransactionalWrite
val readRowIdHighWatermark =
RowId.extractHighWatermark(snapshot).getOrElse(RowId.MISSING_HIGH_WATER_MARK)

commitAttemptStartTimeMillis = clock.getTimeMillis()
commitInfo = CommitInfo(
clock.getTimeMillis(),
op.name,
op.jsonEncodedValues,
Map.empty,
Some(readVersion).filter(_ >= 0),
Option(isolationLevelToUse.toString),
Some(isBlindAppend),
getOperationMetrics(op),
getUserMetadata(op),
time = commitAttemptStartTimeMillis,
operation = op.name,
inCommitTimestamp =
generateInCommitTimestampForFirstCommitAttempt(commitAttemptStartTimeMillis),
operationParameters = op.jsonEncodedValues,
commandContext = Map.empty,
readVersion = Some(readVersion).filter(_ >= 0),
isolationLevel = Option(isolationLevelToUse.toString),
isBlindAppend = Some(isBlindAppend),
operationMetrics = getOperationMetrics(op),
userMetadata = getUserMetadata(op),
tags = if (tags.nonEmpty) Some(tags) else None,
txnId = Some(txnId))

// Validate that the [[DeltaConfigs.MANAGED_COMMIT_PROVIDER_CONF]] is json parse-able.
DeltaConfigs.MANAGED_COMMIT_OWNER_CONF.fromMetaData(metadata)

val firstAttemptVersion = getFirstAttemptVersion
val updatedMetadataOpt = commitInfo.inCommitTimestamp.flatMap { inCommitTimestamp =>
InCommitTimestampUtils.getUpdatedMetadataWithICTEnablementInfo(
inCommitTimestamp,
snapshot,
metadata,
firstAttemptVersion)
}
val updatedActions = updatedMetadataOpt.map { updatedMetadata =>
preparedActions.map {
case _: Metadata => updatedMetadata
case other => other
}
}
.getOrElse(preparedActions)
val updatedMetadata = updatedMetadataOpt.getOrElse(metadata)
val currentTransactionInfo = CurrentTransactionInfo(
txnId = txnId,
readPredicates = readPredicates.toSeq,
readFiles = readFiles.toSet,
readWholeTable = readTheWholeTable,
readAppIds = readTxn.toSet,
metadata = metadata,
metadata = updatedMetadata,
protocol = protocol,
actions = preparedActions,
actions = updatedActions,
readSnapshot = snapshot,
commitInfo = Option(commitInfo),
readRowIdHighWatermark = readRowIdHighWatermark,
Expand All @@ -1125,15 +1157,14 @@ trait OptimisticTransactionImpl extends TransactionalWrite
registerPostCommitHook(GenerateSymlinkManifest)
}

commitAttemptStartTime = clock.getTimeMillis()
if (preparedActions.isEmpty && canSkipEmptyCommits &&
skipRecordingEmptyCommitAllowed(isolationLevelToUse)) {
return None
}

val (commitVersion, postCommitSnapshot, updatedCurrentTransactionInfo) =
doCommitRetryIteratively(
getFirstAttemptVersion, currentTransactionInfo, isolationLevelToUse)
firstAttemptVersion, currentTransactionInfo, isolationLevelToUse)
logInfo(s"Committed delta #$commitVersion to ${deltaLog.logPath}")
(commitVersion, postCommitSnapshot, updatedCurrentTransactionInfo.actions)
} catch {
Expand Down Expand Up @@ -1182,15 +1213,17 @@ trait OptimisticTransactionImpl extends TransactionalWrite
newProtocolOpt: Option[Protocol],
op: DeltaOperations.Operation,
context: Map[String, String],
metrics: Map[String, String]): (Long, Snapshot) = {
metrics: Map[String, String]
): (Long, Snapshot) = recordDeltaOperation(deltaLog, "delta.commit.large") {
assert(!committed, "Transaction already committed.")
commitStartNano = System.nanoTime()
val attemptVersion = getFirstAttemptVersion
try {
val tags = Map.empty[String, String]
val commitInfo = CommitInfo(
time = clock.getTimeMillis(),
NANOSECONDS.toMillis(commitStartNano),
operation = op.name,
generateInCommitTimestampForFirstCommitAttempt(NANOSECONDS.toMillis(commitStartNano)),
operationParameters = op.jsonEncodedValues,
context,
readVersion = Some(readVersion),
Expand All @@ -1215,8 +1248,15 @@ trait OptimisticTransactionImpl extends TransactionalWrite
// Initialize everything needed to maintain auto-compaction stats.
partitionsAddedToOpt = Some(new mutable.HashSet[Map[String, String]])
val acStatsCollector = createAutoCompactStatsCollector()
val updatedMetadataOpt = commitInfo.inCommitTimestamp.flatMap { inCommitTimestamp =>
InCommitTimestampUtils.getUpdatedMetadataWithICTEnablementInfo(
inCommitTimestamp,
snapshot,
metadata,
attemptVersion)
}
var allActions =
Seq(commitInfo, metadata).toIterator ++
Seq(commitInfo, updatedMetadataOpt.getOrElse(metadata)).toIterator ++
nonProtocolMetadataActions ++
newProtocolOpt.toIterator
allActions = allActions.map { action =>
Expand Down Expand Up @@ -1719,7 +1759,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite
}
}
// retries all failed
val totalCommitAttemptTime = clock.getTimeMillis() - commitAttemptStartTime
val totalCommitAttemptTime = clock.getTimeMillis() - commitAttemptStartTimeMillis
throw DeltaErrors.maxCommitRetriesExceededException(
maxRetryAttempts + 1,
commitVersion,
Expand Down Expand Up @@ -1939,12 +1979,12 @@ trait OptimisticTransactionImpl extends TransactionalWrite
otherCommitFileStatus,
commitIsolationLevel)
logInfo(s"$logPrefixStr No conflicts in version $otherCommitVersion, " +
s"${clock.getTimeMillis() - commitAttemptStartTime} ms since start")
s"${clock.getTimeMillis() - commitAttemptStartTimeMillis} ms since start")
}

logInfo(s"$logPrefixStr No conflicts with versions [$checkVersion, $nextAttemptVersion) " +
s"with current txn having $txnDetailsLogStr, " +
s"${clock.getTimeMillis() - commitAttemptStartTime} ms since start")
s"${clock.getTimeMillis() - commitAttemptStartTimeMillis} ms since start")
(nextAttemptVersion, updatedCurrentTransactionInfo)
}
}
Expand Down

0 comments on commit b15a2c9

Please sign in to comment.