Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix replica task failures with metadata inconsistency while running concurrent append replace #16614

Merged
merged 11 commits into from
Jun 24, 2024

Conversation

kfaraz
Copy link
Contributor

@kfaraz kfaraz commented Jun 16, 2024

Description

A streaming ingestion with multiple replicas may sometimes run into the following error
when publishing segments to an interval. This can happen only when there is a concurrent
REPLACE task has recently committed higher version segments to that same interval.

java.util.concurrent.ExecutionException: org.apache.druid.java.util.common.ISE:
Failed to publish segments because of 
[java.lang.RuntimeException: Inconsistency between stored metadata state[KafkaDataSourceMetadata{xxx}]
and target state[KafkaDataSourceMetadata{yyy}]. Try resetting the supervisor.]

This error does not cause any data loss but is an operational overhead since it leads to unnecessary task failures.

The situation typically plays out as below:

  • Streaming supervisor launches 2 replicas to append to an interval I
  • Both replicas allocate and start appending to a segment S(v0,p1), i.e. segment partition 1 on version 0.
  • Concurrent REPLACE task commits a new version v1 in interval I, say segments S(v1, p0), S(v1, p1)
  • Version v0 is now completely overshadowed by version v1
  • First replica publishes segment S(v0, p1), which is upgraded by the overlord to S(v1, p2)
  • Second replica tries to publish segment S(v0, p1) but fails because higher offsets have already been committed
  • Second replica then checks if its segments have already been published by someone else
  • In the absence of concurrent replace, the previous step would succeed thus causing second replica to move ahead with the ingestion.
  • But since concurrent replace has already overshadowed version v0, the second replica is not able to find it in the set of "used and visible" segments and thus it fails.

Fix

While looking for published segments, the second replica should search not only visible, but also overshadowed and unused segments.

Changes

  • Add new task action RetrieveSegmentsByIdAction
  • Use new task action to retrieve segments irrespective of their visibility
  • During rolling upgrades, this task action would fail as Overlord would be on old version
  • If new action fails, fall back to just fetching used segments as before

Testing

Setup

  • Local Druid cluster with 3 MMs, 3 task slots each
  • Kafka streaming ingestion with 3 task replicas
  • Concurrent compaction enabled with skipOffsetFromLatest = PT0S

Observation
All the replicas finish successfully even if they are not able to publish segments due to a concurrent replace task.

Screenshot 2024-06-21 at 4 06 35 PM

Replica 1

  • Published 4 segments
  • Upgraded 1 segment kafka_super_1976-04-16T00:00:00.000Z_1976-04-16T01:00:00.000Z_2024-06-21T10:22:46.040Z_1 which overshadowed the published segment kafka_super_1976-04-16T00:00:00.000Z_1976-04-16T01:00:00.000Z_1970-01-01T00:00:00.000Z_1
  • Logs
    2024-06-21T10:23:08,891 INFO [[index_kafka_kafka_super_a6ce1c843bb750d_kbipcfpk]-appenderator-merge] org.apache.druid.segment.realtime.appenderator.StreamAppenderator - Push complete...
    2024-06-21T10:23:08,915 INFO [[index_kafka_kafka_super_a6ce1c843bb750d_kbipcfpk]-publish] org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver - Published [4] segments with commit metadata[{nextPartitions=SeekableStreamStartSequenceNumbers{stream='abc', partitionSequenceNumberMap={KafkaTopicPartition{partition=0, topic='null', multiTopicPartition=false}=177}, exclusivePartitions=[]}, publishPartitions=SeekableStreamEndSequenceNumbers{stream='abc', partitionSequenceNumberMap={KafkaTopicPartition{partition=0, topic='null', multiTopicPartition=false}=177}}}].
    2024-06-21T10:23:08,916 INFO [[index_kafka_kafka_super_a6ce1c843bb750d_kbipcfpk]-publish] org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver - Published segments: [kafka_super_1976-04-16T00:00:00.000Z_1976-04-16T01:00:00.000Z_1970-01-01T00:00:00.000Z_1, kafka_super_1976-04-17T00:00:00.000Z_1976-04-17T01:00:00.000Z_1970-01-01T00:00:00.000Z, kafka_super_1976-04-14T00:00:00.000Z_1976-04-14T01:00:00.000Z_1970-01-01T00:00:00.000Z_3, kafka_super_1976-04-15T00:00:00.000Z_1976-04-15T01:00:00.000Z_2024-06-21T10:22:26.059Z_2]
    2024-06-21T10:23:08,916 INFO [[index_kafka_kafka_super_a6ce1c843bb750d_kbipcfpk]-publish] org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver - Published [1] upgraded segments.
    2024-06-21T10:23:08,916 INFO [[index_kafka_kafka_super_a6ce1c843bb750d_kbipcfpk]-publish] org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver - Upgraded segments: [kafka_super_1976-04-16T00:00:00.000Z_1976-04-16T01:00:00.000Z_2024-06-21T10:22:46.040Z_1]

