diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 48382e0870a32..9258dd7dc12de 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -1555,7 +1555,7 @@ public HoodieMetrics getMetrics() { *
  • Initializing metrics contexts
  • * */ - protected final HoodieTable initTable(WriteOperationType operationType, Option instantTime, boolean initialMetadataTableIfNecessary) { + protected final HoodieTable initTable(WriteOperationType operationType, Option instantTime, boolean initialMetadataTableIfNecessary, boolean skipLocking) { HoodieTableMetaClient metaClient = createMetaClient(true); // Setup write schemas for deletes if (operationType == WriteOperationType.DELETE) { @@ -1567,12 +1567,16 @@ protected final HoodieTable initTable(WriteOperationType operationType, Option instantTime) { - return initTable(operationType, instantTime, config.isMetadataTableEnabled()); + return initTable(operationType, instantTime, config.isMetadataTableEnabled(), false); + } + + protected final HoodieTable initTable(WriteOperationType operationType, Option instantTime, boolean skipLocking) { + return initTable(operationType, instantTime, config.isMetadataTableEnabled(), skipLocking); } /** diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 3e551f7a17fe7..78b69d0ac5671 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -146,7 +146,7 @@ public void bootstrap(Option> extraMetadata) { @Override public List upsert(List> records, String instantTime) { HoodieTable>, List, List> table = - initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime)); + initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime), true); table.validateUpsertSchema(); preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient()); final HoodieWriteHandle writeHandle = getOrCreateWriteHandle(records.get(0), getConfig(), @@ -162,7 +162,7 @@ public List upsert(List> records, String instantTim public List upsertPreppedRecords(List> preppedRecords, String instantTime) { // only used for metadata table, the upsert happens in single thread HoodieTable>, List, List> table = - initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime)); + initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime), true); table.validateUpsertSchema(); preWrite(instantTime, WriteOperationType.UPSERT_PREPPED, table.getMetaClient()); Map>> preppedRecordsByFileId = preppedRecords.stream().parallel() @@ -178,7 +178,7 @@ public List upsertPreppedRecords(List> preppedRecor @Override public List insert(List> records, String instantTime) { HoodieTable>, List, List> table = - initTable(WriteOperationType.INSERT, Option.ofNullable(instantTime)); + initTable(WriteOperationType.INSERT, Option.ofNullable(instantTime), true); table.validateInsertSchema(); preWrite(instantTime, WriteOperationType.INSERT, table.getMetaClient()); // create the write handle if not exists @@ -201,7 +201,7 @@ public List insert(List> records, String instantTim public List insertOverwrite( List> records, final String instantTime) { HoodieTable>, List, List> table = - initTable(WriteOperationType.INSERT_OVERWRITE, Option.ofNullable(instantTime)); + initTable(WriteOperationType.INSERT_OVERWRITE, Option.ofNullable(instantTime), true); table.validateInsertSchema(); preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE, table.getMetaClient()); // create the write handle if not exists @@ -220,7 +220,7 @@ public List insertOverwrite( */ public List insertOverwriteTable( List> records, final String instantTime) { - HoodieTable table = initTable(WriteOperationType.INSERT_OVERWRITE_TABLE, Option.ofNullable(instantTime)); + HoodieTable table = initTable(WriteOperationType.INSERT_OVERWRITE_TABLE, Option.ofNullable(instantTime), true); table.validateInsertSchema(); preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE_TABLE, table.getMetaClient()); // create the write handle if not exists @@ -253,7 +253,7 @@ public List bulkInsertPreppedRecords(List> preppedR @Override public List delete(List keys, String instantTime) { HoodieTable>, List, List> table = - initTable(WriteOperationType.DELETE, Option.ofNullable(instantTime)); + initTable(WriteOperationType.DELETE, Option.ofNullable(instantTime), true); preWrite(instantTime, WriteOperationType.DELETE, table.getMetaClient()); HoodieWriteMetadata> result = table.delete(context, instantTime, keys); return postWrite(result, instantTime, table); @@ -261,7 +261,7 @@ public List delete(List keys, String instantTime) { public List deletePartitions(List partitions, String instantTime) { HoodieTable>, List, List> table = - initTable(WriteOperationType.DELETE_PARTITION, Option.ofNullable(instantTime)); + initTable(WriteOperationType.DELETE_PARTITION, Option.ofNullable(instantTime), true); preWrite(instantTime, WriteOperationType.DELETE_PARTITION, table.getMetaClient()); HoodieWriteMetadata> result = table.deletePartitions(context, instantTime, partitions); return postWrite(result, instantTime, table);