Skip to content

Commit 7e7e01c

Browse files
Initial Commit
1 parent c04e2d1 commit 7e7e01c

File tree

3 files changed

+123
-0
lines changed

3 files changed

+123
-0
lines changed
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import java.util.*;
2+
import org.apache.kafka.clients.consumer.*;
3+
import org.apache.kafka.common.*;
4+
5+
public class RandomConsumer{
6+
7+
8+
public static void main(String[] args) throws Exception{
9+
10+
String topicName = "RandomProducerTopic";
11+
KafkaConsumer<String, String> consumer = null;
12+
13+
String groupName = "RG";
14+
Properties props = new Properties();
15+
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
16+
props.put("group.id", groupName);
17+
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
18+
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
19+
props.put("enable.auto.commit", "false");
20+
21+
consumer = new KafkaConsumer<>(props);
22+
RebalanceListner rebalanceListner = new RebalanceListner(consumer);
23+
24+
consumer.subscribe(Arrays.asList(topicName),rebalanceListner);
25+
try{
26+
while (true){
27+
ConsumerRecords<String, String> records = consumer.poll(100);
28+
for (ConsumerRecord<String, String> record : records){
29+
//System.out.println("Topic:"+ record.topic() +" Partition:" + record.partition() + " Offset:" + record.offset() + " Value:"+ record.value());
30+
// Do some processing and save it to Database
31+
rebalanceListner.addOffset(record.topic(), record.partition(),record.offset());
32+
}
33+
//consumer.commitSync(rebalanceListner.getCurrentOffsets());
34+
}
35+
}catch(Exception ex){
36+
System.out.println("Exception.");
37+
ex.printStackTrace();
38+
}
39+
finally{
40+
consumer.close();
41+
}
42+
}
43+
44+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import java.util.*;
2+
import org.apache.kafka.clients.producer.*;
3+
public class RandomProducer {
4+
5+
public static void main(String[] args) throws InterruptedException{
6+
7+
String topicName = "RandomProducerTopic";
8+
String msg;
9+
10+
Properties props = new Properties();
11+
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
12+
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
13+
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
14+
15+
Producer<String, String> producer = new KafkaProducer <>(props);
16+
Random rg = new Random();
17+
Calendar dt = Calendar.getInstance();
18+
dt.set(2016,1,1);
19+
try{
20+
while(true){
21+
for (int i=0;i<100;i++){
22+
msg = dt.get(Calendar.YEAR)+"-"+dt.get(Calendar.MONTH)+"-"+dt.get(Calendar.DATE) + "," + rg.nextInt(1000);
23+
producer.send(new ProducerRecord<String, String>(topicName,0,"Key",msg)).get();
24+
msg = dt.get(Calendar.YEAR)+"-"+dt.get(Calendar.MONTH)+"-"+dt.get(Calendar.DATE) + "," + rg.nextInt(1000);
25+
producer.send(new ProducerRecord<String, String>(topicName,1,"Key",msg)).get();
26+
}
27+
dt.add(Calendar.DATE,1);
28+
System.out.println("Data Sent for " + dt.get(Calendar.YEAR) + "-" + dt.get(Calendar.MONTH) + "-" + dt.get(Calendar.DATE) );
29+
}
30+
}
31+
catch(Exception ex){
32+
System.out.println("Intrupted");
33+
}
34+
finally{
35+
producer.close();
36+
}
37+
38+
}
39+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import java.util.*;
2+
import org.apache.kafka.clients.consumer.*;
3+
import org.apache.kafka.common.*;
4+
5+
public class RebalanceListner implements ConsumerRebalanceListener {
6+
private KafkaConsumer consumer;
7+
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap();
8+
9+
public RebalanceListner(KafkaConsumer con){
10+
this.consumer=con;
11+
}
12+
13+
public void addOffset(String topic, int partition, long offset){
14+
currentOffsets.put(new TopicPartition(topic, partition),new OffsetAndMetadata(offset,"Commit"));
15+
}
16+
17+
public Map<TopicPartition, OffsetAndMetadata> getCurrentOffsets(){
18+
return currentOffsets;
19+
}
20+
21+
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
22+
System.out.println("Following Partitions Assigned ....");
23+
for(TopicPartition partition: partitions)
24+
System.out.println(partition.partition()+",");
25+
}
26+
27+
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
28+
System.out.println("Following Partitions Revoked ....");
29+
for(TopicPartition partition: partitions)
30+
System.out.println(partition.partition()+",");
31+
32+
33+
System.out.println("Following Partitions commited ...." );
34+
for(TopicPartition tp: currentOffsets.keySet())
35+
System.out.println(tp.partition());
36+
37+
consumer.commitSync(currentOffsets);
38+
currentOffsets.clear();
39+
}
40+
}

0 commit comments

Comments
 (0)