Skip to content

Commit

Permalink
Add Feature Phaseout support for V2 Checkpoints
Browse files Browse the repository at this point in the history
This PR adds table table feature phaseout support for V2 checkpoints. Users can now downgrade their tables from v2 checkpoints to classic checkpoints allowing older clients to interact with these tables.

Closes #2284

GitOrigin-RevId: 6321298103baeda6fbd9a0d9932090f915af4ee0
  • Loading branch information
dhruvarya-db authored and allisonport-db committed Nov 16, 2023
1 parent 355263f commit bcd0ee2
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 1 deletion.
Expand Up @@ -15,9 +15,10 @@
*/

package org.apache.spark.sql.delta
import java.util.concurrent.TimeUnit

import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.commands.AlterTableUnsetPropertiesDeltaCommand
import org.apache.spark.sql.delta.commands.{AlterTableSetPropertiesDeltaCommand, AlterTableUnsetPropertiesDeltaCommand}
import org.apache.spark.sql.delta.metering.DeltaLogging

/**
Expand Down Expand Up @@ -87,3 +88,32 @@ case class TestLegacyReaderWriterFeaturePreDowngradeCommand(table: DeltaTableV2)
true
}
}

case class V2CheckpointPreDowngradeCommand(table: DeltaTableV2)
extends PreDowngradeTableFeatureCommand
with DeltaLogging {
/**
* We set the checkpoint policy to classic to prevent any transactions from creating
* v2 checkpoints.
*
* @return True if it changed checkpoint policy metadata property to classic.
* False otherwise.
*/
override def removeFeatureTracesIfNeeded(): Boolean = {

if (V2CheckpointTableFeature.validateRemoval(table.initialSnapshot)) return false

val startTimeNs = System.nanoTime()
val properties = Map(DeltaConfigs.CHECKPOINT_POLICY.key -> CheckpointPolicy.Classic.name)
AlterTableSetPropertiesDeltaCommand(table, properties).run(table.spark)

recordDeltaEvent(
table.deltaLog,
opType = "delta.v2CheckpointFeatureRemovalMetrics",
data =
Map(("downgradeTimeMs", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)))
)

