Skip to content

Commit 3f5efa1

Browse files
Initial Commit
1 parent 4ebfb0f commit 3f5efa1

File tree

3 files changed

+54
-0
lines changed

3 files changed

+54
-0
lines changed
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import java.util.*;
2+
import java.io.*;
3+
import org.apache.kafka.clients.consumer.KafkaConsumer;
4+
import org.apache.kafka.clients.consumer.ConsumerRecords;
5+
import org.apache.kafka.clients.consumer.ConsumerRecord;
6+
7+
public class NewSupplierConsumer{
8+
9+
public static void main(String[] args) throws Exception{
10+
11+
String topicName = "SupplierTopic";
12+
String groupName = "SupplierTopicGroup";
13+
Properties props = new Properties();
14+
//props.put("bootstrap.servers", "localhost:9092,localhost:9093");
15+
//props.put("group.id", groupName);
16+
//props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
17+
//props.put("value.deserializer", "SupplierDeserializer");
18+
19+
InputStream input = null;
20+
KafkaConsumer<String, Supplier> consumer = null;
21+
22+
try {
23+
input = new FileInputStream("SupplierConsumer.properties");
24+
props.load(input);
25+
consumer = new KafkaConsumer<>(props);
26+
consumer.subscribe(Arrays.asList(topicName));
27+
28+
while (true){
29+
ConsumerRecords<String, Supplier> records = consumer.poll(100);
30+
for (ConsumerRecord<String, Supplier> record : records){
31+
System.out.println("Supplier id= " + String.valueOf(record.value().getID()) + " Supplier Name = " + record.value().getName() + " Supplier Start Date = " + record.value().getStartDate().toString());
32+
}
33+
}
34+
}catch(Exception ex){
35+
ex.printStackTrace();
36+
}finally{
37+
input.close();
38+
consumer.close();
39+
}
40+
}
41+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
bootstrap.servers=localhost:9092,localhost:9093
2+
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
3+
value.deserializer=SupplierDeserializer
4+
group.id=SupplierTopicGroup

ConsumerExample/build.sbt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
name := "KafkaTest"
2+
3+
libraryDependencies ++= Seq(
4+
"org.apache.kafka" % "kafka-clients" % "0.10.1.0"
5+
exclude("javax.jms", "jms")
6+
exclude("com.sun.jdmk", "jmxtools")
7+
exclude("com.sun.jmx", "jmxri")
8+
exclude("org.slf4j", "slf4j-simple")
9+
)

0 commit comments

Comments
 (0)