Skip to content

Commit

Permalink
[HUDI-5414] No need to guard the table initialization by lock for Hoo…
Browse files Browse the repository at this point in the history
…dieFlinkWriteClient (#7509) (#7522)

Different with other write clients, HoodieFlinkWriteClient invokes the dataset writing methods(#upsert or #insert)
for each batch of new data set in the long running task. In current impl, a engine-specific hoodie table would be created before performing
these actions, and before the table creation, some table bootstrapping operations are performed(such as table upgrade/downgrade, the metadata table
bootstrap). These bootstrapping operations are guarded by a trasanction lock.

In Flink, these bootstrapping operations can be avoided because they are all performed only once on the coordinator.

The changes:

- Make BaseHoodieWriteClient#doInitTable non abstract, it now only performs the bootstrapping operations
- Add a default impl BaseHoodieWriteClient#initMetadataTable for metadata table bootstrap specifically
- Add a new abstract method for creating engine-specific hoodie table

(cherry picked from commit fd62a14)
  • Loading branch information
danny0405 committed Dec 21, 2022
1 parent c288a50 commit 211af1a
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,8 @@ private void saveInternalSchema(HoodieTable table, String instantTime, HoodieCom

protected abstract HoodieTable<T, I, K, O> createTable(HoodieWriteConfig config, Configuration hadoopConf);

protected abstract HoodieTable<T, I, K, O> createTable(HoodieWriteConfig config, Configuration hadoopConf, HoodieTableMetaClient metaClient);

void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String actionType) {
if (writeTimer != null) {
long durationInMs = metrics.getDurationInMs(writeTimer.stop());
Expand Down Expand Up @@ -1425,17 +1427,38 @@ public HoodieMetrics getMetrics() {
}

/**
* Instantiates engine-specific instance of {@link HoodieTable} as well as performs necessary
* bootstrapping operations (for ex, validating whether Metadata Table has to be bootstrapped)
* Performs necessary bootstrapping operations (for ex, validating whether Metadata Table has to be bootstrapped).
*
* NOTE: THIS OPERATION IS EXECUTED UNDER LOCK, THEREFORE SHOULD AVOID ANY OPERATIONS
* NOT REQUIRING EXTERNAL SYNCHRONIZATION
* <p>NOTE: THIS OPERATION IS EXECUTED UNDER LOCK, THEREFORE SHOULD AVOID ANY OPERATIONS
* NOT REQUIRING EXTERNAL SYNCHRONIZATION
*
* @param metaClient instance of {@link HoodieTableMetaClient}
* @param instantTime current inflight instant time
* @return instantiated {@link HoodieTable}
*/
protected abstract HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime, boolean initialMetadataTableIfNecessary);
protected void doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
Option<HoodieInstant> ownerInstant = Option.empty();
if (instantTime.isPresent()) {
ownerInstant = Option.of(new HoodieInstant(true, CommitUtils.getCommitActionType(operationType, metaClient.getTableType()), instantTime.get()));
}
this.txnManager.beginTransaction(ownerInstant, Option.empty());
try {
tryUpgrade(metaClient, instantTime);
if (initialMetadataTableIfNecessary) {
initMetadataTable(instantTime);
}
} finally {
this.txnManager.endTransaction(ownerInstant);
}
}

/**
* Bootstrap the metadata table.
*
* @param instantTime current inflight instant time
*/
protected void initMetadataTable(Option<String> instantTime) {
// by default do nothing.
}

/**
* Instantiates and initializes instance of {@link HoodieTable}, performing crucial bootstrapping
Expand All @@ -1457,18 +1480,8 @@ protected final HoodieTable initTable(WriteOperationType operationType, Option<S
setWriteSchemaForDeletes(metaClient);
}

HoodieTable table;
Option<HoodieInstant> ownerInstant = Option.empty();
if (instantTime.isPresent()) {
ownerInstant = Option.of(new HoodieInstant(true, CommitUtils.getCommitActionType(operationType, metaClient.getTableType()), instantTime.get()));
}
this.txnManager.beginTransaction(ownerInstant, Option.empty());
try {
tryUpgrade(metaClient, instantTime);
table = doInitTable(metaClient, instantTime, initialMetadataTableIfNecessary);
} finally {
this.txnManager.endTransaction(ownerInstant);
}
doInitTable(metaClient, instantTime, initialMetadataTableIfNecessary);
HoodieTable table = createTable(config, hadoopConf, metaClient);

// Validate table properties
metaClient.validateTableProperties(config.getProps());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoop
return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
}

@Override
protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf, HoodieTableMetaClient metaClient) {
return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context, metaClient);
}

@Override
public List<HoodieRecord<T>> filterExists(List<HoodieRecord<T>> hoodieRecords) {
// Create a Hoodie table which encapsulated the commits and files visible
Expand Down Expand Up @@ -291,10 +296,14 @@ public void initMetadataTable() {
HoodieFlinkTable<?> table = getHoodieTable();
if (config.isMetadataTableEnabled()) {
// initialize the metadata table path
try (HoodieBackedTableMetadataWriter metadataWriter = initMetadataWriter()) {
// do nothing
// guard the metadata writer with concurrent lock
try {
this.txnManager.getLockManager().lock();
initMetadataWriter().close();
} catch (Exception e) {
throw new HoodieException("Failed to initialize metadata table", e);
} finally {
this.txnManager.getLockManager().unlock();
}
// clean the obsolete index stats
table.deleteMetadataIndexIfNecessary();
Expand Down Expand Up @@ -478,16 +487,13 @@ private void completeClustering(
}

@Override
protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
// Create a Hoodie table which encapsulated the commits and files visible
return getHoodieTable();
}

@Override
protected void tryUpgrade(HoodieTableMetaClient metaClient, Option<String> instantTime) {
protected void doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
// do nothing.

// flink executes the upgrade/downgrade once when initializing the first instant on start up,
// no need to execute the upgrade/downgrade on each write in streaming.

// flink performs metadata table bootstrap on the coordinator when it starts up.
}

public void completeTableService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoop
return HoodieJavaTable.create(config, context);
}

@Override
protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf, HoodieTableMetaClient metaClient) {
return HoodieJavaTable.create(config, context, metaClient);
}

@Override
public List<WriteStatus> upsert(List<HoodieRecord<T>> records,
String instantTime) {
Expand Down Expand Up @@ -228,13 +233,4 @@ protected HoodieWriteMetadata<List<WriteStatus>> compact(String compactionInstan
public HoodieWriteMetadata<List<WriteStatus>> cluster(final String clusteringInstant, final boolean shouldComplete) {
throw new HoodieNotSupportedException("Cluster is not supported in HoodieJavaClient");
}

@Override
protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
// new JavaUpgradeDowngrade(metaClient, config, context).run(metaClient, HoodieTableVersion.current(), config, context, instantTime);

// Create a Hoodie table which encapsulated the commits and files visible
return HoodieJavaTable.create(config, (HoodieJavaEngineContext) context, metaClient);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public static <T extends HoodieRecordPayload> HoodieJavaTable<T> create(HoodieWr
}

public static <T extends HoodieRecordPayload> HoodieJavaTable<T> create(HoodieWriteConfig config,
HoodieJavaEngineContext context,
HoodieEngineContext context,
HoodieTableMetaClient metaClient) {
switch (metaClient.getTableType()) {
case COPY_ON_WRITE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoop
return HoodieSparkTable.create(config, context);
}

@Override
protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf, HoodieTableMetaClient metaClient) {
return HoodieSparkTable.create(config, context, metaClient);
}

@Override
public JavaRDD<HoodieRecord<T>> filterExists(JavaRDD<HoodieRecord<T>> hoodieRecords) {
// Create a Hoodie table which encapsulated the commits and files visible
Expand Down Expand Up @@ -434,16 +439,11 @@ private void updateTableMetadata(HoodieTable table, HoodieCommitMetadata commitM
}

@Override
protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
if (initialMetadataTableIfNecessary) {
// Initialize Metadata Table to make sure it's bootstrapped _before_ the operation,
// if it didn't exist before
// See https://issues.apache.org/jira/browse/HUDI-3343 for more details
initializeMetadataTable(instantTime);
}

// Create a Hoodie table which encapsulated the commits and files visible
return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient);
protected void initMetadataTable(Option<String> instantTime) {
// Initialize Metadata Table to make sure it's bootstrapped _before_ the operation,
// if it didn't exist before
// See https://issues.apache.org/jira/browse/HUDI-3343 for more details
initializeMetadataTable(instantTime);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.hudi.table;

import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
Expand Down Expand Up @@ -60,11 +59,11 @@ public static <T extends HoodieRecordPayload> HoodieSparkTable<T> create(HoodieW
.setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())))
.setFileSystemRetryConfig(config.getFileSystemRetryConfig())
.setProperties(config.getProps()).build();
return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient);
return HoodieSparkTable.create(config, context, metaClient);
}

public static <T extends HoodieRecordPayload> HoodieSparkTable<T> create(HoodieWriteConfig config,
HoodieSparkEngineContext context,
HoodieEngineContext context,
HoodieTableMetaClient metaClient) {
HoodieSparkTable<T> hoodieSparkTable;
switch (metaClient.getTableType()) {
Expand Down

0 comments on commit 211af1a

Please sign in to comment.