Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-4412] Multiple writers NPE when Insert_overwrite #6130

Merged
merged 4 commits into from Sep 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -124,14 +124,15 @@ private void init(HoodieInstant instant) {
HoodieRequestedReplaceMetadata requestedReplaceMetadata = this.metadataWrapper.getMetadataFromTimeline().getHoodieRequestedReplaceMetadata();
org.apache.hudi.avro.model.HoodieCommitMetadata inflightCommitMetadata = this.metadataWrapper.getMetadataFromTimeline().getHoodieInflightReplaceMetadata();
if (instant.isRequested()) {
if (requestedReplaceMetadata != null) {
// for insert_overwrite/insert_overwrite_table clusteringPlan will be empty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to comment here, descript it on the jira ticket, it's enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to add comments here as clustering and insert_overwrite are involved here

if (requestedReplaceMetadata != null && requestedReplaceMetadata.getClusteringPlan() != null) {
this.mutatedFileIds = getFileIdsFromRequestedReplaceMetadata(requestedReplaceMetadata);
this.operationType = WriteOperationType.CLUSTER;
}
} else {
if (inflightCommitMetadata != null) {
this.mutatedFileIds = getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord(inflightCommitMetadata.getPartitionToWriteStats()).keySet();
this.operationType = WriteOperationType.fromValue(this.metadataWrapper.getMetadataFromTimeline().getHoodieCommitMetadata().getOperationType());
this.operationType = WriteOperationType.fromValue(this.metadataWrapper.getMetadataFromTimeline().getHoodieInflightReplaceMetadata().getOperationType());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking if this change has been tested.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1
The intent is right but it is a critical change. Please add a test to cover this scenario.

} else if (requestedReplaceMetadata != null) {
// inflight replacecommit metadata is empty due to clustering, read fileIds from requested replacecommit
this.mutatedFileIds = getFileIdsFromRequestedReplaceMetadata(requestedReplaceMetadata);
Expand Down
Expand Up @@ -123,6 +123,41 @@ public void testConcurrentWritesWithInterleavingSuccesssfulCommit() throws Excep
}
}

@Test
public void testConcurrentWritesWithReplaceInflightCommit() throws Exception {
createReplaceInflight(HoodieActiveTimeline.createNewInstantTime());
HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
Option<HoodieInstant> lastSuccessfulInstant = Option.empty();

// writer 1 starts
String currentWriterInstant = HoodieActiveTimeline.createNewInstantTime();
createInflightCommit(currentWriterInstant);
Option<HoodieInstant> currentInstant = Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, currentWriterInstant));

// writer 2 starts and finishes
String newInstantTime = HoodieActiveTimeline.createNewInstantTime();
createReplaceInflight(newInstantTime);

SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new SimpleConcurrentFileWritesConflictResolutionStrategy();
HoodieCommitMetadata currentMetadata = createCommitMetadata(currentWriterInstant);
timeline = timeline.reload();

List<HoodieInstant> candidateInstants = strategy.getCandidateInstants(timeline, currentInstant.get(), lastSuccessfulInstant).collect(
Collectors.toList());

// writer 1 conflicts with writer 2
Assertions.assertTrue(candidateInstants.size() == 1);
ConcurrentOperation thatCommitOperation = new ConcurrentOperation(candidateInstants.get(0), metaClient);
ConcurrentOperation thisCommitOperation = new ConcurrentOperation(currentInstant.get(), currentMetadata);
Assertions.assertTrue(strategy.hasConflict(thisCommitOperation, thatCommitOperation));
try {
strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation);
Assertions.fail("Cannot reach here, writer 1 and writer 2 should have thrown a conflict");
} catch (HoodieWriteConflictException e) {
// expected
}
}

@Test
public void testConcurrentWritesWithInterleavingScheduledCompaction() throws Exception {
createCommit(HoodieActiveTimeline.createNewInstantTime());
Expand Down Expand Up @@ -394,6 +429,20 @@ private void createReplaceRequested(String instantTime) throws Exception {
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2);
}

private void createReplaceInflight(String instantTime) throws Exception {
String fileId1 = "file-1";
String fileId2 = "file-2";

HoodieCommitMetadata inflightReplaceMetadata = new HoodieCommitMetadata();
inflightReplaceMetadata.setOperationType(WriteOperationType.INSERT_OVERWRITE);
HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setFileId("file-1");
inflightReplaceMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, writeStat);
HoodieTestTable.of(metaClient)
.addInflightReplace(instantTime, Option.of(inflightReplaceMetadata))
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2);
}

private void createReplace(String instantTime, WriteOperationType writeOperationType) throws Exception {
String fileId1 = "file-1";
String fileId2 = "file-2";
Expand Down
Expand Up @@ -284,6 +284,12 @@ public HoodieTestTable addRequestedReplace(String instantTime, Option<HoodieRequ
return this;
}

public HoodieTestTable addInflightReplace(String instantTime, Option<HoodieCommitMetadata> inflightReplaceMetadata) throws Exception {
createInflightReplaceCommit(basePath, instantTime, inflightReplaceMetadata);
currentInstantTime = instantTime;
return this;
}

public HoodieTestTable addInflightClean(String instantTime, HoodieCleanerPlan cleanerPlan) throws IOException {
createRequestedCleanFile(basePath, instantTime, cleanerPlan);
createInflightCleanFile(basePath, instantTime, cleanerPlan);
Expand Down