Skip to content

Comments

Remove skip ignorable shards#13221

Merged
AmatyaAvadhanula merged 7 commits intoapache:masterfrom
AmatyaAvadhanula:feature-remove_skipIgnorableShards
Oct 28, 2022
Merged

Remove skip ignorable shards#13221
AmatyaAvadhanula merged 7 commits intoapache:masterfrom
AmatyaAvadhanula:feature-remove_skipIgnorableShards

Conversation

@AmatyaAvadhanula
Copy link
Contributor

@AmatyaAvadhanula AmatyaAvadhanula commented Oct 13, 2022

Description

It's possible in kinesis ingestion for closed and empty shards to be considered during assignment of shards to tasks. This could potentially lead to a skew in the data processed by workers. since some tasks may have more closed and empty shards in their assignment. #12235 was raised to solve this by adding a flag skipIgnorableShards . When this flag is set, supervisor would try to exclude such useless (aka ignorable) shards from assignment. However, after more careful analysis, it turns out that the flag is not really needed due to the following reasons:

  • #12787 points out that the flag never worked at all due to a bug in its implementation
  • When there are too many shards (100's or 1000's due to resharding), setting flag can be very expensive and the supervisor can take a long time to start because of the overhead in determining which shards can be ignored.
  • repartitionTransitionDuration is the period after which new assignment to tasks happens if the set of active shards has changed. A supervisor can begin ingesting early and once the tasks discover that closed and empty shards are present or even if closed shards have been completely read, a new allocation of the currently active shards to the stream occurs after this duration (2 minutes by default). As a result the ignorable shards are not considered for the next assignment.

For tips about how to write a good release note, see Release notes.



This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@kfaraz kfaraz added Area - Streaming Ingestion AWS Kinesis For changes in Kinesis ingestion labels Oct 14, 2022
@kfaraz kfaraz self-requested a review October 18, 2022 08:53
Comment on lines 808 to 821
final Set<String> retVal = new TreeSet<>();
ListShardsRequest request = new ListShardsRequest().withStreamName(stream);
while (true) {
ListShardsResult result = kinesis.listShards(request);
retVal.addAll(result.getShards()
.stream()
.map(Shard::getShardId)
.collect(Collectors.toList())
);
String nextToken = result.getNextToken();
if (nextToken == null) {
return retVal;
}
request = new ListShardsRequest().withNextToken(nextToken);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the difference between this and getShardsUsingListShards?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing it out. This is an error and would always use listShards to get partition Ids

@AmatyaAvadhanula AmatyaAvadhanula merged commit 9cbda66 into apache:master Oct 28, 2022
@kfaraz kfaraz added this to the 25.0 milestone Nov 22, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Area - Streaming Ingestion AWS Kinesis For changes in Kinesis ingestion

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants