From 292ec02dc2859a90fe6a98ee5be4d6e22ec1c0ca Mon Sep 17 00:00:00 2001 From: Bernd Ahlers Date: Wed, 9 Nov 2016 16:34:51 +0100 Subject: [PATCH] Route messages to different index sets (#2976) To be able to write a message to different indices/index-sets we have to map streams to index sets and handle that when writing to Elasticsearch. - Track matching index sets in the `Message` object to we don't have to iterate over all streams when doing the mapping. This assumes that usually there will be fewer index-sets than streams for a message. - Add `Stream#getIndexSets()` and add the legacy default index-set to each stream. This needs to be changed to load the index-sets for a stream from the database. - Adjust the `BlockingBatchedESOutput` to write one message into all index-sets for that message. Depending on the number of index-sets for a message, we are actually writing multiple messages for each message. This can slow down indexing because more work has to be done. The batching behaviour doesn't change because we are still flushing after the configured batch size. - The `Messages#bulkIndex()` method now takes a small `IndexAndMessage` object to decide into which index a message should be indexed. The reasoning behind putting the index-sets into the message instead of looking them up later is that we just have to iterate over index sets instead of iterating over streams and looking up index-sets from them. Since we are rebuilding the stream router in a background thread when stream (and soon index-set) configuration changes, the expensive work is done outside of the hot code path. There are probably several improvements that can be implemented to make this more efficient. This is a first naive version that makes it work with minimal changes. --- .../org/graylog2/bundles/BundleImporter.java | 2 +- .../graylog2/indexer/messages/Messages.java | 23 ++++++------- .../outputs/BlockingBatchedESOutput.java | 21 ++++++++---- .../graylog2/outputs/ElasticSearchOutput.java | 17 +++++++--- .../DefaultStreamMigrationPeriodical.java | 2 +- .../java/org/graylog2/plugin/Message.java | 27 +++++++++++++-- .../org/graylog2/plugin/streams/Stream.java | 3 ++ .../java/org/graylog2/streams/StreamImpl.java | 12 ++++++- .../graylog2/streams/StreamServiceImpl.java | 11 ++++-- .../outputs/BlockingBatchedESOutputTest.java | 34 +++++++++++-------- .../java/org/graylog2/plugin/MessageTest.java | 33 ++++++++++++++++++ .../streams/StreamListFingerprintTest.java | 2 +- .../java/org/graylog2/streams/StreamMock.java | 6 ++++ 13 files changed, 146 insertions(+), 47 deletions(-) diff --git a/graylog2-server/src/main/java/org/graylog2/bundles/BundleImporter.java b/graylog2-server/src/main/java/org/graylog2/bundles/BundleImporter.java index 59e5eafba55f..44e6809f65a9 100644 --- a/graylog2-server/src/main/java/org/graylog2/bundles/BundleImporter.java +++ b/graylog2-server/src/main/java/org/graylog2/bundles/BundleImporter.java @@ -26,7 +26,6 @@ import org.graylog2.dashboards.widgets.InvalidWidgetConfigurationException; import org.graylog2.database.NotFoundException; import org.graylog2.grok.GrokPatternService; -import org.graylog2.indexer.searches.Searches; import org.graylog2.inputs.InputService; import org.graylog2.inputs.converters.ConverterFactory; import org.graylog2.inputs.extractors.ExtractorFactory; @@ -466,6 +465,7 @@ private org.graylog2.plugin.streams.Stream createStream(final String bundleId, f new ObjectId(org.graylog2.plugin.streams.Stream.DEFAULT_STREAM_ID), streamData.build(), Collections.emptyList(), + Collections.emptySet(), Collections.emptySet()); } else { stream = streamService.create(streamData.build()); diff --git a/graylog2-server/src/main/java/org/graylog2/indexer/messages/Messages.java b/graylog2-server/src/main/java/org/graylog2/indexer/messages/Messages.java index 916a45cfe6cc..b7e78e2a1bb3 100644 --- a/graylog2-server/src/main/java/org/graylog2/indexer/messages/Messages.java +++ b/graylog2-server/src/main/java/org/graylog2/indexer/messages/Messages.java @@ -42,6 +42,7 @@ import org.graylog2.indexer.IndexFailure; import org.graylog2.indexer.IndexFailureImpl; import org.graylog2.indexer.IndexMapping; +import org.graylog2.indexer.IndexSet; import org.graylog2.indexer.results.ResultMessage; import org.graylog2.plugin.Message; import org.slf4j.Logger; @@ -114,26 +115,22 @@ public List analyze(String string, String index) { return terms; } - public boolean bulkIndex(final List messages) { - return bulkIndex(deflectorName, messages); - } - - public boolean bulkIndex(final String indexName, final List messages) { - if (messages.isEmpty()) { + public boolean bulkIndex(final List> messageList) { + if (messageList.isEmpty()) { return true; } final BulkRequestBuilder requestBuilder = c.prepareBulk().setConsistencyLevel(WriteConsistencyLevel.ONE); - for (Message msg : messages) { - requestBuilder.add(buildIndexRequest(indexName, msg.toElasticSearchObject(invalidTimestampMeter), msg.getId())); + for (Map.Entry entry : messageList) { + requestBuilder.add(buildIndexRequest(entry.getKey().getWriteIndexAlias(), entry.getValue().toElasticSearchObject(invalidTimestampMeter), entry.getValue().getId())); } final BulkResponse response = runBulkRequest(requestBuilder.request()); - LOG.debug("Index {}: Bulk indexed {} messages, took {} ms, failures: {}", indexName, + LOG.debug("Index: Bulk indexed {} messages, took {} ms, failures: {}", response.getItems().length, response.getTookInMillis(), response.hasFailures()); if (response.hasFailures()) { - propagateFailure(response.getItems(), messages, response.buildFailureMessage()); + propagateFailure(response.getItems(), messageList, response.buildFailureMessage()); } return !response.hasFailures(); @@ -153,7 +150,7 @@ private BulkResponse runBulkRequest(final BulkRequest request) { } } - private void propagateFailure(BulkItemResponse[] items, List messages, String errorMessage) { + private void propagateFailure(BulkItemResponse[] items, List> messageList, String errorMessage) { final List indexFailures = new LinkedList<>(); for (BulkItemResponse item : items) { if (item.isFailed()) { @@ -161,13 +158,13 @@ private void propagateFailure(BulkItemResponse[] items, List messages, // Write failure to index_failures. final BulkItemResponse.Failure f = item.getFailure(); - final Message message = messages.get(item.getItemId()); + final Map.Entry messageEntry = messageList.get(item.getItemId()); final Map doc = ImmutableMap.builder() .put("letter_id", item.getId()) .put("index", f.getIndex()) .put("type", f.getType()) .put("message", f.getMessage()) - .put("timestamp", message.getTimestamp()) + .put("timestamp", messageEntry.getValue().getTimestamp()) .build(); indexFailures.add(new IndexFailureImpl(doc)); diff --git a/graylog2-server/src/main/java/org/graylog2/outputs/BlockingBatchedESOutput.java b/graylog2-server/src/main/java/org/graylog2/outputs/BlockingBatchedESOutput.java index fabfca1377c4..b7ad14669526 100644 --- a/graylog2-server/src/main/java/org/graylog2/outputs/BlockingBatchedESOutput.java +++ b/graylog2-server/src/main/java/org/graylog2/outputs/BlockingBatchedESOutput.java @@ -21,8 +21,10 @@ import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.inject.assistedinject.Assisted; import com.google.inject.assistedinject.AssistedInject; +import org.graylog2.indexer.IndexSet; import org.graylog2.indexer.cluster.Cluster; import org.graylog2.indexer.messages.Messages; import org.graylog2.plugin.Message; @@ -34,6 +36,7 @@ import javax.inject.Inject; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -51,7 +54,7 @@ public class BlockingBatchedESOutput extends ElasticSearchOutput { private final Meter bufferFlushes; private final Meter bufferFlushesRequested; - private volatile List buffer; + private volatile List> buffer; private static final AtomicInteger activeFlushThreads = new AtomicInteger(0); private final AtomicLong lastFlushTime = new AtomicLong(); @@ -89,9 +92,15 @@ public BlockingBatchedESOutput(MetricRegistry metricRegistry, @Override public void write(Message message) throws Exception { - List flushBatch = null; + for (IndexSet indexSet : message.getIndexSets()) { + writeMessageEntry(Maps.immutableEntry(indexSet, message)); + } + } + + public void writeMessageEntry(Map.Entry entry) throws Exception { + List> flushBatch = null; synchronized (this) { - buffer.add(message); + buffer.add(entry); if (buffer.size() >= maxBufferSize) { flushBatch = buffer; @@ -106,7 +115,7 @@ public void write(Message message) throws Exception { } } - private void flush(List messages) { + private void flush(List> messages) { if (!cluster.isConnected() || !cluster.isDeflectorHealthy()) { try { cluster.waitForConnectedAndDeflectorHealthy(); @@ -125,7 +134,7 @@ private void flush(List messages) { try (Timer.Context ignored = processTime.time()) { lastFlushTime.set(System.nanoTime()); - write(messages); + writeMessageEntries(messages); batchSize.update(messages.size()); bufferFlushes.mark(); } catch (Exception e) { @@ -148,7 +157,7 @@ public void forceFlushIfTimedout() { return; } // flip buffer quickly and initiate flush - final List flushBatch; + final List> flushBatch; synchronized (this) { flushBatch = buffer; buffer = Lists.newArrayListWithCapacity(maxBufferSize); diff --git a/graylog2-server/src/main/java/org/graylog2/outputs/ElasticSearchOutput.java b/graylog2-server/src/main/java/org/graylog2/outputs/ElasticSearchOutput.java index a2036e053da1..07cdb850a314 100644 --- a/graylog2-server/src/main/java/org/graylog2/outputs/ElasticSearchOutput.java +++ b/graylog2-server/src/main/java/org/graylog2/outputs/ElasticSearchOutput.java @@ -20,10 +20,10 @@ import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; import com.google.common.base.Joiner; -import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import com.google.inject.assistedinject.Assisted; import com.google.inject.assistedinject.AssistedInject; +import org.graylog2.indexer.IndexSet; import org.graylog2.indexer.messages.Messages; import org.graylog2.plugin.Message; import org.graylog2.plugin.configuration.Configuration; @@ -37,7 +37,9 @@ import javax.inject.Inject; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import static com.codahale.metrics.MetricRegistry.name; @@ -87,9 +89,14 @@ public void write(Message message) throws Exception { @Override public void write(List messageList) throws Exception { + throw new UnsupportedOperationException("Method not supported!"); + } + + public void writeMessageEntries(List> messageList) throws Exception { if (LOG.isTraceEnabled()) { - final List sortedIds = Ordering.natural().sortedCopy(Lists.transform(messageList, - Message.ID_FUNCTION)); + final List sortedIds = Ordering.natural().sortedCopy(messageList.stream() + .map(entry -> entry.getValue().getId()) + .collect(Collectors.toList())); LOG.trace("Writing message ids to [{}]: <{}>", NAME, Joiner.on(", ").join(sortedIds)); } @@ -97,8 +104,8 @@ public void write(List messageList) throws Exception { try (final Timer.Context ignored = processTime.time()) { messages.bulkIndex(messageList); } - for (final Message message : messageList) { - journal.markJournalOffsetCommitted(message.getJournalOffset()); + for (final Map.Entry entry : messageList) { + journal.markJournalOffsetCommitted(entry.getValue().getJournalOffset()); } } diff --git a/graylog2-server/src/main/java/org/graylog2/periodical/DefaultStreamMigrationPeriodical.java b/graylog2-server/src/main/java/org/graylog2/periodical/DefaultStreamMigrationPeriodical.java index 0c44d7e8ccf0..94a0d5587de6 100644 --- a/graylog2-server/src/main/java/org/graylog2/periodical/DefaultStreamMigrationPeriodical.java +++ b/graylog2-server/src/main/java/org/graylog2/periodical/DefaultStreamMigrationPeriodical.java @@ -74,7 +74,7 @@ public void doRun() { .put(StreamImpl.FIELD_MATCHING_TYPE, StreamImpl.MatchingType.DEFAULT.name()) .put(StreamImpl.FIELD_DEFAULT_STREAM, true) .build(); - final Stream stream = new StreamImpl(id, fields, Collections.emptyList(), Collections.emptySet()); + final Stream stream = new StreamImpl(id, fields, Collections.emptyList(), Collections.emptySet(), Collections.emptySet()); final StreamRule streamRule = new StreamRuleImpl( ImmutableMap.builder() .put(StreamRuleImpl.FIELD_TYPE, StreamRuleType.ALWAYS_MATCH.getValue()) diff --git a/graylog2-server/src/main/java/org/graylog2/plugin/Message.java b/graylog2-server/src/main/java/org/graylog2/plugin/Message.java index 37dd5ce68059..1cff6802adc7 100644 --- a/graylog2-server/src/main/java/org/graylog2/plugin/Message.java +++ b/graylog2-server/src/main/java/org/graylog2/plugin/Message.java @@ -29,6 +29,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.net.InetAddresses; +import org.graylog2.indexer.IndexSet; import org.graylog2.plugin.streams.Stream; import org.joda.time.DateTime; import org.slf4j.Logger; @@ -116,6 +117,7 @@ public class Message implements Messages { private final Map fields = Maps.newHashMap(); private Set streams = Sets.newHashSet(); + private Set indexSets = Sets.newHashSet(); private String sourceInputId; // Used for drools to filter out messages. @@ -423,6 +425,7 @@ public Set getStreams() { * @param stream the stream to route this message into */ public void addStream(Stream stream) { + indexSets.addAll(stream.getIndexSets()); streams.add(stream); } @@ -431,7 +434,9 @@ public void addStream(Stream stream) { * @param newStreams an iterable of Stream objects */ public void addStreams(Iterable newStreams) { - Iterables.addAll(streams, newStreams); + for (final Stream stream : newStreams) { + addStream(stream); + } } /** @@ -440,7 +445,25 @@ public void addStreams(Iterable newStreams) { * @return true if this message was assigned to the stream */ public boolean removeStream(Stream stream) { - return streams.remove(stream); + final boolean removed = streams.remove(stream); + + if (removed) { + indexSets.clear(); + for (Stream s : streams) { + indexSets.addAll(s.getIndexSets()); + } + } + + return removed; + } + + /** + * Return the index sets for this message based on the assigned streams. + * + * @return index sets + */ + public Set getIndexSets() { + return ImmutableSet.copyOf(this.indexSets); } public List getStreamIds() { diff --git a/graylog2-server/src/main/java/org/graylog2/plugin/streams/Stream.java b/graylog2-server/src/main/java/org/graylog2/plugin/streams/Stream.java index d693fadc8795..fe4eb72e3f62 100644 --- a/graylog2-server/src/main/java/org/graylog2/plugin/streams/Stream.java +++ b/graylog2-server/src/main/java/org/graylog2/plugin/streams/Stream.java @@ -17,6 +17,7 @@ package org.graylog2.plugin.streams; import com.fasterxml.jackson.annotation.JsonCreator; +import org.graylog2.indexer.IndexSet; import org.graylog2.plugin.database.Persisted; import java.util.List; @@ -73,4 +74,6 @@ public static MatchingType valueOfOrDefault(String name) { boolean isDefaultStream(); void setDefaultStream(boolean defaultStream); + + Set getIndexSets(); } diff --git a/graylog2-server/src/main/java/org/graylog2/streams/StreamImpl.java b/graylog2-server/src/main/java/org/graylog2/streams/StreamImpl.java index b58e378cccdf..c1b5bd29deb0 100644 --- a/graylog2-server/src/main/java/org/graylog2/streams/StreamImpl.java +++ b/graylog2-server/src/main/java/org/graylog2/streams/StreamImpl.java @@ -28,6 +28,7 @@ import org.graylog2.database.validators.FilledStringValidator; import org.graylog2.database.validators.MapValidator; import org.graylog2.database.validators.OptionalStringValidator; +import org.graylog2.indexer.IndexSet; import org.graylog2.plugin.Tools; import org.graylog2.plugin.database.validators.Validator; import org.graylog2.plugin.streams.Output; @@ -63,24 +64,28 @@ public class StreamImpl extends PersistedImpl implements Stream { private final List streamRules; private final Set outputs; + private final Set indexSets; public StreamImpl(Map fields) { super(fields); this.streamRules = null; this.outputs = null; + this.indexSets = null; } protected StreamImpl(ObjectId id, Map fields) { super(id, fields); this.streamRules = null; this.outputs = null; + this.indexSets = null; } - public StreamImpl(ObjectId id, Map fields, List streamRules, Set outputs) { + public StreamImpl(ObjectId id, Map fields, List streamRules, Set outputs, Set indexSets) { super(id, fields); this.streamRules = streamRules; this.outputs = outputs; + this.indexSets = indexSets; } @Override @@ -230,4 +235,9 @@ public boolean isDefaultStream() { public void setDefaultStream(boolean defaultStream) { fields.put(FIELD_DEFAULT_STREAM, defaultStream); } + + @Override + public Set getIndexSets() { + return indexSets; + } } diff --git a/graylog2-server/src/main/java/org/graylog2/streams/StreamServiceImpl.java b/graylog2-server/src/main/java/org/graylog2/streams/StreamServiceImpl.java index 9552d96912f4..88d04fc6b8a4 100644 --- a/graylog2-server/src/main/java/org/graylog2/streams/StreamServiceImpl.java +++ b/graylog2-server/src/main/java/org/graylog2/streams/StreamServiceImpl.java @@ -26,6 +26,8 @@ import org.graylog2.database.MongoConnection; import org.graylog2.database.NotFoundException; import org.graylog2.database.PersistedServiceImpl; +import org.graylog2.indexer.IndexSet; +import org.graylog2.indexer.LegacyDeflectorIndexSet; import org.graylog2.notifications.Notification; import org.graylog2.notifications.NotificationService; import org.graylog2.plugin.Tools; @@ -53,6 +55,7 @@ public class StreamServiceImpl extends PersistedServiceImpl implements StreamSer private final StreamRuleService streamRuleService; private final AlertService alertService; private final OutputService outputService; + private final Set indexSets; private final NotificationService notificationService; @Inject @@ -60,11 +63,13 @@ public StreamServiceImpl(MongoConnection mongoConnection, StreamRuleService streamRuleService, AlertService alertService, OutputService outputService, + LegacyDeflectorIndexSet indexSet, NotificationService notificationService) { super(mongoConnection); this.streamRuleService = streamRuleService; this.alertService = alertService; this.outputService = outputService; + this.indexSets = Collections.singleton(indexSet); this.notificationService = notificationService; } @@ -81,7 +86,8 @@ public Stream load(ObjectId id) throws NotFoundException { @SuppressWarnings("unchecked") final Map fields = o.toMap(); - return new StreamImpl((ObjectId) o.get("_id"), fields, streamRules, outputs); + // TODO 2.2: Needs to load the index sets from the database! + return new StreamImpl((ObjectId) o.get("_id"), fields, streamRules, outputs, indexSets); } @Override @@ -147,7 +153,8 @@ public List loadAll(Map additionalQueryOpts) { @SuppressWarnings("unchecked") final Map fields = o.toMap(); - streams.add(new StreamImpl(objectId, fields, streamRules, outputs)); + // TODO 2.2: Needs to load the index sets from the database! + streams.add(new StreamImpl(objectId, fields, streamRules, outputs, indexSets)); } return streams.build(); diff --git a/graylog2-server/src/test/java/org/graylog2/outputs/BlockingBatchedESOutputTest.java b/graylog2-server/src/test/java/org/graylog2/outputs/BlockingBatchedESOutputTest.java index 2f4260865577..2abdcf6ba952 100644 --- a/graylog2-server/src/test/java/org/graylog2/outputs/BlockingBatchedESOutputTest.java +++ b/graylog2-server/src/test/java/org/graylog2/outputs/BlockingBatchedESOutputTest.java @@ -18,7 +18,9 @@ import com.codahale.metrics.MetricRegistry; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; import org.graylog2.Configuration; +import org.graylog2.indexer.IndexSet; import org.graylog2.indexer.cluster.Cluster; import org.graylog2.indexer.messages.Messages; import org.graylog2.plugin.Message; @@ -31,9 +33,11 @@ import org.mockito.runners.MockitoJUnitRunner; import java.util.List; +import java.util.Map; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -70,10 +74,10 @@ public void write() throws Exception { final BlockingBatchedESOutput output = new BlockingBatchedESOutput(metricRegistry, messages, cluster, config, journal); - final List messageList = buildMessages(config.getOutputBatchSize()); + final List> messageList = buildMessages(config.getOutputBatchSize()); - for (Message message : messageList) { - output.write(message); + for (Map.Entry entry : messageList) { + output.writeMessageEntry(entry); } verify(messages, times(1)).bulkIndex(eq(messageList)); @@ -87,11 +91,11 @@ public void writeDoesNotFlushIfClusterIsNotConnected() throws Exception { final BlockingBatchedESOutput output = new BlockingBatchedESOutput(metricRegistry, messages, cluster, config, journal); - final List messageList = buildMessages(config.getOutputBatchSize()); + final List> messageList = buildMessages(config.getOutputBatchSize()); - for (Message message : messageList) { + for (Map.Entry entry : messageList) { try { - output.write(message); + output.writeMessageEntry(entry); } catch(RuntimeException ignore) { } } @@ -108,11 +112,11 @@ public void writeDoesNotFlushIfClusterIsUnhealthy() throws Exception { final BlockingBatchedESOutput output = new BlockingBatchedESOutput(metricRegistry, messages, cluster, config, journal); - final List messageList = buildMessages(config.getOutputBatchSize()); + final List> messageList = buildMessages(config.getOutputBatchSize()); - for (Message message : messageList) { + for (Map.Entry entry : messageList) { try { - output.write(message); + output.writeMessageEntry(entry); } catch(RuntimeException ignore) { } } @@ -127,10 +131,10 @@ public void forceFlushIfTimedOut() throws Exception { final BlockingBatchedESOutput output = new BlockingBatchedESOutput(metricRegistry, messages, cluster, config, journal); - final List messageList = buildMessages(config.getOutputBatchSize() - 1); + final List> messageList = buildMessages(config.getOutputBatchSize() - 1); - for (Message message : messageList) { - output.write(message); + for (Map.Entry entry : messageList) { + output.writeMessageEntry(entry); } // Should flush the buffer even though the batch size is not reached yet @@ -139,10 +143,10 @@ public void forceFlushIfTimedOut() throws Exception { verify(messages, times(1)).bulkIndex(eq(messageList)); } - private List buildMessages(final int count) { - final ImmutableList.Builder builder = ImmutableList.builder(); + private List> buildMessages(final int count) { + final ImmutableList.Builder> builder = ImmutableList.builder(); for (int i = 0; i < count; i++) { - builder.add(new Message("message" + i, "test", Tools.nowUTC())); + builder.add(Maps.immutableEntry(mock(IndexSet.class), new Message("message" + i, "test", Tools.nowUTC()))); } return builder.build(); diff --git a/graylog2-server/src/test/java/org/graylog2/plugin/MessageTest.java b/graylog2-server/src/test/java/org/graylog2/plugin/MessageTest.java index 028ab92d8408..58569a8db7e1 100644 --- a/graylog2-server/src/test/java/org/graylog2/plugin/MessageTest.java +++ b/graylog2-server/src/test/java/org/graylog2/plugin/MessageTest.java @@ -23,6 +23,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.graylog2.indexer.IndexSet; import org.graylog2.plugin.streams.Stream; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -247,6 +248,38 @@ public void testStreamMutators() { assertThat(message.getStreams()).containsOnly(stream1, stream3); } + @Test + public void testStreamMutatorsWithIndexSets() { + final Stream stream1 = mock(Stream.class); + final Stream stream2 = mock(Stream.class); + final Stream stream3 = mock(Stream.class); + + final IndexSet indexSet1 = mock(IndexSet.class); + final IndexSet indexSet2 = mock(IndexSet.class); + + assertThat(message.getIndexSets()).isEmpty(); + + when(stream1.getIndexSets()).thenReturn(Collections.singleton(indexSet1)); + when(stream2.getIndexSets()).thenReturn(Collections.emptySet()); + when(stream3.getIndexSets()).thenReturn(Sets.newHashSet(indexSet1, indexSet2)); + + message.addStream(stream1); + message.addStreams(Sets.newHashSet(stream2, stream3)); + + assertThat(message.getIndexSets()).containsOnly(indexSet1, indexSet2); + + message.removeStream(stream3); + + assertThat(message.getIndexSets()).containsOnly(indexSet1); + + final Set indexSets = message.getIndexSets(); + + message.addStream(stream3); + + // getIndexSets is a copy and doesn't change after mutations + assertThat(indexSets).containsOnly(indexSet1); + } + @Test public void testGetStreamIds() throws Exception { message.addField("streams", Lists.newArrayList("stream-id")); diff --git a/graylog2-server/src/test/java/org/graylog2/streams/StreamListFingerprintTest.java b/graylog2-server/src/test/java/org/graylog2/streams/StreamListFingerprintTest.java index ba5dc38cb281..f48d8ac160da 100644 --- a/graylog2-server/src/test/java/org/graylog2/streams/StreamListFingerprintTest.java +++ b/graylog2-server/src/test/java/org/graylog2/streams/StreamListFingerprintTest.java @@ -74,7 +74,7 @@ private static Stream makeStream(int id, String title, StreamRule[] rules, Outpu final HashMap fields = Maps.newHashMap(); fields.put(StreamImpl.FIELD_TITLE, title); return new StreamImpl(new ObjectId(String.format(Locale.ENGLISH, "%024d", id)), fields, Lists.newArrayList(rules), Sets.newHashSet( - outputs)); + outputs), Collections.emptySet()); } private static StreamRule makeStreamRule(int id, String field) { diff --git a/graylog2-server/src/test/java/org/graylog2/streams/StreamMock.java b/graylog2-server/src/test/java/org/graylog2/streams/StreamMock.java index 5abba19e7592..c190471ea24a 100644 --- a/graylog2-server/src/test/java/org/graylog2/streams/StreamMock.java +++ b/graylog2-server/src/test/java/org/graylog2/streams/StreamMock.java @@ -20,6 +20,7 @@ import com.google.common.base.MoreObjects; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.graylog2.indexer.IndexSet; import org.graylog2.plugin.database.validators.Validator; import org.graylog2.plugin.streams.Output; import org.graylog2.plugin.streams.Stream; @@ -203,4 +204,9 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(id, title, description, streamRules, matchingType, defaultStream); } + + @Override + public Set getIndexSets() { + return Collections.emptySet(); + } }