Skip to content

Commit

Permalink
Fix stream and default stream creation (#3152)
Browse files Browse the repository at this point in the history
* Make sure that the stream object is created with an index set

This makes it possible to use `#getIndexSet()` on the newly created
stream.

* Unbreak default stream creation migration

The default stream couldn't be created in a new installation because the
required `index_set_id` field was missing.

Change order of migrations so the default stream migration runs after
the default index set has been created.

Also avoid using the `Persisted#saveWithoutValidation()` method because
it doesn't do what it says. It just ignores the validation exception
thrown by `#save()`.

* Fix creation time of default stream migration
  • Loading branch information
bernd authored and joschi committed Dec 6, 2016
1 parent 0e00c33 commit 9f211ca
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 16 deletions.
Expand Up @@ -24,8 +24,8 @@ public class MigrationsModule extends AbstractModule {
protected void configure() {
final Multibinder<Migration> binder = Multibinder.newSetBinder(binder(), Migration.class);
binder.addBinding().to(V20151210140600_ElasticsearchConfigMigration.class);
binder.addBinding().to(V20160929120500_CreateDefaultStreamMigration.class);
binder.addBinding().to(V20161116172100_DefaultIndexSetMigration.class);
binder.addBinding().to(V20161116172200_CreateDefaultStreamMigration.class);
binder.addBinding().to(V20161122174500_AssignIndexSetsToStreamsMigration.class);
binder.addBinding().to(V20161124104700_AddRetentionRotationAndDefaultFlagToIndexSetMigration.class);
binder.addBinding().to(V20161125142400_EmailAlarmCallbackMigration.class);
Expand Down
Expand Up @@ -20,6 +20,9 @@
import org.bson.types.ObjectId;
import org.graylog2.database.NotFoundException;
import org.graylog2.events.ClusterEventBus;
import org.graylog2.indexer.IndexSet;
import org.graylog2.indexer.IndexSetRegistry;
import org.graylog2.plugin.database.ValidationException;
import org.graylog2.plugin.streams.Stream;
import org.graylog2.streams.StreamImpl;
import org.graylog2.streams.StreamService;
Expand All @@ -37,21 +40,25 @@
/**
* Migration creating the default stream if it doesn't exist.
*/
public class V20160929120500_CreateDefaultStreamMigration extends Migration {
private static final Logger LOG = LoggerFactory.getLogger(V20160929120500_CreateDefaultStreamMigration.class);
public class V20161116172200_CreateDefaultStreamMigration extends Migration {
private static final Logger LOG = LoggerFactory.getLogger(V20161116172200_CreateDefaultStreamMigration.class);

private final StreamService streamService;
private final ClusterEventBus clusterEventBus;
private final IndexSetRegistry indexSetRegistry;

@Inject
public V20160929120500_CreateDefaultStreamMigration(StreamService streamService, ClusterEventBus clusterEventBus) {
public V20161116172200_CreateDefaultStreamMigration(StreamService streamService,
ClusterEventBus clusterEventBus,
IndexSetRegistry indexSetRegistry) {
this.streamService = streamService;
this.clusterEventBus = clusterEventBus;
this.indexSetRegistry = indexSetRegistry;
}

@Override
public ZonedDateTime createdAt() {
return ZonedDateTime.parse("2016-09-29T12:05:00Z");
return ZonedDateTime.parse("2016-11-16T17:22:00Z");
}

@Override
Expand All @@ -64,6 +71,9 @@ public void upgrade() {
}

private void createDefaultStream() {
final IndexSet indexSet = indexSetRegistry.getDefault()
.orElseThrow(() -> new IllegalStateException("Couldn't find default index set! This is a bug!"));

final ObjectId id = new ObjectId(Stream.DEFAULT_STREAM_ID);
final Map<String, Object> fields = ImmutableMap.<String, Object>builder()
.put(StreamImpl.FIELD_TITLE, "All messages")
Expand All @@ -74,13 +84,16 @@ private void createDefaultStream() {
.put(StreamImpl.FIELD_MATCHING_TYPE, StreamImpl.MatchingType.DEFAULT.name())
.put(StreamImpl.FIELD_REMOVE_MATCHES_FROM_DEFAULT_STREAM, false)
.put(StreamImpl.FIELD_DEFAULT_STREAM, true)
.put(StreamImpl.FIELD_INDEX_SET_ID, indexSet.getConfig().id())
.build();
final Stream stream = new StreamImpl(id, fields, Collections.emptyList(), Collections.emptySet(), null);
final Stream stream = new StreamImpl(id, fields, Collections.emptyList(), Collections.emptySet(), indexSet);

// Save without validations here to avoid failing the index set validation which has been added at a later
// point in time.
streamService.saveWithoutValidation(stream);
LOG.info("Successfully created default stream: {}", stream.getTitle());
try {
streamService.save(stream);
LOG.info("Successfully created default stream: {}", stream.getTitle());
} catch (ValidationException e) {
LOG.error("Couldn't create default stream! This is a bug!");
}

clusterEventBus.post(StreamsChangedEvent.create(stream.getId()));
}
Expand Down
Expand Up @@ -73,6 +73,13 @@ public StreamImpl(Map<String, Object> fields) {
this.indexSet = null;
}

public StreamImpl(Map<String, Object> fields, IndexSet indexSet) {
super(fields);
this.streamRules = null;
this.outputs = null;
this.indexSet = indexSet;
}

protected StreamImpl(ObjectId id, Map<String, Object> fields) {
super(id, fields);
this.streamRules = null;
Expand Down
Expand Up @@ -60,6 +60,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static com.google.common.base.Strings.isNullOrEmpty;

public class StreamServiceImpl extends PersistedServiceImpl implements StreamService {
private static final Logger LOG = LoggerFactory.getLogger(StreamServiceImpl.class);
private final StreamRuleService streamRuleService;
Expand Down Expand Up @@ -91,8 +93,12 @@ public StreamServiceImpl(MongoConnection mongoConnection,

@Nullable
private IndexSet getIndexSet(DBObject dbObject) {
final String id = (String) dbObject.get(StreamImpl.FIELD_INDEX_SET_ID);
if (id == null) {
return getIndexSet((String) dbObject.get(StreamImpl.FIELD_INDEX_SET_ID));
}

@Nullable
private IndexSet getIndexSet(String id) {
if (isNullOrEmpty(id)) {
return null;
}
final Optional<IndexSetConfig> indexSetConfig = indexSetService.get(id);
Expand All @@ -117,7 +123,7 @@ public Stream load(ObjectId id) throws NotFoundException {

@Override
public Stream create(Map<String, Object> fields) {
return new StreamImpl(fields);
return new StreamImpl(fields, getIndexSet((String) fields.get(StreamImpl.FIELD_INDEX_SET_ID)));
}

@Override
Expand Down
Expand Up @@ -18,6 +18,9 @@

import org.graylog2.database.NotFoundException;
import org.graylog2.events.ClusterEventBus;
import org.graylog2.indexer.IndexSet;
import org.graylog2.indexer.IndexSetRegistry;
import org.graylog2.indexer.indexset.IndexSetConfig;
import org.graylog2.plugin.database.ValidationException;
import org.graylog2.plugin.streams.Stream;
import org.graylog2.streams.StreamImpl;
Expand All @@ -26,11 +29,14 @@
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;

import java.util.Optional;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
Expand All @@ -39,36 +45,58 @@
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

public class V20160929120500_CreateDefaultStreamMigrationTest {
public class V20161116172200_CreateDefaultStreamMigrationTest {
@Rule
public final MockitoRule mockitoRule = MockitoJUnit.rule();
@Rule
public final ExpectedException expectedException = ExpectedException.none();

private Migration migration;
@Mock
private StreamService streamService;
@Mock
private ClusterEventBus clusterEventBus;
@Mock
private IndexSetRegistry indexSetRegistry;
@Mock
private IndexSet indexSet;
@Mock
private IndexSetConfig indexSetConfig;

@Before
public void setUpService() throws Exception {
migration = new V20160929120500_CreateDefaultStreamMigration(streamService, clusterEventBus);
migration = new V20161116172200_CreateDefaultStreamMigration(streamService, clusterEventBus, indexSetRegistry);

when(indexSet.getConfig()).thenReturn(indexSetConfig);
when(indexSetConfig.id()).thenReturn("abc123");
}

@Test
public void upgrade() throws Exception {
final ArgumentCaptor<Stream> streamArgumentCaptor = ArgumentCaptor.forClass(Stream.class);
when(streamService.load("000000000000000000000001")).thenThrow(NotFoundException.class);
when(indexSetRegistry.getDefault()).thenReturn(Optional.of(indexSet));

migration.upgrade();

verify(streamService).saveWithoutValidation(streamArgumentCaptor.capture());
verify(streamService).save(streamArgumentCaptor.capture());

final Stream stream = streamArgumentCaptor.getValue();
assertThat(stream.getTitle()).isEqualTo("All messages");
assertThat(stream.getDisabled()).isFalse();
assertThat(stream.getMatchingType()).isEqualTo(StreamImpl.MatchingType.DEFAULT);
}

@Test
public void upgradeWithoutDefaultIndexSet() throws Exception {
when(streamService.load("000000000000000000000001")).thenThrow(NotFoundException.class);
when(indexSetRegistry.getDefault()).thenReturn(Optional.empty());

expectedException.expect(IllegalStateException.class);

migration.upgrade();
}

@Test
public void upgradeDoesNotRunIfDefaultStreamExists() throws Exception {
when(streamService.load("000000000000000000000001")).thenReturn(mock(Stream.class));
Expand All @@ -80,6 +108,7 @@ public void upgradeDoesNotRunIfDefaultStreamExists() throws Exception {

@Test
public void upgradePostsStreamsChangedEvent() throws Exception {
when(indexSetRegistry.getDefault()).thenReturn(Optional.of(indexSet));
when(streamService.load("000000000000000000000001")).thenThrow(NotFoundException.class);
final ArgumentCaptor<StreamsChangedEvent> argumentCaptor = ArgumentCaptor.forClass(StreamsChangedEvent.class);
migration.upgrade();
Expand Down

0 comments on commit 9f211ca

Please sign in to comment.