/
KafkaTransport.java
396 lines (348 loc) · 16.6 KB
/
KafkaTransport.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog2.inputs.transports;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.InstrumentedExecutorService;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.MetricSet;
import com.google.common.collect.ImmutableMap;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.ConsumerTimeoutException;
import kafka.consumer.KafkaStream;
import kafka.consumer.TopicFilter;
import kafka.consumer.Whitelist;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.graylog2.plugin.LocalMetricRegistry;
import org.graylog2.plugin.ServerStatus;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.configuration.ConfigurationRequest;
import org.graylog2.plugin.configuration.fields.ConfigurationField;
import org.graylog2.plugin.configuration.fields.DropdownField;
import org.graylog2.plugin.configuration.fields.NumberField;
import org.graylog2.plugin.configuration.fields.TextField;
import org.graylog2.plugin.inputs.MessageInput;
import org.graylog2.plugin.inputs.MisfireException;
import org.graylog2.plugin.inputs.annotations.ConfigClass;
import org.graylog2.plugin.inputs.annotations.FactoryClass;
import org.graylog2.plugin.inputs.codecs.CodecAggregator;
import org.graylog2.plugin.inputs.transports.ThrottleableTransport;
import org.graylog2.plugin.inputs.transports.Transport;
import org.graylog2.plugin.journal.RawMessage;
import org.graylog2.plugin.lifecycles.Lifecycle;
import org.graylog2.plugin.system.NodeId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Named;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static com.codahale.metrics.MetricRegistry.name;
public class KafkaTransport extends ThrottleableTransport {
public static final String CK_FETCH_MIN_BYTES = "fetch_min_bytes";
public static final String CK_FETCH_WAIT_MAX = "fetch_wait_max";
public static final String CK_ZOOKEEPER = "zookeeper";
public static final String CK_TOPIC_FILTER = "topic_filter";
public static final String CK_THREADS = "threads";
public static final String CK_OFFSET_RESET = "offset_reset";
public static final String CK_GROUP_ID = "group_id";
// See https://kafka.apache.org/090/documentation.html for available values for "auto.offset.reset".
private static final ImmutableMap<String, String> OFFSET_RESET_VALUES = ImmutableMap.of(
"largest", "Automatically reset the offset to the largest offset",
"smallest", "Automatically reset the offset to the smallest offset"
);
private static final String DEFAULT_OFFSET_RESET = "largest";
private static final String DEFAULT_GROUP_ID = "graylog2";
private static final Logger LOG = LoggerFactory.getLogger(KafkaTransport.class);
private final Configuration configuration;
private final MetricRegistry localRegistry;
private final NodeId nodeId;
private final EventBus serverEventBus;
private final ServerStatus serverStatus;
private final ScheduledExecutorService scheduler;
private final MetricRegistry metricRegistry;
private final AtomicLong totalBytesRead = new AtomicLong(0);
private final AtomicLong lastSecBytesRead = new AtomicLong(0);
private final AtomicLong lastSecBytesReadTmp = new AtomicLong(0);
private volatile boolean stopped = false;
private volatile boolean paused = true;
private volatile CountDownLatch pausedLatch = new CountDownLatch(1);
private CountDownLatch stopLatch;
private ConsumerConnector cc;
@AssistedInject
public KafkaTransport(@Assisted Configuration configuration,
LocalMetricRegistry localRegistry,
NodeId nodeId,
EventBus serverEventBus,
ServerStatus serverStatus,
@Named("daemonScheduler") ScheduledExecutorService scheduler) {
super(serverEventBus, configuration);
this.configuration = configuration;
this.localRegistry = localRegistry;
this.nodeId = nodeId;
this.serverEventBus = serverEventBus;
this.serverStatus = serverStatus;
this.scheduler = scheduler;
this.metricRegistry = localRegistry;
localRegistry.register("read_bytes_1sec", new Gauge<Long>() {
@Override
public Long getValue() {
return lastSecBytesRead.get();
}
});
localRegistry.register("written_bytes_1sec", new Gauge<Long>() {
@Override
public Long getValue() {
return 0L;
}
});
localRegistry.register("read_bytes_total", new Gauge<Long>() {
@Override
public Long getValue() {
return totalBytesRead.get();
}
});
localRegistry.register("written_bytes_total", new Gauge<Long>() {
@Override
public Long getValue() {
return 0L;
}
});
}
@Subscribe
public void lifecycleStateChange(Lifecycle lifecycle) {
LOG.debug("Lifecycle changed to {}", lifecycle);
switch (lifecycle) {
case PAUSED:
case FAILED:
case HALTING:
pausedLatch = new CountDownLatch(1);
paused = true;
break;
default:
paused = false;
pausedLatch.countDown();
break;
}
}
@Override
public void setMessageAggregator(CodecAggregator ignored) {
}
@Override
public void doLaunch(final MessageInput input) throws MisfireException {
serverStatus.awaitRunning(new Runnable() {
@Override
public void run() {
lifecycleStateChange(Lifecycle.RUNNING);
}
});
// listen for lifecycle changes
serverEventBus.register(this);
final Properties props = new Properties();
props.put("group.id", configuration.getString(CK_GROUP_ID, DEFAULT_GROUP_ID));
props.put("client.id", "gl2-" + nodeId + "-" + input.getId());
props.put("fetch.min.bytes", String.valueOf(configuration.getInt(CK_FETCH_MIN_BYTES)));
props.put("fetch.wait.max.ms", String.valueOf(configuration.getInt(CK_FETCH_WAIT_MAX)));
props.put("zookeeper.connect", configuration.getString(CK_ZOOKEEPER));
props.put("auto.offset.reset", configuration.getString(CK_OFFSET_RESET, DEFAULT_OFFSET_RESET));
// Default auto commit interval is 60 seconds. Reduce to 1 second to minimize message duplication
// if something breaks.
props.put("auto.commit.interval.ms", "1000");
// Set a consumer timeout to avoid blocking on the consumer iterator.
props.put("consumer.timeout.ms", "1000");
final int numThreads = configuration.getInt(CK_THREADS);
final ConsumerConfig consumerConfig = new ConsumerConfig(props);
cc = Consumer.createJavaConsumerConnector(consumerConfig);
final TopicFilter filter = new Whitelist(configuration.getString(CK_TOPIC_FILTER));
final List<KafkaStream<byte[], byte[]>> streams = cc.createMessageStreamsByFilter(filter, numThreads);
final ExecutorService executor = executorService(numThreads);
// this is being used during shutdown to first stop all submitted jobs before committing the offsets back to zookeeper
// and then shutting down the connection.
// this is to avoid yanking away the connection from the consumer runnables
stopLatch = new CountDownLatch(streams.size());
for (final KafkaStream<byte[], byte[]> stream : streams) {
executor.submit(new Runnable() {
@Override
public void run() {
final ConsumerIterator<byte[], byte[]> consumerIterator = stream.iterator();
boolean retry;
do {
retry = false;
try {
// we have to use hasNext() here instead foreach, because next() marks the message as processed immediately
// noinspection WhileLoopReplaceableByForEach
while (consumerIterator.hasNext()) {
if (paused) {
// we try not to spin here, so we wait until the lifecycle goes back to running.
LOG.debug(
"Message processing is paused, blocking until message processing is turned back on.");
Uninterruptibles.awaitUninterruptibly(pausedLatch);
}
// check for being stopped before actually getting the message, otherwise we could end up losing that message
if (stopped) {
break;
}
if (isThrottled()) {
blockUntilUnthrottled();
}
// process the message, this will immediately mark the message as having been processed. this gets tricky
// if we get an exception about processing it down below.
final MessageAndMetadata<byte[], byte[]> message = consumerIterator.next();
final byte[] bytes = message.message();
// it is possible that the message is null
if (bytes == null) {
continue;
}
totalBytesRead.addAndGet(bytes.length);
lastSecBytesReadTmp.addAndGet(bytes.length);
final RawMessage rawMessage = new RawMessage(bytes);
// TODO implement throttling
input.processRawMessage(rawMessage);
}
} catch (ConsumerTimeoutException e) {
// Happens when there is nothing to consume, retry to check again.
retry = true;
} catch (Exception e) {
LOG.error("Kafka consumer error, stopping consumer thread.", e);
}
} while (retry && !stopped);
// explicitly commit our offsets when stopping.
// this might trigger a couple of times, but it won't hurt
cc.commitOffsets();
stopLatch.countDown();
}
});
}
scheduler.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
lastSecBytesRead.set(lastSecBytesReadTmp.getAndSet(0));
}
}, 1, 1, TimeUnit.SECONDS);
}
private ExecutorService executorService(int numThreads) {
final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("kafka-transport-%d").build();
return new InstrumentedExecutorService(
Executors.newFixedThreadPool(numThreads, threadFactory),
metricRegistry,
name(this.getClass(), "executor-service"));
}
@Override
public void doStop() {
stopped = true;
serverEventBus.unregister(this);
if (stopLatch != null) {
try {
// unpause the processors if they are blocked. this will cause them to see that we are stopping, even if they were paused.
if (pausedLatch != null && pausedLatch.getCount() > 0) {
pausedLatch.countDown();
}
final boolean allStoppedOrderly = stopLatch.await(5, TimeUnit.SECONDS);
stopLatch = null;
if (!allStoppedOrderly) {
// timed out
LOG.info(
"Stopping Kafka input timed out (waited 5 seconds for consumer threads to stop). Forcefully closing connection now. " +
"This is usually harmless when stopping the input.");
}
} catch (InterruptedException e) {
LOG.debug("Interrupted while waiting to stop input.");
}
}
if (cc != null) {
cc.shutdown();
cc = null;
}
}
@Override
public MetricSet getMetricSet() {
return localRegistry;
}
@FactoryClass
public interface Factory extends Transport.Factory<KafkaTransport> {
@Override
KafkaTransport create(Configuration configuration);
@Override
Config getConfig();
}
@ConfigClass
public static class Config extends ThrottleableTransport.Config {
@Override
public ConfigurationRequest getRequestedConfiguration() {
final ConfigurationRequest cr = super.getRequestedConfiguration();
cr.addField(new TextField(
CK_ZOOKEEPER,
"ZooKeeper address",
"127.0.0.1:2181",
"Host and port of the ZooKeeper that is managing your Kafka cluster.",
ConfigurationField.Optional.NOT_OPTIONAL));
cr.addField(new TextField(
CK_TOPIC_FILTER,
"Topic filter regex",
"^your-topic$",
"Every topic that matches this regular expression will be consumed.",
ConfigurationField.Optional.NOT_OPTIONAL));
cr.addField(new NumberField(
CK_FETCH_MIN_BYTES,
"Fetch minimum bytes",
5,
"Wait for a message batch to reach at least this size or the configured maximum wait time before fetching.",
ConfigurationField.Optional.NOT_OPTIONAL));
cr.addField(new NumberField(
CK_FETCH_WAIT_MAX,
"Fetch maximum wait time (ms)",
100,
"Wait for this time or the configured minimum size of a message batch before fetching.",
ConfigurationField.Optional.NOT_OPTIONAL));
cr.addField(new NumberField(
CK_THREADS,
"Processor threads",
2,
"Number of processor threads to spawn. Use one thread per Kafka topic partition.",
ConfigurationField.Optional.NOT_OPTIONAL));
cr.addField(new DropdownField(
CK_OFFSET_RESET,
"Auto offset reset",
DEFAULT_OFFSET_RESET,
OFFSET_RESET_VALUES,
"What to do when there is no initial offset in ZooKeeper or if an offset is out of range",
ConfigurationField.Optional.OPTIONAL));
cr.addField(new TextField(
CK_GROUP_ID,
"Consumer group id",
DEFAULT_GROUP_ID,
"Name of the consumer group the Kafka input belongs to",
ConfigurationField.Optional.OPTIONAL));
return cr;
}
}
}