Skip to content

Commit

Permalink
Route messages to different index sets (#2976)
Browse files Browse the repository at this point in the history
To be able to write a message to different indices/index-sets we have to
map streams to index sets and handle that when writing to Elasticsearch.

- Track matching index sets in the `Message` object to we don't have to
  iterate over all streams when doing the mapping. This assumes that
  usually there will be fewer index-sets than streams for a message.

- Add `Stream#getIndexSets()` and add the legacy default index-set to
  each stream. This needs to be changed to load the index-sets for a
  stream from the database.

- Adjust the `BlockingBatchedESOutput` to write one message into all
  index-sets for that message. Depending on the number of index-sets for
  a message, we are actually writing multiple messages for each message.
  This can slow down indexing because more work has to be done.
  The batching behaviour doesn't change because we are still flushing
  after the configured batch size.

- The `Messages#bulkIndex()` method now takes a small `IndexAndMessage`
  object to decide into which index a message should be indexed.

The reasoning behind putting the index-sets into the message instead of
looking them up later is that we just have to iterate over index sets
instead of iterating over streams and looking up index-sets from them.

Since we are rebuilding the stream router in a background thread when
stream (and soon index-set) configuration changes, the expensive work is
done outside of the hot code path.

There are probably several improvements that can be implemented to make
this more efficient. This is a first naive version that makes it work
with minimal changes.
  • Loading branch information
bernd authored and joschi committed Nov 9, 2016
1 parent 1720c48 commit 292ec02
Show file tree
Hide file tree
Showing 13 changed files with 146 additions and 47 deletions.
Expand Up @@ -26,7 +26,6 @@
import org.graylog2.dashboards.widgets.InvalidWidgetConfigurationException; import org.graylog2.dashboards.widgets.InvalidWidgetConfigurationException;
import org.graylog2.database.NotFoundException; import org.graylog2.database.NotFoundException;
import org.graylog2.grok.GrokPatternService; import org.graylog2.grok.GrokPatternService;
import org.graylog2.indexer.searches.Searches;
import org.graylog2.inputs.InputService; import org.graylog2.inputs.InputService;
import org.graylog2.inputs.converters.ConverterFactory; import org.graylog2.inputs.converters.ConverterFactory;
import org.graylog2.inputs.extractors.ExtractorFactory; import org.graylog2.inputs.extractors.ExtractorFactory;
Expand Down Expand Up @@ -466,6 +465,7 @@ private org.graylog2.plugin.streams.Stream createStream(final String bundleId, f
new ObjectId(org.graylog2.plugin.streams.Stream.DEFAULT_STREAM_ID), new ObjectId(org.graylog2.plugin.streams.Stream.DEFAULT_STREAM_ID),
streamData.build(), streamData.build(),
Collections.emptyList(), Collections.emptyList(),
Collections.emptySet(),
Collections.emptySet()); Collections.emptySet());
} else { } else {
stream = streamService.create(streamData.build()); stream = streamService.create(streamData.build());
Expand Down
Expand Up @@ -42,6 +42,7 @@
import org.graylog2.indexer.IndexFailure; import org.graylog2.indexer.IndexFailure;
import org.graylog2.indexer.IndexFailureImpl; import org.graylog2.indexer.IndexFailureImpl;
import org.graylog2.indexer.IndexMapping; import org.graylog2.indexer.IndexMapping;
import org.graylog2.indexer.IndexSet;
import org.graylog2.indexer.results.ResultMessage; import org.graylog2.indexer.results.ResultMessage;
import org.graylog2.plugin.Message; import org.graylog2.plugin.Message;
import org.slf4j.Logger; import org.slf4j.Logger;
Expand Down Expand Up @@ -114,26 +115,22 @@ public List<String> analyze(String string, String index) {
return terms; return terms;
} }