Replica 2

  • Failed to publish the segments
  • But still managed to find all the segments already present in the metadata store, even the overshadowed one kafka_super_1976-04-16T00:00:00.000Z_1976-04-16T01:00:00.000Z_1970-01-01T00:00:00.000Z_1.
  • Logs
    2024-06-21T10:23:08,893 INFO [[index_kafka_kafka_super_a6ce1c843bb750d_copcbpef]-appenderator-merge] org.apache.druid.segment.realtime.appenderator.StreamAppenderator - Push complete...
    2024-06-21T10:23:08,927 INFO [[index_kafka_kafka_super_a6ce1c843bb750d_copcbpef]-publish] org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver - Could not publish [4] segments, but they have already been published by another task.
    2024-06-21T10:23:08,928 INFO [[index_kafka_kafka_super_a6ce1c843bb750d_copcbpef]-publish] org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver - Could not publish segments: [kafka_super_1976-04-16T00:00:00.000Z_1976-04-16T01:00:00.000Z_1970-01-01T00:00:00.000Z_1, kafka_super_1976-04-17T00:00:00.000Z_1976-04-17T01:00:00.000Z_1970-01-01T00:00:00.000Z, kafka_super_1976-04-14T00:00:00.000Z_1976-04-14T01:00:00.000Z_1970-01-01T00:00:00.000Z_3, kafka_super_1976-04-15T00:00:00.000Z_1976-04-15T01:00:00.000Z_2024-06-21T10:22:26.059Z_2]

Benchmarking

Setup

  • Local Druid cluster with a single datasource with 1M used and 1M unused segments
mysql> select used, count(*) from druid_segments group by 1;
+------+----------+
| used | count(*) |
+------+----------+
|    0 |  1215000 |
|    1 |  1000001 |
+------+----------+
2 rows in set (0.33 sec)
  • Using a bash script, post the new task action segmentListById to fetch 500 segments (250 used, 250 unused)
  • Enable logging emitter and note task/action/run/time

Observation

2024-06-22T07:39:00,349 INFO [qtp2062755811-141] org.apache.druid.java.util.emitter.core.LoggingEmitter - [metrics] {"feed":"metrics","taskType":"noop","metric":"task/action/run/time","service":"druid/coordinator","groupId":"noop_2024-06-22T07:39:00.312Z_02192b28-d721-4654-9f33-2aab4470d8d4","host":"localhost:8081","taskActionType":"segmentListById","version":"31.0.0-SNAPSHOT","value":26,"dataSource":"none","taskId":"noop_2024-06-22T07:39:00.312Z_02192b28-d721-4654-9f33-2aab4470d8d4","timestamp":"2024-06-22T07:39:00.349Z"}
2024-06-22T07:39:15,245 INFO [qtp2062755811-146] org.apache.druid.java.util.emitter.core.LoggingEmitter - [metrics] {"feed":"metrics","taskType":"noop","metric":"task/action/run/time","service":"druid/coordinator","groupId":"noop_2024-06-22T07:39:15.229Z_5d31b801-d874-4888-ae9a-f82666997969","host":"localhost:8081","taskActionType":"segmentListById","version":"31.0.0-SNAPSHOT","value":16,"dataSource":"none","taskId":"noop_2024-06-22T07:39:15.229Z_5d31b801-d874-4888-ae9a-f82666997969","timestamp":"2024-06-22T07:39:15.245Z"}
2024-06-22T07:39:17,037 INFO [qtp2062755811-144] org.apache.druid.java.util.emitter.core.LoggingEmitter - [metrics] {"feed":"metrics","taskType":"noop","metric":"task/action/run/time","service":"druid/coordinator","groupId":"noop_2024-06-22T07:39:17.019Z_4c12a954-c96a-4cc6-886c-e42d9261a4a9","host":"localhost:8081","taskActionType":"segmentListById","version":"31.0.0-SNAPSHOT","value":17,"dataSource":"none","taskId":"noop_2024-06-22T07:39:17.019Z_4c12a954-c96a-4cc6-886c-e42d9261a4a9","timestamp":"2024-06-22T07:39:17.037Z"}

