Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Clean up partition assignment logic #7249

Merged
merged 2 commits into from Sep 9, 2019
Merged

MINOR: Clean up partition assignment logic #7249

merged 2 commits into from Sep 9, 2019

Conversation

vvcephei
Copy link
Contributor

These are just some "tidying up" changes I made when I was preparing to start working on KIP-441.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Contributor Author

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang , what do you think about this cleanup? It mostly arose out of me trying to grok the code while getting ramped up on the KIP-441 changes.

protected final Set<Integer> supportedVersions = new HashSet<>();

private Logger log;
private String logPrefix;
public enum Error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to another class to shorten this file a little.

}
}

private static final class InternalStreamsConfig extends StreamsConfig {
private InternalStreamsConfig(final Map<?, ?> props) {
private static final class QuietStreamsConfig extends StreamsConfig {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just making this a little more readable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a org.apache.kafka.streams.internals.QuietStreamsConfig to use.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, that's in test-utils.

I could move it to a separate internal class in this module, and then use it in test-utils, though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, let's do that: we should try our best to avoid code duplicates.

@@ -412,24 +415,28 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
}

final boolean versionProbing;
if (futureMetadataVersion != UNKNOWN) {
if (futureMetadataVersion == UNKNOWN) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoiding unnecessary negation.

} else {
numPartitionsCandidate = metadata.partitionCountForTopic(sourceTopicName);
final Integer count = metadata.partitionCountForTopic(sourceTopicName);
if (count == null) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoiding an unboxing warning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The topic may not be in the metadata due to some edge conditions, we cannot throw the exception in this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This exception would always have happened anyway (it would have thrown an NPE on unboxing). What alternative handling would you recommend?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, in that case we can do this as-is then: I need to think through what's possible under the context of KIP-429 anyways so we can have a thorough discussion in a later PR.

@vvcephei
Copy link
Contributor Author

vvcephei commented Sep 3, 2019

@bbejeck , do you have time to take a look at this? It's in preparation for KIP-441 work.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a pass over the PR. Overall I think it is a good idea to refactor StreamsPartitionAssignor to make it a smaller class while moving static functions and other classes into another class.

}
}

private static final class InternalStreamsConfig extends StreamsConfig {
private InternalStreamsConfig(final Map<?, ?> props) {
private static final class QuietStreamsConfig extends StreamsConfig {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a org.apache.kafka.streams.internals.QuietStreamsConfig to use.


private String getUserEndpoint(final StreamsConfig streamsConfig) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a wild thought: such functions can just be made static, and maybe we can make all of them part of a StreamsPartitionAssignorUtil class along with the AssignerError as well, if our primary goal is to make this class smaller and simpler?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this sounds good. I was thinking about making a separate helper class for parsing out the configs.

} else {
numPartitionsCandidate = metadata.partitionCountForTopic(sourceTopicName);
final Integer count = metadata.partitionCountForTopic(sourceTopicName);
if (count == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The topic may not be in the metadata due to some edge conditions, we cannot throw the exception in this case.

Copy link
Contributor Author

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang , I've updated the PR to pull more of the logic out into helper classes.

final String name;
final Map<String, String> topicConfigs;

private int numberOfPartitions = StreamsPartitionAssignor.UNKNOWN;
private Optional<Integer> numberOfPartitions = Optional.empty();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This must have been a refactoring mistake at some point. "Unknown Protocol Version" is clearly the wrong constant to use to represent "Unknown Number of Partitions".

Since this isn't hot path, I've swapped it out for Optional to hopefully prevent any ambiguity.

return numberOfPartitions;
}

void setNumberOfPartitions(final int numberOfPartitions) {
public void setNumberOfPartitions(final int numberOfPartitions) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was necessary as part of breaking the StreamsPartitionAssignor into multiple classes.

if (existedTopicPartition.containsKey(topicName)) {
if (!existedTopicPartition.get(topicName).equals(numberOfPartitions)) {
final Optional<Integer> numberOfPartitions = entry.getValue().numberOfPartitions();
if (existedTopicPartition.containsKey(topicName) && numberOfPartitions.isPresent()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This extra condition should be effectively equivalent, since if we have an existing partition count, then it shouldn't have ever been -1 and therefore we should be fine to only run the equality check if number of partitions is also specified.

private final static int VERSION_TWO = 2;
private final static int VERSION_THREE = 3;
private final static int VERSION_FOUR = 4;
private final static int EARLIEST_PROBEABLE_VERSION = VERSION_THREE;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to separate "constants" class StreamsAssignmentProtocolVersions.

private final static int VERSION_THREE = 3;
private final static int VERSION_FOUR = 4;
private final static int EARLIEST_PROBEABLE_VERSION = VERSION_THREE;
protected final Set<Integer> supportedVersions = new HashSet<>();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was only present to (unnecessarily) support test logic. I refactored the test and removed the field.

public class AssignmentInfo {

private static final Logger log = LoggerFactory.getLogger(AssignmentInfo.class);

public static final int LATEST_SUPPORTED_VERSION = 4;
static final int UNKNOWN = -1;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These (and the SubscriptionInfo) are logically the same as the "assignment protocol version", so I've migrated them to StreamsAssignmentProtocolVersions.

public class SubscriptionInfo {

private static final Logger log = LoggerFactory.getLogger(SubscriptionInfo.class);

public static final int LATEST_SUPPORTED_VERSION = 4;
static final int UNKNOWN = -1;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned above, these are migrated to StreamsAssignmentProtocolVersions.

@@ -119,7 +120,7 @@
private final String userEndPoint = "localhost:8080";
private final String applicationId = "stream-partition-assignor-test";

private final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
private TaskManager taskManager;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The story here is that (I think wrongly), the tests were using the task manager via the config map before, but not always initializing the mock. I guess a lot of the tests never accessed its methods before. I also guess that the mock just returns null for field accesses (specifically adminClient), but it doesn't validate the field access. Now that it's a method, it need the expectation of the call always, so we always need to initialize the mock now.

The changes I made in this class are related to this. Basically, we always initialize the taskManager mock before using it to configure the StreamsPartitionAssignor.

EasyMock.expect(taskManager.builder()).andReturn(builder).anyTimes();
EasyMock.expect(taskManager.prevActiveTaskIds()).andReturn(prevTasks).anyTimes();
EasyMock.expect(taskManager.cachedTasksIds()).andReturn(cachedTasks).anyTimes();
EasyMock.expect(taskManager.processId()).andReturn(processId).anyTimes();
EasyMock.replay(taskManager);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We sometimes add more expectations to the mock before using it, so I've moved the replay inline, so each method can customize their task manager.


final List<TaskId> activeTaskList = asList(task0, task3);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This stuff is just reorganized so that each field is defined just before it's used (so you can better tell how each field is used in the test).

@vvcephei
Copy link
Contributor Author

vvcephei commented Sep 5, 2019

Just for reference, the tests failed, as the copartitioning logic had too high NPath complexity.

I golfed it a bit, and I think it's significantly more readable now.

While refactoring, I noticed for the first time that it doesn't just validate the partition counts, it also sets the partition counts of the repartition topics involved in the cogroup. So, I went ahead and renamed the logic to "enforce" rather than "validate".

@vvcephei
Copy link
Contributor Author

vvcephei commented Sep 5, 2019

Rebased to resolve merge conflicts.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Just one nit comment otherwise LGTM.

Could we trigger a streams system test just to make sure there's no regression before merging?


import static org.apache.kafka.common.utils.Utils.getHost;
import static org.apache.kafka.common.utils.Utils.getPort;
import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe we can just merge StreamsAssignmentProtocolVersions to this class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback. I separated them because there are lots of classes that need to reference the assignment protocol versions, but there's only one that needs to parse configurations. It would be a shame to couple knowledge of PartitionAssignor configurations into SubscriptionInfo, and AssignmentInfo, for example.

@vvcephei
Copy link
Contributor Author

vvcephei commented Sep 6, 2019

Some system test failures look related. I'm digging into it.

@vvcephei
Copy link
Contributor Author

vvcephei commented Sep 6, 2019

Ok, finally figured out the system test failure was related to some funny business in the "FutureStreamsPartitionAssignor". I think I fixed it. Kicked off: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/2985/

@vvcephei
Copy link
Contributor Author

vvcephei commented Sep 7, 2019

Results: http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2019-09-06--001.1567846807--vvcephei--pre-441-cleanup--19ee100/report.html

A couple of tests failed 2 out of 3 times. I’ll look into it when I get a chance.

@vvcephei
Copy link
Contributor Author

vvcephei commented Sep 8, 2019

Those tests didn't look related to me, so I ran them again, 10 times:

my branch:

trunk:

This looks pretty good to me, so I'd be in favor of merging, once we get one more approval.

@guozhangwang
Copy link
Contributor

Thanks @vvcephei for the follow-up! Since this is a cleaning up I think I can just merge it myself.

@guozhangwang guozhangwang merged commit 0f177ea into apache:trunk Sep 9, 2019
@vvcephei
Copy link
Contributor Author

vvcephei commented Sep 9, 2019

Just for kicks, I ran the Streams test suite again x3:
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2019-09-08--001.1568011310--vvcephei--pre-441-cleanup--6cc2c66/report.html

This time, the EOS test failed again, twice (and the broker resilience test failed once).
It's interesting that EOS didn't fail in the x10 run, and it also hasn't been failing on trunk (which is in the context of a x1 run).

Regardless, it doesn't seem to be failing due to anything in this PR...

@vvcephei vvcephei deleted the pre-441-cleanup branch September 9, 2019 16:03
guozhangwang pushed a commit that referenced this pull request Jan 7, 2020
…ics counts (#7904)

This PR fixes the regression introduced in 2.4 from 2 refactoring PRs:
#7249
#7419

The bug was introduced by having a logical path leading numPartitionsCandidate to be 0, which is assigned to numPartitions and later being checked by setNumPartitions. In the subsequent check we will throw illegal argument if the numPartitions is 0.

This bug is both impacting new 2.4 application and upgrades to 2.4 in certain types of topology. The example in original JIRA was imported as a new integration test to guard against such regression. We also verify that without the bug fix application will still fail by running this integration test.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
guozhangwang pushed a commit that referenced this pull request Jan 7, 2020
…ics counts (#7904)

This PR fixes the regression introduced in 2.4 from 2 refactoring PRs:
#7249
#7419

The bug was introduced by having a logical path leading numPartitionsCandidate to be 0, which is assigned to numPartitions and later being checked by setNumPartitions. In the subsequent check we will throw illegal argument if the numPartitions is 0.

This bug is both impacting new 2.4 application and upgrades to 2.4 in certain types of topology. The example in original JIRA was imported as a new integration test to guard against such regression. We also verify that without the bug fix application will still fail by running this integration test.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
guozhangwang pushed a commit to confluentinc/kafka that referenced this pull request Jan 7, 2020
…ics counts (apache#7904)

This PR fixes the regression introduced in 2.4 from 2 refactoring PRs:
apache#7249
apache#7419

The bug was introduced by having a logical path leading numPartitionsCandidate to be 0, which is assigned to numPartitions and later being checked by setNumPartitions. In the subsequent check we will throw illegal argument if the numPartitions is 0.

This bug is both impacting new 2.4 application and upgrades to 2.4 in certain types of topology. The example in original JIRA was imported as a new integration test to guard against such regression. We also verify that without the bug fix application will still fail by running this integration test.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
qq619618919 pushed a commit to qq619618919/kafka that referenced this pull request May 12, 2020
…ics counts (apache#7904)

This PR fixes the regression introduced in 2.4 from 2 refactoring PRs:
apache#7249
apache#7419

The bug was introduced by having a logical path leading numPartitionsCandidate to be 0, which is assigned to numPartitions and later being checked by setNumPartitions. In the subsequent check we will throw illegal argument if the numPartitions is 0.

This bug is both impacting new 2.4 application and upgrades to 2.4 in certain types of topology. The example in original JIRA was imported as a new integration test to guard against such regression. We also verify that without the bug fix application will still fail by running this integration test.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants