Skip to content

Commit

Permalink
Assign messages to the default stream (#3098)
Browse files Browse the repository at this point in the history
* add default stream to each message when entering the processing engine

Unify legacy ProcessBufferProcessor class with its sole subclass
turn its provider into a factory to save mutable state
create singleton provider for default stream

* prevent updates (create, edit, delete) to default stream or its stream rules

the default stream it doesn't have rules and the UI will not show the actions necessary to modify it, this prevents using the API to mutate it

* the default stream doesn't have stream rules, don't check the service in tests

* Remove obsolete DefaultStreamCreated marker
  • Loading branch information
kroepke authored and joschi committed Nov 18, 2016
1 parent 21cc21e commit 13b23a3
Show file tree
Hide file tree
Showing 10 changed files with 203 additions and 165 deletions.
Expand Up @@ -20,6 +20,7 @@
import com.google.inject.assistedinject.FactoryModuleBuilder; import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.multibindings.MapBinder; import com.google.inject.multibindings.MapBinder;
import com.google.inject.multibindings.Multibinder; import com.google.inject.multibindings.Multibinder;

import org.apache.shiro.mgt.DefaultSecurityManager; import org.apache.shiro.mgt.DefaultSecurityManager;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.node.Node; import org.elasticsearch.node.Node;
Expand All @@ -30,13 +31,13 @@
import org.graylog2.bindings.providers.BundleImporterProvider; import org.graylog2.bindings.providers.BundleImporterProvider;
import org.graylog2.bindings.providers.ClusterEventBusProvider; import org.graylog2.bindings.providers.ClusterEventBusProvider;
import org.graylog2.bindings.providers.DefaultSecurityManagerProvider; import org.graylog2.bindings.providers.DefaultSecurityManagerProvider;
import org.graylog2.bindings.providers.DefaultStreamProvider;
import org.graylog2.bindings.providers.EsClientProvider; import org.graylog2.bindings.providers.EsClientProvider;
import org.graylog2.bindings.providers.EsNodeProvider; import org.graylog2.bindings.providers.EsNodeProvider;
import org.graylog2.bindings.providers.MongoConnectionProvider; import org.graylog2.bindings.providers.MongoConnectionProvider;
import org.graylog2.bindings.providers.RulesEngineProvider; import org.graylog2.bindings.providers.RulesEngineProvider;
import org.graylog2.bindings.providers.SystemJobFactoryProvider; import org.graylog2.bindings.providers.SystemJobFactoryProvider;
import org.graylog2.bindings.providers.SystemJobManagerProvider; import org.graylog2.bindings.providers.SystemJobManagerProvider;
import org.graylog2.buffers.processors.ServerProcessBufferProcessor;
import org.graylog2.bundles.BundleService; import org.graylog2.bundles.BundleService;
import org.graylog2.cluster.ClusterConfigServiceImpl; import org.graylog2.cluster.ClusterConfigServiceImpl;
import org.graylog2.dashboards.widgets.WidgetCacheTime; import org.graylog2.dashboards.widgets.WidgetCacheTime;
Expand All @@ -61,6 +62,8 @@
import org.graylog2.plugin.cluster.ClusterConfigService; import org.graylog2.plugin.cluster.ClusterConfigService;
import org.graylog2.plugin.decorators.SearchResponseDecorator; import org.graylog2.plugin.decorators.SearchResponseDecorator;
import org.graylog2.plugin.inject.Graylog2Module; import org.graylog2.plugin.inject.Graylog2Module;
import org.graylog2.plugin.streams.DefaultStream;
import org.graylog2.plugin.streams.Stream;
import org.graylog2.rest.NotFoundExceptionMapper; import org.graylog2.rest.NotFoundExceptionMapper;
import org.graylog2.rest.ScrollChunkWriter; import org.graylog2.rest.ScrollChunkWriter;
import org.graylog2.rest.ValidationExceptionMapper; import org.graylog2.rest.ValidationExceptionMapper;
Expand Down Expand Up @@ -134,6 +137,9 @@ private void bindFactoryModules() {
install(new FactoryModuleBuilder().build(LdapSettingsImpl.Factory.class)); install(new FactoryModuleBuilder().build(LdapSettingsImpl.Factory.class));
install(new FactoryModuleBuilder().build(WidgetCacheTime.Factory.class)); install(new FactoryModuleBuilder().build(WidgetCacheTime.Factory.class));
install(new FactoryModuleBuilder().build(UserImpl.Factory.class)); install(new FactoryModuleBuilder().build(UserImpl.Factory.class));

install(new FactoryModuleBuilder().build(ProcessBufferProcessor.Factory.class));
bind(Stream.class).annotatedWith(DefaultStream.class).toProvider(DefaultStreamProvider.class);
} }


