Skip to content
Permalink
Browse files
[HUDI-3451] Delete metadata table when the write client disables MDT (#…
…5186)

* Add checks for metadata table init to avoid possible out-of-sync

* Revise the logic to reuse existing table config

* Revise docs and naming

Co-authored-by: yuezhang <yuezhang@freewheel.tv>
Co-authored-by: Y Ethan Guo <ethan.guoyihua@gmail.com>
  • Loading branch information
3 people committed Apr 2, 2022
1 parent b1e7e1f commit 020786a5f9d25bf140decf24d65e07dd738e4f9d
Showing 6 changed files with 136 additions and 16 deletions.
@@ -18,11 +18,6 @@

package org.apache.hudi.table;

import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
@@ -50,6 +45,7 @@
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -64,6 +60,7 @@
import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
@@ -80,6 +77,12 @@
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.table.storage.HoodieLayoutFactory;
import org.apache.hudi.table.storage.HoodieStorageLayout;

import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

@@ -802,6 +805,48 @@ public <R extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetad
return Option.empty();
}

/**
* Deletes the metadata table if the writer disables metadata table with hoodie.metadata.enable=false
*/
public void maybeDeleteMetadataTable() {
if (shouldExecuteMetadataTableDeletion()) {
try {
Path mdtBasePath = new Path(HoodieTableMetadata.getMetadataTableBasePath(config.getBasePath()));
FileSystem fileSystem = metaClient.getFs();
if (fileSystem.exists(mdtBasePath)) {
LOG.info("Deleting metadata table because it is disabled in writer.");
fileSystem.delete(mdtBasePath, true);
}
clearMetadataTablePartitionsConfig();
} catch (IOException ioe) {
throw new HoodieIOException("Failed to delete metadata table.", ioe);
}
}
}

private boolean shouldExecuteMetadataTableDeletion() {
// Only execute metadata table deletion when all the following conditions are met
// (1) This is data table
// (2) Metadata table is disabled in HoodieWriteConfig for the writer
// (3) Check `HoodieTableConfig.TABLE_METADATA_PARTITIONS`. Either the table config
// does not exist, or the table config is non-empty indicating that metadata table
// partitions are ready to use
return !HoodieTableMetadata.isMetadataTable(metaClient.getBasePath())
&& !config.isMetadataTableEnabled()
&& (!metaClient.getTableConfig().contains(HoodieTableConfig.TABLE_METADATA_PARTITIONS)
|| !StringUtils.isNullOrEmpty(metaClient.getTableConfig().getMetadataPartitions()));
}

/**
* Clears hoodie.table.metadata.partitions in hoodie.properties
*/
private void clearMetadataTablePartitionsConfig() {
LOG.info("Clear hoodie.table.metadata.partitions in hoodie.properties");
metaClient.getTableConfig().setValue(
HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(), StringUtils.EMPTY_STRING);
HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), metaClient.getTableConfig().getProps());
}

public HoodieTableMetadata getMetadataTable() {
return this.metadata;
}
@@ -18,7 +18,6 @@

package org.apache.hudi.table;

import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.data.HoodieData;
@@ -37,6 +36,8 @@
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.table.action.HoodieWriteMetadata;

import org.apache.avro.specific.SpecificRecordBase;

import java.util.List;

import static org.apache.hudi.common.data.HoodieList.getList;
@@ -107,6 +108,7 @@ public <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetad
return Option.of(FlinkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config,
context, actionMetadata, Option.of(triggeringInstantTimestamp)));
} else {
maybeDeleteMetadataTable();
return Option.empty();
}
}
@@ -122,6 +122,8 @@ public <R extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetad
} catch (IOException e) {
throw new HoodieMetadataException("Checking existence of metadata table failed", e);
}
} else {
maybeDeleteMetadataTable();
}

return Option.empty();
@@ -18,15 +18,6 @@

package org.apache.hudi.client.functional;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.util.Time;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
@@ -76,6 +67,7 @@
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
@@ -100,6 +92,16 @@
import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.util.Time;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroSchemaConverter;
@@ -114,6 +116,7 @@
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
@@ -198,6 +201,69 @@ public void testMetadataTableBootstrap(HoodieTableType tableType, boolean addRol
validateMetadata(testTable, true);
}

