Skip to content
Permalink
Browse files
[HUDI-3721] Delete MDT if necessary when trigger rollback to savepoint (
#5173)

Co-authored-by: yuezhang <yuezhang@freewheel.tv>
  • Loading branch information
zhangyue19921010 and yuezhang committed Mar 31, 2022
1 parent 2c4554f commit 2dbb273d26ea38ecf28af704876861554250449c
Showing 9 changed files with 106 additions and 24 deletions.
@@ -22,13 +22,18 @@
import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.commands.TableCommand;
import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;

import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.shell.core.CommandResult;
@@ -118,6 +123,54 @@ public void testRollbackToSavepoint() throws IOException {
new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", "103")));
}

/**
* Test case of command 'savepoint rollback' with metadata table bootstrap.
*/
@Test
public void testRollbackToSavepointWithMetadataTableEnable() throws IOException {
// generate for savepoints
for (int i = 101; i < 105; i++) {
String instantTime = String.valueOf(i);
HoodieTestDataGenerator.createCommitFile(tablePath, instantTime, jsc.hadoopConfiguration());
}

// generate one savepoint at 102
String savepoint = "102";
HoodieTestDataGenerator.createSavepointFile(tablePath, savepoint, jsc.hadoopConfiguration());

// re-bootstrap metadata table
// delete first
String basePath = metaClient.getBasePath();
Path metadataTableBasePath = new Path(HoodieTableMetadata.getMetadataTableBasePath(basePath));
metaClient.getFs().delete(metadataTableBasePath, true);

// then bootstrap metadata table at instant 104
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath(HoodieCLI.basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build();
SparkHoodieBackedTableMetadataWriter.create(HoodieCLI.conf, writeConfig, new HoodieSparkEngineContext(jsc));

assertTrue(HoodieCLI.fs.exists(metadataTableBasePath));

// roll back to savepoint
CommandResult cr = getShell().executeCommand(
String.format("savepoint rollback --savepoint %s --sparkMaster %s", savepoint, "local"));

assertAll("Command run failed",
() -> assertTrue(cr.isSuccess()),
() -> assertEquals(
String.format("Savepoint \"%s\" rolled back", savepoint), cr.getResult().toString()));

// there is 1 restore instant
HoodieActiveTimeline timeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
assertEquals(1, timeline.getRestoreTimeline().countInstants());

// 103 and 104 instant had rollback
assertFalse(timeline.getCommitTimeline().containsInstant(
new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", "103")));
assertFalse(timeline.getCommitTimeline().containsInstant(
new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", "104")));
}

/**
* Test case of command 'savepoint delete'.
*/
@@ -18,6 +18,7 @@

package org.apache.hudi.client;

import org.apache.hadoop.fs.Path;
import org.apache.hudi.async.AsyncArchiveService;
import org.apache.hudi.async.AsyncCleanerService;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
@@ -66,6 +67,7 @@
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.table.BulkInsertPartitioner;
@@ -643,9 +645,30 @@ public void deleteSavepoint(String savepointTime) {
* @return true if the savepoint was restored to successfully
*/
public void restoreToSavepoint(String savepointTime) {
HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, Option.empty());
boolean initialMetadataTableIfNecessary = config.isMetadataTableEnabled();
if (initialMetadataTableIfNecessary) {
try {
// Delete metadata table directly when users trigger savepoint rollback if mdt existed and beforeTimelineStarts
String metadataTableBasePathStr = HoodieTableMetadata.getMetadataTableBasePath(config.getBasePath());
HoodieTableMetaClient mdtClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePathStr).build();
// Same as HoodieTableMetadataUtil#processRollbackMetadata
HoodieInstant syncedInstant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, savepointTime);
// The instant required to sync rollback to MDT has been archived and the mdt syncing will be failed
// So that we need to delete the whole MDT here.
if (mdtClient.getCommitsTimeline().isBeforeTimelineStarts(syncedInstant.getTimestamp())) {
mdtClient.getFs().delete(new Path(metadataTableBasePathStr), true);
// rollbackToSavepoint action will try to bootstrap MDT at first but sync to MDT will fail at the current scenario.
// so that we need to disable metadata initialized here.
initialMetadataTableIfNecessary = false;
}
} catch (Exception e) {
// Metadata directory does not exist
}
}

HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, Option.empty(), initialMetadataTableIfNecessary);
SavepointHelpers.validateSavepointPresence(table, savepointTime);
restoreToInstant(savepointTime);
restoreToInstant(savepointTime, initialMetadataTableIfNecessary);
SavepointHelpers.validateSavepointRestore(table, savepointTime);
}

@@ -659,7 +682,7 @@ public boolean rollback(final String commitInstantTime) throws HoodieRollbackExc
/**
* @Deprecated
* Rollback the inflight record changes with the given commit time. This
* will be removed in future in favor of {@link BaseHoodieWriteClient#restoreToInstant(String)}
* will be removed in future in favor of {@link BaseHoodieWriteClient#restoreToInstant(String, boolean)
*
* @param commitInstantTime Instant time of the commit
* @param pendingRollbackInfo pending rollback instant and plan if rollback failed from previous attempt.
@@ -717,12 +740,12 @@ public boolean rollback(final String commitInstantTime, Option<HoodiePendingRoll
*
* @param instantTime Instant time to which restoration is requested
*/
public HoodieRestoreMetadata restoreToInstant(final String instantTime) throws HoodieRestoreException {
public HoodieRestoreMetadata restoreToInstant(final String instantTime, boolean initialMetadataTableIfNecessary) throws HoodieRestoreException {
LOG.info("Begin restore to instant " + instantTime);
final String restoreInstantTime = HoodieActiveTimeline.createNewInstantTime();
Timer.Context timerContext = metrics.getRollbackCtx();
try {
HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, Option.empty());
HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, Option.empty(), initialMetadataTableIfNecessary);
Option<HoodieRestorePlan> restorePlanOption = table.scheduleRestore(context, restoreInstantTime, instantTime);
if (restorePlanOption.isPresent()) {
HoodieRestoreMetadata restoreMetadata = table.restore(context, restoreInstantTime, instantTime);
@@ -1288,22 +1311,22 @@ public HoodieMetrics getMetrics() {
* @param instantTime current inflight instant time
* @return instantiated {@link HoodieTable}
*/
protected abstract HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime);
protected abstract HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime, boolean initialMetadataTableIfNecessary);

/**
* Instantiates and initializes instance of {@link HoodieTable}, performing crucial bootstrapping
* operations such as:
*
* NOTE: This method is engine-agnostic and SHOULD NOT be overloaded, please check on
* {@link #doInitTable(HoodieTableMetaClient, Option<String>)} instead
* {@link #doInitTable(HoodieTableMetaClient, Option, boolean)} instead
*
* <ul>
* <li>Checking whether upgrade/downgrade is required</li>
* <li>Bootstrapping Metadata Table (if required)</li>
* <li>Initializing metrics contexts</li>
* </ul>
*/
protected final HoodieTable initTable(WriteOperationType operationType, Option<String> instantTime) {
protected final HoodieTable initTable(WriteOperationType operationType, Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
HoodieTableMetaClient metaClient = createMetaClient(true);
// Setup write schemas for deletes
if (operationType == WriteOperationType.DELETE) {
@@ -1315,7 +1338,7 @@ protected final HoodieTable initTable(WriteOperationType operationType, Option<S
this.txnManager.beginTransaction();
try {
tryUpgrade(metaClient, instantTime);
table = doInitTable(metaClient, instantTime);
table = doInitTable(metaClient, instantTime, initialMetadataTableIfNecessary);
} finally {
this.txnManager.endTransaction();
}
@@ -1348,6 +1371,10 @@ protected final HoodieTable initTable(WriteOperationType operationType, Option<S
return table;
}

protected final HoodieTable initTable(WriteOperationType operationType, Option<String> instantTime) {
return initTable(operationType, instantTime, config.isMetadataTableEnabled());
}

/**
* Sets write schema from last instant since deletes may not have schema set in the config.
*/
@@ -398,7 +398,7 @@ public HoodieWriteMetadata<List<WriteStatus>> cluster(final String clusteringIns
}

@Override
protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime) {
protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
// Create a Hoodie table which encapsulated the commits and files visible
return getHoodieTable();
}
@@ -233,7 +233,7 @@ public HoodieWriteMetadata<List<WriteStatus>> cluster(final String clusteringIns
}

@Override
protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime) {
protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
// new JavaUpgradeDowngrade(metaClient, config, context).run(metaClient, HoodieTableVersion.current(), config, context, instantTime);

// Create a Hoodie table which encapsulated the commits and files visible
@@ -425,11 +425,13 @@ private void updateTableMetadata(HoodieTable table, HoodieCommitMetadata commitM
}

@Override
protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime) {
// Initialize Metadata Table to make sure it's bootstrapped _before_ the operation,
// if it didn't exist before
// See https://issues.apache.org/jira/browse/HUDI-3343 for more details
initializeMetadataTable(instantTime);
protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
if (initialMetadataTableIfNecessary) {
// Initialize Metadata Table to make sure it's bootstrapped _before_ the operation,
// if it didn't exist before
// See https://issues.apache.org/jira/browse/HUDI-3343 for more details
initializeMetadataTable(instantTime);
}

// Create a Hoodie table which encapsulated the commits and files visible
return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient, config.isMetadataTableEnabled());
@@ -291,7 +291,7 @@ public void testMORTable() throws Exception {
}

// Rollback to the original schema
client.restoreToInstant("004");
client.restoreToInstant("004", hoodieWriteConfig.isMetadataTableEnabled());
checkLatestDeltaCommit("004");

// Updates with original schema are now allowed
@@ -432,7 +432,7 @@ public void testCopyOnWriteTable() throws Exception {

// Revert to the older commit and ensure that the original schema can now
// be used for inserts and inserts.
client.restoreToInstant("003");
client.restoreToInstant("003", hoodieWriteConfig.isMetadataTableEnabled());
curTimeline = metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants();
assertTrue(curTimeline.lastInstant().get().getTimestamp().equals("003"));
checkReadRecords("000", numRecords);
@@ -1208,7 +1208,7 @@ private void testTableOperationsImpl(HoodieSparkEngineContext engineContext, Hoo
validateMetadata(client);

// Restore
client.restoreToInstant("0000006");
client.restoreToInstant("0000006", writeConfig.isMetadataTableEnabled());
validateMetadata(client);
}
}
@@ -585,7 +585,7 @@ private void testUpsertsInternal(HoodieWriteConfig config,

client.savepoint("004", "user1","comment1");

client.restoreToInstant("004");
client.restoreToInstant("004", config.isMetadataTableEnabled());

assertFalse(metaClient.reloadActiveTimeline().getRollbackTimeline().lastInstant().isPresent());

@@ -150,7 +150,7 @@ void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) thro
// NOTE: First writer will have Metadata table DISABLED
HoodieWriteConfig.Builder cfgBuilder =
getConfigBuilder(false, rollbackUsingMarkers, HoodieIndex.IndexType.SIMPLE);

addConfigsForPopulateMetaFields(cfgBuilder, true);
HoodieWriteConfig cfg = cfgBuilder.build();

@@ -480,7 +480,7 @@ void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception {
copyOfRecords.clear();

// Rollback latest commit first
client.restoreToInstant("000");
client.restoreToInstant("000", cfg.isMetadataTableEnabled());

metaClient = HoodieTableMetaClient.reload(metaClient);
allFiles = listAllBaseFilesInPath(hoodieTable);
@@ -530,7 +530,7 @@ void testMORTableRestore(boolean restoreAfterCompaction) throws Exception {

if (!restoreAfterCompaction) {
// restore to 002 and validate records.
client.restoreToInstant("002");
client.restoreToInstant("002", cfg.isMetadataTableEnabled());
validateRecords(cfg, metaClient, updates1);
} else {
// trigger compaction and then trigger couple of upserts followed by restore.
@@ -546,7 +546,7 @@ void testMORTableRestore(boolean restoreAfterCompaction) throws Exception {
validateRecords(cfg, metaClient, updates5);

// restore to 003 and validate records.
client.restoreToInstant("003");
client.restoreToInstant("003", cfg.isMetadataTableEnabled());
validateRecords(cfg, metaClient, updates2);
}
}

0 comments on commit 2dbb273

Please sign in to comment.