Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values() #9007

Conversation

tombentley
Copy link
Contributor

As per KIP-621. Also added some tests in KafkaAdminClientTest

@tombentley
Copy link
Contributor Author

@cmccabe, @omkreddy, @mimaison, @dajac, @dongjinleekr since you all voted on the KIP feel free to review.

Copy link
Contributor

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tombentley Thanks for the PR. It looks good overall. I have left few comments.

Comment on lines 57 to 60
return "LogDirDescription{" +
"replicaInfos=" + replicaInfos +
", error=" + error +
'}';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We usually use parenthesis instead of curly braces.

Comment on lines 64 to 68
return "ReplicaInfo{" +
"size=" + size +
", offsetLag=" + offsetLag +
", isFuture=" + isFuture +
'}';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: curly braces instead of parenthesis.

@@ -64,7 +63,7 @@ object LogDirsCommand {
"logDirs" -> logDirInfos.map { case (logDir, logDirInfo) =>
Map(
"logDir" -> logDir,
"error" -> logDirInfo.error.exceptionName(),
"error" -> Option(logDirInfo.error).flatMap(ex => Some(ex.getClass.getName)).orNull,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can't we use map instead of flatMap and remove the Some?

HashMap<String, LogDirDescription> result = new HashMap<>(response.data().results().size());
for (DescribeLogDirsResponseData.DescribeLogDirsResult logDirResult : response.data().results()) {
Map<TopicPartition, ReplicaInfo> replicaInfoMap = new HashMap<>();
if (logDirResult.topics() != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: topics is not nullable in the protocol so it should never be null, does it?

TopicPartition tp = replicaInfoEntry.getKey();
DescribeLogDirsResponse.ReplicaInfo replicaInfo = replicaInfoEntry.getValue();
ReplicaInfo replicaInfo = replicaInfoEntry.getValue();
ReplicaLogDirInfo replicaLogDirInfo = replicaDirInfoByPartition.get(tp);
if (replicaLogDirInfo == null) {
handleFailure(new IllegalStateException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to your PR but this look weird. It seems that we fail all the futures if an unexpected replica is provided by the broker in the response. I think that we should log a warning when this happen like we do in the other methods (e.g. createTopics). What do you think?

Comment on lines 1083 to 1091
Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> descriptions = result.descriptions();
assertEquals(Collections.singleton(0), descriptions.keySet());
assertNotNull(descriptions.get(0));
assertEquals(Collections.singleton("/var/data/kafka"), descriptions.get(0).get().keySet());
assertNull(descriptions.get(0).get().get("/var/data/kafka").error());
assertEquals(Collections.singleton(tp), descriptions.get(0).get().get("/var/data/kafka").replicaInfos().keySet());
assertEquals(1234567890, descriptions.get(0).get().get("/var/data/kafka").replicaInfos().get(tp).size());
assertEquals(0, descriptions.get(0).get().get("/var/data/kafka").replicaInfos().get(tp).offsetLag());
assertFalse(descriptions.get(0).get().get("/var/data/kafka").replicaInfos().get(tp).isFuture());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These blocks of assertions are quite hard to read. Can we try to make them more digestable? We could perhaps extract temporary variable to reduce the number of .get(). We could also define an verifyDescription helper that verify a LogDirDescription for instance. It may be worth having dedicated unit tests for the new and the old APIs as well.

Comment on lines 51 to 52
case class ReplicaInfo(size: Long, offsetLag: Long, isFuture: Boolean)
case class LogDirInfo(error: Errors, replicaInfos: Map[TopicPartition, ReplicaInfo])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a pity that we have to redefine there classes here. Couldn't we update the test to work with the plain response instead?

@tombentley
Copy link
Contributor Author

Thats for the review @dajac. I've addressed your comments.

@@ -64,7 +63,7 @@ object LogDirsCommand {
"logDirs" -> logDirInfos.map { case (logDir, logDirInfo) =>
Map(
"logDir" -> logDir,
"error" -> logDirInfo.error.exceptionName(),
"error" -> Option(logDirInfo.error).map(ex => ex.getClass.getName).orNull,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for the Option way. 😄

Copy link
Contributor

@dongjinleekr dongjinleekr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. One minor change! 😃

* possibly some other exception if there were problems describing the log directory
* or null if the directory is online.
*/
public ApiException error() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about this? (in consistency with current DescribeLogDirsResponse.LogDirInfo):

Returns `ApiException` if the log directory is offline or an error occurred. If not, returns null.
<p><ul>
<li> KafkaStorageException - The log directory is offline.
<li> UnknownServerException - The server experienced an unexpected error when processing the request.
</ul><p>

(Description of UnknownServerException was from Errors.UNKNOWN_SERVER_ERROR)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency with the other Javadoc it seems like we should use a single <p> only when between the paragraphs. Please remove the <p> tags. (Sorry, I was also confused.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should remove the <p> before the <ul> also. (see below.)
kafka KafkaStreams java at trunk · apache kafka

@tombentley tombentley force-pushed the KAFKA-10120-DescribeLogDirsResult-exposes-internal-classes branch from 33a9413 to ba2160c Compare July 14, 2020 15:21
@tombentley
Copy link
Contributor Author

@dongjinleekr thanks for the review, amended. Also rebased for conflict.

private final Map<TopicPartition, ReplicaInfo> replicaInfos;
private final ApiException error;

public LogDirDescription(ApiException error, Map<TopicPartition, ReplicaInfo> replicaInfos) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better to have package-private visibility?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially thought the same, but TopicDescription, for example (as well as other classes accessible from *Results classes) have a public constructors.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's great point. At the moment, I think that we are not consistent about this. Some are package private and some are not. The advantage of keeping it public is that it allows to use the class in unit tests which resides in other packages.

@@ -95,6 +76,7 @@ public static DescribeLogDirsResponse parse(ByteBuffer buffer, short version) {
* KAFKA_STORAGE_ERROR (56)
* UNKNOWN (-1)
*/
@Deprecated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it need comment to describe the replacement? for example

@deprecated Deprecated Since Kafka 2.7. Use {@link LogDirDescription}.

Copy link
Contributor

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tombentley Thanks for the updated PR. I have left some more comments.

private final Map<TopicPartition, ReplicaInfo> replicaInfos;
private final ApiException error;

public LogDirDescription(ApiException error, Map<TopicPartition, ReplicaInfo> replicaInfos) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's great point. At the moment, I think that we are not consistent about this. Some are package private and some are not. The advantage of keeping it public is that it allows to use the class in unit tests which resides in other packages.

Comment on lines 1086 to 1092
assertEquals(Collections.singleton("/var/data/kafka"), descriptionsMap.keySet());
assertNull(descriptionsMap.get("/var/data/kafka").error());
Map<TopicPartition, ReplicaInfo> descriptionsReplicaInfos = descriptionsMap.get("/var/data/kafka").replicaInfos();
assertEquals(Collections.singleton(tp), descriptionsReplicaInfos.keySet());
assertEquals(1234567890, descriptionsReplicaInfos.get(tp).size());
assertEquals(0, descriptionsReplicaInfos.get(tp).offsetLag());
assertFalse(descriptionsReplicaInfos.get(tp).isFuture());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block of assertions is used multiple times. Would it make sense to extract it in a helper method, say assertDescriptions, that verifies a descriptions map contains the information about a single log dir/topic partition?

Something like assertDescriptionContains(descriptionsMap, logDir, tp, size, offsetLag, isFuture).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might have less mileage than you expected because the different types mean we need two methods each with two call sites, rather than 4 call sites for a single method, but I've done it anyway.

}

@Test
public void testDescribeReplicaLogDirs() throws ExecutionException, InterruptedException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not due to your PR but shall we add a unit test which uses multiple brokers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added it to the existing test. Due to the new helper methods I felt this didn't really complicate the test very much and is also allows us to cover the case where the RPC returns STORAGE_ERROR.

* possibly some other exception if there were problems describing the log directory
* or null if the directory is online.
*/
public ApiException error() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency with the other Javadoc it seems like we should use a single <p> only when between the paragraphs. Please remove the <p> tags. (Sorry, I was also confused.)

@tombentley
Copy link
Contributor Author

@dongjinleekr ah, thank you, I hadn't noticed that that was the norm.

Copy link
Contributor

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tombentley Thanks for the update. LGTM pending jenkins.

@omkreddy
Copy link
Contributor

ok to test

Copy link
Member

@mimaison mimaison left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @tombentley. I left a few comments

@@ -2306,6 +2310,22 @@ void handleFailure(Throwable throwable) {
return new DescribeLogDirsResult(new HashMap<>(futures));
}

private Map<String, LogDirDescription> logDirDescriptions(DescribeLogDirsResponse response) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be static. Also should we keep it in DescribeLogDirsResponse?

@@ -2306,6 +2310,22 @@ void handleFailure(Throwable throwable) {
return new DescribeLogDirsResult(new HashMap<>(futures));
}

private Map<String, LogDirDescription> logDirDescriptions(DescribeLogDirsResponse response) {
HashMap<String, LogDirDescription> result = new HashMap<>(response.data().results().size());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The left side can be Map

@@ -1057,6 +1059,263 @@ public void testDescribeConfigsUnrequested() throws Exception {
}
}

private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors error, String logDir, TopicPartition tp, long partitionSize, long offsetLag) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be static

prepareDescribeLogDirsTopics(partitionSize, offsetLag, tp.topic(), tp.partition(), false));
}

private List<DescribeLogDirsTopic> prepareDescribeLogDirsTopics(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be static

.setOffsetLag(offsetLag))));
}

private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors error, String logDir,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be static

}
}

private void assertDescriptionContains(Map<String, LogDirDescription> descriptionsMap, String logDir,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be static

}

@SuppressWarnings("deprecation")
private void assertDescriptionContains(Map<String, DescribeLogDirsResponse.LogDirInfo> descriptionsMap,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be static

@tombentley
Copy link
Contributor Author

@mimaison done, thanks.

@mimaison
Copy link
Member

retest this please

@mimaison
Copy link
Member

ok to test

@mimaison
Copy link
Member

Failures look unrelated:

  • org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest > shouldUpgradeFromEosAlphaToEosBeta[true] FAILED
  • org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > testOneWayReplicationWithAutorOffsetSync1

@mimaison mimaison merged commit 819cd45 into apache:trunk Jul 30, 2020
@dongjinleekr
Copy link
Contributor

@tombentley Congratulations! ㊗️ @omkreddy @mimaison Thanks again for the detailed review, as usual! 😃

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants