Skip to content

Commit

Permalink
Update error messages when supervisor's checkpoint state is invalid (#…
Browse files Browse the repository at this point in the history
…16208)

* Update error message when topic messages.

Suggest resetting the supervisor when the topic changes instead of changing
the supervisor name which is actually making a new supervisor.

* Update server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java

Co-authored-by: Kashif Faraz <kashif.faraz@gmail.com>

* Cleanup

* Remove log and include oldCommitMetadataFromDb

* Fix test

---------

Co-authored-by: Kashif Faraz <kashif.faraz@gmail.com>
  • Loading branch information
abhishekrb19 and kfaraz committed Apr 3, 2024
1 parent 1df41db commit 75fb57e
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
Expand Down Expand Up @@ -151,7 +152,12 @@ public void testFailTransactionalUpdateDataSourceMetadata() throws Exception
);

Assert.assertEquals(
SegmentPublishResult.fail("java.lang.RuntimeException: Failed to update the metadata Store. The new start metadata is ahead of last commited end state."),
SegmentPublishResult.fail(
InvalidInput.exception(
"The new start metadata state[ObjectMetadata{theObject=[1]}] is ahead of the last commited end"
+ " state[null]. Try resetting the supervisor."
).toString()
),
result
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.common.io.BaseEncoding;
import com.google.inject.Inject;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.SegmentCreateRequest;
Expand Down Expand Up @@ -445,41 +446,33 @@ public SegmentPublishResult commitSegmentsAndMetadata(

try {
return connector.retryTransaction(
new TransactionCallback<SegmentPublishResult>()
{
@Override
public SegmentPublishResult inTransaction(
final Handle handle,
final TransactionStatus transactionStatus
) throws Exception
{
// Set definitelyNotUpdated back to false upon retrying.
definitelyNotUpdated.set(false);
(handle, transactionStatus) -> {
// Set definitelyNotUpdated back to false upon retrying.
definitelyNotUpdated.set(false);

if (startMetadata != null) {
final DataStoreMetadataUpdateResult result = updateDataSourceMetadataWithHandle(
handle,
dataSource,
startMetadata,
endMetadata
);

if (result.isFailed()) {
// Metadata was definitely not updated.
transactionStatus.setRollbackOnly();
definitelyNotUpdated.set(true);

if (result.canRetry()) {
throw new RetryTransactionException(result.getErrorMsg());
} else {
throw new RuntimeException(result.getErrorMsg());
}
if (startMetadata != null) {
final DataStoreMetadataUpdateResult result = updateDataSourceMetadataWithHandle(
handle,
dataSource,
startMetadata,
endMetadata
);

if (result.isFailed()) {
// Metadata was definitely not updated.
transactionStatus.setRollbackOnly();
definitelyNotUpdated.set(true);

if (result.canRetry()) {
throw new RetryTransactionException(result.getErrorMsg());
} else {
throw InvalidInput.exception(result.getErrorMsg());
}
}

final Set<DataSegment> inserted = announceHistoricalSegmentBatch(handle, segments, usedSegments);
return SegmentPublishResult.ok(ImmutableSet.copyOf(inserted));
}

final Set<DataSegment> inserted = announceHistoricalSegmentBatch(handle, segments, usedSegments);
return SegmentPublishResult.ok(ImmutableSet.copyOf(inserted));
},
3,
getSqlMetadataMaxRetry()
Expand Down Expand Up @@ -2395,17 +2388,19 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle(
}

final boolean startMetadataMatchesExisting;
int startMetadataGreaterThanExisting = 0;
boolean startMetadataGreaterThanExisting = false;

if (oldCommitMetadataFromDb == null) {
startMetadataMatchesExisting = startMetadata.isValidStart();
startMetadataGreaterThanExisting = 1;
startMetadataGreaterThanExisting = true;
} else {
// Checking against the last committed metadata.
// If the new start sequence number is greater than the end sequence number of last commit compareTo() function will return 1,
// 0 in all other cases. It might be because multiple tasks are publishing the sequence at around same time.
// If the new start sequence number is greater than the end sequence number of the last commit,
// compareTo() will return 1 and 0 in all other cases. This can happen if multiple tasks are publishing the
// sequence around the same time.
if (startMetadata instanceof Comparable) {
startMetadataGreaterThanExisting = ((Comparable) startMetadata.asStartMetadata()).compareTo(oldCommitMetadataFromDb.asStartMetadata());
startMetadataGreaterThanExisting = ((Comparable) startMetadata.asStartMetadata())
.compareTo(oldCommitMetadataFromDb.asStartMetadata()) > 0;
}

// Converting the last one into start metadata for checking since only the same type of metadata can be matched.
Expand All @@ -2415,25 +2410,20 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle(
startMetadataMatchesExisting = startMetadata.asStartMetadata().matches(oldCommitMetadataFromDb.asStartMetadata());
}

if (startMetadataGreaterThanExisting == 1 && !startMetadataMatchesExisting) {
// Offset stored in StartMetadata is Greater than the last commited metadata,
// Then retry multiple task might be trying to publish the segment for same partitions.
log.info("Failed to update the metadata Store. The new start metadata: [%s] is ahead of last commited end state: [%s].",
startMetadata,
oldCommitMetadataFromDb);
if (startMetadataGreaterThanExisting && !startMetadataMatchesExisting) {
// Offsets stored in startMetadata is greater than the last commited metadata.
return new DataStoreMetadataUpdateResult(true, false,
"Failed to update the metadata Store. The new start metadata is ahead of last commited end state."
"The new start metadata state[%s] is ahead of the last commited"
+ " end state[%s]. Try resetting the supervisor.", startMetadata, oldCommitMetadataFromDb
);
}

if (!startMetadataMatchesExisting) {
// Not in the desired start state.
return new DataStoreMetadataUpdateResult(true, false, StringUtils.format(
"Inconsistent metadata state. This can happen if you update input topic in a spec without changing " +
"the supervisor name. Stored state: [%s], Target state: [%s].",
oldCommitMetadataFromDb,
startMetadata
));
return new DataStoreMetadataUpdateResult(true, false,
"Inconsistency between stored metadata state[%s] and target state[%s]. Try resetting the supervisor.",
oldCommitMetadataFromDb, startMetadata
);
}

// Only endOffsets should be stored in metadata store
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.common.hash.Hashing;
import com.google.common.io.BaseEncoding;
import org.apache.druid.data.input.StringTuple;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.ObjectMetadata;
import org.apache.druid.indexing.overlord.SegmentCreateRequest;
Expand Down Expand Up @@ -935,7 +936,14 @@ public void testTransactionalAnnounceFailDbNullWantNotNull() throws IOException
new ObjectMetadata(ImmutableMap.of("foo", "bar")),
new ObjectMetadata(ImmutableMap.of("foo", "baz"))
);
Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Failed to update the metadata Store. The new start metadata is ahead of last commited end state."), result1);
Assert.assertEquals(
SegmentPublishResult.fail(
InvalidInput.exception(
"The new start metadata state[ObjectMetadata{theObject={foo=bar}}] is ahead of the last commited"
+ " end state[null]. Try resetting the supervisor."
).toString()),
result1
);

// Should only be tried once.
Assert.assertEquals(1, metadataUpdateCounter.get());
Expand All @@ -956,10 +964,15 @@ public void testTransactionalAnnounceFailDbNotNullWantNull() throws IOException
new ObjectMetadata(null),
new ObjectMetadata(ImmutableMap.of("foo", "baz"))
);
Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Inconsistent metadata state. This can " +
"happen if you update input topic in a spec without changing the supervisor name. " +
"Stored state: [ObjectMetadata{theObject={foo=baz}}], " +
"Target state: [ObjectMetadata{theObject=null}]."), result2);
Assert.assertEquals(
SegmentPublishResult.fail(
InvalidInput.exception(
"Inconsistency between stored metadata state[ObjectMetadata{theObject={foo=baz}}]"
+ " and target state[ObjectMetadata{theObject=null}]. Try resetting the supervisor."
).toString()
),
result2
);

// Should only be tried once per call.
Assert.assertEquals(2, metadataUpdateCounter.get());
Expand Down Expand Up @@ -1026,10 +1039,14 @@ public void testTransactionalAnnounceFailDbNotNullWantDifferent() throws IOExcep
new ObjectMetadata(ImmutableMap.of("foo", "qux")),
new ObjectMetadata(ImmutableMap.of("foo", "baz"))
);
Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Inconsistent metadata state. This can " +
"happen if you update input topic in a spec without changing the supervisor name. " +
"Stored state: [ObjectMetadata{theObject={foo=baz}}], " +
"Target state: [ObjectMetadata{theObject={foo=qux}}]."), result2);
Assert.assertEquals(
SegmentPublishResult.fail(
InvalidInput.exception(
"Inconsistency between stored metadata state[ObjectMetadata{theObject={foo=baz}}] and "
+ "target state[ObjectMetadata{theObject={foo=qux}}]. Try resetting the supervisor."
).toString()),
result2
);

// Should only be tried once per call.
Assert.assertEquals(2, metadataUpdateCounter.get());
Expand Down

0 comments on commit 75fb57e

Please sign in to comment.