From 9f211cadb7c32a8d78aea21783822bedb221ac9d Mon Sep 17 00:00:00 2001 From: Bernd Ahlers Date: Tue, 6 Dec 2016 15:09:18 +0100 Subject: [PATCH] Fix stream and default stream creation (#3152) * Make sure that the stream object is created with an index set This makes it possible to use `#getIndexSet()` on the newly created stream. * Unbreak default stream creation migration The default stream couldn't be created in a new installation because the required `index_set_id` field was missing. Change order of migrations so the default stream migration runs after the default index set has been created. Also avoid using the `Persisted#saveWithoutValidation()` method because it doesn't do what it says. It just ignores the validation exception thrown by `#save()`. * Fix creation time of default stream migration --- .../graylog2/migrations/MigrationsModule.java | 2 +- ...6172200_CreateDefaultStreamMigration.java} | 31 +++++++++++----- .../java/org/graylog2/streams/StreamImpl.java | 7 ++++ .../graylog2/streams/StreamServiceImpl.java | 12 +++++-- ...200_CreateDefaultStreamMigrationTest.java} | 35 +++++++++++++++++-- 5 files changed, 71 insertions(+), 16 deletions(-) rename graylog2-server/src/main/java/org/graylog2/migrations/{V20160929120500_CreateDefaultStreamMigration.java => V20161116172200_CreateDefaultStreamMigration.java} (69%) rename graylog2-server/src/test/java/org/graylog2/migrations/{V20160929120500_CreateDefaultStreamMigrationTest.java => V20161116172200_CreateDefaultStreamMigrationTest.java} (72%) diff --git a/graylog2-server/src/main/java/org/graylog2/migrations/MigrationsModule.java b/graylog2-server/src/main/java/org/graylog2/migrations/MigrationsModule.java index 025a82578bf4..aa9035492e59 100644 --- a/graylog2-server/src/main/java/org/graylog2/migrations/MigrationsModule.java +++ b/graylog2-server/src/main/java/org/graylog2/migrations/MigrationsModule.java @@ -24,8 +24,8 @@ public class MigrationsModule extends AbstractModule { protected void configure() { final Multibinder binder = Multibinder.newSetBinder(binder(), Migration.class); binder.addBinding().to(V20151210140600_ElasticsearchConfigMigration.class); - binder.addBinding().to(V20160929120500_CreateDefaultStreamMigration.class); binder.addBinding().to(V20161116172100_DefaultIndexSetMigration.class); + binder.addBinding().to(V20161116172200_CreateDefaultStreamMigration.class); binder.addBinding().to(V20161122174500_AssignIndexSetsToStreamsMigration.class); binder.addBinding().to(V20161124104700_AddRetentionRotationAndDefaultFlagToIndexSetMigration.class); binder.addBinding().to(V20161125142400_EmailAlarmCallbackMigration.class); diff --git a/graylog2-server/src/main/java/org/graylog2/migrations/V20160929120500_CreateDefaultStreamMigration.java b/graylog2-server/src/main/java/org/graylog2/migrations/V20161116172200_CreateDefaultStreamMigration.java similarity index 69% rename from graylog2-server/src/main/java/org/graylog2/migrations/V20160929120500_CreateDefaultStreamMigration.java rename to graylog2-server/src/main/java/org/graylog2/migrations/V20161116172200_CreateDefaultStreamMigration.java index 1098caf9359c..f731735cf4b2 100644 --- a/graylog2-server/src/main/java/org/graylog2/migrations/V20160929120500_CreateDefaultStreamMigration.java +++ b/graylog2-server/src/main/java/org/graylog2/migrations/V20161116172200_CreateDefaultStreamMigration.java @@ -20,6 +20,9 @@ import org.bson.types.ObjectId; import org.graylog2.database.NotFoundException; import org.graylog2.events.ClusterEventBus; +import org.graylog2.indexer.IndexSet; +import org.graylog2.indexer.IndexSetRegistry; +import org.graylog2.plugin.database.ValidationException; import org.graylog2.plugin.streams.Stream; import org.graylog2.streams.StreamImpl; import org.graylog2.streams.StreamService; @@ -37,21 +40,25 @@ /** * Migration creating the default stream if it doesn't exist. */ -public class V20160929120500_CreateDefaultStreamMigration extends Migration { - private static final Logger LOG = LoggerFactory.getLogger(V20160929120500_CreateDefaultStreamMigration.class); +public class V20161116172200_CreateDefaultStreamMigration extends Migration { + private static final Logger LOG = LoggerFactory.getLogger(V20161116172200_CreateDefaultStreamMigration.class); private final StreamService streamService; private final ClusterEventBus clusterEventBus; + private final IndexSetRegistry indexSetRegistry; @Inject - public V20160929120500_CreateDefaultStreamMigration(StreamService streamService, ClusterEventBus clusterEventBus) { + public V20161116172200_CreateDefaultStreamMigration(StreamService streamService, + ClusterEventBus clusterEventBus, + IndexSetRegistry indexSetRegistry) { this.streamService = streamService; this.clusterEventBus = clusterEventBus; + this.indexSetRegistry = indexSetRegistry; } @Override public ZonedDateTime createdAt() { - return ZonedDateTime.parse("2016-09-29T12:05:00Z"); + return ZonedDateTime.parse("2016-11-16T17:22:00Z"); } @Override @@ -64,6 +71,9 @@ public void upgrade() { } private void createDefaultStream() { + final IndexSet indexSet = indexSetRegistry.getDefault() + .orElseThrow(() -> new IllegalStateException("Couldn't find default index set! This is a bug!")); + final ObjectId id = new ObjectId(Stream.DEFAULT_STREAM_ID); final Map fields = ImmutableMap.builder() .put(StreamImpl.FIELD_TITLE, "All messages") @@ -74,13 +84,16 @@ private void createDefaultStream() { .put(StreamImpl.FIELD_MATCHING_TYPE, StreamImpl.MatchingType.DEFAULT.name()) .put(StreamImpl.FIELD_REMOVE_MATCHES_FROM_DEFAULT_STREAM, false) .put(StreamImpl.FIELD_DEFAULT_STREAM, true) + .put(StreamImpl.FIELD_INDEX_SET_ID, indexSet.getConfig().id()) .build(); - final Stream stream = new StreamImpl(id, fields, Collections.emptyList(), Collections.emptySet(), null); + final Stream stream = new StreamImpl(id, fields, Collections.emptyList(), Collections.emptySet(), indexSet); - // Save without validations here to avoid failing the index set validation which has been added at a later - // point in time. - streamService.saveWithoutValidation(stream); - LOG.info("Successfully created default stream: {}", stream.getTitle()); + try { + streamService.save(stream); + LOG.info("Successfully created default stream: {}", stream.getTitle()); + } catch (ValidationException e) { + LOG.error("Couldn't create default stream! This is a bug!"); + } clusterEventBus.post(StreamsChangedEvent.create(stream.getId())); } diff --git a/graylog2-server/src/main/java/org/graylog2/streams/StreamImpl.java b/graylog2-server/src/main/java/org/graylog2/streams/StreamImpl.java index 62dfc7e77d5c..e04ac779dd64 100644 --- a/graylog2-server/src/main/java/org/graylog2/streams/StreamImpl.java +++ b/graylog2-server/src/main/java/org/graylog2/streams/StreamImpl.java @@ -73,6 +73,13 @@ public StreamImpl(Map fields) { this.indexSet = null; } + public StreamImpl(Map fields, IndexSet indexSet) { + super(fields); + this.streamRules = null; + this.outputs = null; + this.indexSet = indexSet; + } + protected StreamImpl(ObjectId id, Map fields) { super(id, fields); this.streamRules = null; diff --git a/graylog2-server/src/main/java/org/graylog2/streams/StreamServiceImpl.java b/graylog2-server/src/main/java/org/graylog2/streams/StreamServiceImpl.java index c0c28310571d..ab56d2af4f46 100644 --- a/graylog2-server/src/main/java/org/graylog2/streams/StreamServiceImpl.java +++ b/graylog2-server/src/main/java/org/graylog2/streams/StreamServiceImpl.java @@ -60,6 +60,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; +import static com.google.common.base.Strings.isNullOrEmpty; + public class StreamServiceImpl extends PersistedServiceImpl implements StreamService { private static final Logger LOG = LoggerFactory.getLogger(StreamServiceImpl.class); private final StreamRuleService streamRuleService; @@ -91,8 +93,12 @@ public StreamServiceImpl(MongoConnection mongoConnection, @Nullable private IndexSet getIndexSet(DBObject dbObject) { - final String id = (String) dbObject.get(StreamImpl.FIELD_INDEX_SET_ID); - if (id == null) { + return getIndexSet((String) dbObject.get(StreamImpl.FIELD_INDEX_SET_ID)); + } + + @Nullable + private IndexSet getIndexSet(String id) { + if (isNullOrEmpty(id)) { return null; } final Optional indexSetConfig = indexSetService.get(id); @@ -117,7 +123,7 @@ public Stream load(ObjectId id) throws NotFoundException { @Override public Stream create(Map fields) { - return new StreamImpl(fields); + return new StreamImpl(fields, getIndexSet((String) fields.get(StreamImpl.FIELD_INDEX_SET_ID))); } @Override diff --git a/graylog2-server/src/test/java/org/graylog2/migrations/V20160929120500_CreateDefaultStreamMigrationTest.java b/graylog2-server/src/test/java/org/graylog2/migrations/V20161116172200_CreateDefaultStreamMigrationTest.java similarity index 72% rename from graylog2-server/src/test/java/org/graylog2/migrations/V20160929120500_CreateDefaultStreamMigrationTest.java rename to graylog2-server/src/test/java/org/graylog2/migrations/V20161116172200_CreateDefaultStreamMigrationTest.java index 97092e2d06cf..ee1193da8107 100644 --- a/graylog2-server/src/test/java/org/graylog2/migrations/V20160929120500_CreateDefaultStreamMigrationTest.java +++ b/graylog2-server/src/test/java/org/graylog2/migrations/V20161116172200_CreateDefaultStreamMigrationTest.java @@ -18,6 +18,9 @@ import org.graylog2.database.NotFoundException; import org.graylog2.events.ClusterEventBus; +import org.graylog2.indexer.IndexSet; +import org.graylog2.indexer.IndexSetRegistry; +import org.graylog2.indexer.indexset.IndexSetConfig; import org.graylog2.plugin.database.ValidationException; import org.graylog2.plugin.streams.Stream; import org.graylog2.streams.StreamImpl; @@ -26,11 +29,14 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; +import java.util.Optional; + import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -39,29 +45,41 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -public class V20160929120500_CreateDefaultStreamMigrationTest { +public class V20161116172200_CreateDefaultStreamMigrationTest { @Rule public final MockitoRule mockitoRule = MockitoJUnit.rule(); + @Rule + public final ExpectedException expectedException = ExpectedException.none(); private Migration migration; @Mock private StreamService streamService; @Mock private ClusterEventBus clusterEventBus; + @Mock + private IndexSetRegistry indexSetRegistry; + @Mock + private IndexSet indexSet; + @Mock + private IndexSetConfig indexSetConfig; @Before public void setUpService() throws Exception { - migration = new V20160929120500_CreateDefaultStreamMigration(streamService, clusterEventBus); + migration = new V20161116172200_CreateDefaultStreamMigration(streamService, clusterEventBus, indexSetRegistry); + + when(indexSet.getConfig()).thenReturn(indexSetConfig); + when(indexSetConfig.id()).thenReturn("abc123"); } @Test public void upgrade() throws Exception { final ArgumentCaptor streamArgumentCaptor = ArgumentCaptor.forClass(Stream.class); when(streamService.load("000000000000000000000001")).thenThrow(NotFoundException.class); + when(indexSetRegistry.getDefault()).thenReturn(Optional.of(indexSet)); migration.upgrade(); - verify(streamService).saveWithoutValidation(streamArgumentCaptor.capture()); + verify(streamService).save(streamArgumentCaptor.capture()); final Stream stream = streamArgumentCaptor.getValue(); assertThat(stream.getTitle()).isEqualTo("All messages"); @@ -69,6 +87,16 @@ public void upgrade() throws Exception { assertThat(stream.getMatchingType()).isEqualTo(StreamImpl.MatchingType.DEFAULT); } + @Test + public void upgradeWithoutDefaultIndexSet() throws Exception { + when(streamService.load("000000000000000000000001")).thenThrow(NotFoundException.class); + when(indexSetRegistry.getDefault()).thenReturn(Optional.empty()); + + expectedException.expect(IllegalStateException.class); + + migration.upgrade(); + } + @Test public void upgradeDoesNotRunIfDefaultStreamExists() throws Exception { when(streamService.load("000000000000000000000001")).thenReturn(mock(Stream.class)); @@ -80,6 +108,7 @@ public void upgradeDoesNotRunIfDefaultStreamExists() throws Exception { @Test public void upgradePostsStreamsChangedEvent() throws Exception { + when(indexSetRegistry.getDefault()).thenReturn(Optional.of(indexSet)); when(streamService.load("000000000000000000000001")).thenThrow(NotFoundException.class); final ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(StreamsChangedEvent.class); migration.upgrade();