New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-7609: Add Protocol Generator for Kafka #5893

Merged
merged 14 commits into from Jan 12, 2019

Conversation

Projects
None yet
5 participants
@cmccabe
Copy link
Contributor

cmccabe commented Nov 8, 2018

No description provided.

@cmccabe

This comment has been minimized.

Copy link
Contributor

cmccabe commented Nov 8, 2018

Overview:

buildSrc/src
The message generator code is here. This code is automatically re-run by gradle when one of the schema files changes. The entire directory is processed at once to minimize the number of times we have to start a new JVM. We use Jackson to translate the JSON files into Java objects.

clients/src/main/java/org/apache/kafka/common/protocol/Message.java
This is the interface implemented by all automatically generated messages.

clients/src/main/java/org/apache/kafka/common/protocol/MessageUtil.java
Some utility functions used by the generated message code.

clients/src/main/java/org/apache/kafka/common/protocol/Readable.java, Writable.java, ByteBufferAccessor.java
The generated message code uses these classes for writing to a buffer.

clients/src/main/message/README.md
This README file explains how the JSON schemas work.

clients/src/main/message/*.json
The JSON files in this directory implement every supported version of every Kafka API. The unit tests automatically validate that the generated schemas match the hand-written schemas in our code. Additionally, there are some things like request and response headers that have schemas here.

clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashSet.java
I added an optimization here for empty sets. This is useful here because I want all messages to start with empty sets by default prior to being loaded with data. This is similar to the "empty list" optimizations in the java.util.ArrayList class

core/src, connect/src, clients/src/main/java/org/apache/kafka/clients/admin, etc.
These changes are related to switching CreateTopicsRequest and CreateTopicsResponse to using the automatically generate classes rather than the hand-written ones.

@cmccabe cmccabe force-pushed the cmccabe:KAFKA-7609 branch from 9cf77ae to 886658c Nov 8, 2018

@cmccabe cmccabe force-pushed the cmccabe:KAFKA-7609 branch from 03676b3 to f6ff824 Nov 14, 2018

@bob-barrett
Copy link
Contributor

bob-barrett left a comment

This looks good to me, Colin. I left a couple minor comments.

As a more general comment, I think we should include the docstrings in the schema files, so that we can eventually generate the protocol docs off of them. And if we're going to do that, we may actually want to be more precise about not changing field names from the current manual schemas, so that people don't get thrown off by changed names. I don't think either of these things necessarily has to happen in this patch, though.

Show resolved Hide resolved core/src/main/scala/kafka/server/KafkaApis.scala Outdated
Show resolved Hide resolved core/src/main/scala/kafka/server/KafkaApis.scala Outdated

@cmccabe cmccabe force-pushed the cmccabe:KAFKA-7609 branch 2 times, most recently from 497b56e to 14d5e20 Nov 21, 2018

@cmccabe cmccabe force-pushed the cmccabe:KAFKA-7609 branch 2 times, most recently from ad3de1a to 2aba254 Nov 29, 2018

@cmccabe

This comment has been minimized.

Copy link
Contributor

cmccabe commented Nov 29, 2018

As a more general comment, I think we should include the docstrings in the schema files, so that we can eventually generate the protocol docs off of them.

That's a good point. I added documentation to all JSON schema files, based on the original Struct documentation. In cases where this documentation was missing, I created protocol documentation for the fields. I also ported over the comments explaining the purpose of each protocol version change.

And if we're going to do that, we may actually want to be more precise about not changing field names from the current manual schemas, so that people don't get thrown off by changed names. I don't think either of these things necessarily has to happen in this patch, though.

I think it's best to decide about field names when doing the conversion for each RPC.

Are we no longer concerned about duplicate topics? Is that handled during deserialization?

I fixed this so that we handle duplicates the same way we do now: by setting an INVALID_REQUEST error for them. This is actually specified in the KIP that added CreateTopics.

In order to make this possible, deserialization needs to be able to represent duplicate topics in the set. I handled this by using multisets instead of sets for data received over the wire. I think this is the correct approach in general because it's flexible and will allow us to covert over the existing code more easily. Throwing a deserialization error when encountering duplicates would be very unfriendly-- deserialization errors just result in the broker disconnecting without an error message.

To make this easier to review, I have split the CreateTopics changes off into a separate PR, #5972 . I think with that stuff split out, this is a very low-impact change since it's just adding a code generator, and not modifying any existing broker code path.

@bob-barrett
Copy link
Contributor

bob-barrett left a comment

Thanks Colin! I agree about handling field names when we do the conversion. I added a couple more comments.

@cmccabe cmccabe force-pushed the cmccabe:KAFKA-7609 branch from 0e95526 to 9813bc5 Dec 5, 2018

@cmccabe

This comment has been minimized.

Copy link
Contributor

cmccabe commented Dec 5, 2018

  • Rebase on trunk

  • Update message schemas for KIP-380

@cmccabe

This comment has been minimized.

Copy link
Contributor

cmccabe commented Dec 7, 2018

retest this please

@hachikuji
Copy link
Contributor

hachikuji left a comment

Thanks, this looks great! I left a few minor comments and some questions.

Show resolved Hide resolved buildSrc/src/main/java/org/apache/kafka/message/FieldSpec.java Outdated
Show resolved Hide resolved buildSrc/src/main/java/org/apache/kafka/message/HeaderGenerator.java Outdated
Show resolved Hide resolved buildSrc/src/main/java/org/apache/kafka/message/FieldType.java Outdated
Show resolved Hide resolved buildSrc/src/main/java/org/apache/kafka/message/CodeBuffer.java Outdated
Show resolved Hide resolved buildSrc/src/main/java/org/apache/kafka/message/CodeBuffer.java Outdated
Show resolved Hide resolved .../main/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiSet.java
Show resolved Hide resolved clients/src/main/resources/message/FetchResponse.json Outdated
"fields": [
{ "name": "ReplicaId", "type": "int32", "versions": "0+",
"about": "The broker ID of the requestor, or -1 if this request is being made by a normal consumer." },
{ "name": "IsolationLevel", "type": "int8", "versions": "2+",

This comment has been minimized.

@hachikuji

hachikuji Jan 2, 2019

Contributor

I wonder if there should be an enum type in this schema. Looking at the generated code, this field is just represented as a byte, but it would be nice to have something friendlier to work with. Otherwise we'll just create the enum manually and do some conversion.

This comment has been minimized.

@cmccabe

cmccabe Jan 3, 2019

Contributor

Yeah, enums would be a good addition. It's a bit trickier than it seems, though. For one thing, we already have a lot of public/stable enum types, and we can't break them. enums can't inherit from other classes, either, so that escape hatch is closed.

Perhaps a simple way to make it work would be generating simple constants

public static int NONE = 0;
public static int OFFSET_OUT_OF_RANGE = 1;
... etc. etc. ...

Then the existing public/stable enums could be retrofitted to grab the values from these declarations. We could also generate classes in cases where there was no existing public thing.

Show resolved Hide resolved buildSrc/src/main/java/org/apache/kafka/message/FieldSpec.java
Show resolved Hide resolved .../main/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiSet.java

@cmccabe cmccabe force-pushed the cmccabe:KAFKA-7609 branch 2 times, most recently from 47cb19c to b585449 Jan 3, 2019

@cmccabe

This comment has been minimized.

Copy link
Contributor

cmccabe commented Jan 5, 2019

The test failure was kafka.api.CustomQuotaCallbackTest.testCustomQuotaCallback, which is unrelated.

@cmccabe

This comment has been minimized.

Copy link
Contributor

cmccabe commented Jan 5, 2019

retest this please

@cmccabe cmccabe force-pushed the cmccabe:KAFKA-7609 branch from 753b0cf to e1b2173 Jan 7, 2019

@hachikuji

This comment has been minimized.

Copy link
Contributor

hachikuji commented Jan 9, 2019

Mentioned offline, but we need to move the few test cases under clients/src/test/java/org/apache/kafka/common/message/ to clients/src/test/java/org/apache/kafka/message/.

@hachikuji
Copy link
Contributor

hachikuji left a comment

A few more small comments.

this.ignorable = ignorable;
this.about = about == null ? "" : about;
if (!this.struct.fields().isEmpty()) {
if (!this.type.isArray()) {

This comment has been minimized.

@hachikuji

hachikuji Jan 10, 2019

Contributor

Couldn't we make this check stricter? Would we ever expect the type to be something other than STRUCT?

This comment has been minimized.

@cmccabe

cmccabe Jan 11, 2019

Contributor

I refactored this code to clean it up a bit. Not all FieldSpecs describe arrays-- only some of them do. We should have a toStruct() method to make it convenient, but not embed a StructSpec unconditionally into the FieldSpec.

import java.util.Optional;

public interface FieldType {
static final String STRUCT_PREFIX = "[]";

This comment has been minimized.

@hachikuji

hachikuji Jan 10, 2019

Contributor

nit: static final is redundant since this is an interface

This comment has been minimized.

@cmccabe

cmccabe Jan 10, 2019

Contributor

Removed, thanks

this.imports.add(newImport);
}

public void generate() throws Exception {

This comment has been minimized.

@hachikuji

hachikuji Jan 10, 2019

Contributor

nit: Exception not raised. A bunch of these in MessageDataGenerator and MessageFactoryGenerator as well. Probably fall out from removing the throws on printf.

This comment has been minimized.

@cmccabe

cmccabe Jan 10, 2019

Contributor

Good point. I was able to remove quite a few of these.

return true;
}

private boolean generateNonNullCheck(Versions prevVersions, FieldSpec field) throws Exception {

This comment has been minimized.

@hachikuji

hachikuji Jan 10, 2019

Contributor

This is unused.

This comment has been minimized.

@cmccabe

cmccabe Jan 10, 2019

Contributor

Removed

/**
* Returns the API key of this message, or -1 if there is none.
*/
short apiKey();