private void bindSingletons() { private void bindSingletons() {
Expand Down Expand Up @@ -176,7 +182,6 @@ private void bindInterfaces() {
bind(ActivityWriter.class).to(SystemMessageActivityWriter.class); bind(ActivityWriter.class).to(SystemMessageActivityWriter.class);
bind(PersistedInputs.class).to(PersistedInputsImpl.class); bind(PersistedInputs.class).to(PersistedInputsImpl.class);


bind(ProcessBufferProcessor.class).to(ServerProcessBufferProcessor.class);
bind(RoleService.class).to(RoleServiceImpl.class).in(Scopes.SINGLETON); bind(RoleService.class).to(RoleServiceImpl.class).in(Scopes.SINGLETON);
} }


Expand Down
@@ -0,0 +1,71 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog2.bindings.providers;

import org.graylog2.database.NotFoundException;
import org.graylog2.gelfclient.util.Uninterruptibles;
import org.graylog2.plugin.streams.Stream;
import org.graylog2.streams.StreamService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import javax.inject.Inject;
import javax.inject.Provider;
import javax.inject.Singleton;

@Singleton
public class DefaultStreamProvider implements Provider<Stream> {
private static final Logger LOG = LoggerFactory.getLogger(DefaultStreamProvider.class);

private final StreamService service;

private AtomicReference<Stream> sharedInstance = new AtomicReference<>();

@Inject
private DefaultStreamProvider(StreamService service) {
this.service = service;
}

@Override
public Stream get() {
Stream defaultStream = sharedInstance.get();
if (defaultStream != null) {
return defaultStream;
}

synchronized (this) {
defaultStream = sharedInstance.get();
if (defaultStream != null) {
return defaultStream;
}
do {
try {
LOG.debug("Loading shared default stream instance");
defaultStream = service.load(Stream.DEFAULT_STREAM_ID);
} catch (NotFoundException ignored) {
LOG.warn("Unable to load default stream, retrying. Processing is blocked until this succeeds.");
Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
}
} while (defaultStream == null);
sharedInstance.set(defaultStream);
}
return defaultStream;
}
}

This file was deleted.

Expand Up @@ -18,18 +18,14 @@


import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import org.bson.types.ObjectId; import org.bson.types.ObjectId;
import org.graylog2.database.NotFoundException;
import org.graylog2.events.ClusterEventBus; import org.graylog2.events.ClusterEventBus;
import org.graylog2.plugin.cluster.ClusterConfigService; import org.graylog2.plugin.cluster.ClusterConfigService;
import org.graylog2.plugin.database.ValidationException; import org.graylog2.plugin.database.ValidationException;
import org.graylog2.plugin.periodical.Periodical; import org.graylog2.plugin.periodical.Periodical;
import org.graylog2.plugin.streams.Stream; import org.graylog2.plugin.streams.Stream;
import org.graylog2.plugin.streams.StreamRule;
import org.graylog2.plugin.streams.StreamRuleType;
import org.graylog2.streams.StreamImpl; import org.graylog2.streams.StreamImpl;
import org.graylog2.streams.StreamRuleImpl;
import org.graylog2.streams.StreamRuleService;
import org.graylog2.streams.StreamService; import org.graylog2.streams.StreamService;
import org.graylog2.streams.config.DefaultStreamCreated;
import org.graylog2.streams.events.StreamsChangedEvent; import org.graylog2.streams.events.StreamsChangedEvent;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.DateTimeZone; import org.joda.time.DateTimeZone;
Expand All @@ -47,17 +43,14 @@ public class DefaultStreamMigrationPeriodical extends Periodical {
private static final Logger LOG = LoggerFactory.getLogger(DefaultStreamMigrationPeriodical.class); private static final Logger LOG = LoggerFactory.getLogger(DefaultStreamMigrationPeriodical.class);


private final StreamService streamService; private final StreamService streamService;
private final StreamRuleService streamRuleService;
private final ClusterEventBus clusterEventBus; private final ClusterEventBus clusterEventBus;
private final ClusterConfigService clusterConfigService; private final ClusterConfigService clusterConfigService;


@Inject @Inject
public DefaultStreamMigrationPeriodical(final StreamService streamService, public DefaultStreamMigrationPeriodical(final StreamService streamService,
final StreamRuleService streamRuleService,
final ClusterEventBus clusterEventBus, final ClusterEventBus clusterEventBus,
final ClusterConfigService clusterConfigService) { final ClusterConfigService clusterConfigService) {
this.streamService = streamService; this.streamService = streamService;
this.streamRuleService = streamRuleService;
this.clusterEventBus = clusterEventBus; this.clusterEventBus = clusterEventBus;
this.clusterConfigService = clusterConfigService; this.clusterConfigService = clusterConfigService;
} }
Expand All @@ -75,21 +68,11 @@ public void doRun() {
.put(StreamImpl.FIELD_DEFAULT_STREAM, true) .put(StreamImpl.FIELD_DEFAULT_STREAM, true)
.build(); .build();
final Stream stream = new StreamImpl(id, fields, Collections.emptyList(), Collections.emptySet(), Collections.emptySet()); final Stream stream = new StreamImpl(id, fields, Collections.emptyList(), Collections.emptySet(), Collections.emptySet());
final StreamRule streamRule = new StreamRuleImpl(
ImmutableMap.<String, Object>builder()
.put(StreamRuleImpl.FIELD_TYPE, StreamRuleType.ALWAYS_MATCH.getValue())
.put(StreamRuleImpl.FIELD_FIELD, "timestamp")
.put(StreamRuleImpl.FIELD_INVERTED, false)
.put(StreamRuleImpl.FIELD_STREAM_ID, id)
.put(StreamRuleImpl.FIELD_DESCRIPTION, "Match all messages")
.build());
try { try {
streamService.save(stream); streamService.save(stream);
streamRuleService.save(streamRule);

LOG.info("Successfully created default stream: {}", stream.getTitle()); LOG.info("Successfully created default stream: {}", stream.getTitle());


clusterConfigService.write(DefaultStreamCreated.create());
clusterEventBus.post(StreamsChangedEvent.create(stream.getId())); clusterEventBus.post(StreamsChangedEvent.create(stream.getId()));
} catch (ValidationException e) { } catch (ValidationException e) {
LOG.error("Couldn't create default stream", e); LOG.error("Couldn't create default stream", e);
Expand All @@ -113,7 +96,12 @@ public boolean masterOnly() {


@Override @Override
public boolean startOnThisNode() { public boolean startOnThisNode() {
return clusterConfigService.get(DefaultStreamCreated.class) == null; try {
return streamService.load(Stream.DEFAULT_STREAM_ID) == null;
} catch (NotFoundException ignored) {
// if the stream cannot be found, recreate it
return true;
}
} }


@Override @Override
Expand Down
Expand Up @@ -14,17 +14,17 @@
* You should have received a copy of the GNU General Public License * You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>. * along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/ */
package org.graylog2.streams.config; package org.graylog2.plugin.streams;


import com.fasterxml.jackson.annotation.JsonAutoDetect; import java.lang.annotation.ElementType;
import com.fasterxml.jackson.annotation.JsonCreator; import java.lang.annotation.Retention;
import com.google.auto.value.AutoValue; import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;


@JsonAutoDetect import javax.inject.Qualifier;
@AutoValue
public abstract class DefaultStreamCreated { @Retention(RetentionPolicy.RUNTIME)
@JsonCreator @Target(ElementType.PARAMETER)
public static DefaultStreamCreated create() { @Qualifier
return new AutoValue_DefaultStreamCreated(); public @interface DefaultStream {
}
} }

0 comments on commit 13b23a3

Please sign in to comment.