File tree Expand file tree Collapse file tree 1 file changed +41
-0
lines changed
Expand file tree Collapse file tree 1 file changed +41
-0
lines changed Original file line number Diff line number Diff line change 1+ import java .util .*;
2+ import java .io .*;
3+ import org .apache .kafka .clients .consumer .KafkaConsumer ;
4+ import org .apache .kafka .clients .consumer .ConsumerRecords ;
5+ import org .apache .kafka .clients .consumer .ConsumerRecord ;
6+
7+ public class ManualConsumer {
8+
9+ public static void main (String [] args ) throws Exception {
10+
11+ String topicName = "SupplierTopic" ;
12+ String groupName = "SupplierTopicGroup" ;
13+
14+ Properties props = new Properties ();
15+ props .put ("bootstrap.servers" , "localhost:9092,localhost:9093" );
16+ props .put ("group.id" , groupName );
17+ props .put ("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" );
18+ props .put ("value.deserializer" , "SupplierDeserializer" );
19+ props .put ("enable.auto.commit" , "false" );
20+
21+ KafkaConsumer <String , Supplier > consumer = null ;
22+
23+ try {
24+ consumer = new KafkaConsumer <>(props );
25+ consumer .subscribe (Arrays .asList (topicName ));
26+
27+ while (true ){
28+ ConsumerRecords <String , Supplier > records = consumer .poll (100 );
29+ for (ConsumerRecord <String , Supplier > record : records ){
30+ System .out .println ("Supplier id= " + String .valueOf (record .value ().getID ()) + " Supplier Name = " + record .value ().getName () + " Supplier Start Date = " + record .value ().getStartDate ().toString ());
31+ }
32+ consumer .commitAsync ();
33+ }
34+ }catch (Exception ex ){
35+ ex .printStackTrace ();
36+ }finally {
37+ consumer .commitSync ();
38+ consumer .close ();
39+ }
40+ }
41+ }
You can’t perform that action at this time.
0 commit comments