This comment has been minimized.

@hachikuji

hachikuji Jan 10, 2019

Contributor

It's a little annoying that we have to expose this for all of the nested types. Have you considered having something like an ApiMessage which extends this?

This comment has been minimized.

@cmccabe

cmccabe Jan 10, 2019

Contributor

That is a fair point. I will separate out Message as a subtype of the ApiMessage interface.


public void registerMessageType(MessageSpec spec) {
if (spec.type() == MessageSpecType.REQUEST) {
if (requestApis.containsKey(spec.apiKey())) {

This comment has been minimized.

@hachikuji

hachikuji Jan 10, 2019

Contributor

Is it worth having any validation that apiKey is not -1? I was even considering whether MessageSpec.apiKey could return Optional<Short>.

This comment has been minimized.

@cmccabe

cmccabe Jan 10, 2019

Contributor

I can use Optional there.

}
this.fields = Collections.unmodifiableList(fields == null ?
Collections.emptyList() : new ArrayList<>(fields));
this.hasKeys = fields == null ? false : fields.stream().anyMatch(f -> f.mapKey());

This comment has been minimized.

@hachikuji

hachikuji Jan 10, 2019

Contributor

nit: fields != null && fields.stream().anyMatch(f -> f.mapKey())

Show resolved Hide resolved buildSrc/src/main/java/org/apache/kafka/message/Versions.java
@@ -14,6 +14,7 @@
// limitations under the License.

{
"apiKey": -1,

This comment has been minimized.

@hachikuji

hachikuji Jan 11, 2019

Contributor

Why was this needed?

This comment has been minimized.

@cmccabe

cmccabe Jan 11, 2019

Contributor

It shouldn't be needed-- I'll remove it.

In the long term, we'll want to generate other things besides ApiMessage from JSON files (enums, common structures, etc.) but we can hold off on that for now.

/**
* A Message which is part of the top-level Kafka API.
*/
public interface ApiMessage extends Message {

This comment has been minimized.

@hachikuji

hachikuji Jan 11, 2019

Contributor

Would it make any sense to move the version fields into ApiMessage?

This comment has been minimized.

@cmccabe

cmccabe Jan 11, 2019

Contributor

Hmm... I think it makes sense to leave them in Message. All Messages have minimum and maximum supported versions, not just ApiMessages.

@hachikuji

This comment has been minimized.

Copy link
Contributor

hachikuji commented Jan 11, 2019

retest this please

@hachikuji
Copy link
Contributor

hachikuji left a comment

Thanks @cmccabe, LGTM. This is a major improvement. I'll merge after the builds complete.

@cmccabe

This comment has been minimized.

Copy link
Contributor

cmccabe commented Jan 12, 2019

Failing test is kafka.api.UserQuotaTest.testThrottledProducerConsumer, which is definitely unrelated to this PR

@hachikuji hachikuji merged commit 71e85f5 into apache:trunk Jan 12, 2019

1 of 2 checks passed

JDK 11 and Scala 2.12 FAILURE 10288 tests run, 5 skipped, 1 failed.
Details
JDK 8 and Scala 2.11 SUCCESS 10288 tests run, 5 skipped, 0 failed.
Details
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment