Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1555,7 +1555,7 @@ public HoodieMetrics getMetrics() {
* <li>Initializing metrics contexts</li>
* </ul>
*/
protected final HoodieTable initTable(WriteOperationType operationType, Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
protected final HoodieTable initTable(WriteOperationType operationType, Option<String> instantTime, boolean initialMetadataTableIfNecessary, boolean skipLocking) {
HoodieTableMetaClient metaClient = createMetaClient(true);
// Setup write schemas for deletes
if (operationType == WriteOperationType.DELETE) {
Expand All @@ -1567,12 +1567,16 @@ protected final HoodieTable initTable(WriteOperationType operationType, Option<S
if (instantTime.isPresent()) {
ownerInstant = Option.of(new HoodieInstant(true, CommitUtils.getCommitActionType(operationType, metaClient.getTableType()), instantTime.get()));
}
this.txnManager.beginTransaction(ownerInstant, Option.empty());
if (!skipLocking) {
this.txnManager.beginTransaction(ownerInstant, Option.empty());
}
try {
tryUpgrade(metaClient, instantTime);
table = doInitTable(metaClient, instantTime, initialMetadataTableIfNecessary);
} finally {
this.txnManager.endTransaction(ownerInstant);
if (!skipLocking) {
this.txnManager.endTransaction(ownerInstant);
}
}

// Validate table properties
Expand Down Expand Up @@ -1602,7 +1606,11 @@ protected final HoodieTable initTable(WriteOperationType operationType, Option<S
}

protected final HoodieTable initTable(WriteOperationType operationType, Option<String> instantTime) {
return initTable(operationType, instantTime, config.isMetadataTableEnabled());
return initTable(operationType, instantTime, config.isMetadataTableEnabled(), false);
}

protected final HoodieTable initTable(WriteOperationType operationType, Option<String> instantTime, boolean skipLocking) {
return initTable(operationType, instantTime, config.isMetadataTableEnabled(), skipLocking);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public void bootstrap(Option<Map<String, String>> extraMetadata) {
@Override
public List<WriteStatus> upsert(List<HoodieRecord<T>> records, String instantTime) {
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime));
initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime), true);
table.validateUpsertSchema();
preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient());
final HoodieWriteHandle<?, ?, ?, ?> writeHandle = getOrCreateWriteHandle(records.get(0), getConfig(),
Expand All @@ -162,7 +162,7 @@ public List<WriteStatus> upsert(List<HoodieRecord<T>> records, String instantTim
public List<WriteStatus> upsertPreppedRecords(List<HoodieRecord<T>> preppedRecords, String instantTime) {
// only used for metadata table, the upsert happens in single thread
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime));
initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime), true);
table.validateUpsertSchema();
preWrite(instantTime, WriteOperationType.UPSERT_PREPPED, table.getMetaClient());
Map<String, List<HoodieRecord<T>>> preppedRecordsByFileId = preppedRecords.stream().parallel()
Expand All @@ -178,7 +178,7 @@ public List<WriteStatus> upsertPreppedRecords(List<HoodieRecord<T>> preppedRecor
@Override
public List<WriteStatus> insert(List<HoodieRecord<T>> records, String instantTime) {
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
initTable(WriteOperationType.INSERT, Option.ofNullable(instantTime));
initTable(WriteOperationType.INSERT, Option.ofNullable(instantTime), true);
table.validateInsertSchema();
preWrite(instantTime, WriteOperationType.INSERT, table.getMetaClient());
// create the write handle if not exists
Expand All @@ -201,7 +201,7 @@ public List<WriteStatus> insert(List<HoodieRecord<T>> records, String instantTim
public List<WriteStatus> insertOverwrite(
List<HoodieRecord<T>> records, final String instantTime) {
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
initTable(WriteOperationType.INSERT_OVERWRITE, Option.ofNullable(instantTime));
initTable(WriteOperationType.INSERT_OVERWRITE, Option.ofNullable(instantTime), true);
table.validateInsertSchema();
preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE, table.getMetaClient());
// create the write handle if not exists
Expand All @@ -220,7 +220,7 @@ public List<WriteStatus> insertOverwrite(
*/
public List<WriteStatus> insertOverwriteTable(
List<HoodieRecord<T>> records, final String instantTime) {
HoodieTable table = initTable(WriteOperationType.INSERT_OVERWRITE_TABLE, Option.ofNullable(instantTime));
HoodieTable table = initTable(WriteOperationType.INSERT_OVERWRITE_TABLE, Option.ofNullable(instantTime), true);
table.validateInsertSchema();
preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE_TABLE, table.getMetaClient());
// create the write handle if not exists
Expand Down Expand Up @@ -253,15 +253,15 @@ public List<WriteStatus> bulkInsertPreppedRecords(List<HoodieRecord<T>> preppedR
@Override
public List<WriteStatus> delete(List<HoodieKey> keys, String instantTime) {
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
initTable(WriteOperationType.DELETE, Option.ofNullable(instantTime));
initTable(WriteOperationType.DELETE, Option.ofNullable(instantTime), true);
preWrite(instantTime, WriteOperationType.DELETE, table.getMetaClient());
HoodieWriteMetadata<List<WriteStatus>> result = table.delete(context, instantTime, keys);
return postWrite(result, instantTime, table);
}

public List<WriteStatus> deletePartitions(List<String> partitions, String instantTime) {
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
initTable(WriteOperationType.DELETE_PARTITION, Option.ofNullable(instantTime));
initTable(WriteOperationType.DELETE_PARTITION, Option.ofNullable(instantTime), true);
preWrite(instantTime, WriteOperationType.DELETE_PARTITION, table.getMetaClient());
HoodieWriteMetadata<List<WriteStatus>> result = table.deletePartitions(context, instantTime, partitions);
return postWrite(result, instantTime, table);
Expand Down