/
KafkaBasedLog.java
529 lines (478 loc) · 26.2 KB
/
KafkaBasedLog.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.util;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
/**
* <p>
* KafkaBasedLog provides a generic implementation of a shared, compacted log of records stored in Kafka that all
* clients need to consume and, at times, agree on their offset / that they have read to the end of the log.
* </p>
* <p>
* This functionality is useful for storing different types of data that all clients may need to agree on --
* offsets or config for example. This class runs a consumer in a background thread to continuously tail the target
* topic, accepts write requests which it writes to the topic using an internal producer, and provides some helpful
* utilities like checking the current log end offset and waiting until the current end of the log is reached.
* </p>
* <p>
* To support different use cases, this class works with either single- or multi-partition topics.
* </p>
* <p>
* Since this class is generic, it delegates the details of data storage via a callback that is invoked for each
* record that is consumed from the topic. The invocation of callbacks is guaranteed to be serialized -- if the
* calling class keeps track of state based on the log and only writes to it when consume callbacks are invoked
* and only reads it in {@link #readToEnd(Callback)} callbacks then no additional synchronization will be required.
* </p>
*/
public class KafkaBasedLog<K, V> {
private static final Logger log = LoggerFactory.getLogger(KafkaBasedLog.class);
private static final long CREATE_TOPIC_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(30);
private static final long MAX_SLEEP_MS = TimeUnit.SECONDS.toMillis(1);
// 15min of admin retry duration to ensure successful metadata propagation. 10 seconds of backoff
// in between retries
private static final Duration ADMIN_CLIENT_RETRY_DURATION = Duration.ofMinutes(15);
private static final long ADMIN_CLIENT_RETRY_BACKOFF_MS = TimeUnit.SECONDS.toMillis(10);
private final Time time;
private final String topic;
private int partitionCount;
private final Map<String, Object> producerConfigs;
private final Map<String, Object> consumerConfigs;
private final Callback<ConsumerRecord<K, V>> consumedCallback;
private final Supplier<TopicAdmin> topicAdminSupplier;
private final boolean requireAdminForOffsets;
private Consumer<K, V> consumer;
private Optional<Producer<K, V>> producer;
private TopicAdmin admin;
private Thread thread;
private boolean stopRequested;
private final Queue<Callback<Void>> readLogEndOffsetCallbacks;
private final java.util.function.Consumer<TopicAdmin> initializer;
/**
* Create a new KafkaBasedLog object. This does not start reading the log and writing is not permitted until
* {@link #start()} is invoked.
*
* @param topic the topic to treat as a log
* @param producerConfigs configuration options to use when creating the internal producer. At a minimum this must
* contain compatible serializer settings for the generic types used on this class. Some
* setting, such as the number of acks, will be overridden to ensure correct behavior of this
* class.
* @param consumerConfigs configuration options to use when creating the internal consumer. At a minimum this must
* contain compatible serializer settings for the generic types used on this class. Some
* setting, such as the auto offset reset policy, will be overridden to ensure correct
* behavior of this class.
* @param consumedCallback callback to invoke for each {@link ConsumerRecord} consumed when tailing the log
* @param time Time interface
* @param initializer the component that should be run when this log is {@link #start() started}; may be null
* @deprecated Replaced by {@link #KafkaBasedLog(String, Map, Map, Supplier, Callback, Time, java.util.function.Consumer)}
*/
@Deprecated
public KafkaBasedLog(String topic,
Map<String, Object> producerConfigs,
Map<String, Object> consumerConfigs,
Callback<ConsumerRecord<K, V>> consumedCallback,
Time time,
Runnable initializer) {
this(topic, producerConfigs, consumerConfigs, () -> null, consumedCallback, time, initializer != null ? admin -> initializer.run() : null);
}
/**
* Create a new KafkaBasedLog object. This does not start reading the log and writing is not permitted until
* {@link #start()} is invoked.
*
* @param topic the topic to treat as a log
* @param producerConfigs configuration options to use when creating the internal producer. At a minimum this must
* contain compatible serializer settings for the generic types used on this class. Some
* setting, such as the number of acks, will be overridden to ensure correct behavior of this
* class.
* @param consumerConfigs configuration options to use when creating the internal consumer. At a minimum this must
* contain compatible serializer settings for the generic types used on this class. Some
* setting, such as the auto offset reset policy, will be overridden to ensure correct
* behavior of this class.
* @param topicAdminSupplier supplier function for an admin client, the lifecycle of which is expected to be controlled
* by the calling component; may not be null
* @param consumedCallback callback to invoke for each {@link ConsumerRecord} consumed when tailing the log
* @param time Time interface
* @param initializer the function that should be run when this log is {@link #start() started}; may be null
*/
public KafkaBasedLog(String topic,
Map<String, Object> producerConfigs,
Map<String, Object> consumerConfigs,
Supplier<TopicAdmin> topicAdminSupplier,
Callback<ConsumerRecord<K, V>> consumedCallback,
Time time,
java.util.function.Consumer<TopicAdmin> initializer) {
this.topic = topic;
this.producerConfigs = producerConfigs;
this.consumerConfigs = consumerConfigs;
this.topicAdminSupplier = Objects.requireNonNull(topicAdminSupplier);
this.consumedCallback = consumedCallback;
this.stopRequested = false;
this.readLogEndOffsetCallbacks = new ArrayDeque<>();
this.time = time;
this.initializer = initializer != null ? initializer : admin -> { };
// Initialize the producer Optional here to prevent NPEs later on
this.producer = Optional.empty();
// If the consumer is configured with isolation.level = read_committed, then its end offsets method cannot be relied on
// as it will not take records from currently-open transactions into account. We want to err on the side of caution in that
// case: when users request a read to the end of the log, we will read up to the point where the latest offsets visible to the
// consumer are at least as high as the (possibly-part-of-a-transaction) end offsets of the topic.
this.requireAdminForOffsets = IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)
.equals(consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG));
}
/**
* Create a new KafkaBasedLog object using pre-existing Kafka clients. This does not start reading the log and writing
* is not permitted until {@link #start()} is invoked. Note that the consumer and (if not null) producer given to this log
* will be closed when this log is {@link #stop() stopped}.
*
* @param topic the topic to treat as a log
* @param consumer the consumer to use for reading from the log; may not be null
* @param producer the producer to use for writing to the log; may be null, which will create a read-only log
* @param topicAdmin an admin client, the lifecycle of which is expected to be controlled by the calling component;
* may not be null
* @param consumedCallback callback to invoke for each {@link ConsumerRecord} consumed when tailing the log
* @param time Time interface
* @param initializer the function that should be run when this log is {@link #start() started}; may be null
* @return a {@link KafkaBasedLog} using the given clients
*/
public static <K, V> KafkaBasedLog<K, V> withExistingClients(String topic,
Consumer<K, V> consumer,
Producer<K, V> producer,
TopicAdmin topicAdmin,
Callback<ConsumerRecord<K, V>> consumedCallback,
Time time,
java.util.function.Consumer<TopicAdmin> initializer) {
Objects.requireNonNull(topicAdmin);
return new KafkaBasedLog<K, V>(topic,
Collections.emptyMap(),
Collections.emptyMap(),
() -> topicAdmin,
consumedCallback,
time,
initializer) {
@Override
protected Producer<K, V> createProducer() {
return producer;
}
@Override
protected Consumer<K, V> createConsumer() {
return consumer;
}
};
}
public void start() {
log.info("Starting KafkaBasedLog with topic " + topic);
// Create the topic admin client and initialize the topic ...
admin = topicAdminSupplier.get(); // may be null
if (admin == null && requireAdminForOffsets) {
throw new ConnectException(
"Must provide a TopicAdmin to KafkaBasedLog when consumer is configured with "
+ ConsumerConfig.ISOLATION_LEVEL_CONFIG + " set to "
+ IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)
);
}
initializer.accept(admin);
// Then create the producer and consumer
producer = Optional.ofNullable(createProducer());
if (!producer.isPresent())
log.trace("Creating read-only KafkaBasedLog for topic " + topic);
consumer = createConsumer();
List<TopicPartition> partitions = new ArrayList<>();
// We expect that the topics will have been created either manually by the user or automatically by the herder
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
long started = time.nanoseconds();
long sleepMs = 100;
while (partitionInfos.isEmpty() && time.nanoseconds() - started < CREATE_TOPIC_TIMEOUT_NS) {
time.sleep(sleepMs);
sleepMs = Math.min(2 * sleepMs, MAX_SLEEP_MS);
partitionInfos = consumer.partitionsFor(topic);
}
if (partitionInfos.isEmpty())
throw new ConnectException("Could not look up partition metadata for offset backing store topic in" +
" allotted period. This could indicate a connectivity issue, unavailable topic partitions, or if" +
" this is your first use of the topic it may have taken too long to create.");
for (PartitionInfo partition : partitionInfos)
partitions.add(new TopicPartition(partition.topic(), partition.partition()));
partitionCount = partitions.size();
consumer.assign(partitions);
// Always consume from the beginning of all partitions. Necessary to ensure that we don't use committed offsets
// when a 'group.id' is specified (if offsets happen to have been committed unexpectedly).
consumer.seekToBeginning(partitions);
readToLogEnd(true);
thread = new WorkThread();
thread.start();
log.info("Finished reading KafkaBasedLog for topic " + topic);
log.info("Started KafkaBasedLog for topic " + topic);
}
public void stop() {
log.info("Stopping KafkaBasedLog for topic " + topic);
synchronized (this) {
stopRequested = true;
}
if (consumer != null) {
consumer.wakeup();
}
if (thread != null) {
try {
thread.join();
} catch (InterruptedException e) {
throw new ConnectException("Failed to stop KafkaBasedLog. Exiting without cleanly shutting " +
"down it's producer and consumer.", e);
}
}
producer.ifPresent(p -> Utils.closeQuietly(p, "KafkaBasedLog producer for topic " + topic));
Utils.closeQuietly(consumer, "KafkaBasedLog consumer for topic " + topic);
// do not close the admin client, since we don't own it
admin = null;
log.info("Stopped KafkaBasedLog for topic " + topic);
}
/**
* Flushes any outstanding writes and then reads to the current end of the log and invokes the specified callback.
* Note that this checks the current offsets, reads to them, and invokes the callback regardless of whether
* additional records have been written to the log. If the caller needs to ensure they have truly reached the end
* of the log, they must ensure there are no other writers during this period.
*
* This waits until the end of all partitions has been reached.
*
* This method is asynchronous. If you need a synchronous version, pass an instance of
* {@link org.apache.kafka.connect.util.FutureCallback} as the {@param callback} parameter and wait on it to block.
*
* @param callback the callback to invoke once the end of the log has been reached.
*/
public void readToEnd(Callback<Void> callback) {
log.trace("Starting read to end log for topic {}", topic);
flush();
synchronized (this) {
readLogEndOffsetCallbacks.add(callback);
}
consumer.wakeup();
}
/**
* Flush the underlying producer to ensure that all pending writes have been sent.
*/
public void flush() {
producer.ifPresent(Producer::flush);
}
/**
* Same as {@link #readToEnd(Callback)} but provides a {@link Future} instead of using a callback.
* @return the future associated with the operation
*/
public Future<Void> readToEnd() {
FutureCallback<Void> future = new FutureCallback<>(null);
readToEnd(future);
return future;
}
public void send(K key, V value) {
send(key, value, null);
}
public void send(K key, V value, org.apache.kafka.clients.producer.Callback callback) {
producer.orElseThrow(() ->
new IllegalStateException("This KafkaBasedLog was created in read-only mode and does not support write operations")
).send(new ProducerRecord<>(topic, key, value), callback);
}
public int partitionCount() {
return partitionCount;
}
protected Producer<K, V> createProducer() {
// Always require producer acks to all to ensure durable writes
producerConfigs.put(ProducerConfig.ACKS_CONFIG, "all");
// Don't allow more than one in-flight request to prevent reordering on retry (if enabled)
producerConfigs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
return new KafkaProducer<>(producerConfigs);
}
protected Consumer<K, V> createConsumer() {
// Always force reset to the beginning of the log since this class wants to consume all available log data
consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Turn off autocommit since we always want to consume the full log
consumerConfigs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new KafkaConsumer<>(consumerConfigs);
}
private void poll(long timeoutMs) {
try {
ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(timeoutMs));
for (ConsumerRecord<K, V> record : records)
consumedCallback.onCompletion(null, record);
} catch (WakeupException e) {
// Expected on get() or stop(). The calling code should handle this
throw e;
} catch (KafkaException e) {
log.error("Error polling: " + e);
}
}
/**
* This method finds the end offsets of the Kafka log's topic partitions, optionally retrying
* if the {@code listOffsets()} method of the admin client throws a {@link RetriableException}.
*
* @param shouldRetry Boolean flag to enable retry for the admin client {@code listOffsets()} call.
* @see TopicAdmin#retryEndOffsets
*/
private void readToLogEnd(boolean shouldRetry) {
Set<TopicPartition> assignment = consumer.assignment();
Map<TopicPartition, Long> endOffsets = readEndOffsets(assignment, shouldRetry);
log.trace("Reading to end of log offsets {}", endOffsets);
while (!endOffsets.isEmpty()) {
Iterator<Map.Entry<TopicPartition, Long>> it = endOffsets.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<TopicPartition, Long> entry = it.next();
TopicPartition topicPartition = entry.getKey();
long endOffset = entry.getValue();
long lastConsumedOffset = consumer.position(topicPartition);
if (lastConsumedOffset >= endOffset) {
log.trace("Read to end offset {} for {}", endOffset, topicPartition);
it.remove();
} else {
log.trace("Behind end offset {} for {}; last-read offset is {}",
endOffset, topicPartition, lastConsumedOffset);
poll(Integer.MAX_VALUE);
break;
}
}
}
}
// Visible for testing
/**
* Read to the end of the given list of topic partitions
* @param assignment the topic partitions to read to the end of
* @param shouldRetry boolean flag to enable retry for the admin client {@code listOffsets()} call.
* @throws UnsupportedVersionException if the log's consumer is using the "read_committed" isolation level (and
* therefore a separate admin client is required to read end offsets for the topic), but the broker does not support
* reading end offsets using an admin client
*/
Map<TopicPartition, Long> readEndOffsets(Set<TopicPartition> assignment, boolean shouldRetry) throws UnsupportedVersionException {
log.trace("Reading to end of offset log");
// Note that we'd prefer to not use the consumer to find the end offsets for the assigned topic partitions.
// That is because it's possible that the consumer is already blocked waiting for new records to appear, when
// the consumer is already at the end. In such cases, using 'consumer.endOffsets(...)' will block until at least
// one more record becomes available, meaning we can't even check whether we're at the end offset.
// Since all we're trying to do here is get the end offset, we should use the supplied admin client
// (if available) to obtain the end offsets for the given topic partitions.
// Deprecated constructors do not provide an admin supplier, so the admin is potentially null.
if (admin != null) {
// Use the admin client to immediately find the end offsets for the assigned topic partitions.
// Unlike using the consumer
try {
if (shouldRetry) {
return admin.retryEndOffsets(assignment,
ADMIN_CLIENT_RETRY_DURATION,
ADMIN_CLIENT_RETRY_BACKOFF_MS);
}
return admin.endOffsets(assignment);
} catch (UnsupportedVersionException e) {
// This may happen with really old brokers that don't support the auto topic creation
// field in metadata requests
if (requireAdminForOffsets) {
// Should be handled by the caller during log startup
throw e;
}
log.debug("Reading to end of log offsets with consumer since admin client is unsupported: {}", e.getMessage());
// Forget the reference to the admin so that we won't even try to use the admin the next time this method is called
admin = null;
// continue and let the consumer handle the read
}
// Other errors, like timeouts and retriable exceptions are intentionally propagated
}
// The admin may be null if older deprecated constructor is used or if the admin client is using a broker that doesn't
// support getting the end offsets (e.g., 0.10.x). In such cases, we should use the consumer, which is not ideal (see above).
return consumer.endOffsets(assignment);
}
private class WorkThread extends Thread {
public WorkThread() {
super("KafkaBasedLog Work Thread - " + topic);
}
@Override
public void run() {
try {
log.trace("{} started execution", this);
while (true) {
int numCallbacks;
synchronized (KafkaBasedLog.this) {
if (stopRequested)
break;
numCallbacks = readLogEndOffsetCallbacks.size();
}
if (numCallbacks > 0) {
try {
readToLogEnd(false);
log.trace("Finished read to end log for topic {}", topic);
} catch (TimeoutException e) {
log.warn("Timeout while reading log to end for topic '{}'. Retrying automatically. " +
"This may occur when brokers are unavailable or unreachable. Reason: {}", topic, e.getMessage());
continue;
} catch (RetriableException | org.apache.kafka.connect.errors.RetriableException e) {
log.warn("Retriable error while reading log to end for topic '{}'. Retrying automatically. " +
"Reason: {}", topic, e.getMessage());
continue;
} catch (WakeupException e) {
// Either received another get() call and need to retry reading to end of log or stop() was
// called. Both are handled by restarting this loop.
continue;
}
}
synchronized (KafkaBasedLog.this) {
// Only invoke exactly the number of callbacks we found before triggering the read to log end
// since it is possible for another write + readToEnd to sneak in the meantime
for (int i = 0; i < numCallbacks; i++) {
Callback<Void> cb = readLogEndOffsetCallbacks.poll();
cb.onCompletion(null, null);
}
}
try {
poll(Integer.MAX_VALUE);
} catch (WakeupException e) {
// See previous comment, both possible causes of this wakeup are handled by starting this loop again
continue;
}
}
} catch (Throwable t) {
log.error("Unexpected exception in {}", this, t);
}
}
}
}