1+ import java .util .*;
2+ import org .apache .kafka .clients .consumer .*;
3+ import org .apache .kafka .common .*;
4+ import java .sql .*;
5+
6+ public class SensorConsumer {
7+
8+
9+ public static void main (String [] args ) throws Exception {
10+
11+ String topicName = "SensorTopic" ;
12+ KafkaConsumer <String , String > consumer = null ;
13+ int rCount ;
14+
15+ Properties props = new Properties ();
16+ props .put ("bootstrap.servers" , "localhost:9092,localhost:9093" );
17+ props .put ("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" );
18+ props .put ("value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" );
19+ props .put ("enable.auto.commit" , "false" );
20+
21+ consumer = new KafkaConsumer <>(props );
22+ TopicPartition p0 = new TopicPartition (topicName , 0 );
23+ TopicPartition p1 = new TopicPartition (topicName , 1 );
24+ TopicPartition p2 = new TopicPartition (topicName , 2 );
25+
26+ consumer .assign (Arrays .asList (p0 ,p1 ,p2 ));
27+ System .out .println ("Current position p0=" + consumer .position (p0 )
28+ + " p1=" + consumer .position (p1 )
29+ + " p2=" + consumer .position (p2 ));
30+
31+ consumer .seek (p0 , getOffsetFromDB (p0 ));
32+ consumer .seek (p1 , getOffsetFromDB (p1 ));
33+ consumer .seek (p2 , getOffsetFromDB (p2 ));
34+ System .out .println ("New positions po=" + consumer .position (p0 )
35+ + " p1=" + consumer .position (p1 )
36+ + " p2=" + consumer .position (p2 ));
37+
38+ System .out .println ("Start Fetching Now" );
39+ try {
40+ do {
41+ ConsumerRecords <String , String > records = consumer .poll (1000 );
42+ System .out .println ("Record polled " + records .count ());
43+ rCount = records .count ();
44+ for (ConsumerRecord <String , String > record : records ){
45+ saveAndCommit (consumer ,record );
46+ }
47+ }while (rCount >0 );
48+ }catch (Exception ex ){
49+ System .out .println ("Exception in main." );
50+ }
51+ finally {
52+ consumer .close ();
53+ }
54+ }
55+
56+ private static long getOffsetFromDB (TopicPartition p ){
57+ long offset = 0 ;
58+ try {
59+ Class .forName ("com.mysql.jdbc.Driver" );
60+ Connection con =DriverManager .getConnection ("jdbc:mysql://localhost:3306/test" ,"root" ,"pandey" );
61+
62+ String sql = "select offset from tss_offsets where topic_name='" + p .topic () + "' and partition=" + p .partition ();
63+ Statement stmt =con .createStatement ();
64+ ResultSet rs = stmt .executeQuery (sql );
65+ if (rs .next ())
66+ offset = rs .getInt ("offset" );
67+ stmt .close ();
68+ con .close ();
69+ }catch (Exception e ){
70+ System .out .println ("Exception in getOffsetFromDB" );
71+ }
72+ return offset ;
73+ }
74+
75+ private static void saveAndCommit (KafkaConsumer <String , String > c , ConsumerRecord <String , String > r ){
76+ System .out .println ("Topic=" + r .topic () + " Partition=" + r .partition () + " Offset=" + r .offset () + " Key=" + r .key () + " Value=" + r .value ());
77+ try {
78+ Class .forName ("com.mysql.jdbc.Driver" );
79+ Connection con =DriverManager .getConnection ("jdbc:mysql://localhost:3306/test" ,"root" ,"pandey" );
80+ con .setAutoCommit (false );
81+
82+ String insertSQL = "insert into tss_data values(?,?)" ;
83+ PreparedStatement psInsert = con .prepareStatement (insertSQL );
84+ psInsert .setString (1 ,r .key ());
85+ psInsert .setString (2 ,r .value ());
86+
87+ String updateSQL = "update tss_offsets set offset=? where topic_name=? and partition=?" ;
88+ PreparedStatement psUpdate = con .prepareStatement (updateSQL );
89+ psUpdate .setLong (1 ,r .offset ()+1 );
90+ psUpdate .setString (2 ,r .topic ());
91+ psUpdate .setInt (3 ,r .partition ());
92+
93+ psInsert .executeUpdate ();
94+ psUpdate .executeUpdate ();
95+ con .commit ();
96+ con .close ();
97+ }catch (Exception e ){
98+ System .out .println ("Exception in saveAndCommit" );
99+ }
100+
101+ }
102+ }
0 commit comments