public boolean bulkIndex(final List<Message> messages) { public boolean bulkIndex(final List<Map.Entry<IndexSet, Message>> messageList) {
return bulkIndex(deflectorName, messages); if (messageList.isEmpty()) {
}

public boolean bulkIndex(final String indexName, final List<Message> messages) {
if (messages.isEmpty()) {
return true; return true;
} }


final BulkRequestBuilder requestBuilder = c.prepareBulk().setConsistencyLevel(WriteConsistencyLevel.ONE); final BulkRequestBuilder requestBuilder = c.prepareBulk().setConsistencyLevel(WriteConsistencyLevel.ONE);
for (Message msg : messages) { for (Map.Entry<IndexSet, Message> entry : messageList) {
requestBuilder.add(buildIndexRequest(indexName, msg.toElasticSearchObject(invalidTimestampMeter), msg.getId())); requestBuilder.add(buildIndexRequest(entry.getKey().getWriteIndexAlias(), entry.getValue().toElasticSearchObject(invalidTimestampMeter), entry.getValue().getId()));
} }


final BulkResponse response = runBulkRequest(requestBuilder.request()); final BulkResponse response = runBulkRequest(requestBuilder.request());


LOG.debug("Index {}: Bulk indexed {} messages, took {} ms, failures: {}", indexName, LOG.debug("Index: Bulk indexed {} messages, took {} ms, failures: {}",
response.getItems().length, response.getTookInMillis(), response.hasFailures()); response.getItems().length, response.getTookInMillis(), response.hasFailures());
if (response.hasFailures()) { if (response.hasFailures()) {
propagateFailure(response.getItems(), messages, response.buildFailureMessage()); propagateFailure(response.getItems(), messageList, response.buildFailureMessage());
} }


return !response.hasFailures(); return !response.hasFailures();
Expand All @@ -153,21 +150,21 @@ private BulkResponse runBulkRequest(final BulkRequest request) {
} }
} }