true
}
}
26 changes: 26 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala
Expand Up @@ -535,6 +535,7 @@ object ClusteringTableFeature extends WriterFeature("clustering") {
*/
object V2CheckpointTableFeature
extends ReaderWriterFeature(name = "v2Checkpoint")
with RemovableFeature
with FeatureAutomaticallyEnabledByMetadata {

override def automaticallyUpdateProtocolOfExistingTables: Boolean = true
Expand All @@ -545,6 +546,31 @@ object V2CheckpointTableFeature
override def metadataRequiresFeatureToBeEnabled(
metadata: Metadata,
spark: SparkSession): Boolean = isV2CheckpointSupportNeededByMetadata(metadata)

override def validateRemoval(snapshot: Snapshot): Boolean = {
// Fail validation if v2 checkpoints are still enabled in the current snapshot
if (isV2CheckpointSupportNeededByMetadata(snapshot.metadata)) return false

// Validation also fails if the current snapshot might depend on a v2 checkpoint.
// NOTE: Empty and preloaded checkpoint providers never reference v2 checkpoints.
snapshot.checkpointProvider match {
case p if p.isEmpty => true
case _: PreloadedCheckpointProvider => true
case lazyProvider: LazyCompleteCheckpointProvider =>
lazyProvider.underlyingCheckpointProvider.isInstanceOf[PreloadedCheckpointProvider]
case _ => false
}
}

override def actionUsesFeature(action: Action): Boolean = action match {
case m: Metadata => isV2CheckpointSupportNeededByMetadata(m)
case _: CheckpointMetadata => true
case _: SidecarFile => true
case _ => false
}

override def preDowngradeCommand(table: DeltaTableV2): PreDowngradeTableFeatureCommand =
V2CheckpointPreDowngradeCommand(table)
}

/**
Expand Down
Expand Up @@ -31,11 +31,13 @@ import org.apache.spark.sql.delta.commands.{AlterTableDropFeatureDeltaCommand, A
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
import org.apache.spark.sql.delta.util.FileNames
import org.apache.spark.sql.delta.util.FileNames.{deltaFile, DeltaFile}
import org.apache.spark.sql.delta.util.JsonUtils

import org.apache.spark.{SparkConf, SparkThrowable}
import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.util.DateTimeConstants
import org.apache.spark.sql.execution.streaming.MemoryStream
Expand Down Expand Up @@ -3139,6 +3141,139 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
expectedDowngradedProtocol = protocolWithReaderFeature(TestRemovableReaderWriterFeature))
}

private def dropV2CheckpointsTableFeature(spark: SparkSession, log: DeltaLog): Unit = {
spark.sql(s"ALTER TABLE delta.`${log.dataPath}` DROP FEATURE " +
s"`${V2CheckpointTableFeature.name}`")
}

private def testV2CheckpointTableFeatureDrop(
v2CheckpointFormat: V2Checkpoint.Format,
withInitialV2Checkpoint: Boolean,
forceMultiPartCheckpoint: Boolean = false): Unit = {
var confs = Seq(
DeltaConfigs.CHECKPOINT_POLICY.defaultTablePropertyKey -> CheckpointPolicy.V2.name,
DeltaSQLConf.CHECKPOINT_V2_TOP_LEVEL_FILE_FORMAT.key -> v2CheckpointFormat.name
)
val expectedClassicCheckpointType = if (forceMultiPartCheckpoint) {
confs :+= DeltaSQLConf.DELTA_CHECKPOINT_PART_SIZE.key -> "1"
CheckpointInstance.Format.WITH_PARTS
} else {
CheckpointInstance.Format.SINGLE
}
withSQLConf(confs: _*) {
withTempPath { dir =>
val clock = new ManualClock(System.currentTimeMillis())
val targetLog = DeltaLog.forTable(spark, dir, clock)
val defaultRetentionPeriod =
DeltaConfigs.LOG_RETENTION.fromMetaData(targetLog.update().metadata).toString

val targetDF = spark.range(start = 0, end = 100, step = 1, numPartitions = 2)
targetDF.write.format("delta").save(dir.toString)

val initialCheckpointCount = if (withInitialV2Checkpoint) 1 else 0

if (withInitialV2Checkpoint) {
// Create a v2 checkpoint.
targetLog.checkpoint()
}

// Assert that the current checkpointing policy requires v2 checkpoint support.
val preDowngradeSnapshot = targetLog.update()
assert(
DeltaConfigs.CHECKPOINT_POLICY
.fromMetaData(preDowngradeSnapshot.metadata)
.needsV2CheckpointSupport)
val checkpointFiles = targetLog.listFrom(0).filter(FileNames.isCheckpointFile)
assert(checkpointFiles.length == initialCheckpointCount)
checkpointFiles.foreach { f =>
assert(CheckpointInstance(f.getPath).format == CheckpointInstance.Format.V2)
}

// Dropping the feature should fail because
// 1. The checkpointing policy in metadata requires v2 checkpoint support.
// 2. Also, when initialCheckpointCount = true, there is a v2 checkpoint.
val e1 = intercept[DeltaTableFeatureException] {
dropV2CheckpointsTableFeature(spark, targetLog)
}
checkError(
exception = e1,
errorClass = "DELTA_FEATURE_DROP_WAIT_FOR_RETENTION_PERIOD",
parameters = Map(
"feature" -> V2CheckpointTableFeature.name,
"logRetentionPeriodKey" -> "delta.logRetentionDuration",
"logRetentionPeriod" -> defaultRetentionPeriod,
"truncateHistoryLogRetentionPeriod" -> truncateHistoryDefaultLogRetention.toString))

val postCleanupCheckpointFiles =
targetLog.listFrom(0).filter(FileNames.isCheckpointFile).toList

// Assert that a new classic checkpoint has been created.
val uniqueCheckpointCount = postCleanupCheckpointFiles
.drop(initialCheckpointCount)
.map { checkpointFile =>
val checkpointInstance = CheckpointInstance(checkpointFile.getPath)

assert(checkpointInstance.format == expectedClassicCheckpointType)

checkpointInstance.version
}
// Count a multi-part checkpoint as a single checkpoint.
.toSet.size
// Drop feature command generates one classic checkpoints after v2 checkpoint cleanup.
val expectedClassicCheckpointCount = 1
assert(uniqueCheckpointCount == expectedClassicCheckpointCount)

spark.range(100, 120).write.format("delta").mode("append").save(dir.getCanonicalPath)

// V2 Checkpoint related traces have not been cleaned up yet. Attempt should fail.
val e2 = intercept[DeltaTableFeatureException] {
dropV2CheckpointsTableFeature(spark, targetLog)
}
checkError(
exception = e2,
errorClass = "DELTA_FEATURE_DROP_HISTORICAL_VERSIONS_EXIST",
parameters = Map(
"feature" -> V2CheckpointTableFeature.name,
"logRetentionPeriodKey" -> "delta.logRetentionDuration",
"logRetentionPeriod" -> defaultRetentionPeriod,
"truncateHistoryLogRetentionPeriod" -> truncateHistoryDefaultLogRetention.toString))

// Pretend retention period has passed.
clock.advance(
targetLog.deltaRetentionMillis(targetLog.update().metadata) +
TimeUnit.HOURS.toMillis(1))

// History is now clean. We should be able to remove the feature.
dropV2CheckpointsTableFeature(spark, targetLog)

val postDowngradeSnapshot = targetLog.update()
val protocol = postDowngradeSnapshot.protocol
assert(!protocol.readerFeatureNames.contains(V2CheckpointTableFeature.name))
assert(
!DeltaConfigs.CHECKPOINT_POLICY
.fromMetaData(postDowngradeSnapshot.metadata)
.needsV2CheckpointSupport)
assert(targetLog.listFrom(0).filter(FileNames.isCheckpointFile).forall { f =>
CheckpointInstance(f.getPath).format == expectedClassicCheckpointType
})
}
}
}

for (
v2CheckpointFormat <- V2Checkpoint.Format.ALL;
withInitialV2Checkpoint <- BOOLEAN_DOMAIN)
test(s"Remove v2 Checkpoints Feature [v2CheckpointFormat: ${v2CheckpointFormat.name}; " +
s"withInitialV2Checkpoint: $withInitialV2Checkpoint; forceMultiPartCheckpoint: false]") {
testV2CheckpointTableFeatureDrop(v2CheckpointFormat, withInitialV2Checkpoint)
}

test(
s"Remove v2 Checkpoints Feature [v2CheckpointFormat: ${V2Checkpoint.Format.PARQUET.name}; " +
s"withInitialV2Checkpoint: true; forceMultiPartCheckpoint: true]") {
testV2CheckpointTableFeatureDrop(V2Checkpoint.Format.PARQUET, true, true)
}

private def assertPropertiesAndShowTblProperties(
deltaLog: DeltaLog,
tableHasFeatures: Boolean = false): Unit = {
Expand Down

0 comments on commit bcd0ee2

Please sign in to comment.