New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-7149 Reduce assignment data size #5663
Conversation
Thanks for the PR, @brary ! I'll take a look over it. |
Test failure seems to be related (happened in both builds):
|
I have tested the test manually and with ./gradlew -Dtest.single=StreamsMetadataStateTest stream:test -x streams:checkstyleMain -x streams:checkstyleTest , the build is successful. Any idea how a unit test case can fail here but run fine on my machine. |
partitionsByHostState.put(hostInfo, topicPartitions); | ||
} | ||
} | ||
} else if (minReceivedMetadataVersion >= 4) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we reorder flow as below for better readability & progression:
if(version >= 4) { .... }
else if(version >=2) { .... }
..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do it.
for (final Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) { | ||
final HostInfo hostInfo = entry.getValue().hostInfo; | ||
|
||
if (hostInfo != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we also check for entry.getValue().state is not null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not required as entry can't be null and still be in assign, entry.getValue().state is already being used in Step 2.
} | ||
log.info("Size of assignment is: " + assignmentSize + " bytes. Total hosts in this assignment are " + clientsMetadata.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it be total instances/processes instead of hosts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I will make the change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Some initial comments.
key, | ||
new DefaultStreamPartitioner<>(keySerializer, clusterMetadata), | ||
sourceTopicsInfo); | ||
if (assignmentVersion >= 4) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: please add {
and }
for all blocks (it's agreed code style) throughout the whole PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will make this change.
@@ -208,9 +219,23 @@ public String toString(final String indent) { | |||
* @param currentState the current mapping of {@link HostInfo} -> {@link TopicPartition}s | |||
* @param clusterMetadata the current clusterMetadata {@link Cluster} | |||
*/ | |||
synchronized void onChange(final Map<HostInfo, Set<TopicPartition>> currentState, final Cluster clusterMetadata) { | |||
synchronized void onChangeOldVersion(final Map<HostInfo, Set<TopicPartition>> currentState, final Cluster clusterMetadata, final int version) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OldVersion
is not very intuitive -- also, how to rename if we add a third version?
Maybe better: updateMetadataWithPartitionInfo
?
rebuildMetdataOldVersion(currentState, version); | ||
} | ||
|
||
synchronized void onChangeNewVersion(final Map<HostInfo, Set<TaskId>> currentState, final Cluster clusterMetadata, final int version) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-> updateMetadataWithTaskInfo
?
@@ -222,7 +247,8 @@ private boolean hasPartitionsForAnyTopics(final List<String> topicNames, final S | |||
return false; | |||
} | |||
|
|||
private void rebuildMetadata(final Map<HostInfo, Set<TopicPartition>> currentState) { | |||
private void rebuildMetdataOldVersion(final Map<HostInfo, Set<TopicPartition>> currentState, final int version) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above: naming
} | ||
} | ||
|
||
private void rebuildMetadataNewVersion(final Map<HostInfo, Set<TaskId>> currentState, final int version) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above: naming
if (receivedAssignmentMetadataVersion >= 4) | ||
taskManager.setTasksByHostState(tasksByHosts, receivedAssignmentMetadataVersion); | ||
else | ||
taskManager.setPartitionsByHostState(partitionsByHostState, receivedAssignmentMetadataVersion); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering, if we could unify setTasksByHostState
and setPartitionsByHostState
into one method? If we can translate tasksByHosts -> partitionsByHostState
(or reverse) we could simplify the code (ie, translate once after rebalance, and have unified code everywhere else?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can remove partitionsByHostState altogether from AssignmentInfo and convert it into tasksByHosts for older versions as well in the assign function. So, all the assignmentInfo's be it from older version or newer will have tasksByHosts. But with this should we update the version of AssingmentInfo to 4 or keep it whatever it is originally.
|
||
public AssignmentInfo(final Map<HostInfo, Set<TaskId>> tasksByHost, | ||
final List<TaskId> activeTasks, | ||
final Map<TaskId, Set<TopicPartition>> standbyTasks) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why different parameter order compared to other constructor?
@@ -130,8 +157,7 @@ public int latestSupportedVersion() { | |||
*/ | |||
public ByteBuffer encode() { | |||
final ByteArrayOutputStream baos = new ByteArrayOutputStream(); | |||
|
|||
try (final DataOutputStream out = new DataOutputStream(baos)) { | |||
try (final DataOutputStream out = new DataOutputStream(new GZIPOutputStream(baos))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will break backward compatibility as encoding of earlier versions (<=3) should not change. We can leave version-bytes uncompressed and can compress bytes following version-bytes for v4. This way we have flexibility to choose completely different compression/encoding for later versions (v5, v6..)
@@ -230,7 +275,7 @@ public static AssignmentInfo decode(final ByteBuffer data) { | |||
// ensure we are at the beginning of the ByteBuffer | |||
data.rewind(); | |||
|
|||
try (final DataInputStream in = new DataInputStream(new ByteBufferInputStream(data))) { | |||
try (final DataInputStream in = new DataInputStream(new GZIPInputStream(new ByteBufferInputStream(data)))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Compression wrapping would need to be modified as per new encoding changes.
@@ -297,10 +342,10 @@ private static void decodeVersionTwoData(final AssignmentInfo assignmentInfo, | |||
final DataInputStream in) throws IOException { | |||
decodeActiveTasks(assignmentInfo, in); | |||
decodeStandbyTasks(assignmentInfo, in); | |||
decodeGlobalAssignmentData(assignmentInfo, in); | |||
decodeGlobalPartitionsAssignmentData(assignmentInfo, in); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we rename it without 'Global' if there is no significance to it.
…roker (#5745) Reviewers: Dong Lin <lindong28@gmail.com>
…5791) Reviewers: Jason Gustafson <jason@confluent.io>
Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>
Adds `client.dns.lookup=resolve_canonical_bootstrap_servers_only` option to perform full dns resolution of bootstrap addresses Reviewers: Colin Patrick McCabe <colin@cmccabe.xyz>, Sriharsha Chintalapani <sriharsha@apache.org>, Edoardo Comar <ecomar@uk.ibm.com>, Mickael Maison <mickael.maison@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>, Rajini Sivaram <rajinisivaram@googlemail.com>
1. In test_upgrade_downgrade_brokers, allow duplicates to happen. 2. In test_version_probing_upgrade, grep the generation numbers from brokers at the end, and check if they can ever be synchronized. Reviewers: John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>
Update `ClientUtilsTest.testParseAndValidateAddressesWithReverseLookup` test to work in environments where ipv6 is not enabled and `InetAddress.getAllByName` doesn't return ipv6 addresses. Reviewers: Ismael Juma <ismael@juma.me.uk>
Reviewers: Guozhang Wang <guozhang@confluent.io>, Jim Galasyn <jim.galasyn@confluent.io>
Stop using current system time by default, as it introduces non-determinism. Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
Reviewers: Matthias J. Sax <matthias@confluent.io>, John Roesler <john@confluent.io>
Reviewers: Matthias J. Sax <matthias@confluent.io>, John Roesler <john@confluent.io>
Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>
Documentation changes for adding overloaded StreamsBuilder(java.util.Properties props) method in KIP-312 Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <guozhang@confluent.io>
While working on the documentation updates I realized the Streams Scala API needs to get updated for the addition of Grouped Added a test for Grouped.scala ran all streams-scala tests and streams tests Reviewers: Matthias J. Sax <matthias@confluent.io>, John Roesler <john@confluent.io>, Guozhang Wang <guozhang@confluent.io>
) Reduce tick interval of the mock timer and avoid large timer increments to avoid hitting idle expiry on the client-side before delayed close is processed by the server. Also reduce poll interval in the server to make the test complete faster (since delayed close is only processed when poll returns). Reviewers: Ismael Juma <ismael@juma.me.uk>
…/decompression on encode/decode
…if topic name is not found (#6124) * Update KafkaAdminClient#describeTopics to throw UnknownTopicOrPartitionException. * Remove unused method: WorkerUtils#getMatchingTopicPartitions. * Add some JavaDoc. Reviewed-by: Colin P. McCabe <cmccabe@apache.org>, Ryanne Dolan <ryannedolan@gmail.com>
+ Add a parameter to the ducktap-ak to control the OpenJDK base image. + Fix a few issues of using OpenJDK:11 as the base image. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* Author: Xi Yang <xi@confluent.io> Reviewers: Ewen Cheslack-Postava <ewen@confluent.io> Closes #6071 from yangxi/ducktape-jdk
This patch adds a framework to automatically generate the request/response classes for Kafka's protocol. The code will be updated to use the generated classes in follow-up patches. Below is a brief summary of the included components: **buildSrc/src** The message generator code is here. This code is automatically re-run by gradle when one of the schema files changes. The entire directory is processed at once to minimize the number of times we have to start a new JVM. We use Jackson to translate the JSON files into Java objects. **clients/src/main/java/org/apache/kafka/common/protocol/Message.java** This is the interface implemented by all automatically generated messages. **clients/src/main/java/org/apache/kafka/common/protocol/MessageUtil.java** Some utility functions used by the generated message code. **clients/src/main/java/org/apache/kafka/common/protocol/Readable.java, Writable.java, ByteBufferAccessor.java** The generated message code uses these classes for writing to a buffer. **clients/src/main/message/README.md** This README file explains how the JSON schemas work. **clients/src/main/message/\*.json** The JSON files in this directory implement every supported version of every Kafka API. The unit tests automatically validate that the generated schemas match the hand-written schemas in our code. Additionally, there are some things like request and response headers that have schemas here. **clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashSet.java** I added an optimization here for empty sets. This is useful here because I want all messages to start with empty sets by default prior to being loaded with data. This is similar to the "empty list" optimizations in the `java.util.ArrayList` class. Reviewers: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Ismael Juma <ismael@juma.me.uk>, Bob Barrett <bob.barrett@outlook.com>, Jason Gustafson <jason@confluent.io>
Using AdminClient#alterConfigs, topic `retention.ms` property can be assigned to a value lesser than -1. This leads to inconsistency while describing the topic configuration. We should not allow values lesser than -1. Author: Kamal Chandraprakash <kamal.chandraprakash@gmail.com> Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>,Matthias J. Sax <matthias@confluent.io> Closes #6082 from kamalcph/KAFKA-7781
…ts from the data plane (#5921) KIP-291 Implementation : Added code to separate controller connections and requests from the data plane. Tested with local deployment that the controller request are handled by the control plane and other requests are handled by the data plane. Also added unit tests in order to test the functionality. Author: Lucas Wang <luwang@linkedin.com>, Author: Mayuresh Gharat <gharatmayuresh15@gmail.com> Reviewers: Joel Koshy <jjkoshy@gmail.com>, Jun Rao <junrao@gmail.com>
…ion (#6128) The existing javadoc for PartitionGroup is a little confusing. It's relatively important for these concepts to be clear, since they form the basis for stream-time in Kafka Streams. Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Another system test that needs to be updated with states in the correct order Reviewers: Guozhang Wang <wangguoz@gmail.com>
kafka-delegation-tokens.sh is the name of the script
Reviewers: Bill Bejeck <bill@confluent.io>, John Roesler <john@confluent.io>, Guozhang Wang <guozhang@confluent.io>, Ryanne Dolan <ryannedolan@gmail.com>, Ismael Juma <ismael@confuent.io>
Expose a programmatic way to bring up a Kafka and Zk cluster through Java API to facilitate integration tests for framework level changes in Kafka Connect. The Kafka classes would be similar to KafkaEmbedded in streams. The new classes would reuse the kafka.server.KafkaServer classes from :core, and provide a simple interface to bring up brokers in integration tests. Signed-off-by: Arjun Satish <arjunconfluent.io> Author: Arjun Satish <arjun@confluent.io> Author: Arjun Satish <wicknicks@users.noreply.github.com> Reviewers: Randall Hauch <rhauch@gmail.com>, Konstantine Karantasis <konstantine@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io> Closes #5516 from wicknicks/connect-integration-test
…for (#6096) Reviewers: Jun Rao <junrao@gmail.com>
Added testing of logical types for Kafka Connect in support of KIP-145 features. Added tests for Boolean, Time, Date and Timestamp, including the valid conversions. The area of ISO8601 strings is a bit of a mess because the tokenizer is not compatible with that format, and a subsequent JIRA will be needed to fix that. A few small fixes as well as creating test cases, but they're clearly just corrections such as using 0 to mean January (java.util.Calendar uses zero-based month numbers). Author: Andrew Schofield <andrew_schofield@uk.ibm.com> Reviewers: Mickael Maison <mimaison@users.noreply.github.com>, Ewen Cheslack-Postava <ewen@confluent.io> Closes #6077 from AndrewJSchofield/KAFKA-7461-ConverterValuesLogicalTypesTest
Update the Trogdor StringExpander regex to handle an epilogue. Previously the regex would use a lazy quantifier at the end, which meant it would not catch anything after the range expression. Add a unit test. Reviewed-by: Colin P. McCabe <cmccabe@apache.org>
…/decompression on encode/decode
…nts will have taskIds
There seems to be something wrong with this PR. It's multiple hundreds of commits and more than 60K line of code change. Cannot review this PR because this issue is resolved. |
I had submitted the commits 3 months back, and I had to rebase my branch since some tests were failing. So, the commits in trunk branch in last 3 months came along with this PR. Do you want me close this and open a new PR with just my commits? |
If you rebase on |
@brary Maybe you can try creating a new PR (git diff the files, create a new branch) if you cannot rebase the commits on this one, if that helps moving faster with the review the merge process. |
Closing this PR, as I have created another PR after rebase which includes just my commits. Here's the PR. |
The PR contains the changes related to Assignment Info re-design where the TopicPartitions are replaced with TaskIDs and GZIP compression is being done on assignmentInfo to reduce the assignment size in version 4.
More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.
Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.
Committer Checklist (excluded from commit message)