forked from apache/kafka
/
ConsumerPartitionAssignor.java
206 lines (172 loc) · 7.21 KB
/
ConsumerPartitionAssignor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;
/**
* This interface is used to define custom partition assignment for use in
* {@link org.apache.kafka.clients.consumer.KafkaConsumer}. Members of the consumer group subscribe
* to the topics they are interested in and forward their subscriptions to a Kafka broker serving
* as the group coordinator. The coordinator selects one member to perform the group assignment and
* propagates the subscriptions of all members to it. Then {@link #assign(Cluster, GroupSubscription)} is called
* to perform the assignment and the results are forwarded back to each respective members
*
* In some cases, it is useful to forward additional metadata to the assignor in order to make
* assignment decisions. For this, you can override {@link #subscriptionUserData(Set)} and provide custom
* userData in the returned Subscription. For example, to have a rack-aware assignor, an implementation
* can use this user data to forward the rackId belonging to each member.
*/
public interface ConsumerPartitionAssignor {
/**
* Return serialized data that will be included in the {@link Subscription} sent to the leader
* and can be leveraged in {@link #assign(Cluster, GroupSubscription)} ((e.g. local host/rack information)
*
* @return optional join subscription user data
*/
default ByteBuffer subscriptionUserData(Set<String> topics) {
return null;
}
/**
* Perform the group assignment given the member subscriptions and current cluster metadata.
* @param metadata Current topic/broker metadata known by consumer
* @param subscriptions Subscriptions from all members including metadata provided through {@link #subscriptionUserData(Set)}
* @return A map from the members to their respective assignment. This should have one entry
* for each member in the input subscription map.
*/
GroupAssignment assign(Cluster metadata, GroupSubscription subscriptions);
/**
* Callback which is invoked when a group member receives its assignment from the leader.
* @param assignment The local member's assignment as provided by the leader in {@link #assign(Cluster, GroupSubscription)}
* @param metadata Additional metadata on the consumer (optional)
*/
default void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {
}
/**
* Indicate which rebalance protocol this assignor works with;
* By default it should always work with {@link RebalanceProtocol#EAGER}.
*/
default List<RebalanceProtocol> supportedProtocols() {
return Collections.singletonList(RebalanceProtocol.EAGER);
}
/**
* Return the version of the assignor which indicates how the user metadata encodings
* and the assignment algorithm gets evolved.
*/
default short version() {
return (short) 0;
}
/**
* Unique name for this assignor (e.g. "range" or "roundrobin" or "sticky"). Note, this is not required
* to be the same as the class name specified in {@link ConsumerConfig#PARTITION_ASSIGNMENT_STRATEGY_CONFIG}
* @return non-null unique name
*/
String name();
final class Subscription {
private final List<String> topics;
private final ByteBuffer userData;
private final List<TopicPartition> ownedPartitions;
private Optional<String> groupInstanceId;
public Subscription(List<String> topics, ByteBuffer userData, List<TopicPartition> ownedPartitions) {
this.topics = topics;
this.userData = userData;
this.ownedPartitions = ownedPartitions;
this.groupInstanceId = Optional.empty();
}
public Subscription(List<String> topics, ByteBuffer userData) {
this(topics, userData, Collections.emptyList());
}
public Subscription(List<String> topics) {
this(topics, null, Collections.emptyList());
}
public List<String> topics() {
return topics;
}
public ByteBuffer userData() {
return userData;
}
public List<TopicPartition> ownedPartitions() {
return ownedPartitions;
}
public void setGroupInstanceId(Optional<String> groupInstanceId) {
this.groupInstanceId = groupInstanceId;
}
public Optional<String> groupInstanceId() {
return groupInstanceId;
}
}
final class Assignment {
private List<TopicPartition> partitions;
private ByteBuffer userData;
public Assignment(List<TopicPartition> partitions, ByteBuffer userData) {
this.partitions = partitions;
this.userData = userData;
}
public Assignment(List<TopicPartition> partitions) {
this(partitions, null);
}
public List<TopicPartition> partitions() {
return partitions;
}
public ByteBuffer userData() {
return userData;
}
}
final class GroupSubscription {
private final Map<String, Subscription> subscriptions;
public GroupSubscription(Map<String, Subscription> subscriptions) {
this.subscriptions = subscriptions;
}
public Map<String, Subscription> groupSubscription() {
return subscriptions;
}
}
final class GroupAssignment {
private final Map<String, Assignment> assignments;
public GroupAssignment(Map<String, Assignment> assignments) {
this.assignments = assignments;
}
public Map<String, Assignment> groupAssignment() {
return assignments;
}
}
enum RebalanceProtocol {
EAGER((byte) 0), COOPERATIVE((byte) 1);
private final byte id;
RebalanceProtocol(byte id) {
this.id = id;
}
public byte id() {
return id;
}
public static RebalanceProtocol forId(byte id) {
switch (id) {
case 0:
return EAGER;
case 1:
return COOPERATIVE;
default:
throw new IllegalArgumentException("Unknown rebalance protocol id: " + id);
}
}
}
}