Run times are 26ms, 16ms, 17ms.

Copy link
Contributor

@AmatyaAvadhanula AmatyaAvadhanula left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have a few questions about the approach

@kfaraz
Copy link
Contributor Author

kfaraz commented Jun 20, 2024

@AmatyaAvadhanula , I have added a new task action to fetch segments by ID. I have also reverted all the refactoring changes so that the PR is easier to review. The refactoring changes can be made later.

@AmatyaAvadhanula
Copy link
Contributor

The overall approach looks good to me. However, could you please add benchmarks for task/action/run/time while fetching 500 segments (including both used and unused) from the metadata store for the new action with 1M+ used segments and 2-3M+ unused segments?

@AmatyaAvadhanula
Copy link
Contributor

Could you please also add details about any cluster testing that has been done with this patch?

@kfaraz
Copy link
Contributor Author

kfaraz commented Jun 21, 2024

Could you please also add details about any cluster testing that has been done with this patch?

@AmatyaAvadhanula , I have added cluster testing details in the PR description, hope this suffices.
I will update the details of the benchmarking soon.

Update: Added benchmarking details too.

Copy link
Contributor

@AmatyaAvadhanula AmatyaAvadhanula left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!
Have minor suggestions about naming

@@ -38,9 +38,8 @@
@JsonSubTypes.Type(name = "segmentTransactionalInsert", value = SegmentTransactionalInsertAction.class),
@JsonSubTypes.Type(name = "segmentTransactionalAppend", value = SegmentTransactionalAppendAction.class),
@JsonSubTypes.Type(name = "segmentTransactionalReplace", value = SegmentTransactionalReplaceAction.class),
// Type name doesn't correspond to the name of the class for backward compatibility.
@JsonSubTypes.Type(name = "segmentListById", value = RetrieveSegmentsByIdAction.class),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Please rename type name to "retrieveSegmentsById" since there is no need to maintain backward compatibility for this action

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I named it segmentListById to be consistent with segmentListUsed and segmentListUnused. If you feel strongly about this, I can include the rename in my follow up PR.

Copy link
Contributor

@AmatyaAvadhanula AmatyaAvadhanula Jun 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The names are supposed to be consistent with the task action's.
The comment that was moved indicates that certain actions have a different key than the name because of backward compatibility.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The names are supposed to be consistent with the task action's.

Sure, this is preferable, but not a requirement.

In this case, it made more sense to me to adhere to the nomenclature that we are now going to support forever i.e. segmentListXXX.

But I agree that it is better to stick to the convention used by all the other task actions rather than the 2 bad ones. Thanks for calling this out!

@@ -274,7 +275,7 @@ Stream<SegmentsOfInterval> getAllSegmentsOfInterval()
{
this.appenderator = Preconditions.checkNotNull(appenderator, "appenderator");
this.segmentAllocator = Preconditions.checkNotNull(segmentAllocator, "segmentAllocator");
this.usedSegmentChecker = Preconditions.checkNotNull(usedSegmentChecker, "usedSegmentChecker");
this.usedSegmentChecker = Preconditions.checkNotNull(usedSegmentChecker, "segmentRetriever");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename this variable to segmentRetriever

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do this in a follow up PR that renames the other things too. Don't want to trigger CI for this.

@kfaraz kfaraz merged commit 0fe6a2a into apache:master Jun 24, 2024
87 checks passed
@kfaraz kfaraz deleted the fix_metadata_inconsistency branch June 24, 2024 04:26
@zargor
Copy link

zargor commented Jun 28, 2024

We hit this issue. I guess we can expect this in the next release of Druid...

@kfaraz kfaraz added this to the 31.0.0 milestone Oct 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants