forked from apache/kafka
/
KafkaConsumer.java
2307 lines (2194 loc) · 130 KB
/
KafkaConsumer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.Fetcher;
import org.apache.kafka.clients.consumer.internals.FetcherMetricsRegistry;
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
/**
* A client that consumes records from a Kafka cluster.
* <p>
* This client transparently handles the failure of Kafka brokers, and transparently adapts as topic partitions
* it fetches migrate within the cluster. This client also interacts with the broker to allow groups of
* consumers to load balance consumption using <a href="#consumergroups">consumer groups</a>.
* <p>
* The consumer maintains TCP connections to the necessary brokers to fetch data.
* Failure to close the consumer after use will leak these connections.
* The consumer is not thread-safe. See <a href="#multithreaded">Multi-threaded Processing</a> for more details.
*
* <h3>Cross-Version Compatibility</h3>
* This client can communicate with brokers that are version 0.10.0 or newer. Older or newer brokers may not support
* certain features. For example, 0.10.0 brokers do not support offsetsForTimes, because this feature was added
* in version 0.10.1. You will receive an {@link org.apache.kafka.common.errors.UnsupportedVersionException}
* when invoking an API that is not available on the running broker version.
* <p>
*
* <h3>Offsets and Consumer Position</h3>
* Kafka maintains a numerical offset for each record in a partition. This offset acts as a unique identifier of
* a record within that partition, and also denotes the position of the consumer in the partition. For example, a consumer
* which is at position 5 has consumed records with offsets 0 through 4 and will next receive the record with offset 5. There
* are actually two notions of position relevant to the user of the consumer:
* <p>
* The {@link #position(TopicPartition) position} of the consumer gives the offset of the next record that will be given
* out. It will be one larger than the highest offset the consumer has seen in that partition. It automatically advances
* every time the consumer receives messages in a call to {@link #poll(Duration)}.
* <p>
* The {@link #commitSync() committed position} is the last offset that has been stored securely. Should the
* process fail and restart, this is the offset that the consumer will recover to. The consumer can either automatically commit
* offsets periodically; or it can choose to control this committed position manually by calling one of the commit APIs
* (e.g. {@link #commitSync() commitSync} and {@link #commitAsync(OffsetCommitCallback) commitAsync}).
* <p>
* This distinction gives the consumer control over when a record is considered consumed. It is discussed in further
* detail below.
*
* <h3><a name="consumergroups">Consumer Groups and Topic Subscriptions</a></h3>
*
* Kafka uses the concept of <i>consumer groups</i> to allow a pool of processes to divide the work of consuming and
* processing records. These processes can either be running on the same machine or they can be
* distributed over many machines to provide scalability and fault tolerance for processing. All consumer instances
* sharing the same {@code group.id} will be part of the same consumer group.
* <p>
* Each consumer in a group can dynamically set the list of topics it wants to subscribe to through one of the
* {@link #subscribe(Collection, ConsumerRebalanceListener) subscribe} APIs. Kafka will deliver each message in the
* subscribed topics to one process in each consumer group. This is achieved by balancing the partitions between all
* members in the consumer group so that each partition is assigned to exactly one consumer in the group. So if there
* is a topic with four partitions, and a consumer group with two processes, each process would consume from two partitions.
* <p>
* Membership in a consumer group is maintained dynamically: if a process fails, the partitions assigned to it will
* be reassigned to other consumers in the same group. Similarly, if a new consumer joins the group, partitions will be moved
* from existing consumers to the new one. This is known as <i>rebalancing</i> the group and is discussed in more
* detail <a href="#failuredetection">below</a>. Group rebalancing is also used when new partitions are added
* to one of the subscribed topics or when a new topic matching a {@link #subscribe(Pattern, ConsumerRebalanceListener) subscribed regex}
* is created. The group will automatically detect the new partitions through periodic metadata refreshes and
* assign them to members of the group.
* <p>
* Conceptually you can think of a consumer group as being a single logical subscriber that happens to be made up of
* multiple processes. As a multi-subscriber system, Kafka naturally supports having any number of consumer groups for a
* given topic without duplicating data (additional consumers are actually quite cheap).
* <p>
* This is a slight generalization of the functionality that is common in messaging systems. To get semantics similar to
* a queue in a traditional messaging system all processes would be part of a single consumer group and hence record
* delivery would be balanced over the group like with a queue. Unlike a traditional messaging system, though, you can
* have multiple such groups. To get semantics similar to pub-sub in a traditional messaging system each process would
* have its own consumer group, so each process would subscribe to all the records published to the topic.
* <p>
* In addition, when group reassignment happens automatically, consumers can be notified through a {@link ConsumerRebalanceListener},
* which allows them to finish necessary application-level logic such as state cleanup, manual offset
* commits, etc. See <a href="#rebalancecallback">Storing Offsets Outside Kafka</a> for more details.
* <p>
* It is also possible for the consumer to <a href="#manualassignment">manually assign</a> specific partitions
* (similar to the older "simple" consumer) using {@link #assign(Collection)}. In this case, dynamic partition
* assignment and consumer group coordination will be disabled.
*
* <h3><a name="failuredetection">Detecting Consumer Failures</a></h3>
*
* After subscribing to a set of topics, the consumer will automatically join the group when {@link #poll(Duration)} is
* invoked. The poll API is designed to ensure consumer liveness. As long as you continue to call poll, the consumer
* will stay in the group and continue to receive messages from the partitions it was assigned. Underneath the covers,
* the consumer sends periodic heartbeats to the server. If the consumer crashes or is unable to send heartbeats for
* a duration of {@code session.timeout.ms}, then the consumer will be considered dead and its partitions will
* be reassigned.
* <p>
* It is also possible that the consumer could encounter a "livelock" situation where it is continuing
* to send heartbeats, but no progress is being made. To prevent the consumer from holding onto its partitions
* indefinitely in this case, we provide a liveness detection mechanism using the {@code max.poll.interval.ms}
* setting. Basically if you don't call poll at least as frequently as the configured max interval,
* then the client will proactively leave the group so that another consumer can take over its partitions. When this happens,
* you may see an offset commit failure (as indicated by a {@link CommitFailedException} thrown from a call to {@link #commitSync()}).
* This is a safety mechanism which guarantees that only active members of the group are able to commit offsets.
* So to stay in the group, you must continue to call poll.
* <p>
* The consumer provides two configuration settings to control the behavior of the poll loop:
* <ol>
* <li><code>max.poll.interval.ms</code>: By increasing the interval between expected polls, you can give
* the consumer more time to handle a batch of records returned from {@link #poll(Duration)}. The drawback
* is that increasing this value may delay a group rebalance since the consumer will only join the rebalance
* inside the call to poll. You can use this setting to bound the time to finish a rebalance, but
* you risk slower progress if the consumer cannot actually call {@link #poll(Duration) poll} often enough.</li>
* <li><code>max.poll.records</code>: Use this setting to limit the total records returned from a single
* call to poll. This can make it easier to predict the maximum that must be handled within each poll
* interval. By tuning this value, you may be able to reduce the poll interval, which will reduce the
* impact of group rebalancing.</li>
* </ol>
* <p>
* For use cases where message processing time varies unpredictably, neither of these options may be sufficient.
* The recommended way to handle these cases is to move message processing to another thread, which allows
* the consumer to continue calling {@link #poll(Duration) poll} while the processor is still working.
* Some care must be taken to ensure that committed offsets do not get ahead of the actual position.
* Typically, you must disable automatic commits and manually commit processed offsets for records only after the
* thread has finished handling them (depending on the delivery semantics you need).
* Note also that you will need to {@link #pause(Collection) pause} the partition so that no new records are received
* from poll until after thread has finished handling those previously returned.
*
* <h3>Usage Examples</h3>
* The consumer APIs offer flexibility to cover a variety of consumption use cases. Here are some examples to
* demonstrate how to use them.
*
* <h4>Automatic Offset Committing</h4>
* This example demonstrates a simple usage of Kafka's consumer api that relies on automatic offset committing.
* <p>
* <pre>
* Properties props = new Properties();
* props.setProperty("bootstrap.servers", "localhost:9092");
* props.setProperty("group.id", "test");
* props.setProperty("enable.auto.commit", "true");
* props.setProperty("auto.commit.interval.ms", "1000");
* props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
* props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
* KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
* consumer.subscribe(Arrays.asList("foo", "bar"));
* while (true) {
* ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
* for (ConsumerRecord<String, String> record : records)
* System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
* }
* </pre>
*
* The connection to the cluster is bootstrapped by specifying a list of one or more brokers to contact using the
* configuration {@code >bootstrap.servers}. This list is just used to discover the rest of the brokers in the
* cluster and need not be an exhaustive list of servers in the cluster (though you may want to specify more than one in
* case there are servers down when the client is connecting).
* <p>
* Setting {@code enable.auto.commit} means that offsets are committed automatically with a frequency controlled by
* the config {@code auto.commit.interval.ms}.
* <p>
* In this example the consumer is subscribing to the topics <i>foo</i> and <i>bar</i> as part of a group of consumers
* called <i>test</i> as configured with {@code group.id}.
* <p>
* The deserializer settings specify how to turn bytes into objects. For example, by specifying string deserializers, we
* are saying that our record's key and value will just be simple strings.
*
* <h4>Manual Offset Control</h4>
*
* Instead of relying on the consumer to periodically commit consumed offsets, users can also control when records
* should be considered as consumed and hence commit their offsets. This is useful when the consumption of the messages
* is coupled with some processing logic and hence a message should not be considered as consumed until it is completed processing.
* <p>
* <pre>
* Properties props = new Properties();
* props.setProperty("bootstrap.servers", "localhost:9092");
* props.setProperty("group.id", "test");
* props.setProperty("enable.auto.commit", "false");
* props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
* props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
* KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
* consumer.subscribe(Arrays.asList("foo", "bar"));
* final int minBatchSize = 200;
* List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
* while (true) {
* ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
* for (ConsumerRecord<String, String> record : records) {
* buffer.add(record);
* }
* if (buffer.size() >= minBatchSize) {
* insertIntoDb(buffer);
* consumer.commitSync();
* buffer.clear();
* }
* }
* </pre>
*
* In this example we will consume a batch of records and batch them up in memory. When we have enough records
* batched, we will insert them into a database. If we allowed offsets to auto commit as in the previous example, records
* would be considered consumed after they were returned to the user in {@link #poll(Duration) poll}. It would then be
* possible
* for our process to fail after batching the records, but before they had been inserted into the database.
* <p>
* To avoid this, we will manually commit the offsets only after the corresponding records have been inserted into the
* database. This gives us exact control of when a record is considered consumed. This raises the opposite possibility:
* the process could fail in the interval after the insert into the database but before the commit (even though this
* would likely just be a few milliseconds, it is a possibility). In this case the process that took over consumption
* would consume from last committed offset and would repeat the insert of the last batch of data. Used in this way
* Kafka provides what is often called "at-least-once" delivery guarantees, as each record will likely be delivered one
* time but in failure cases could be duplicated.
* <p>
* <b>Note: Using automatic offset commits can also give you "at-least-once" delivery, but the requirement is that
* you must consume all data returned from each call to {@link #poll(Duration)} before any subsequent calls, or before
* {@link #close() closing} the consumer. If you fail to do either of these, it is possible for the committed offset
* to get ahead of the consumed position, which results in missing records. The advantage of using manual offset
* control is that you have direct control over when a record is considered "consumed."</b>
* <p>
* The above example uses {@link #commitSync() commitSync} to mark all received records as committed. In some cases
* you may wish to have even finer control over which records have been committed by specifying an offset explicitly.
* In the example below we commit offset after we finish handling the records in each partition.
* <p>
* <pre>
* try {
* while(running) {
* ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
* for (TopicPartition partition : records.partitions()) {
* List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
* for (ConsumerRecord<String, String> record : partitionRecords) {
* System.out.println(record.offset() + ": " + record.value());
* }
* long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
* consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
* }
* }
* } finally {
* consumer.close();
* }
* </pre>
*
* <b>Note: The committed offset should always be the offset of the next message that your application will read.</b>
* Thus, when calling {@link #commitSync(Map) commitSync(offsets)} you should add one to the offset of the last message processed.
*
* <h4><a name="manualassignment">Manual Partition Assignment</a></h4>
*
* In the previous examples, we subscribed to the topics we were interested in and let Kafka dynamically assign a
* fair share of the partitions for those topics based on the active consumers in the group. However, in
* some cases you may need finer control over the specific partitions that are assigned. For example:
* <p>
* <ul>
* <li>If the process is maintaining some kind of local state associated with that partition (like a
* local on-disk key-value store), then it should only get records for the partition it is maintaining on disk.
* <li>If the process itself is highly available and will be restarted if it fails (perhaps using a
* cluster management framework like YARN, Mesos, or AWS facilities, or as part of a stream processing framework). In
* this case there is no need for Kafka to detect the failure and reassign the partition since the consuming process
* will be restarted on another machine.
* </ul>
* <p>
* To use this mode, instead of subscribing to the topic using {@link #subscribe(Collection) subscribe}, you just call
* {@link #assign(Collection)} with the full list of partitions that you want to consume.
*
* <pre>
* String topic = "foo";
* TopicPartition partition0 = new TopicPartition(topic, 0);
* TopicPartition partition1 = new TopicPartition(topic, 1);
* consumer.assign(Arrays.asList(partition0, partition1));
* </pre>
*
* Once assigned, you can call {@link #poll(Duration) poll} in a loop, just as in the preceding examples to consume
* records. The group that the consumer specifies is still used for committing offsets, but now the set of partitions
* will only change with another call to {@link #assign(Collection) assign}. Manual partition assignment does
* not use group coordination, so consumer failures will not cause assigned partitions to be rebalanced. Each consumer
* acts independently even if it shares a groupId with another consumer. To avoid offset commit conflicts, you should
* usually ensure that the groupId is unique for each consumer instance.
* <p>
* Note that it isn't possible to mix manual partition assignment (i.e. using {@link #assign(Collection) assign})
* with dynamic partition assignment through topic subscription (i.e. using {@link #subscribe(Collection) subscribe}).
*
* <h4><a name="rebalancecallback">Storing Offsets Outside Kafka</h4>
*
* The consumer application need not use Kafka's built-in offset storage, it can store offsets in a store of its own
* choosing. The primary use case for this is allowing the application to store both the offset and the results of the
* consumption in the same system in a way that both the results and offsets are stored atomically. This is not always
* possible, but when it is it will make the consumption fully atomic and give "exactly once" semantics that are
* stronger than the default "at-least once" semantics you get with Kafka's offset commit functionality.
* <p>
* Here are a couple of examples of this type of usage:
* <ul>
* <li>If the results of the consumption are being stored in a relational database, storing the offset in the database
* as well can allow committing both the results and offset in a single transaction. Thus either the transaction will
* succeed and the offset will be updated based on what was consumed or the result will not be stored and the offset
* won't be updated.
* <li>If the results are being stored in a local store it may be possible to store the offset there as well. For
* example a search index could be built by subscribing to a particular partition and storing both the offset and the
* indexed data together. If this is done in a way that is atomic, it is often possible to have it be the case that even
* if a crash occurs that causes unsync'd data to be lost, whatever is left has the corresponding offset stored as well.
* This means that in this case the indexing process that comes back having lost recent updates just resumes indexing
* from what it has ensuring that no updates are lost.
* </ul>
* <p>
* Each record comes with its own offset, so to manage your own offset you just need to do the following:
*
* <ul>
* <li>Configure <code>enable.auto.commit=false</code>
* <li>Use the offset provided with each {@link ConsumerRecord} to save your position.
* <li>On restart restore the position of the consumer using {@link #seek(TopicPartition, long)}.
* </ul>
*
* <p>
* This type of usage is simplest when the partition assignment is also done manually (this would be likely in the
* search index use case described above). If the partition assignment is done automatically special care is
* needed to handle the case where partition assignments change. This can be done by providing a
* {@link ConsumerRebalanceListener} instance in the call to {@link #subscribe(Collection, ConsumerRebalanceListener)}
* and {@link #subscribe(Pattern, ConsumerRebalanceListener)}.
* For example, when partitions are taken from a consumer the consumer will want to commit its offset for those partitions by
* implementing {@link ConsumerRebalanceListener#onPartitionsRevoked(Collection)}. When partitions are assigned to a
* consumer, the consumer will want to look up the offset for those new partitions and correctly initialize the consumer
* to that position by implementing {@link ConsumerRebalanceListener#onPartitionsAssigned(Collection)}.
* <p>
* Another common use for {@link ConsumerRebalanceListener} is to flush any caches the application maintains for
* partitions that are moved elsewhere.
*
* <h4>Controlling The Consumer's Position</h4>
*
* In most use cases the consumer will simply consume records from beginning to end, periodically committing its
* position (either automatically or manually). However Kafka allows the consumer to manually control its position,
* moving forward or backwards in a partition at will. This means a consumer can re-consume older records, or skip to
* the most recent records without actually consuming the intermediate records.
* <p>
* There are several instances where manually controlling the consumer's position can be useful.
* <p>
* One case is for time-sensitive record processing it may make sense for a consumer that falls far enough behind to not
* attempt to catch up processing all records, but rather just skip to the most recent records.
* <p>
* Another use case is for a system that maintains local state as described in the previous section. In such a system
* the consumer will want to initialize its position on start-up to whatever is contained in the local store. Likewise
* if the local state is destroyed (say because the disk is lost) the state may be recreated on a new machine by
* re-consuming all the data and recreating the state (assuming that Kafka is retaining sufficient history).
* <p>
* Kafka allows specifying the position using {@link #seek(TopicPartition, long)} to specify the new position. Special
* methods for seeking to the earliest and latest offset the server maintains are also available (
* {@link #seekToBeginning(Collection)} and {@link #seekToEnd(Collection)} respectively).
*
* <h4>Consumption Flow Control</h4>
*
* If a consumer is assigned multiple partitions to fetch data from, it will try to consume from all of them at the same time,
* effectively giving these partitions the same priority for consumption. However in some cases consumers may want to
* first focus on fetching from some subset of the assigned partitions at full speed, and only start fetching other partitions
* when these partitions have few or no data to consume.
*
* <p>
* One of such cases is stream processing, where processor fetches from two topics and performs the join on these two streams.
* When one of the topics is long lagging behind the other, the processor would like to pause fetching from the ahead topic
* in order to get the lagging stream to catch up. Another example is bootstraping upon consumer starting up where there are
* a lot of history data to catch up, the applications usually want to get the latest data on some of the topics before consider
* fetching other topics.
*
* <p>
* Kafka supports dynamic controlling of consumption flows by using {@link #pause(Collection)} and {@link #resume(Collection)}
* to pause the consumption on the specified assigned partitions and resume the consumption
* on the specified paused partitions respectively in the future {@link #poll(Duration)} calls.
*
* <h3>Reading Transactional Messages</h3>
*
* <p>
* Transactions were introduced in Kafka 0.11.0 wherein applications can write to multiple topics and partitions atomically.
* In order for this to work, consumers reading from these partitions should be configured to only read committed data.
* This can be achieved by setting the {@code isolation.level=read_committed} in the consumer's configuration.
*
* <p>
* In <code>read_committed</code> mode, the consumer will read only those transactional messages which have been
* successfully committed. It will continue to read non-transactional messages as before. There is no client-side
* buffering in <code>read_committed</code> mode. Instead, the end offset of a partition for a <code>read_committed</code>
* consumer would be the offset of the first message in the partition belonging to an open transaction. This offset
* is known as the 'Last Stable Offset'(LSO).</p>
*
* <p>
* A {@code read_committed} consumer will only read up to the LSO and filter out any transactional
* messages which have been aborted. The LSO also affects the behavior of {@link #seekToEnd(Collection)} and
* {@link #endOffsets(Collection)} for {@code read_committed} consumers, details of which are in each method's documentation.
* Finally, the fetch lag metrics are also adjusted to be relative to the LSO for {@code read_committed} consumers.
*
* <p>
* Partitions with transactional messages will include commit or abort markers which indicate the result of a transaction.
* There markers are not returned to applications, yet have an offset in the log. As a result, applications reading from
* topics with transactional messages will see gaps in the consumed offsets. These missing messages would be the transaction
* markers, and they are filtered out for consumers in both isolation levels. Additionally, applications using
* {@code read_committed} consumers may also see gaps due to aborted transactions, since those messages would not
* be returned by the consumer and yet would have valid offsets.
*
* <h3><a name="multithreaded">Multi-threaded Processing</a></h3>
*
* The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application
* making the call. It is the responsibility of the user to ensure that multi-threaded access
* is properly synchronized. Un-synchronized access will result in {@link ConcurrentModificationException}.
*
* <p>
* The only exception to this rule is {@link #wakeup()}, which can safely be used from an external thread to
* interrupt an active operation. In this case, a {@link org.apache.kafka.common.errors.WakeupException} will be
* thrown from the thread blocking on the operation. This can be used to shutdown the consumer from another thread.
* The following snippet shows the typical pattern:
*
* <pre>
* public class KafkaConsumerRunner implements Runnable {
* private final AtomicBoolean closed = new AtomicBoolean(false);
* private final KafkaConsumer consumer;
*
* public KafkaConsumerRunner(KafkaConsumer consumer) {
* this.consumer = consumer;
* }
*
* public void run() {
* try {
* consumer.subscribe(Arrays.asList("topic"));
* while (!closed.get()) {
* ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));
* // Handle new records
* }
* } catch (WakeupException e) {
* // Ignore exception if closing
* if (!closed.get()) throw e;
* } finally {
* consumer.close();
* }
* }
*
* // Shutdown hook which can be called from a separate thread
* public void shutdown() {
* closed.set(true);
* consumer.wakeup();
* }
* }
* </pre>
*
* Then in a separate thread, the consumer can be shutdown by setting the closed flag and waking up the consumer.
*
* <p>
* <pre>
* closed.set(true);
* consumer.wakeup();
* </pre>
*
* <p>
* Note that while it is possible to use thread interrupts instead of {@link #wakeup()} to abort a blocking operation
* (in which case, {@link InterruptException} will be raised), we discourage their use since they may cause a clean
* shutdown of the consumer to be aborted. Interrupts are mainly supported for those cases where using {@link #wakeup()}
* is impossible, e.g. when a consumer thread is managed by code that is unaware of the Kafka client.
*
* <p>
* We have intentionally avoided implementing a particular threading model for processing. This leaves several
* options for implementing multi-threaded processing of records.
*
* <h4>1. One Consumer Per Thread</h4>
*
* A simple option is to give each thread its own consumer instance. Here are the pros and cons of this approach:
* <ul>
* <li><b>PRO</b>: It is the easiest to implement
* <li><b>PRO</b>: It is often the fastest as no inter-thread co-ordination is needed
* <li><b>PRO</b>: It makes in-order processing on a per-partition basis very easy to implement (each thread just
* processes messages in the order it receives them).
* <li><b>CON</b>: More consumers means more TCP connections to the cluster (one per thread). In general Kafka handles
* connections very efficiently so this is generally a small cost.
* <li><b>CON</b>: Multiple consumers means more requests being sent to the server and slightly less batching of data
* which can cause some drop in I/O throughput.
* <li><b>CON</b>: The number of total threads across all processes will be limited by the total number of partitions.
* </ul>
*
* <h4>2. Decouple Consumption and Processing</h4>
*
* Another alternative is to have one or more consumer threads that do all data consumption and hands off
* {@link ConsumerRecords} instances to a blocking queue consumed by a pool of processor threads that actually handle
* the record processing.
*
* This option likewise has pros and cons:
* <ul>
* <li><b>PRO</b>: This option allows independently scaling the number of consumers and processors. This makes it
* possible to have a single consumer that feeds many processor threads, avoiding any limitation on partitions.
* <li><b>CON</b>: Guaranteeing order across the processors requires particular care as the threads will execute
* independently an earlier chunk of data may actually be processed after a later chunk of data just due to the luck of
* thread execution timing. For processing that has no ordering requirements this is not a problem.
* <li><b>CON</b>: Manually committing the position becomes harder as it requires that all threads co-ordinate to ensure
* that processing is complete for that partition.
* </ul>
*
* There are many possible variations on this approach. For example each processor thread can have its own queue, and
* the consumer threads can hash into these queues using the TopicPartition to ensure in-order consumption and simplify
* commit.
*/
public class KafkaConsumer<K, V> implements Consumer<K, V> {
private static final String CLIENT_ID_METRIC_TAG = "client-id";
private static final long NO_CURRENT_THREAD = -1L;
private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
private static final String JMX_PREFIX = "kafka.consumer";
static final long DEFAULT_CLOSE_TIMEOUT_MS = 30 * 1000;
// Visible for testing
final Metrics metrics;
private final Logger log;
private final String clientId;
private String groupId;
private final ConsumerCoordinator coordinator;
private final Deserializer<K> keyDeserializer;
private final Deserializer<V> valueDeserializer;
private final Fetcher<K, V> fetcher;
private final ConsumerInterceptors<K, V> interceptors;
private final Time time;
private final ConsumerNetworkClient client;
private final SubscriptionState subscriptions;
private final ConsumerMetadata metadata;
private final long retryBackoffMs;
private final long requestTimeoutMs;
private final int defaultApiTimeoutMs;
private volatile boolean closed = false;
private List<ConsumerPartitionAssignor> assignors;
// currentThread holds the threadId of the current thread accessing KafkaConsumer
// and is used to prevent multi-threaded access
private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
// refcount is used to allow reentrant access by the thread who has acquired currentThread
private final AtomicInteger refcount = new AtomicInteger(0);
// to keep from repeatedly scanning subscriptions in poll(), cache the result during metadata updates
private boolean cachedSubscriptionHashAllFetchPositions;
/**
* A consumer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
* are documented <a href="http://kafka.apache.org/documentation.html#consumerconfigs" >here</a>. Values can be
* either strings or objects of the appropriate type (for example a numeric configuration would accept either the
* string "42" or the integer 42).
* <p>
* Valid configuration strings are documented at {@link ConsumerConfig}.
* <p>
* Note: after creating a {@code KafkaConsumer} you must always {@link #close()} it to avoid resource leaks.
*
* @param configs The consumer configs
*/
public KafkaConsumer(Map<String, Object> configs) {
this(configs, null, null);
}
/**
* A consumer is instantiated by providing a set of key-value pairs as configuration, and a key and a value {@link Deserializer}.
* <p>
* Valid configuration strings are documented at {@link ConsumerConfig}.
* <p>
* Note: after creating a {@code KafkaConsumer} you must always {@link #close()} it to avoid resource leaks.
*
* @param configs The consumer configs
* @param keyDeserializer The deserializer for key that implements {@link Deserializer}. The configure() method
* won't be called in the consumer when the deserializer is passed in directly.
* @param valueDeserializer The deserializer for value that implements {@link Deserializer}. The configure() method
* won't be called in the consumer when the deserializer is passed in directly.
*/
public KafkaConsumer(Map<String, Object> configs,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(configs, keyDeserializer, valueDeserializer)),
keyDeserializer,
valueDeserializer);
}
/**
* A consumer is instantiated by providing a {@link java.util.Properties} object as configuration.
* <p>
* Valid configuration strings are documented at {@link ConsumerConfig}.
* <p>
* Note: after creating a {@code KafkaConsumer} you must always {@link #close()} it to avoid resource leaks.
*
* @param properties The consumer configuration properties
*/
public KafkaConsumer(Properties properties) {
this(properties, null, null);
}
/**
* A consumer is instantiated by providing a {@link java.util.Properties} object as configuration, and a
* key and a value {@link Deserializer}.
* <p>
* Valid configuration strings are documented at {@link ConsumerConfig}.
* <p>
* Note: after creating a {@code KafkaConsumer} you must always {@link #close()} it to avoid resource leaks.
*
* @param properties The consumer configuration properties
* @param keyDeserializer The deserializer for key that implements {@link Deserializer}. The configure() method
* won't be called in the consumer when the deserializer is passed in directly.
* @param valueDeserializer The deserializer for value that implements {@link Deserializer}. The configure() method
* won't be called in the consumer when the deserializer is passed in directly.
*/
public KafkaConsumer(Properties properties,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(properties, keyDeserializer, valueDeserializer)),
keyDeserializer, valueDeserializer);
}
@SuppressWarnings("unchecked")
private KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
try {
String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
if (clientId.isEmpty())
clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
this.clientId = clientId;
this.groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG);
GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(config,
GroupRebalanceConfig.ProtocolType.CONSUMER);
LogContext logContext;
// If group.instance.id is set, we will append it to the log context.
if (groupRebalanceConfig.groupInstanceId.isPresent()) {
logContext = new LogContext("[Consumer instanceId=" + groupRebalanceConfig.groupInstanceId.get() +
", clientId=" + clientId + ", groupId=" + groupId + "] ");
} else {
logContext = new LogContext("[Consumer clientId=" + clientId + ", groupId=" + groupId + "] ");
}
this.log = logContext.logger(getClass());
boolean enableAutoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
if (groupId == null) { // overwrite in case of default group id where the config is not explicitly provided
if (!config.originals().containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG))
enableAutoCommit = false;
else if (enableAutoCommit)
throw new InvalidConfigurationException(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " cannot be set to true when default group id (null) is used.");
} else if (groupId.isEmpty())
log.warn("Support for using the empty group id by consumers is deprecated and will be removed in the next major release.");
log.debug("Initializing the Kafka consumer");
this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
this.time = Time.SYSTEM;
this.metrics = buildMetrics(config, time, clientId);
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
// load interceptors and make sure they get clientId
Map<String, Object> userProvidedConfigs = config.originals();
userProvidedConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
List<ConsumerInterceptor<K, V>> interceptorList = (List) (new ConsumerConfig(userProvidedConfigs, false)).getConfiguredInstances(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
ConsumerInterceptor.class);
this.interceptors = new ConsumerInterceptors<>(interceptorList);
if (keyDeserializer == null) {
this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
this.keyDeserializer.configure(config.originals(), true);
} else {
config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
this.keyDeserializer = keyDeserializer;
}
if (valueDeserializer == null) {
this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
this.valueDeserializer.configure(config.originals(), false);
} else {
config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
this.valueDeserializer = valueDeserializer;
}
OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
this.subscriptions = new SubscriptionState(logContext, offsetResetStrategy);
ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keyDeserializer,
valueDeserializer, metrics.reporters(), interceptorList);
this.metadata = new ConsumerMetadata(retryBackoffMs,
config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
!config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG),
config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
subscriptions, logContext, clusterResourceListeners);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), config.getString(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG));
this.metadata.bootstrap(addresses, time.milliseconds());
String metricGrpPrefix = "consumer";
FetcherMetricsRegistry metricsRegistry = new FetcherMetricsRegistry(Collections.singleton(CLIENT_ID_METRIC_TAG), metricGrpPrefix);
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, time);
IsolationLevel isolationLevel = IsolationLevel.valueOf(
config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT));
Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, metricsRegistry);
int heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
ApiVersions apiVersions = new ApiVersions();
NetworkClient netClient = new NetworkClient(
new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder, logContext),
this.metadata,
clientId,
100, // a fixed large enough value will suffice for max in-flight requests
config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
ClientDnsLookup.forConfig(config.getString(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG)),
time,
true,
apiVersions,
throttleTimeSensor,
logContext);
this.client = new ConsumerNetworkClient(
logContext,
netClient,
metadata,
time,
retryBackoffMs,
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
heartbeatIntervalMs); //Will avoid blocking an extended period of time to prevent heartbeat thread starvation
this.assignors = config.getConfiguredInstances(
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
ConsumerPartitionAssignor.class);
// no coordinator will be constructed for the default (null) group id
this.coordinator = groupId == null ? null :
new ConsumerCoordinator(groupRebalanceConfig,
logContext,
this.client,
assignors,
this.metadata,
this.subscriptions,
metrics,
metricGrpPrefix,
this.time,
enableAutoCommit,
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
this.interceptors);
this.fetcher = new Fetcher<>(
logContext,
this.client,
config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),
config.getString(ConsumerConfig.CLIENT_RACK_CONFIG),
this.keyDeserializer,
this.valueDeserializer,
this.metadata,
this.subscriptions,
metrics,
metricsRegistry,
this.time,
this.retryBackoffMs,
this.requestTimeoutMs,
isolationLevel,
apiVersions);
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
log.debug("Kafka consumer initialized");
} catch (Throwable t) {
// call close methods if internal objects are already constructed; this is to prevent resource leak. see KAFKA-2121
close(0, true);
// now propagate the exception
throw new KafkaException("Failed to construct kafka consumer", t);
}
}
// visible for testing
KafkaConsumer(LogContext logContext,
String clientId,
ConsumerCoordinator coordinator,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
Fetcher<K, V> fetcher,
ConsumerInterceptors<K, V> interceptors,
Time time,
ConsumerNetworkClient client,
Metrics metrics,
SubscriptionState subscriptions,
ConsumerMetadata metadata,
long retryBackoffMs,
long requestTimeoutMs,
int defaultApiTimeoutMs,
List<ConsumerPartitionAssignor> assignors,
String groupId) {
this.log = logContext.logger(getClass());
this.clientId = clientId;
this.coordinator = coordinator;
this.keyDeserializer = keyDeserializer;
this.valueDeserializer = valueDeserializer;
this.fetcher = fetcher;
this.interceptors = Objects.requireNonNull(interceptors);
this.time = time;
this.client = client;
this.metrics = metrics;
this.subscriptions = subscriptions;
this.metadata = metadata;
this.retryBackoffMs = retryBackoffMs;
this.requestTimeoutMs = requestTimeoutMs;
this.defaultApiTimeoutMs = defaultApiTimeoutMs;
this.assignors = assignors;
this.groupId = groupId;
}
private static Metrics buildMetrics(ConsumerConfig config, Time time, String clientId) {
Map<String, String> metricsTags = Collections.singletonMap(CLIENT_ID_METRIC_TAG, clientId);
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
.recordLevel(Sensor.RecordingLevel.forName(config.getString(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
.tags(metricsTags);
List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class, Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
reporters.add(new JmxReporter(JMX_PREFIX));
return new Metrics(metricConfig, reporters, time);
}
/**
* Get the set of partitions currently assigned to this consumer. If subscription happened by directly assigning
* partitions using {@link #assign(Collection)} then this will simply return the same partitions that
* were assigned. If topic subscription was used, then this will give the set of topic partitions currently assigned
* to the consumer (which may be none if the assignment hasn't happened yet, or the partitions are in the
* process of getting reassigned).
* @return The set of partitions currently assigned to this consumer
*/
public Set<TopicPartition> assignment() {
acquireAndEnsureOpen();
try {
return Collections.unmodifiableSet(this.subscriptions.assignedPartitions());
} finally {
release();
}
}
/**
* Get the current subscription. Will return the same topics used in the most recent call to
* {@link #subscribe(Collection, ConsumerRebalanceListener)}, or an empty set if no such call has been made.
* @return The set of topics currently subscribed to
*/
public Set<String> subscription() {
acquireAndEnsureOpen();
try {
return Collections.unmodifiableSet(new HashSet<>(this.subscriptions.subscription()));
} finally {
release();
}
}
/**
* Subscribe to the given list of topics to get dynamically
* assigned partitions. <b>Topic subscriptions are not incremental. This list will replace the current
* assignment (if there is one).</b> Note that it is not possible to combine topic subscription with group management
* with manual partition assignment through {@link #assign(Collection)}.
*
* If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}.
*
* <p>
* As part of group management, the consumer will keep track of the list of consumers that belong to a particular
* group and will trigger a rebalance operation if any one of the following events are triggered:
* <ul>
* <li>Number of partitions change for any of the subscribed topics
* <li>A subscribed topic is created or deleted
* <li>An existing member of the consumer group is shutdown or fails
* <li>A new member is added to the consumer group
* </ul>
* <p>
* When any of these events are triggered, the provided listener will be invoked first to indicate that
* the consumer's assignment has been revoked, and then again when the new assignment has been received.
* Note that rebalances will only occur during an active call to {@link #poll(Duration)}, so callbacks will
* also only be invoked during that time.
*
* The provided listener will immediately override any listener set in a previous call to subscribe.
* It is guaranteed, however, that the partitions revoked/assigned through this interface are from topics
* subscribed in this call. See {@link ConsumerRebalanceListener} for more details.
*
* @param topics The list of topics to subscribe to
* @param listener Non-null listener instance to get notifications on partition assignment/revocation for the
* subscribed topics
* @throws IllegalArgumentException If topics is null or contains null or empty elements, or if listener is null
* @throws IllegalStateException If {@code subscribe()} is called previously with pattern, or assign is called
* previously (without a subsequent call to {@link #unsubscribe()}), or if not
* configured at-least one partition assignment strategy
*/
@Override
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
acquireAndEnsureOpen();
try {
maybeThrowInvalidGroupIdException();
if (topics == null)
throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
if (topics.isEmpty()) {
// treat subscribing to empty topic list as the same as unsubscribing
this.unsubscribe();
} else {
for (String topic : topics) {
if (topic == null || topic.trim().isEmpty())
throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
}
throwIfNoAssignorsConfigured();
fetcher.clearBufferedDataForUnassignedTopics(topics);
log.info("Subscribed to topic(s): {}", Utils.join(topics, ", "));
if (this.subscriptions.subscribe(new HashSet<>(topics), listener))
metadata.requestUpdateForNewTopics();
}
} finally {
release();
}
}
/**
* Subscribe to the given list of topics to get dynamically assigned partitions.
* <b>Topic subscriptions are not incremental. This list will replace the current
* assignment (if there is one).</b> It is not possible to combine topic subscription with group management
* with manual partition assignment through {@link #assign(Collection)}.
*
* If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}.
*
* <p>
* This is a short-hand for {@link #subscribe(Collection, ConsumerRebalanceListener)}, which
* uses a no-op listener. If you need the ability to seek to particular offsets, you should prefer
* {@link #subscribe(Collection, ConsumerRebalanceListener)}, since group rebalances will cause partition offsets
* to be reset. You should also provide your own listener if you are doing your own offset
* management since the listener gives you an opportunity to commit offsets before a rebalance finishes.
*
* @param topics The list of topics to subscribe to
* @throws IllegalArgumentException If topics is null or contains null or empty elements
* @throws IllegalStateException If {@code subscribe()} is called previously with pattern, or assign is called
* previously (without a subsequent call to {@link #unsubscribe()}), or if not
* configured at-least one partition assignment strategy
*/
@Override
public void subscribe(Collection<String> topics) {
subscribe(topics, new NoOpConsumerRebalanceListener());
}
/**
* Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
* The pattern matching will be done periodically against all topics existing at the time of check.
* This can be controlled through the {@code metadata.max.age.ms} configuration: by lowering
* the max metadata age, the consumer will refresh metadata more often and check for matching topics.
* <p>
* See {@link #subscribe(Collection, ConsumerRebalanceListener)} for details on the
* use of the {@link ConsumerRebalanceListener}. Generally rebalances are triggered when there
* is a change to the topics matching the provided pattern and when consumer group membership changes.
* Group rebalances only take place during an active call to {@link #poll(Duration)}.
*
* @param pattern Pattern to subscribe to
* @param listener Non-null listener instance to get notifications on partition assignment/revocation for the
* subscribed topics