From 95a0b7587b03c4efd4424fd8de83298d0b79e31b Mon Sep 17 00:00:00 2001 From: Andras Beni Date: Tue, 6 Mar 2018 22:47:15 -0800 Subject: [PATCH 1/3] Restart after protocol description was moved out of Protocol.java Change-Id: I0420551ccd9896f08499cd801bd34c829f8de495 --- .../consumer/internals/ConsumerProtocol.java | 14 +++++++------- .../common/requests/AlterConfigsResponse.java | 3 ++- .../requests/AlterReplicaLogDirsResponse.java | 3 ++- .../kafka/common/requests/JoinGroupRequest.java | 4 ++-- .../kafka/common/requests/JoinGroupResponse.java | 10 +++++++--- .../kafka/common/requests/ListGroupsResponse.java | 6 +++--- .../kafka/common/requests/MetadataResponse.java | 2 +- .../kafka/common/requests/SyncGroupRequest.java | 6 ++++-- .../common/requests/WriteTxnMarkersResponse.java | 3 ++- 9 files changed, 30 insertions(+), 21 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java index 920c2957c4dc..c429c1a7a4a8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java @@ -64,19 +64,19 @@ public class ConsumerProtocol { public static final short CONSUMER_PROTOCOL_V0 = 0; public static final Schema CONSUMER_PROTOCOL_HEADER_SCHEMA = new Schema( - new Field(VERSION_KEY_NAME, Type.INT16)); + new Field(VERSION_KEY_NAME, Type.INT16, "Version number of the consumer protocol.")); private static final Struct CONSUMER_PROTOCOL_HEADER_V0 = new Struct(CONSUMER_PROTOCOL_HEADER_SCHEMA) .set(VERSION_KEY_NAME, CONSUMER_PROTOCOL_V0); public static final Schema SUBSCRIPTION_V0 = new Schema( - new Field(TOPICS_KEY_NAME, new ArrayOf(Type.STRING)), - new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES)); + new Field(TOPICS_KEY_NAME, new ArrayOf(Type.STRING), "The topics the consumer subscribes to."), + new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES, "Custom user data sent to the partition assignor.")); public static final Schema TOPIC_ASSIGNMENT_V0 = new Schema( - new Field(TOPIC_KEY_NAME, Type.STRING), - new Field(PARTITIONS_KEY_NAME, new ArrayOf(Type.INT32))); + new Field(TOPIC_KEY_NAME, Type.STRING, "The topic the consumer subscribed to."), + new Field(PARTITIONS_KEY_NAME, new ArrayOf(Type.INT32), "The partitions of the topic which are assigned to the consumer.")); public static final Schema ASSIGNMENT_V0 = new Schema( - new Field(TOPIC_PARTITIONS_KEY_NAME, new ArrayOf(TOPIC_ASSIGNMENT_V0)), - new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES)); + new Field(TOPIC_PARTITIONS_KEY_NAME, new ArrayOf(TOPIC_ASSIGNMENT_V0), "Topic partitions assigned to the consumer."), + new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES, "Custom user data sent by the partition assignor.")); public static ByteBuffer serializeSubscription(PartitionAssignor.Subscription subscription) { Struct struct = new Struct(SUBSCRIPTION_V0); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java index f292ef6731b0..0eeca4172df5 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java @@ -50,7 +50,8 @@ public class AlterConfigsResponse extends AbstractResponse { private static final Schema ALTER_CONFIGS_RESPONSE_V0 = new Schema( THROTTLE_TIME_MS, - new Field(RESOURCES_KEY_NAME, new ArrayOf(ALTER_CONFIGS_RESPONSE_ENTITY_V0))); + new Field(RESOURCES_KEY_NAME, new ArrayOf(ALTER_CONFIGS_RESPONSE_ENTITY_V0), + "The result of the change for each resource.")); public static Schema[] schemaVersions() { return new Schema[]{ALTER_CONFIGS_RESPONSE_V0}; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsResponse.java index f8d15466d62e..69e573902015 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsResponse.java @@ -52,7 +52,8 @@ public class AlterReplicaLogDirsResponse extends AbstractResponse { TOPIC_NAME, new Field(PARTITIONS_KEY_NAME, new ArrayOf(new Schema( PARTITION_ID, - ERROR_CODE))))))); + ERROR_CODE)), "Error codes for each partition."))), + "The result of the operation for each topic.")); public static Schema[] schemaVersions() { return new Schema[]{ALTER_REPLICA_LOG_DIRS_RESPONSE_V0}; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java index a7b62a98660f..03d58fae487a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java @@ -45,8 +45,8 @@ public class JoinGroupRequest extends AbstractRequest { /* Join group api */ private static final Schema JOIN_GROUP_REQUEST_PROTOCOL_V0 = new Schema( - new Field(PROTOCOL_NAME_KEY_NAME, STRING), - new Field(PROTOCOL_METADATA_KEY_NAME, BYTES)); + new Field(PROTOCOL_NAME_KEY_NAME, STRING, "Protocol type name."), + new Field(PROTOCOL_METADATA_KEY_NAME, BYTES, "Protocol type specific member metadata.")); private static final Schema JOIN_GROUP_REQUEST_V0 = new Schema( GROUP_ID, diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java index 4bcd6e6d65fd..2f37af575f67 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java @@ -47,7 +47,7 @@ public class JoinGroupResponse extends AbstractResponse { private static final Schema JOIN_GROUP_RESPONSE_MEMBER_V0 = new Schema( MEMBER_ID, - new Field(MEMBER_METADATA_KEY_NAME, BYTES)); + new Field(MEMBER_METADATA_KEY_NAME, BYTES, "The metadata supplied in this member's join group request.")); private static final Schema JOIN_GROUP_RESPONSE_V0 = new Schema( ERROR_CODE, @@ -55,7 +55,9 @@ public class JoinGroupResponse extends AbstractResponse { new Field(GROUP_PROTOCOL_KEY_NAME, STRING, "The group protocol selected by the coordinator"), new Field(LEADER_ID_KEY_NAME, STRING, "The leader of the group"), MEMBER_ID, - new Field(MEMBERS_KEY_NAME, new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0))); + new Field(MEMBERS_KEY_NAME, new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0), + "The leader will receive the full list of members along with the associated metadata for the protocol chosen. " + + "Other members, followers, will receive an empty array of members.")); private static final Schema JOIN_GROUP_RESPONSE_V1 = JOIN_GROUP_RESPONSE_V0; @@ -66,7 +68,9 @@ public class JoinGroupResponse extends AbstractResponse { new Field(GROUP_PROTOCOL_KEY_NAME, STRING, "The group protocol selected by the coordinator"), new Field(LEADER_ID_KEY_NAME, STRING, "The leader of the group"), MEMBER_ID, - new Field(MEMBERS_KEY_NAME, new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0))); + new Field(MEMBERS_KEY_NAME, new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0), + "The leader will receive the full list of members along with the associated metadata for the protocol chosen. " + + "Other members, followers, will receive an empty array of members.")); public static Schema[] schemaVersions() { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java index 9c82ae06bd63..a597dadfb713 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java @@ -40,14 +40,14 @@ public class ListGroupsResponse extends AbstractResponse { private static final Schema LIST_GROUPS_RESPONSE_GROUP_V0 = new Schema( GROUP_ID, - new Field(PROTOCOL_TYPE_KEY_NAME, STRING)); + new Field(PROTOCOL_TYPE_KEY_NAME, STRING, "The current group protocol.")); private static final Schema LIST_GROUPS_RESPONSE_V0 = new Schema( ERROR_CODE, - new Field(GROUPS_KEY_NAME, new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0))); + new Field(GROUPS_KEY_NAME, new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0), "Information about each group managed by this broker.")); private static final Schema LIST_GROUPS_RESPONSE_V1 = new Schema( THROTTLE_TIME_MS, ERROR_CODE, - new Field(GROUPS_KEY_NAME, new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0))); + new Field(GROUPS_KEY_NAME, new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0), "Information about each group managed by this broker.")); public static Schema[] schemaVersions() { return new Schema[] {LIST_GROUPS_RESPONSE_V0, LIST_GROUPS_RESPONSE_V1}; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java index de7c8f6cbc2f..2be804388636 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java @@ -104,7 +104,7 @@ public class MetadataResponse extends AbstractResponse { private static final Schema METADATA_RESPONSE_V0 = new Schema( new Field(BROKERS_KEY_NAME, new ArrayOf(METADATA_BROKER_V0), "Host and port information for all brokers."), - new Field(TOPIC_METADATA_KEY_NAME, new ArrayOf(TOPIC_METADATA_V0))); + new Field(TOPIC_METADATA_KEY_NAME, new ArrayOf(TOPIC_METADATA_V0), "Metadata for each topic.")); private static final Schema METADATA_BROKER_V1 = new Schema( new Field(NODE_ID_KEY_NAME, INT32, "The broker id."), diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java index 14ed2625d5a6..f60b5271753e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java @@ -41,12 +41,14 @@ public class SyncGroupRequest extends AbstractRequest { private static final Schema SYNC_GROUP_REQUEST_MEMBER_V0 = new Schema( MEMBER_ID, - new Field(MEMBER_ASSIGNMENT_KEY_NAME, BYTES)); + new Field(MEMBER_ASSIGNMENT_KEY_NAME, BYTES, "Protocol specific state (e.g. partition assignments)")); private static final Schema SYNC_GROUP_REQUEST_V0 = new Schema( GROUP_ID, GENERATION_ID, MEMBER_ID, - new Field(GROUP_ASSIGNMENT_KEY_NAME, new ArrayOf(SYNC_GROUP_REQUEST_MEMBER_V0))); + new Field(GROUP_ASSIGNMENT_KEY_NAME, new ArrayOf(SYNC_GROUP_REQUEST_MEMBER_V0), + "All members send SyncGroup immediately after joining the group, " + + "but only the leader provides the group's assignment.")); /* v1 request is the same as v0. Throttle time has been added to response */ private static final Schema SYNC_GROUP_REQUEST_V1 = SYNC_GROUP_REQUEST_V0; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java index f4bf157adaf5..ed0297c3e41d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java @@ -49,7 +49,8 @@ public class WriteTxnMarkersResponse extends AbstractResponse { new Field(PRODUCER_ID_KEY_NAME, INT64, "Current producer id in use by the transactional id."), new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema( TOPIC_NAME, - new Field(PARTITIONS_KEY_NAME, new ArrayOf(WRITE_TXN_MARKERS_PARTITION_ERROR_RESPONSE_V0)))), + new Field(PARTITIONS_KEY_NAME, new ArrayOf(WRITE_TXN_MARKERS_PARTITION_ERROR_RESPONSE_V0), + "Error codes for each partition."))), "Errors per partition from writing markers.")); private static final Schema WRITE_TXN_MARKERS_RESPONSE_V0 = new Schema( From a604d82364fc3cc6289c5c892ae7e8562d71c6bf Mon Sep 17 00:00:00 2001 From: Andras Beni Date: Sun, 6 May 2018 18:29:06 +0200 Subject: [PATCH 2/3] Some more field docs Change-Id: I4ef687e897e16e50134608e1199a042681022eba --- .../common/requests/AlterConfigsRequest.java | 8 +-- .../common/requests/AlterConfigsResponse.java | 4 +- .../requests/DescribeConfigsRequest.java | 6 +-- .../requests/DescribeConfigsResponse.java | 14 ++--- .../kafka/common/requests/FetchRequest.java | 52 +++++++++++++++---- .../kafka/common/requests/FetchResponse.java | 8 +-- .../common/requests/JoinGroupRequest.java | 6 ++- .../common/requests/ListOffsetRequest.java | 11 ++-- .../common/requests/MetadataResponse.java | 8 +-- .../common/requests/OffsetCommitRequest.java | 8 ++- .../common/requests/OffsetFetchResponse.java | 15 +++--- .../kafka/common/requests/ProduceRequest.java | 6 ++- .../common/requests/ProduceResponse.java | 6 ++- .../common/requests/SyncGroupResponse.java | 4 +- 14 files changed, 103 insertions(+), 53 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java index 14b39ae9dace..8fd92632b113 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java @@ -51,14 +51,14 @@ public class AlterConfigsRequest extends AbstractRequest { new Field(CONFIG_VALUE, NULLABLE_STRING, "Configuration value")); private static final Schema ALTER_CONFIGS_REQUEST_RESOURCE_V0 = new Schema( - new Field(RESOURCE_TYPE_KEY_NAME, INT8), - new Field(RESOURCE_NAME_KEY_NAME, STRING), - new Field(CONFIG_ENTRIES_KEY_NAME, new ArrayOf(CONFIG_ENTRY))); + new Field(RESOURCE_TYPE_KEY_NAME, INT8, "Id of the resource type to alter configuration of. Value 2 means topic, 4 means broker."), + new Field(RESOURCE_NAME_KEY_NAME, STRING, "Name of the resource to alter configuration of."), + new Field(CONFIG_ENTRIES_KEY_NAME, new ArrayOf(CONFIG_ENTRY), "Configuration entries to alter.")); private static final Schema ALTER_CONFIGS_REQUEST_V0 = new Schema( new Field(RESOURCES_KEY_NAME, new ArrayOf(ALTER_CONFIGS_REQUEST_RESOURCE_V0), "An array of resources to update with the provided configs."), - new Field(VALIDATE_ONLY_KEY_NAME, BOOLEAN)); + new Field(VALIDATE_ONLY_KEY_NAME, BOOLEAN, "If true, only validation takes place and the changes are not applied.")); public static Schema[] schemaVersions() { return new Schema[] {ALTER_CONFIGS_REQUEST_V0}; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java index 0eeca4172df5..d9b6183148fc 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java @@ -45,8 +45,8 @@ public class AlterConfigsResponse extends AbstractResponse { private static final Schema ALTER_CONFIGS_RESPONSE_ENTITY_V0 = new Schema( ERROR_CODE, ERROR_MESSAGE, - new Field(RESOURCE_TYPE_KEY_NAME, INT8), - new Field(RESOURCE_NAME_KEY_NAME, STRING)); + new Field(RESOURCE_TYPE_KEY_NAME, INT8, "Type of the resource this response entity is for."), + new Field(RESOURCE_NAME_KEY_NAME, STRING, "Name of the resource this response entity is for.")); private static final Schema ALTER_CONFIGS_RESPONSE_V0 = new Schema( THROTTLE_TIME_MS, diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java index 41563381151d..a9274d09ef47 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java @@ -43,9 +43,9 @@ public class DescribeConfigsRequest extends AbstractRequest { private static final String CONFIG_NAMES_KEY_NAME = "config_names"; private static final Schema DESCRIBE_CONFIGS_REQUEST_RESOURCE_V0 = new Schema( - new Field(RESOURCE_TYPE_KEY_NAME, INT8), - new Field(RESOURCE_NAME_KEY_NAME, STRING), - new Field(CONFIG_NAMES_KEY_NAME, ArrayOf.nullable(STRING))); + new Field(RESOURCE_TYPE_KEY_NAME, INT8, "Id of resource type to fetch configuration of. Value 2 means topic, 4 means broker."), + new Field(RESOURCE_NAME_KEY_NAME, STRING, "Name of the resource to fetch configuration of."), + new Field(CONFIG_NAMES_KEY_NAME, ArrayOf.nullable(STRING), "Configuration names requested. Null for all configurations.")); private static final Schema DESCRIBE_CONFIGS_REQUEST_V0 = new Schema( new Field(RESOURCES_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_REQUEST_RESOURCE_V0), "An array of config resources to be returned.")); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java index e463618e71bc..4c80a6ed51d3 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java @@ -59,11 +59,11 @@ public class DescribeConfigsResponse extends AbstractResponse { private static final String CONFIG_SOURCE_KEY_NAME = "config_source"; private static final Schema DESCRIBE_CONFIGS_RESPONSE_ENTRY_V0 = new Schema( - new Field(CONFIG_NAME_KEY_NAME, STRING), - new Field(CONFIG_VALUE_KEY_NAME, NULLABLE_STRING), - new Field(READ_ONLY_KEY_NAME, BOOLEAN), - new Field(IS_DEFAULT_KEY_NAME, BOOLEAN), - new Field(IS_SENSITIVE_KEY_NAME, BOOLEAN)); + new Field(CONFIG_NAME_KEY_NAME, STRING, "Name of config requested."), + new Field(CONFIG_VALUE_KEY_NAME, NULLABLE_STRING, "Value of config requested."), + new Field(READ_ONLY_KEY_NAME, BOOLEAN, "True if configuration is read-only, false otherwise."), + new Field(IS_DEFAULT_KEY_NAME, BOOLEAN, "True if configuration is not overridden, false otherwise."), + new Field(IS_SENSITIVE_KEY_NAME, BOOLEAN, "True if configuration is a password, false otherwise.")); private static final Schema DESCRIBE_CONFIGS_RESPONSE_SYNONYM_V1 = new Schema( new Field(CONFIG_NAME_KEY_NAME, STRING), @@ -81,8 +81,8 @@ public class DescribeConfigsResponse extends AbstractResponse { private static final Schema DESCRIBE_CONFIGS_RESPONSE_ENTITY_V0 = new Schema( ERROR_CODE, ERROR_MESSAGE, - new Field(RESOURCE_TYPE_KEY_NAME, INT8), - new Field(RESOURCE_NAME_KEY_NAME, STRING), + new Field(RESOURCE_TYPE_KEY_NAME, INT8, "Type of the resource this response entity is for."), + new Field(RESOURCE_NAME_KEY_NAME, STRING, "Name of the resource this response entity is for."), new Field(CONFIG_ENTRIES_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_RESPONSE_ENTRY_V0))); private static final Schema DESCRIBE_CONFIGS_RESPONSE_ENTITY_V1 = new Schema( diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java index 65cf7fea6b1d..d07aafbabc8d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java @@ -66,7 +66,7 @@ public class FetchRequest extends AbstractRequest { private static final Schema FETCH_REQUEST_PARTITION_V0 = new Schema( PARTITION_ID, - new Field(FETCH_OFFSET_KEY_NAME, INT64, "Message offset."), + new Field(FETCH_OFFSET_KEY_NAME, INT64, "Message offset to begin this fetch from."), new Field(MAX_BYTES_KEY_NAME, INT32, "Maximum bytes to fetch.")); // FETCH_REQUEST_PARTITION_V5 added log_start_offset field - the earliest available offset of partition data that can be consumed. @@ -87,8 +87,14 @@ public class FetchRequest extends AbstractRequest { private static final Schema FETCH_REQUEST_V0 = new Schema( new Field(REPLICA_ID_KEY_NAME, INT32, "Broker id of the follower. For normal consumers, use -1."), - new Field(MAX_WAIT_KEY_NAME, INT32, "Maximum time in ms to wait for the response."), - new Field(MIN_BYTES_KEY_NAME, INT32, "Minimum bytes to accumulate in the response."), + new Field(MAX_WAIT_KEY_NAME, INT32, "Maximum time in ms to wait for the response " + + "if insufficient data is available at the time the request is issued."), + new Field(MIN_BYTES_KEY_NAME, INT32, "Minimum bytes to accumulate in the response. " + + "If client sets this to 0 the server will always respond immediately. " + + "However, if there is no new data since the last request broker will respond with empty record sets." + + " If set to 1, the server will respond as soon at least one partition has at least" + + " one byte of data or the specified timeout occurs. By setting higher values in combination" + + " with the timeout consumer can tune for throughput trade a little additional latency for reading only large chunks of data. "), new Field(TOPICS_KEY_NAME, new ArrayOf(FETCH_REQUEST_TOPIC_V0), "Topics to fetch.")); // The V1 Fetch Request body is the same as V0. @@ -102,8 +108,14 @@ public class FetchRequest extends AbstractRequest { // The partition ordering is now relevant - partitions will be processed in order they appear in request. private static final Schema FETCH_REQUEST_V3 = new Schema( new Field(REPLICA_ID_KEY_NAME, INT32, "Broker id of the follower. For normal consumers, use -1."), - new Field(MAX_WAIT_KEY_NAME, INT32, "Maximum time in ms to wait for the response."), - new Field(MIN_BYTES_KEY_NAME, INT32, "Minimum bytes to accumulate in the response."), + new Field(MAX_WAIT_KEY_NAME, INT32, "Maximum time in ms to wait for the response " + + "if insufficient data is available at the time the request is issued."), + new Field(MIN_BYTES_KEY_NAME, INT32, "Minimum bytes to accumulate in the response." + + " If client sets this to 0 the server will always respond immediately." + + " However, if there is no new data since the last request broker will respond with empty record sets." + + " If set to 1, the server will respond as soon at least one partition has at least" + + " one byte of data or the specified timeout occurs. By setting higher values in combination" + + " with the timeout consumer can tune for throughput trade a little additional latency for reading only large chunks of data."), new Field(MAX_BYTES_KEY_NAME, INT32, "Maximum bytes to accumulate in the response. Note that this is not an absolute maximum, " + "if the first message in the first non-empty partition of the fetch is larger than this " + "value, the message will still be returned to ensure that progress can be made."), @@ -112,8 +124,14 @@ public class FetchRequest extends AbstractRequest { // The V4 Fetch Request adds the fetch isolation level and exposes magic v2 (via the response). private static final Schema FETCH_REQUEST_V4 = new Schema( new Field(REPLICA_ID_KEY_NAME, INT32, "Broker id of the follower. For normal consumers, use -1."), - new Field(MAX_WAIT_KEY_NAME, INT32, "Maximum time in ms to wait for the response."), - new Field(MIN_BYTES_KEY_NAME, INT32, "Minimum bytes to accumulate in the response."), + new Field(MAX_WAIT_KEY_NAME, INT32, "Maximum time in ms to wait for the response " + + "if insufficient data is available at the time the request is issued."), + new Field(MIN_BYTES_KEY_NAME, INT32, "Minimum bytes to accumulate in the response." + + " If client sets this to 0 the server will always respond immediately." + + " However, if there is no new data since the last request broker will respond with empty record sets." + + " If set to 1, the server will respond as soon at least one partition has at least" + + " one byte of data or the specified timeout occurs. By setting higher values in combination" + + " with the timeout consumer can tune for throughput trade a little additional latency for reading only large chunks of data."), new Field(MAX_BYTES_KEY_NAME, INT32, "Maximum bytes to accumulate in the response. Note that this is not an absolute maximum, " + "if the first message in the first non-empty partition of the fetch is larger than this " + "value, the message will still be returned to ensure that progress can be made."), @@ -128,8 +146,14 @@ public class FetchRequest extends AbstractRequest { // FETCH_REQUEST_V5 added a per-partition log_start_offset field - the earliest available offset of partition data that can be consumed. private static final Schema FETCH_REQUEST_V5 = new Schema( new Field(REPLICA_ID_KEY_NAME, INT32, "Broker id of the follower. For normal consumers, use -1."), - new Field(MAX_WAIT_KEY_NAME, INT32, "Maximum time in ms to wait for the response."), - new Field(MIN_BYTES_KEY_NAME, INT32, "Minimum bytes to accumulate in the response."), + new Field(MAX_WAIT_KEY_NAME, INT32, "Maximum time in ms to wait for the response " + + "if insufficient data is available at the time the request is issued."), + new Field(MIN_BYTES_KEY_NAME, INT32, "Minimum bytes to accumulate in the response." + + " If client sets this to 0 the server will always respond immediately." + + " However, if there is no new data since the last request broker will respond with empty record sets." + + " If set to 1, the server will respond as soon at least one partition has at least" + + " one byte of data or the specified timeout occurs. By setting higher values in combination" + + " with the timeout consumer can tune for throughput trade a little additional latency for reading only large chunks of data."), new Field(MAX_BYTES_KEY_NAME, INT32, "Maximum bytes to accumulate in the response. Note that this is not an absolute maximum, " + "if the first message in the first non-empty partition of the fetch is larger than this " + "value, the message will still be returned to ensure that progress can be made."), @@ -159,8 +183,14 @@ public class FetchRequest extends AbstractRequest { private static final Schema FETCH_REQUEST_V7 = new Schema( new Field(REPLICA_ID_KEY_NAME, INT32, "Broker id of the follower. For normal consumers, use -1."), - new Field(MAX_WAIT_KEY_NAME, INT32, "Maximum time in ms to wait for the response."), - new Field(MIN_BYTES_KEY_NAME, INT32, "Minimum bytes to accumulate in the response."), + new Field(MAX_WAIT_KEY_NAME, INT32, "Maximum time in ms to wait for the response " + + "if insufficient data is available at the time the request is issued."), + new Field(MIN_BYTES_KEY_NAME, INT32, "Minimum bytes to accumulate in the response." + + " If client sets this to 0 the server will always respond immediately." + + " However, if there is no new data since the last request broker will respond with empty record sets." + + " If set to 1, the server will respond as soon at least one partition has at least" + + " one byte of data or the specified timeout occurs. By setting higher values in combination" + + " with the timeout consumer can tune for throughput trade a little additional latency for reading only large chunks of data."), new Field(MAX_BYTES_KEY_NAME, INT32, "Maximum bytes to accumulate in the response. Note that this is not an absolute maximum, " + "if the first message in the first non-empty partition of the fetch is larger than this " + "value, the message will still be returned to ensure that progress can be made."), diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java index ed0f5a347dd5..2cdfdffadb71 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java @@ -75,7 +75,9 @@ public class FetchResponse extends AbstractResponse { new Field(HIGH_WATERMARK_KEY_NAME, INT64, "Last committed offset.")); private static final Schema FETCH_RESPONSE_PARTITION_V0 = new Schema( new Field(PARTITION_HEADER_KEY_NAME, FETCH_RESPONSE_PARTITION_HEADER_V0), - new Field(RECORD_SET_KEY_NAME, RECORDS)); + new Field(RECORD_SET_KEY_NAME, RECORDS, "Data fetch from the partition. " + + "In version 1 record set only includes messages of v0 (magic byte 0). " + + "In version 2 and 3, record set can include messages of v0 and v1 (magic byte 0 and 1)")); private static final Schema FETCH_RESPONSE_TOPIC_V0 = new Schema( TOPIC_NAME, @@ -123,11 +125,11 @@ public class FetchResponse extends AbstractResponse { private static final Schema FETCH_RESPONSE_PARTITION_V4 = new Schema( new Field(PARTITION_HEADER_KEY_NAME, FETCH_RESPONSE_PARTITION_HEADER_V4), - new Field(RECORD_SET_KEY_NAME, RECORDS)); + new Field(RECORD_SET_KEY_NAME, RECORDS, "Data fetched from this partition.")); private static final Schema FETCH_RESPONSE_PARTITION_V5 = new Schema( new Field(PARTITION_HEADER_KEY_NAME, FETCH_RESPONSE_PARTITION_HEADER_V5), - new Field(RECORD_SET_KEY_NAME, RECORDS)); + new Field(RECORD_SET_KEY_NAME, RECORDS, "Data fetched from this partition.")); private static final Schema FETCH_RESPONSE_TOPIC_V4 = new Schema( TOPIC_NAME, diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java index 03d58fae487a..992f80df561a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java @@ -55,7 +55,8 @@ public class JoinGroupRequest extends AbstractRequest { MEMBER_ID, new Field(PROTOCOL_TYPE_KEY_NAME, STRING, "Unique name for class of protocols implemented by group"), new Field(GROUP_PROTOCOLS_KEY_NAME, new ArrayOf(JOIN_GROUP_REQUEST_PROTOCOL_V0), "List of protocols " + - "that the member supports")); + "that the member supports. Coordinator chooses a single protocol supported by all members. " + + "This enables for e.g. rolling upgrades without downtime.")); private static final Schema JOIN_GROUP_REQUEST_V1 = new Schema( GROUP_ID, @@ -66,7 +67,8 @@ public class JoinGroupRequest extends AbstractRequest { MEMBER_ID, new Field(PROTOCOL_TYPE_KEY_NAME, STRING, "Unique name for class of protocols implemented by group"), new Field(GROUP_PROTOCOLS_KEY_NAME, new ArrayOf(JOIN_GROUP_REQUEST_PROTOCOL_V0), "List of protocols " + - "that the member supports")); + "that the member supports. Coordinator chooses a single protocol supported by all members. " + + "This enables for e.g. rolling upgrades without downtime.")); /* v2 request is the same as v1. Throttle time has been added to response */ private static final Schema JOIN_GROUP_REQUEST_V2 = JOIN_GROUP_REQUEST_V1; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java index 98f53bdfc44f..258321c1fea1 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java @@ -60,11 +60,16 @@ public class ListOffsetRequest extends AbstractRequest { private static final Schema LIST_OFFSET_REQUEST_PARTITION_V0 = new Schema( PARTITION_ID, - new Field(TIMESTAMP_KEY_NAME, INT64, "Timestamp."), - new Field(MAX_NUM_OFFSETS_KEY_NAME, INT32, "Maximum offsets to return.")); + new Field(TIMESTAMP_KEY_NAME, INT64, "Target timestamp for partition. Used to ask for all messages before a certain time." + + " There are two special values: specify -1 to receive the latest offset (i.e offset of the next message)" + + " and -2 to receive the earliest available offset."), + new Field(MAX_NUM_OFFSETS_KEY_NAME, INT32, "Maximum number of offsets to return. " + + "Note that because offsets are puled in descending order, asking for the earliest offset will always return a single element.")); private static final Schema LIST_OFFSET_REQUEST_PARTITION_V1 = new Schema( PARTITION_ID, - new Field(TIMESTAMP_KEY_NAME, INT64, "The target timestamp for the partition.")); + new Field(TIMESTAMP_KEY_NAME, INT64, "The target timestamp for the partition. Used to ask for all messages before a certain time." + + " There are two special values: specify -1 to receive the latest offset (i.e offset of the next message)" + + " and -2 to receive the earliest available offset.")); private static final Schema LIST_OFFSET_REQUEST_TOPIC_V0 = new Schema( TOPIC_NAME, diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java index 2be804388636..8fe3aa6d0055 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java @@ -139,20 +139,20 @@ public class MetadataResponse extends AbstractResponse { private static final Schema METADATA_RESPONSE_V1 = new Schema( new Field(BROKERS_KEY_NAME, new ArrayOf(METADATA_BROKER_V1), "Host and port information for all brokers."), new Field(CONTROLLER_ID_KEY_NAME, INT32, "The broker id of the controller broker."), - new Field(TOPIC_METADATA_KEY_NAME, new ArrayOf(TOPIC_METADATA_V1))); + new Field(TOPIC_METADATA_KEY_NAME, new ArrayOf(TOPIC_METADATA_V1), "Metadata for each topic requested.")); private static final Schema METADATA_RESPONSE_V2 = new Schema( new Field(BROKERS_KEY_NAME, new ArrayOf(METADATA_BROKER_V1), "Host and port information for all brokers."), new Field(CLUSTER_ID_KEY_NAME, NULLABLE_STRING, "The cluster id that this broker belongs to."), new Field(CONTROLLER_ID_KEY_NAME, INT32, "The broker id of the controller broker."), - new Field(TOPIC_METADATA_KEY_NAME, new ArrayOf(TOPIC_METADATA_V1))); + new Field(TOPIC_METADATA_KEY_NAME, new ArrayOf(TOPIC_METADATA_V1), "Metadata for each topic requested.")); private static final Schema METADATA_RESPONSE_V3 = new Schema( THROTTLE_TIME_MS, new Field(BROKERS_KEY_NAME, new ArrayOf(METADATA_BROKER_V1), "Host and port information for all brokers."), new Field(CLUSTER_ID_KEY_NAME, NULLABLE_STRING, "The cluster id that this broker belongs to."), new Field(CONTROLLER_ID_KEY_NAME, INT32, "The broker id of the controller broker."), - new Field(TOPIC_METADATA_KEY_NAME, new ArrayOf(TOPIC_METADATA_V1))); + new Field(TOPIC_METADATA_KEY_NAME, new ArrayOf(TOPIC_METADATA_V1), "Metadata for each topic requested.")); private static final Schema METADATA_RESPONSE_V4 = METADATA_RESPONSE_V3; @@ -162,7 +162,7 @@ public class MetadataResponse extends AbstractResponse { new Field(BROKERS_KEY_NAME, new ArrayOf(METADATA_BROKER_V1), "Host and port information for all brokers."), new Field(CLUSTER_ID_KEY_NAME, NULLABLE_STRING, "The cluster id that this broker belongs to."), new Field(CONTROLLER_ID_KEY_NAME, INT32, "The broker id of the controller broker."), - new Field(TOPIC_METADATA_KEY_NAME, new ArrayOf(TOPIC_METADATA_V2))); + new Field(TOPIC_METADATA_KEY_NAME, new ArrayOf(TOPIC_METADATA_V2), "Metadata for each topic requested.")); public static Schema[] schemaVersions() { return new Schema[] {METADATA_RESPONSE_V0, METADATA_RESPONSE_V1, METADATA_RESPONSE_V2, METADATA_RESPONSE_V3, diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java index 4686c3bc2580..d5d032aef226 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java @@ -66,7 +66,9 @@ public class OffsetCommitRequest extends AbstractRequest { private static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V1 = new Schema( PARTITION_ID, new Field(COMMIT_OFFSET_KEY_NAME, INT64, "Message offset to be committed."), - new Field(TIMESTAMP_KEY_NAME, INT64, "Timestamp of the commit"), + new Field(TIMESTAMP_KEY_NAME, INT64, "Timestamp of the commit. If the time stamp field is not set (-1)," + + " brokers will set commit time to receive time before committing offset. Users can explicitly set commit timestamp " + + "if they want to retain committed offset longer on the broker than configured offset retention time."), new Field(METADATA_KEY_NAME, NULLABLE_STRING, "Any associated metadata the client wants to keep.")); private static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V2 = new Schema( @@ -100,7 +102,9 @@ public class OffsetCommitRequest extends AbstractRequest { GROUP_ID, GENERATION_ID, MEMBER_ID, - new Field(RETENTION_TIME_KEY_NAME, INT64, "Time period in ms to retain the offset."), + new Field(RETENTION_TIME_KEY_NAME, INT64, "Time period in ms to retain the offset." + + " Brokers will always retain offsets until its commit timestamp + user specified retetntion time in the commit request. " + + "I retention time is not set (-1), broker offset retention time will be used as default."), new Field(TOPICS_KEY_NAME, new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V2), "Topics to commit offsets.")); /* v3 request is same as v2. Throttle time has been added to response */ diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java index e398442e847d..914d6beae10f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java @@ -53,27 +53,30 @@ public class OffsetFetchResponse extends AbstractResponse { private static final Schema OFFSET_FETCH_RESPONSE_PARTITION_V0 = new Schema( PARTITION_ID, - new Field(COMMIT_OFFSET_KEY_NAME, INT64, "Last committed message offset."), - new Field(METADATA_KEY_NAME, NULLABLE_STRING, "Any associated metadata the client wants to keep."), + new Field(COMMIT_OFFSET_KEY_NAME, INT64, "Last committed message offset. " + + "Note that if there is no offset associated with a topicpartition for given consumer group. " + + "The broker does not set an error code (since it is not really an error), " + + "but sets the offset field to -1 and returns empty metadata. "), + new Field(METADATA_KEY_NAME, NULLABLE_STRING, "Associated metadata the client set when committing offset."), ERROR_CODE); private static final Schema OFFSET_FETCH_RESPONSE_TOPIC_V0 = new Schema( TOPIC_NAME, - new Field(PARTITIONS_KEY_NAME, new ArrayOf(OFFSET_FETCH_RESPONSE_PARTITION_V0))); + new Field(PARTITIONS_KEY_NAME, new ArrayOf(OFFSET_FETCH_RESPONSE_PARTITION_V0), "List of per-partition offset metadata.")); private static final Schema OFFSET_FETCH_RESPONSE_V0 = new Schema( - new Field(RESPONSES_KEY_NAME, new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0))); + new Field(RESPONSES_KEY_NAME, new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0), "List of per-topic offset metadata.")); private static final Schema OFFSET_FETCH_RESPONSE_V1 = OFFSET_FETCH_RESPONSE_V0; private static final Schema OFFSET_FETCH_RESPONSE_V2 = new Schema( - new Field(RESPONSES_KEY_NAME, new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0)), + new Field(RESPONSES_KEY_NAME, new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0), "List of per-topic offset metadata."), ERROR_CODE); /* v3 request is the same as v2. Throttle time has been added to v3 response */ private static final Schema OFFSET_FETCH_RESPONSE_V3 = new Schema( THROTTLE_TIME_MS, - new Field(RESPONSES_KEY_NAME, new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0)), + new Field(RESPONSES_KEY_NAME, new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0), "List of per-topic offset metadata."), ERROR_CODE); public static Schema[] schemaVersions() { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java index 91e3aebcf3f8..9b3ac7a62c43 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java @@ -63,14 +63,16 @@ public class ProduceRequest extends AbstractRequest { TOPIC_NAME, new Field(PARTITION_DATA_KEY_NAME, new ArrayOf(new Schema( PARTITION_ID, - new Field(RECORD_SET_KEY_NAME, RECORDS))))); + new Field(RECORD_SET_KEY_NAME, RECORDS, + "A set of records in the format described in The messages section"))), + "Data being published to a particular partition of the topic.")); private static final Schema PRODUCE_REQUEST_V0 = new Schema( new Field(ACKS_KEY_NAME, INT16, "The number of acknowledgments the producer requires the leader to have " + "received before considering a request complete. Allowed values: 0 for no acknowledgments, 1 for " + "only the leader and -1 for the full ISR."), new Field(TIMEOUT_KEY_NAME, INT32, "The time to await a response in ms."), - new Field(TOPIC_DATA_KEY_NAME, new ArrayOf(TOPIC_PRODUCE_DATA_V0))); + new Field(TOPIC_DATA_KEY_NAME, new ArrayOf(TOPIC_PRODUCE_DATA_V0), "Data being produced to a particular topic.")); /** * The body of PRODUCE_REQUEST_V1 is the same as PRODUCE_REQUEST_V0. diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index afedc9d6e88d..f4e9cba2c88d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -104,7 +104,8 @@ public class ProduceResponse extends AbstractResponse { new Field(PARTITION_RESPONSES_KEY_NAME, new ArrayOf(new Schema( PARTITION_ID, ERROR_CODE, - new Field(BASE_OFFSET_KEY_NAME, INT64), + new Field(BASE_OFFSET_KEY_NAME, INT64, + "Offset assigned to the first record in the record set appended to the partition."), new Field(LOG_APPEND_TIME_KEY_NAME, INT64, "The timestamp returned by broker after appending " + "the messages. If CreateTime is used for the topic, the timestamp will be -1. " + "If LogAppendTime is used for the topic, the timestamp will be " + @@ -131,7 +132,8 @@ public class ProduceResponse extends AbstractResponse { new Field(PARTITION_RESPONSES_KEY_NAME, new ArrayOf(new Schema( PARTITION_ID, ERROR_CODE, - new Field(BASE_OFFSET_KEY_NAME, INT64), + new Field(BASE_OFFSET_KEY_NAME, INT64, + "Offset assigned to the first record in the record set appended to the partition."), new Field(LOG_APPEND_TIME_KEY_NAME, INT64, "The timestamp returned by broker after appending " + "the messages. If CreateTime is used for the topic, the timestamp will be -1. " + "If LogAppendTime is used for the topic, the timestamp will be the broker local " + diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java index 77f951251fc6..532c322ab421 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java @@ -34,11 +34,11 @@ public class SyncGroupResponse extends AbstractResponse { private static final Schema SYNC_GROUP_RESPONSE_V0 = new Schema( ERROR_CODE, - new Field(MEMBER_ASSIGNMENT_KEY_NAME, BYTES)); + new Field(MEMBER_ASSIGNMENT_KEY_NAME, BYTES, "State assigned by group leader to this member.")); private static final Schema SYNC_GROUP_RESPONSE_V1 = new Schema( THROTTLE_TIME_MS, ERROR_CODE, - new Field(MEMBER_ASSIGNMENT_KEY_NAME, BYTES)); + new Field(MEMBER_ASSIGNMENT_KEY_NAME, BYTES, "State assigned by group leader to this member.")); public static Schema[] schemaVersions() { return new Schema[] {SYNC_GROUP_RESPONSE_V0, SYNC_GROUP_RESPONSE_V1}; From d85f7b2f8b4dd216fc21e6af17ac16e8e7f02a80 Mon Sep 17 00:00:00 2001 From: Andras Beni Date: Sun, 13 May 2018 07:45:35 +0200 Subject: [PATCH 3/3] Address some of those excessive 'the's. Change-Id: Ia6df205071aaded3744855906d6ef228e959a19b --- .../consumer/internals/ConsumerProtocol.java | 14 +++++++------- .../common/requests/AlterConfigsResponse.java | 6 +++--- .../requests/AlterReplicaLogDirsResponse.java | 2 +- .../kafka/common/requests/JoinGroupResponse.java | 6 +++--- .../kafka/common/requests/ListGroupsResponse.java | 2 +- 5 files changed, 15 insertions(+), 15 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java index c429c1a7a4a8..f0fbf268a300 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java @@ -64,19 +64,19 @@ public class ConsumerProtocol { public static final short CONSUMER_PROTOCOL_V0 = 0; public static final Schema CONSUMER_PROTOCOL_HEADER_SCHEMA = new Schema( - new Field(VERSION_KEY_NAME, Type.INT16, "Version number of the consumer protocol.")); + new Field(VERSION_KEY_NAME, Type.INT16, "Version number of consumer protocol.")); private static final Struct CONSUMER_PROTOCOL_HEADER_V0 = new Struct(CONSUMER_PROTOCOL_HEADER_SCHEMA) .set(VERSION_KEY_NAME, CONSUMER_PROTOCOL_V0); public static final Schema SUBSCRIPTION_V0 = new Schema( - new Field(TOPICS_KEY_NAME, new ArrayOf(Type.STRING), "The topics the consumer subscribes to."), - new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES, "Custom user data sent to the partition assignor.")); + new Field(TOPICS_KEY_NAME, new ArrayOf(Type.STRING), "Topics consumer subscribes to."), + new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES, "Custom user data sent to partition assignor.")); public static final Schema TOPIC_ASSIGNMENT_V0 = new Schema( - new Field(TOPIC_KEY_NAME, Type.STRING, "The topic the consumer subscribed to."), - new Field(PARTITIONS_KEY_NAME, new ArrayOf(Type.INT32), "The partitions of the topic which are assigned to the consumer.")); + new Field(TOPIC_KEY_NAME, Type.STRING, "Topic consumer subscribed to."), + new Field(PARTITIONS_KEY_NAME, new ArrayOf(Type.INT32), "Partitions of the topic which are assigned to consumer.")); public static final Schema ASSIGNMENT_V0 = new Schema( - new Field(TOPIC_PARTITIONS_KEY_NAME, new ArrayOf(TOPIC_ASSIGNMENT_V0), "Topic partitions assigned to the consumer."), - new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES, "Custom user data sent by the partition assignor.")); + new Field(TOPIC_PARTITIONS_KEY_NAME, new ArrayOf(TOPIC_ASSIGNMENT_V0), "Topic partitions assigned to consumer."), + new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES, "Custom user data sent by partition assignor.")); public static ByteBuffer serializeSubscription(PartitionAssignor.Subscription subscription) { Struct struct = new Struct(SUBSCRIPTION_V0); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java index d9b6183148fc..337c391f0864 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java @@ -45,13 +45,13 @@ public class AlterConfigsResponse extends AbstractResponse { private static final Schema ALTER_CONFIGS_RESPONSE_ENTITY_V0 = new Schema( ERROR_CODE, ERROR_MESSAGE, - new Field(RESOURCE_TYPE_KEY_NAME, INT8, "Type of the resource this response entity is for."), - new Field(RESOURCE_NAME_KEY_NAME, STRING, "Name of the resource this response entity is for.")); + new Field(RESOURCE_TYPE_KEY_NAME, INT8, "Type of resource this response entity is for."), + new Field(RESOURCE_NAME_KEY_NAME, STRING, "Name of resource this response entity is for.")); private static final Schema ALTER_CONFIGS_RESPONSE_V0 = new Schema( THROTTLE_TIME_MS, new Field(RESOURCES_KEY_NAME, new ArrayOf(ALTER_CONFIGS_RESPONSE_ENTITY_V0), - "The result of the change for each resource.")); + "Result of change for each resource.")); public static Schema[] schemaVersions() { return new Schema[]{ALTER_CONFIGS_RESPONSE_V0}; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsResponse.java index 69e573902015..fa1174a34236 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsResponse.java @@ -53,7 +53,7 @@ public class AlterReplicaLogDirsResponse extends AbstractResponse { new Field(PARTITIONS_KEY_NAME, new ArrayOf(new Schema( PARTITION_ID, ERROR_CODE)), "Error codes for each partition."))), - "The result of the operation for each topic.")); + "Result of the operation for each topic.")); public static Schema[] schemaVersions() { return new Schema[]{ALTER_REPLICA_LOG_DIRS_RESPONSE_V0}; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java index 2f37af575f67..0ecaa57bc3b9 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java @@ -47,7 +47,7 @@ public class JoinGroupResponse extends AbstractResponse { private static final Schema JOIN_GROUP_RESPONSE_MEMBER_V0 = new Schema( MEMBER_ID, - new Field(MEMBER_METADATA_KEY_NAME, BYTES, "The metadata supplied in this member's join group request.")); + new Field(MEMBER_METADATA_KEY_NAME, BYTES, "Metadata supplied in this member's join group request.")); private static final Schema JOIN_GROUP_RESPONSE_V0 = new Schema( ERROR_CODE, @@ -56,7 +56,7 @@ public class JoinGroupResponse extends AbstractResponse { new Field(LEADER_ID_KEY_NAME, STRING, "The leader of the group"), MEMBER_ID, new Field(MEMBERS_KEY_NAME, new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0), - "The leader will receive the full list of members along with the associated metadata for the protocol chosen. " + + "Leader will receive the full list of members along with associated metadata for the protocol chosen. " + "Other members, followers, will receive an empty array of members.")); private static final Schema JOIN_GROUP_RESPONSE_V1 = JOIN_GROUP_RESPONSE_V0; @@ -69,7 +69,7 @@ public class JoinGroupResponse extends AbstractResponse { new Field(LEADER_ID_KEY_NAME, STRING, "The leader of the group"), MEMBER_ID, new Field(MEMBERS_KEY_NAME, new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0), - "The leader will receive the full list of members along with the associated metadata for the protocol chosen. " + + "Leader will receive the full list of members along with associated metadata for the protocol chosen. " + "Other members, followers, will receive an empty array of members.")); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java index a597dadfb713..9c7374afb133 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java @@ -40,7 +40,7 @@ public class ListGroupsResponse extends AbstractResponse { private static final Schema LIST_GROUPS_RESPONSE_GROUP_V0 = new Schema( GROUP_ID, - new Field(PROTOCOL_TYPE_KEY_NAME, STRING, "The current group protocol.")); + new Field(PROTOCOL_TYPE_KEY_NAME, STRING, "Current group protocol's name.")); private static final Schema LIST_GROUPS_RESPONSE_V0 = new Schema( ERROR_CODE, new Field(GROUPS_KEY_NAME, new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0), "Information about each group managed by this broker."));