-
Notifications
You must be signed in to change notification settings - Fork 13
/
KafkaConsumerUtil.java
106 lines (86 loc) · 3.9 KB
/
KafkaConsumerUtil.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package com.bkatwal.kafka.util;
import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
import com.bkatwal.kafka.impl.CustomAckMessageListener;
import com.bkatwal.kafka.impl.CustomMessageListener;
import java.util.HashMap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;
/** @author "Bikas Katwal" 13/03/19 */
@Slf4j
public final class KafkaConsumerUtil {
private static Map<String, ConcurrentMessageListenerContainer<String, String>> consumersMap =
new HashMap<>();
/**
* 1. This method first checks if consumers are already created for a given topic name 2. If the
* consumers already exists in Map, it will just start the container and return 3. Else create a
* new consumer and add to the Map
*
* @param topic topic name for which consumers is needed
* @param messageListener pass implementation of MessageListener or AcknowledgingMessageListener
* based on enable.auto.commit
* @param concurrency number of consumers you need
* @param consumerProperties all the necessary consumer properties need to be passed in this
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public static void startOrCreateConsumers(
final String topic,
final Object messageListener,
final int concurrency,
final Map<String, Object> consumerProperties) {
log.info("creating kafka consumer for topic {}", topic);
ConcurrentMessageListenerContainer<String, String> container = consumersMap.get(topic);
if (container != null) {
if (!container.isRunning()) {
log.info("Consumer already created for topic {}, starting consumer!!", topic);
container.start();
log.info("Consumer for topic {} started!!!!", topic);
}
return;
}
ContainerProperties containerProps = new ContainerProperties(topic);
containerProps.setPollTimeout(100);
Boolean enableAutoCommit = (Boolean) consumerProperties.get(ENABLE_AUTO_COMMIT_CONFIG);
if (!enableAutoCommit) {
containerProps.setAckMode(AckMode.MANUAL_IMMEDIATE);
}
ConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<>(consumerProperties);
container = new ConcurrentMessageListenerContainer<>(factory, containerProps);
if (enableAutoCommit && !(messageListener instanceof CustomMessageListener)) {
throw new IllegalArgumentException(
"Expected message listener of type com.bkatwal.kafka.impl.CustomMessageListener!");
}
if (!enableAutoCommit && !(messageListener instanceof CustomAckMessageListener)) {
throw new IllegalArgumentException(
"Expected message listener of type com.bkatwal.kafka.impl.CustomAckMessageListener!");
}
container.setupMessageListener(messageListener);
if (concurrency == 0) {
container.setConcurrency(1);
} else {
container.setConcurrency(concurrency);
}
container.start();
consumersMap.put(topic, container);
log.info("created and started kafka consumer for topic {}", topic);
}
/**
* Get the ListenerContainer from Map based on topic name and call stop on it, to stop all
* consumers for given topic
*
* @param topic topic name to stop corresponding consumers
*/
public static void stopConsumer(final String topic) {
log.info("stopping consumer for topic {}", topic);
ConcurrentMessageListenerContainer<String, String> container = consumersMap.get(topic);
container.stop();
log.info("consumer stopped!!");
}
private KafkaConsumerUtil() {
throw new UnsupportedOperationException("Can not instantiate KafkaConsumerUtil");
}
}