@Test
public void testTurnOffMetadataTableAfterEnable() throws Exception {
init(COPY_ON_WRITE, true);
String instant1 = "0000001";
HoodieCommitMetadata hoodieCommitMetadata = doWriteOperationWithMeta(testTable, instant1, INSERT);

// Simulate the complete data directory including ".hoodie_partition_metadata" file
File metaForP1 = new File(metaClient.getBasePath() + "/p1",".hoodie_partition_metadata");
File metaForP2 = new File(metaClient.getBasePath() + "/p2",".hoodie_partition_metadata");
metaForP1.createNewFile();
metaForP2.createNewFile();

// Sync to metadata table
metaClient.reloadActiveTimeline();
HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient);
Option metadataWriter = table.getMetadataWriter(instant1, Option.of(hoodieCommitMetadata));
validateMetadata(testTable, true);

assertTrue(metadataWriter.isPresent());
HoodieTableConfig hoodieTableConfig =
new HoodieTableConfig(this.fs, metaClient.getMetaPath(), writeConfig.getPayloadClass());
assertFalse(hoodieTableConfig.getMetadataPartitions().isEmpty());

// Turn off metadata table
HoodieWriteConfig writeConfig2 = HoodieWriteConfig.newBuilder()
.withProperties(this.writeConfig.getProps())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
.build();
testTable = HoodieTestTable.of(metaClient);
String instant2 = "0000002";
HoodieCommitMetadata hoodieCommitMetadata2 = doWriteOperationWithMeta(testTable, instant2, INSERT);
metaClient.reloadActiveTimeline();
HoodieTable table2 = HoodieSparkTable.create(writeConfig2, context, metaClient);
Option metadataWriter2 = table2.getMetadataWriter(instant2, Option.of(hoodieCommitMetadata2));
assertFalse(metadataWriter2.isPresent());

HoodieTableConfig hoodieTableConfig2 =
new HoodieTableConfig(this.fs, metaClient.getMetaPath(), writeConfig2.getPayloadClass());
assertEquals(StringUtils.EMPTY_STRING, hoodieTableConfig2.getMetadataPartitions());
// Assert metadata table folder is deleted
assertFalse(metaClient.getFs().exists(
new Path(HoodieTableMetadata.getMetadataTableBasePath(writeConfig2.getBasePath()))));

// Enable metadata table again and initialize metadata table through
// HoodieTable.getMetadataWriter() function
HoodieWriteConfig writeConfig3 = HoodieWriteConfig.newBuilder()
.withProperties(this.writeConfig.getProps())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build())
.build();
testTable = HoodieTestTable.of(metaClient);
metaClient.reloadActiveTimeline();
String instant3 = "0000003";
HoodieCommitMetadata hoodieCommitMetadata3 = doWriteOperationWithMeta(testTable, instant3, INSERT);
metaClient.reloadActiveTimeline();
HoodieTable table3 = HoodieSparkTable.create(writeConfig3, context, metaClient);
Option metadataWriter3 = table3.getMetadataWriter(instant3, Option.of(hoodieCommitMetadata3));
validateMetadata(testTable, true);
assertTrue(metadataWriter3.isPresent());
HoodieTableConfig hoodieTableConfig3 =
new HoodieTableConfig(this.fs, metaClient.getMetaPath(), writeConfig.getPayloadClass());
assertFalse(hoodieTableConfig3.getMetadataPartitions().isEmpty());
}

/**
* Only valid partition directories are added to the metadata.
*/
@@ -23,6 +23,7 @@
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteConcurrencyMode;
@@ -176,6 +177,10 @@ protected void doWriteOperation(HoodieTestTable testTable, String commitTime, Wr
testTable.doWriteOperation(commitTime, operationType, emptyList(), asList("p1", "p2"), 3);
}

protected HoodieCommitMetadata doWriteOperationWithMeta(HoodieTestTable testTable, String commitTime, WriteOperationType operationType) throws Exception {
return testTable.doWriteOperation(commitTime, operationType, emptyList(), asList("p1", "p2"), 3);
}

protected void doClean(HoodieTestTable testTable, String commitTime, List<String> commitsToClean) throws IOException {
doCleanInternal(testTable, commitTime, commitsToClean, false);
}
@@ -607,7 +607,7 @@ public String getMetadataPartitionsInflight() {
public String getMetadataPartitions() {
return getStringOrDefault(TABLE_METADATA_PARTITIONS, StringUtils.EMPTY_STRING);
}

public Map<String, String> propsMap() {
return props.entrySet().stream()
.collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e -> String.valueOf(e.getValue())));

0 comments on commit 020786a

Please sign in to comment.