Skip to content

Commit 2690433

Browse files
committed
KAFKA-19193 Support rack-aware partitioning for Kafka producer
According to KIP-1123, this commit adds the support for rack-aware partitioning to `BuiltInPartitioner`. It comes with two new configs for the producer: `partitioner.rack.aware` and `client.rack`, which allows enabling the new behavior. Apart from the added unit tests, the desired behavior was validated by `kafka-producer-perf-test.sh` with an existing and a non-existing rack against a 4 node cluster with two racks and 12-partition topic: ```shell ./kafka_2.13-4.1.0-SNAPSHOT/bin/kafka-producer-perf-test.sh \ --topic test-topic --num-records 100000 --throughput -1 --record-size 1 \ --producer-props bootstrap.servers=127.0.0.1:9092 \ client.rack=rack0 partitioner.rack.aware=true ```
1 parent f42abe6 commit 2690433

File tree

7 files changed

+370
-51
lines changed

7 files changed

+370
-51
lines changed

clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -420,7 +420,9 @@ public KafkaProducer(Properties properties, Serializer<K> keySerializer, Seriali
420420
config.getBoolean(ProducerConfig.PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG);
421421
RecordAccumulator.PartitionerConfig partitionerConfig = new RecordAccumulator.PartitionerConfig(
422422
enableAdaptivePartitioning,
423-
config.getLong(ProducerConfig.PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG)
423+
config.getLong(ProducerConfig.PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG),
424+
config.getBoolean(ProducerConfig.PARTITIONER_RACK_AWARE_CONFIG),
425+
config.getString(ProducerConfig.CLIENT_RACK_CONFIG)
424426
);
425427
// As per Kafka producer configuration documentation batch.size may be set to 0 to explicitly disable
426428
// batching which in practice actually means using a batch size of 1.

clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,10 @@ public class ProducerConfig extends AbstractConfig {
119119
+ "If 'false', producer would choose a partition based on a hash of the key when a key is present. "
120120
+ "Note: this setting has no effect if a custom partitioner is used.";
121121

122+
/** <code>partitioner.rack.aware</code> */
123+
public static final String PARTITIONER_RACK_AWARE_CONFIG = "partitioner.rack.aware";
124+
private static final String PARTITIONER_RACK_AWARE_DOC = "Controls whether the default partitioner is rack-aware. This has no effect when a custom partitioner is used.";
125+
122126
/** <code>acks</code> */
123127
public static final String ACKS_CONFIG = "acks";
124128
private static final String ACKS_DOC = "The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the "
@@ -175,6 +179,12 @@ public class ProducerConfig extends AbstractConfig {
175179
/** <code>client.id</code> */
176180
public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
177181

182+
/**
183+
* <code>client.rack</code>
184+
*/
185+
public static final String CLIENT_RACK_CONFIG = CommonClientConfigs.CLIENT_RACK_CONFIG;
186+
public static final String DEFAULT_CLIENT_RACK = CommonClientConfigs.DEFAULT_CLIENT_RACK;
187+
178188
/** <code>send.buffer.bytes</code> */
179189
public static final String SEND_BUFFER_CONFIG = CommonClientConfigs.SEND_BUFFER_CONFIG;
180190

@@ -314,7 +324,9 @@ public class ProducerConfig extends AbstractConfig {
314324
"This strategy send records to a partition until at least " + BATCH_SIZE_CONFIG + " bytes is produced to the partition. It works with the strategy:" +
315325
"<ol>" +
316326
"<li>If no partition is specified but a key is present, choose a partition based on a hash of the key.</li>" +
317-
"<li>If no partition or key is present, choose the sticky partition that changes when at least " + BATCH_SIZE_CONFIG + " bytes are produced to the partition.</li>" +
327+
"<li>If no partition or key is present, choose the sticky partition that changes when at least <code>" + BATCH_SIZE_CONFIG + "</code> bytes are produced to the partition.</li>" +
328+
"<li>If <code>" + CLIENT_RACK_CONFIG + "</code> is specified and <code>" + PARTITIONER_RACK_AWARE_CONFIG + "=true</code>, the sticky partition is chosen from partitions " +
329+
"with the leader broker in the same rack, if at least one is available. If none are available, it falls back on selecting from all available partitions.</li>" +
318330
"</ol>" +
319331
"</li>" +
320332
"<li><code>org.apache.kafka.clients.producer.RoundRobinPartitioner</code>: A partitioning strategy where " +
@@ -394,9 +406,11 @@ public class ProducerConfig extends AbstractConfig {
394406
.define(PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG, Type.BOOLEAN, true, Importance.LOW, PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_DOC)
395407
.define(PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG, Type.LONG, 0, atLeast(0), Importance.LOW, PARTITIONER_AVAILABILITY_TIMEOUT_MS_DOC)
396408
.define(PARTITIONER_IGNORE_KEYS_CONFIG, Type.BOOLEAN, false, Importance.MEDIUM, PARTITIONER_IGNORE_KEYS_DOC)
409+
.define(PARTITIONER_RACK_AWARE_CONFIG, Type.BOOLEAN, false, Importance.LOW, PARTITIONER_RACK_AWARE_DOC)
397410
.define(LINGER_MS_CONFIG, Type.LONG, 5, atLeast(0), Importance.MEDIUM, LINGER_MS_DOC)
398411
.define(DELIVERY_TIMEOUT_MS_CONFIG, Type.INT, 120 * 1000, atLeast(0), Importance.MEDIUM, DELIVERY_TIMEOUT_MS_DOC)
399412
.define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC)
413+
.define(CLIENT_RACK_CONFIG, Type.STRING, DEFAULT_CLIENT_RACK, Importance.LOW, CommonClientConfigs.CLIENT_RACK_DOC)
400414
.define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(CommonClientConfigs.SEND_BUFFER_LOWER_BOUND), Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC)
401415
.define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(CommonClientConfigs.RECEIVE_BUFFER_LOWER_BOUND), Importance.MEDIUM, CommonClientConfigs.RECEIVE_BUFFER_DOC)
402416
.define(MAX_REQUEST_SIZE_CONFIG,

clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java

Lines changed: 94 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.concurrent.ThreadLocalRandom;
2929
import java.util.concurrent.atomic.AtomicInteger;
3030
import java.util.concurrent.atomic.AtomicReference;
31+
import java.util.stream.Collectors;
3132

3233
/**
3334
* Built-in default partitioner. Note, that this is just a utility class that is used directly from
@@ -40,8 +41,10 @@ public class BuiltInPartitioner {
4041
private final Logger log;
4142
private final String topic;
4243
private final int stickyBatchSize;
44+
private final boolean rackAware;
45+
private final String rack;
4346

44-
private volatile PartitionLoadStats partitionLoadStats = null;
47+
private volatile PartitionLoadStatsHolder partitionLoadStats = null;
4548
private final AtomicReference<StickyPartitionInfo> stickyPartitionInfo = new AtomicReference<>();
4649

4750

@@ -51,13 +54,15 @@ public class BuiltInPartitioner {
5154
* @param topic The topic
5255
* @param stickyBatchSize How much to produce to partition before switch
5356
*/
54-
public BuiltInPartitioner(LogContext logContext, String topic, int stickyBatchSize) {
57+
public BuiltInPartitioner(LogContext logContext, String topic, int stickyBatchSize, boolean rackAware, String rack) {
5558
this.log = logContext.logger(BuiltInPartitioner.class);
5659
this.topic = topic;
5760
if (stickyBatchSize < 1) {
5861
throw new IllegalArgumentException("stickyBatchSize must be >= 1 but got " + stickyBatchSize);
5962
}
6063
this.stickyBatchSize = stickyBatchSize;
64+
this.rackAware = rackAware;
65+
this.rack = rack;
6166
}
6267

6368
/**
@@ -67,14 +72,25 @@ private int nextPartition(Cluster cluster) {
6772
int random = randomPartition();
6873

6974
// Cache volatile variable in local variable.
70-
PartitionLoadStats partitionLoadStats = this.partitionLoadStats;
75+
PartitionLoadStatsHolder partitionLoadStats = this.partitionLoadStats;
76+
7177
int partition;
7278

7379
if (partitionLoadStats == null) {
7480
// We don't have stats to do adaptive partitioning (or it's disabled), just switch to the next
7581
// partition based on uniform distribution.
7682
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
7783
if (!availablePartitions.isEmpty()) {
84+
// Select only partitions with leaders in this rack if configured so, falling back if none are available.
85+
if (rackAware) {
86+
List<PartitionInfo> availablePartitionsInRack = availablePartitions.stream()
87+
.filter(p -> p.leader().hasRack() && p.leader().rack().equals(rack))
88+
.collect(Collectors.toList());
89+
if (!availablePartitionsInRack.isEmpty()) {
90+
availablePartitions = availablePartitionsInRack;
91+
}
92+
}
93+
7894
partition = availablePartitions.get(random % availablePartitions.size()).partition();
7995
} else {
8096
// We don't have available partitions, just pick one among all partitions.
@@ -84,14 +100,20 @@ private int nextPartition(Cluster cluster) {
84100
} else {
85101
// Calculate next partition based on load distribution.
86102
// Note that partitions without leader are excluded from the partitionLoadStats.
87-
assert partitionLoadStats.length > 0;
88103

89-
int[] cumulativeFrequencyTable = partitionLoadStats.cumulativeFrequencyTable;
90-
int weightedRandom = random % cumulativeFrequencyTable[partitionLoadStats.length - 1];
104+
PartitionLoadStats partitionLoadStatsToUse = partitionLoadStats.total;
105+
if (rackAware && partitionLoadStats.inThisRack != null && partitionLoadStats.inThisRack.length > 0) {
106+
partitionLoadStatsToUse = partitionLoadStats.inThisRack;
107+
}
108+
109+
assert partitionLoadStatsToUse.length > 0;
110+
111+
int[] cumulativeFrequencyTable = partitionLoadStatsToUse.cumulativeFrequencyTable;
112+
int weightedRandom = random % cumulativeFrequencyTable[partitionLoadStatsToUse.length - 1];
91113

92114
// By construction, the cumulative frequency table is sorted, so we can use binary
93115
// search to find the desired index.
94-
int searchResult = Arrays.binarySearch(cumulativeFrequencyTable, 0, partitionLoadStats.length, weightedRandom);
116+
int searchResult = Arrays.binarySearch(cumulativeFrequencyTable, 0, partitionLoadStatsToUse.length, weightedRandom);
95117

96118
// binarySearch results the index of the found element, or -(insertion_point) - 1
97119
// (where insertion_point is the index of the first element greater than the key).
@@ -103,8 +125,8 @@ private int nextPartition(Cluster cluster) {
103125
// would return -0 - 1 = -1, by adding 1 we'd get 0. If we're looking for 4, we'd
104126
// get 0, and we need the next one, so adding 1 works here as well.
105127
int partitionIndex = Math.abs(searchResult + 1);
106-
assert partitionIndex < partitionLoadStats.length;
107-
partition = partitionLoadStats.partitionIds[partitionIndex];
128+
assert partitionIndex < partitionLoadStatsToUse.length;
129+
partition = partitionLoadStatsToUse.partitionIds[partitionIndex];
108130
}
109131

110132
log.trace("Switching to partition {} in topic {}", partition, topic);
@@ -121,8 +143,14 @@ int randomPartition() {
121143
*/
122144
public int loadStatsRangeEnd() {
123145
assert partitionLoadStats != null;
124-
assert partitionLoadStats.length > 0;
125-
return partitionLoadStats.cumulativeFrequencyTable[partitionLoadStats.length - 1];
146+
assert partitionLoadStats.total.length > 0;
147+
return partitionLoadStats.total.cumulativeFrequencyTable[partitionLoadStats.total.length - 1];
148+
}
149+
150+
public int loadStatsInThisRackRangeEnd() {
151+
assert partitionLoadStats.inThisRack != null;
152+
assert partitionLoadStats.inThisRack.length > 0;
153+
return partitionLoadStats.inThisRack.cumulativeFrequencyTable[partitionLoadStats.inThisRack.length - 1];
126154
}
127155

128156
/**
@@ -233,18 +261,20 @@ void updatePartitionInfo(StickyPartitionInfo partitionInfo, int appendedBytes, C
233261
*
234262
* @param queueSizes The queue sizes, partitions without leaders are excluded
235263
* @param partitionIds The partition ids for the queues, partitions without leaders are excluded
264+
* @param partitionLeaderRacks The racks of partition leaders for the queues, partitions without leaders are excluded
236265
* @param length The logical length of the arrays (could be less): we may eliminate some partitions
237266
* based on latency, but to avoid reallocation of the arrays, we just decrement
238267
* logical length
239268
* Visible for testing
240269
*/
241-
public void updatePartitionLoadStats(int[] queueSizes, int[] partitionIds, int length) {
270+
public void updatePartitionLoadStats(int[] queueSizes, int[] partitionIds, String[] partitionLeaderRacks, int length) {
242271
if (queueSizes == null) {
243272
log.trace("No load stats for topic {}, not using adaptive", topic);
244273
partitionLoadStats = null;
245274
return;
246275
}
247276
assert queueSizes.length == partitionIds.length;
277+
assert queueSizes.length == partitionLeaderRacks.length;
248278
assert length <= queueSizes.length;
249279

250280
// The queueSizes.length represents the number of all partitions in the topic and if we have
@@ -276,6 +306,7 @@ public void updatePartitionLoadStats(int[] queueSizes, int[] partitionIds, int l
276306
// the value is the index of the partition we're looking for. In this example
277307
// random numbers 0, 1, 2, 3 would map to partition[0], 4 would map to partition[1]
278308
// and 5, 6, 7 would map to partition[2].
309+
// Do the same with this-rack-only partitions if rack awareness is enabled.
279310

280311
// Calculate max queue size + 1 and check if all sizes are the same.
281312
int maxSizePlus1 = queueSizes[0];
@@ -297,14 +328,51 @@ public void updatePartitionLoadStats(int[] queueSizes, int[] partitionIds, int l
297328
return;
298329
}
299330

331+
// Before inverting and folding, build fully the load stats for this rack, because this depends on the raw queue sizes.
332+
PartitionLoadStats partitionLoadStatsInThisRack = createPartitionLoadStatsForThisRackIfNeeded(queueSizes, partitionIds, partitionLeaderRacks, length);
333+
300334
// Invert and fold the queue size, so that they become separator values in the CFT.
335+
invertAndFoldQueueSizeArray(queueSizes, maxSizePlus1, length);
336+
337+
log.trace("Partition load stats for topic {}: CFT={}, IDs={}, length={}",
338+
topic, queueSizes, partitionIds, length);
339+
partitionLoadStats = new PartitionLoadStatsHolder(
340+
new PartitionLoadStats(queueSizes, partitionIds, length),
341+
partitionLoadStatsInThisRack
342+
);
343+
}
344+
345+
private PartitionLoadStats createPartitionLoadStatsForThisRackIfNeeded(int[] queueSizes, int[] partitionIds, String[] partitionLeaderRacks, int length) {
346+
if (!rackAware) {
347+
return null;
348+
}
349+
int[] queueSizesInThisRack = new int[length];
350+
int[] partitionIdsInThisRack = new int[length];
351+
int lengthInThisRack = 0;
352+
int maxSizePlus1InThisRack = -1;
353+
354+
for (int i = 0; i < length; i++) {
355+
if (rack.equals(partitionLeaderRacks[i])) {
356+
queueSizesInThisRack[lengthInThisRack] = queueSizes[i];
357+
partitionIdsInThisRack[lengthInThisRack] = partitionIds[i];
358+
359+
if (queueSizes[i] > maxSizePlus1InThisRack)
360+
maxSizePlus1InThisRack = queueSizes[i];
361+
362+
lengthInThisRack += 1;
363+
}
364+
}
365+
++maxSizePlus1InThisRack;
366+
367+
invertAndFoldQueueSizeArray(queueSizesInThisRack, maxSizePlus1InThisRack, lengthInThisRack);
368+
return new PartitionLoadStats(queueSizesInThisRack, partitionIdsInThisRack, lengthInThisRack);
369+
}
370+
371+
private void invertAndFoldQueueSizeArray(int[] queueSizes, int maxSizePlus1, int length) {
301372
queueSizes[0] = maxSizePlus1 - queueSizes[0];
302373
for (int i = 1; i < length; i++) {
303374
queueSizes[i] = maxSizePlus1 - queueSizes[i] + queueSizes[i - 1];
304375
}
305-
log.trace("Partition load stats for topic {}: CFT={}, IDs={}, length={}",
306-
topic, queueSizes, partitionIds, length);
307-
partitionLoadStats = new PartitionLoadStats(queueSizes, partitionIds, length);
308376
}
309377

310378
/**
@@ -346,4 +414,15 @@ public PartitionLoadStats(int[] cumulativeFrequencyTable, int[] partitionIds, in
346414
this.length = length;
347415
}
348416
}
417+
418+
private static final class PartitionLoadStatsHolder {
419+
final PartitionLoadStats total;
420+
final PartitionLoadStats inThisRack;
421+
422+
private PartitionLoadStatsHolder(PartitionLoadStats total,
423+
PartitionLoadStats inThisRack) {
424+
this.total = total;
425+
this.inThisRack = inThisRack;
426+
}
427+
}
349428
}

0 commit comments

Comments
 (0)