private void propagateFailure(BulkItemResponse[] items, List<Message> messages, String errorMessage) { private void propagateFailure(BulkItemResponse[] items, List<Map.Entry<IndexSet, Message>> messageList, String errorMessage) {
final List<IndexFailure> indexFailures = new LinkedList<>(); final List<IndexFailure> indexFailures = new LinkedList<>();
for (BulkItemResponse item : items) { for (BulkItemResponse item : items) {
if (item.isFailed()) { if (item.isFailed()) {
LOG.trace("Failed to index message: {}", item.getFailureMessage()); LOG.trace("Failed to index message: {}", item.getFailureMessage());


// Write failure to index_failures. // Write failure to index_failures.
final BulkItemResponse.Failure f = item.getFailure(); final BulkItemResponse.Failure f = item.getFailure();
final Message message = messages.get(item.getItemId()); final Map.Entry<IndexSet, Message> messageEntry = messageList.get(item.getItemId());
final Map<String, Object> doc = ImmutableMap.<String, Object>builder() final Map<String, Object> doc = ImmutableMap.<String, Object>builder()
.put("letter_id", item.getId()) .put("letter_id", item.getId())
.put("index", f.getIndex()) .put("index", f.getIndex())
.put("type", f.getType()) .put("type", f.getType())
.put("message", f.getMessage()) .put("message", f.getMessage())
.put("timestamp", message.getTimestamp()) .put("timestamp", messageEntry.getValue().getTimestamp())
.build(); .build();


indexFailures.add(new IndexFailureImpl(doc)); indexFailures.add(new IndexFailureImpl(doc));
Expand Down
Expand Up @@ -21,8 +21,10 @@
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer; import com.codahale.metrics.Timer;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.assistedinject.Assisted; import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject; import com.google.inject.assistedinject.AssistedInject;
import org.graylog2.indexer.IndexSet;
import org.graylog2.indexer.cluster.Cluster; import org.graylog2.indexer.cluster.Cluster;
import org.graylog2.indexer.messages.Messages; import org.graylog2.indexer.messages.Messages;
import org.graylog2.plugin.Message; import org.graylog2.plugin.Message;
Expand All @@ -34,6 +36,7 @@


import javax.inject.Inject; import javax.inject.Inject;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -51,7 +54,7 @@ public class BlockingBatchedESOutput extends ElasticSearchOutput {
private final Meter bufferFlushes; private final Meter bufferFlushes;
private final Meter bufferFlushesRequested; private final Meter bufferFlushesRequested;


private volatile List<Message> buffer; private volatile List<Map.Entry<IndexSet, Message>> buffer;


private static final AtomicInteger activeFlushThreads = new AtomicInteger(0); private static final AtomicInteger activeFlushThreads = new AtomicInteger(0);
private final AtomicLong lastFlushTime = new AtomicLong(); private final AtomicLong lastFlushTime = new AtomicLong();
Expand Down Expand Up @@ -89,9 +92,15 @@ public BlockingBatchedESOutput(MetricRegistry metricRegistry,


@Override @Override
public void write(Message message) throws Exception { public void write(Message message) throws Exception {
List<Message> flushBatch = null; for (IndexSet indexSet : message.getIndexSets()) {
writeMessageEntry(Maps.immutableEntry(indexSet, message));
}
}

public void writeMessageEntry(Map.Entry<IndexSet, Message> entry) throws Exception {
List<Map.Entry<IndexSet, Message>> flushBatch = null;
synchronized (this) { synchronized (this) {
buffer.add(message); buffer.add(entry);


if (buffer.size() >= maxBufferSize) { if (buffer.size() >= maxBufferSize) {
flushBatch = buffer; flushBatch = buffer;
Expand All @@ -106,7 +115,7 @@ public void write(Message message) throws Exception {
} }
} }


private void flush(List<Message> messages) { private void flush(List<Map.Entry<IndexSet, Message>> messages) {
if (!cluster.isConnected() || !cluster.isDeflectorHealthy()) { if (!cluster.isConnected() || !cluster.isDeflectorHealthy()) {
try { try {
cluster.waitForConnectedAndDeflectorHealthy(); cluster.waitForConnectedAndDeflectorHealthy();
Expand All @@ -125,7 +134,7 @@ private void flush(List<Message> messages) {


try (Timer.Context ignored = processTime.time()) { try (Timer.Context ignored = processTime.time()) {
lastFlushTime.set(System.nanoTime()); lastFlushTime.set(System.nanoTime());
write(messages); writeMessageEntries(messages);
batchSize.update(messages.size()); batchSize.update(messages.size());
bufferFlushes.mark(); bufferFlushes.mark();
} catch (Exception e) { } catch (Exception e) {
Expand All @@ -148,7 +157,7 @@ public void forceFlushIfTimedout() {
return; return;
} }
// flip buffer quickly and initiate flush // flip buffer quickly and initiate flush
final List<Message> flushBatch; final List<Map.Entry<IndexSet, Message>> flushBatch;
synchronized (this) { synchronized (this) {
flushBatch = buffer; flushBatch = buffer;
buffer = Lists.newArrayListWithCapacity(maxBufferSize); buffer = Lists.newArrayListWithCapacity(maxBufferSize);
Expand Down
Expand Up @@ -20,10 +20,10 @@
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer; import com.codahale.metrics.Timer;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import com.google.inject.assistedinject.Assisted; import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject; import com.google.inject.assistedinject.AssistedInject;
import org.graylog2.indexer.IndexSet;
import org.graylog2.indexer.messages.Messages; import org.graylog2.indexer.messages.Messages;
import org.graylog2.plugin.Message; import org.graylog2.plugin.Message;
import org.graylog2.plugin.configuration.Configuration; import org.graylog2.plugin.configuration.Configuration;
Expand All @@ -37,7 +37,9 @@
import javax.inject.Inject; import javax.inject.Inject;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;


import static com.codahale.metrics.MetricRegistry.name; import static com.codahale.metrics.MetricRegistry.name;


Expand Down Expand Up @@ -87,18 +89,23 @@ public void write(Message message) throws Exception {


@Override @Override
public void write(List<Message> messageList) throws Exception { public void write(List<Message> messageList) throws Exception {
throw new UnsupportedOperationException("Method not supported!");
}

public void writeMessageEntries(List<Map.Entry<IndexSet, Message>> messageList) throws Exception {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
final List<String> sortedIds = Ordering.natural().sortedCopy(Lists.transform(messageList, final List<String> sortedIds = Ordering.natural().sortedCopy(messageList.stream()
Message.ID_FUNCTION)); .map(entry -> entry.getValue().getId())
.collect(Collectors.toList()));
LOG.trace("Writing message ids to [{}]: <{}>", NAME, Joiner.on(", ").join(sortedIds)); LOG.trace("Writing message ids to [{}]: <{}>", NAME, Joiner.on(", ").join(sortedIds));
} }


writes.mark(messageList.size()); writes.mark(messageList.size());
try (final Timer.Context ignored = processTime.time()) { try (final Timer.Context ignored = processTime.time()) {
messages.bulkIndex(messageList); messages.bulkIndex(messageList);
} }
for (final Message message : messageList) { for (final Map.Entry<IndexSet, Message> entry : messageList) {
journal.markJournalOffsetCommitted(message.getJournalOffset()); journal.markJournalOffsetCommitted(entry.getValue().getJournalOffset());
} }
} }


Expand Down
Expand Up @@ -74,7 +74,7 @@ public void doRun() {
.put(StreamImpl.FIELD_MATCHING_TYPE, StreamImpl.MatchingType.DEFAULT.name()) .put(StreamImpl.FIELD_MATCHING_TYPE, StreamImpl.MatchingType.DEFAULT.name())
.put(StreamImpl.FIELD_DEFAULT_STREAM, true) .put(StreamImpl.FIELD_DEFAULT_STREAM, true)
.build(); .build();
final Stream stream = new StreamImpl(id, fields, Collections.emptyList(), Collections.emptySet()); final Stream stream = new StreamImpl(id, fields, Collections.emptyList(), Collections.emptySet(), Collections.emptySet());
final StreamRule streamRule = new StreamRuleImpl( final StreamRule streamRule = new StreamRuleImpl(
ImmutableMap.<String, Object>builder() ImmutableMap.<String, Object>builder()
.put(StreamRuleImpl.FIELD_TYPE, StreamRuleType.ALWAYS_MATCH.getValue()) .put(StreamRuleImpl.FIELD_TYPE, StreamRuleType.ALWAYS_MATCH.getValue())
Expand Down
27 changes: 25 additions & 2 deletions graylog2-server/src/main/java/org/graylog2/plugin/Message.java
Expand Up @@ -29,6 +29,7 @@
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.net.InetAddresses; import com.google.common.net.InetAddresses;
import org.graylog2.indexer.IndexSet;
import org.graylog2.plugin.streams.Stream; import org.graylog2.plugin.streams.Stream;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.slf4j.Logger; import org.slf4j.Logger;
Expand Down Expand Up @@ -116,6 +117,7 @@ public class Message implements Messages {


private final Map<String, Object> fields = Maps.newHashMap(); private final Map<String, Object> fields = Maps.newHashMap();
private Set<Stream> streams = Sets.newHashSet(); private Set<Stream> streams = Sets.newHashSet();
private Set<IndexSet> indexSets = Sets.newHashSet();
private String sourceInputId; private String sourceInputId;


// Used for drools to filter out messages. // Used for drools to filter out messages.
Expand Down Expand Up @@ -423,6 +425,7 @@ public Set<Stream> getStreams() {
* @param stream the stream to route this message into * @param stream the stream to route this message into
*/ */
public void addStream(Stream stream) { public void addStream(Stream stream) {
indexSets.addAll(stream.getIndexSets());
streams.add(stream); streams.add(stream);
} }


Expand All @@ -431,7 +434,9 @@ public void addStream(Stream stream) {
* @param newStreams an iterable of Stream objects * @param newStreams an iterable of Stream objects
*/ */
public void addStreams(Iterable<Stream> newStreams) { public void addStreams(Iterable<Stream> newStreams) {
Iterables.addAll(streams, newStreams); for (final Stream stream : newStreams) {
addStream(stream);
}
} }


/** /**
Expand All @@ -440,7 +445,25 @@ public void addStreams(Iterable<Stream> newStreams) {
* @return <tt>true</tt> if this message was assigned to the stream * @return <tt>true</tt> if this message was assigned to the stream
*/ */
public boolean removeStream(Stream stream) { public boolean removeStream(Stream stream) {
return streams.remove(stream); final boolean removed = streams.remove(stream);

if (removed) {
indexSets.clear();
for (Stream s : streams) {
indexSets.addAll(s.getIndexSets());
}
}

return removed;
}

/**
* Return the index sets for this message based on the assigned streams.
*
* @return index sets
*/
public Set<IndexSet> getIndexSets() {
return ImmutableSet.copyOf(this.indexSets);
} }


public List<String> getStreamIds() { public List<String> getStreamIds() {
Expand Down
Expand Up @@ -17,6 +17,7 @@
package org.graylog2.plugin.streams; package org.graylog2.plugin.streams;


import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import org.graylog2.indexer.IndexSet;
import org.graylog2.plugin.database.Persisted; import org.graylog2.plugin.database.Persisted;


import java.util.List; import java.util.List;
Expand Down Expand Up @@ -73,4 +74,6 @@ public static MatchingType valueOfOrDefault(String name) {
boolean isDefaultStream(); boolean isDefaultStream();


void setDefaultStream(boolean defaultStream); void setDefaultStream(boolean defaultStream);

Set<IndexSet> getIndexSets();
} }
Expand Up @@ -28,6 +28,7 @@
import org.graylog2.database.validators.FilledStringValidator; import org.graylog2.database.validators.FilledStringValidator;
import org.graylog2.database.validators.MapValidator; import org.graylog2.database.validators.MapValidator;
import org.graylog2.database.validators.OptionalStringValidator; import org.graylog2.database.validators.OptionalStringValidator;
import org.graylog2.indexer.IndexSet;
import org.graylog2.plugin.Tools; import org.graylog2.plugin.Tools;
import org.graylog2.plugin.database.validators.Validator; import org.graylog2.plugin.database.validators.Validator;
import org.graylog2.plugin.streams.Output; import org.graylog2.plugin.streams.Output;
Expand Down Expand Up @@ -63,24 +64,28 @@ public class StreamImpl extends PersistedImpl implements Stream {


private final List<StreamRule> streamRules; private final List<StreamRule> streamRules;
private final Set<Output> outputs; private final Set<Output> outputs;
private final Set<IndexSet> indexSets;


public StreamImpl(Map<String, Object> fields) { public StreamImpl(Map<String, Object> fields) {
super(fields); super(fields);
this.streamRules = null; this.streamRules = null;
this.outputs = null; this.outputs = null;
this.indexSets = null;
} }


protected StreamImpl(ObjectId id, Map<String, Object> fields) { protected StreamImpl(ObjectId id, Map<String, Object> fields) {
super(id, fields); super(id, fields);
this.streamRules = null; this.streamRules = null;
this.outputs = null; this.outputs = null;
this.indexSets = null;
} }


public StreamImpl(ObjectId id, Map<String, Object> fields, List<StreamRule> streamRules, Set<Output> outputs) { public StreamImpl(ObjectId id, Map<String, Object> fields, List<StreamRule> streamRules, Set<Output> outputs, Set<IndexSet> indexSets) {
super(id, fields); super(id, fields);


this.streamRules = streamRules; this.streamRules = streamRules;
this.outputs = outputs; this.outputs = outputs;
this.indexSets = indexSets;
} }


@Override @Override
Expand Down Expand Up @@ -230,4 +235,9 @@ public boolean isDefaultStream() {
public void setDefaultStream(boolean defaultStream) { public void setDefaultStream(boolean defaultStream) {
fields.put(FIELD_DEFAULT_STREAM, defaultStream); fields.put(FIELD_DEFAULT_STREAM, defaultStream);
} }

@Override
public Set<IndexSet> getIndexSets() {
return indexSets;
}
} }
Expand Up @@ -26,6 +26,8 @@
import org.graylog2.database.MongoConnection; import org.graylog2.database.MongoConnection;
import org.graylog2.database.NotFoundException; import org.graylog2.database.NotFoundException;
import org.graylog2.database.PersistedServiceImpl; import org.graylog2.database.PersistedServiceImpl;
import org.graylog2.indexer.IndexSet;
import org.graylog2.indexer.LegacyDeflectorIndexSet;
import org.graylog2.notifications.Notification; import org.graylog2.notifications.Notification;
import org.graylog2.notifications.NotificationService; import org.graylog2.notifications.NotificationService;
import org.graylog2.plugin.Tools; import org.graylog2.plugin.Tools;
Expand Down Expand Up @@ -53,18 +55,21 @@ public class StreamServiceImpl extends PersistedServiceImpl implements StreamSer
private final StreamRuleService streamRuleService; private final StreamRuleService streamRuleService;
private final AlertService alertService; private final AlertService alertService;
private final OutputService outputService; private final OutputService outputService;
private final Set<IndexSet> indexSets;
private final NotificationService notificationService; private final NotificationService notificationService;


@Inject @Inject
public StreamServiceImpl(MongoConnection mongoConnection, public StreamServiceImpl(MongoConnection mongoConnection,
StreamRuleService streamRuleService, StreamRuleService streamRuleService,
AlertService alertService, AlertService alertService,
OutputService outputService, OutputService outputService,
LegacyDeflectorIndexSet indexSet,
NotificationService notificationService) { NotificationService notificationService) {
super(mongoConnection); super(mongoConnection);
this.streamRuleService = streamRuleService; this.streamRuleService = streamRuleService;
this.alertService = alertService; this.alertService = alertService;
this.outputService = outputService; this.outputService = outputService;
this.indexSets = Collections.singleton(indexSet);
this.notificationService = notificationService; this.notificationService = notificationService;
} }


Expand All @@ -81,7 +86,8 @@ public Stream load(ObjectId id) throws NotFoundException {


@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final Map<String, Object> fields = o.toMap(); final Map<String, Object> fields = o.toMap();
return new StreamImpl((ObjectId) o.get("_id"), fields, streamRules, outputs); // TODO 2.2: Needs to load the index sets from the database!
return new StreamImpl((ObjectId) o.get("_id"), fields, streamRules, outputs, indexSets);
} }


@Override @Override
Expand Down Expand Up @@ -147,7 +153,8 @@ public List<Stream> loadAll(Map<String, Object> additionalQueryOpts) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final Map<String, Object> fields = o.toMap(); final Map<String, Object> fields = o.toMap();


streams.add(new StreamImpl(objectId, fields, streamRules, outputs)); // TODO 2.2: Needs to load the index sets from the database!
streams.add(new StreamImpl(objectId, fields, streamRules, outputs, indexSets));
} }


return streams.build(); return streams.build();
Expand Down

0 comments on commit 292ec02

Please sign in to comment.