From e6b859cf0565ce74fd535bfee9f452e0a1817e53 Mon Sep 17 00:00:00 2001 From: Bernd Ahlers Date: Fri, 22 Jul 2016 10:39:39 +0200 Subject: [PATCH] Switch message filters from polling to subscribing to change events (#2496) * Switch stream router from polling to subscription for updates Previously every StreamRouter instance was polling for stream changes every second. Now it's listening on the server event bus for updates. Stream related REST endpoints are now posting events when updating stream configuration. * Switch extractor filter from polling to subscription for updates Before this, every ExtractorFilter was using a cache which expired after 1 second for the extractors. So it was basically polling the extractors every second. * Switch static fields filter from polling to subscription for updates Before this, every StaticFieldFilter instance was using a cache which expired after 1 second. It was basically polling the static fields configuration every second. * Switch rules filter from polling to subscription for updates Before this, every RulesFilter instance was using a cache which expired after 1 second so it was basically polling for new rules every second. Fixes #2391 --- .../org/graylog2/filters/ExtractorFilter.java | 102 +++++++++++------- .../org/graylog2/filters/RulesFilter.java | 99 ++++++++--------- .../graylog2/filters/StaticFieldFilter.java | 83 +++++++++----- .../events/FilterDescriptionUpdateEvent.java | 36 +++++++ .../org/graylog2/inputs/InputServiceImpl.java | 5 + .../system/inputs/responses/InputUpdated.java | 34 ++++++ .../filters/BlacklistSourceResource.java | 10 +- .../resources/streams/StreamResource.java | 15 ++- .../streams/outputs/StreamOutputResource.java | 11 +- .../streams/rules/StreamRuleResource.java | 11 +- .../org/graylog2/streams/StreamRouter.java | 26 +++-- .../streams/events/StreamsChangedEvent.java | 41 +++++++ .../filters/StaticFieldFilterTest.java | 9 +- 13 files changed, 348 insertions(+), 134 deletions(-) create mode 100644 graylog2-server/src/main/java/org/graylog2/filters/events/FilterDescriptionUpdateEvent.java create mode 100644 graylog2-server/src/main/java/org/graylog2/rest/models/system/inputs/responses/InputUpdated.java create mode 100644 graylog2-server/src/main/java/org/graylog2/streams/events/StreamsChangedEvent.java diff --git a/graylog2-server/src/main/java/org/graylog2/filters/ExtractorFilter.java b/graylog2-server/src/main/java/org/graylog2/filters/ExtractorFilter.java index 771f7fd4f8bd..feee0eb8f659 100644 --- a/graylog2-server/src/main/java/org/graylog2/filters/ExtractorFilter.java +++ b/graylog2-server/src/main/java/org/graylog2/filters/ExtractorFilter.java @@ -16,39 +16,50 @@ */ package org.graylog2.filters; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.collect.Lists; +import com.google.common.collect.ImmutableList; +import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; import org.graylog2.database.NotFoundException; import org.graylog2.inputs.Input; import org.graylog2.inputs.InputService; import org.graylog2.plugin.Message; import org.graylog2.plugin.filters.MessageFilter; import org.graylog2.plugin.inputs.Extractor; +import org.graylog2.rest.models.system.inputs.responses.InputCreated; +import org.graylog2.rest.models.system.inputs.responses.InputDeleted; +import org.graylog2.rest.models.system.inputs.responses.InputUpdated; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.inject.Inject; +import javax.inject.Named; import java.util.Collections; -import java.util.Comparator; import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.stream.Collectors; public class ExtractorFilter implements MessageFilter { private static final Logger LOG = LoggerFactory.getLogger(ExtractorFilter.class); private static final String NAME = "Extractor"; - private Cache> cache = CacheBuilder.newBuilder() - .expireAfterWrite(1, TimeUnit.SECONDS) - .build(); + private final ConcurrentMap> extractors = new ConcurrentHashMap<>(); private final InputService inputService; + private final ScheduledExecutorService scheduler; @Inject - public ExtractorFilter(InputService inputService) { + public ExtractorFilter(InputService inputService, + EventBus serverEventBus, + @Named("daemonScheduler") ScheduledExecutorService scheduler) { this.inputService = inputService; + this.scheduler = scheduler; + + loadAllExtractors(); + + // TODO: This class needs lifecycle management to avoid leaking objects in the EventBus + serverEventBus.register(this); } @Override @@ -57,7 +68,7 @@ public boolean filter(Message msg) { return false; } - for (final Extractor extractor : loadExtractors(msg.getSourceInputId())) { + for (final Extractor extractor : extractors.getOrDefault(msg.getSourceInputId(), Collections.emptyList())) { try { extractor.runExtractor(msg); } catch (Exception e) { @@ -70,34 +81,47 @@ public boolean filter(Message msg) { return false; } - private List loadExtractors(final String inputId) { + @Subscribe + @SuppressWarnings("unused") + public void handleInputCreate(final InputCreated event) { + LOG.debug("Load extractors for input <{}>", event.id()); + scheduler.submit(() -> loadExtractors(event.id())); + } + + @Subscribe + @SuppressWarnings("unused") + public void handleInputDelete(final InputDeleted event) { + LOG.debug("Removing input from extractors cache <{}>", event.id()); + extractors.remove(event.id()); + } + + @Subscribe + @SuppressWarnings("unused") + public void handleInputUpdate(final InputUpdated event) { + scheduler.submit(() -> loadExtractors(event.id())); + } + + private void loadAllExtractors() { + try { + inputService.all().forEach(input -> loadExtractors(input.getId())); + } catch (Exception e) { + LOG.error("Unable to load extractors for all inputs", e); + } + } + + private void loadExtractors(final String inputId) { + LOG.debug("Re-loading extractors for input <{}>", inputId); + try { - return cache.get(inputId, new Callable>() { - @Override - public List call() throws Exception { - LOG.debug("Re-loading extractors for input <{}> into cache.", inputId); - - try { - Input input = inputService.find(inputId); - - List sorted = Lists.newArrayList(inputService.getExtractors(input)); - - Collections.sort(sorted, new Comparator() { - public int compare(Extractor e1, Extractor e2) { - return e1.getOrder().intValue() - e2.getOrder().intValue(); - } - }); - - return sorted; - } catch (NotFoundException e) { - LOG.warn("Unable to load input: {}", e.getMessage()); - return Collections.emptyList(); - } - } - }); - } catch (ExecutionException e) { - LOG.error("Could not load extractors into cache. Returning empty list.", e); - return Collections.emptyList(); + final Input input = inputService.find(inputId); + final List sortedExtractors = inputService.getExtractors(input) + .stream() + .sorted((e1, e2) -> e1.getOrder().intValue() - e2.getOrder().intValue()) + .collect(Collectors.toList()); + + extractors.put(inputId, ImmutableList.copyOf(sortedExtractors)); + } catch (NotFoundException e) { + LOG.warn("Unable to load input <{}>: {}", inputId, e.getMessage()); } } diff --git a/graylog2-server/src/main/java/org/graylog2/filters/RulesFilter.java b/graylog2-server/src/main/java/org/graylog2/filters/RulesFilter.java index 8d06bae33a96..2e028569b28d 100644 --- a/graylog2-server/src/main/java/org/graylog2/filters/RulesFilter.java +++ b/graylog2-server/src/main/java/org/graylog2/filters/RulesFilter.java @@ -16,10 +16,10 @@ */ package org.graylog2.filters; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.collect.Sets; -import org.graylog2.filters.blacklist.FilterDescription; +import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; +import org.graylog2.database.NotFoundException; +import org.graylog2.filters.events.FilterDescriptionUpdateEvent; import org.graylog2.plugin.Message; import org.graylog2.plugin.RulesEngine; import org.graylog2.plugin.filters.MessageFilter; @@ -27,10 +27,9 @@ import org.slf4j.LoggerFactory; import javax.inject.Inject; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; +import javax.inject.Named; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicReference; /** * @author Lennart Koopmann @@ -38,68 +37,58 @@ public class RulesFilter implements MessageFilter { private static final Logger LOG = LoggerFactory.getLogger(RulesFilter.class); + private final RulesEngine rulesEngine; private final FilterService filterService; - private final RulesEngine.RulesSession privateSession; - private final Cache> cache; - private Set currentFilterSet; + private final ScheduledExecutorService scheduler; + private final AtomicReference privateSession = new AtomicReference<>(null); @Inject - public RulesFilter(RulesEngine rulesEngine, final FilterService filterService) { + public RulesFilter(final RulesEngine rulesEngine, + final FilterService filterService, + final EventBus serverEventBus, + @Named("daemonScheduler") ScheduledExecutorService scheduler) { + this.rulesEngine = rulesEngine; this.filterService = filterService; + this.scheduler = scheduler; - currentFilterSet = Sets.newHashSet(); - cache = CacheBuilder.newBuilder() - .expireAfterWrite(1, TimeUnit.SECONDS) - .build(); - privateSession = rulesEngine.createPrivateSession(); + loadRules(); + + // TODO: This class needs lifecycle management to avoid leaking objects in the EventBus + serverEventBus.register(this); } @Override public boolean filter(Message msg) { - final Set filters; - try { - filters = cache.get("filters", new Callable>() { - @Override - public Set call() throws Exception { - // TODO this should be improved by computing the difference between the filter sets - // most of the time nothing changes at all. - final Set newFilters = filterService.loadAll(); - final Sets.SetView difference = - Sets.symmetricDifference(currentFilterSet, newFilters); - if (difference.isEmpty()) { - // there wasn't any change, simply return the current filter set - LOG.debug("Filter sets are identical, not updating rules engine."); - return currentFilterSet; - } - - // something changed, we simply update everything, not trying to do the minimal changes yet - // should this become too expensive we need to do something smarter - LOG.debug("Updating rules engine, filter sets differ: {}", difference); - // retract all current filter facts - for (FilterDescription filterDescription : currentFilterSet) { - privateSession.deleteFact(filterDescription); - } - // state all new filter facts - for (FilterDescription filterDescription : newFilters) { - privateSession.insertFact(filterDescription); - } - currentFilterSet.clear(); - // remember currently stated facts - currentFilterSet.addAll(newFilters); - return currentFilterSet; - } - }); - } catch (ExecutionException ignored) { - return false; - } - // Always run the rules engine to make sure rules from the external rules file will be run. - privateSession.evaluate(msg, true); + privateSession.get().evaluate(msg, true); // false if not explicitly set to true in the rules. return msg.getFilterOut(); } + @Subscribe + @SuppressWarnings("unused") + public void handleRulesUpdate(FilterDescriptionUpdateEvent ignored) { + LOG.debug("Updating filter descriptions: {}", ignored); + scheduler.submit(this::loadRules); + } + + private void loadRules() { + LOG.debug("Loading rule filters"); + try { + final RulesEngine.RulesSession newSession = rulesEngine.createPrivateSession(); + + filterService.loadAll().forEach(filterDescription -> { + LOG.debug("Insert filter description: {}", filterDescription); + newSession.insertFact(filterDescription); + }); + + privateSession.set(newSession); + } catch (NotFoundException e) { + LOG.error("No filters found", e); + } + } + @Override public String getName() { return "Rulesfilter"; diff --git a/graylog2-server/src/main/java/org/graylog2/filters/StaticFieldFilter.java b/graylog2-server/src/main/java/org/graylog2/filters/StaticFieldFilter.java index e8f884b770e9..afedd4847026 100644 --- a/graylog2-server/src/main/java/org/graylog2/filters/StaticFieldFilter.java +++ b/graylog2-server/src/main/java/org/graylog2/filters/StaticFieldFilter.java @@ -16,23 +16,28 @@ */ package org.graylog2.filters; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; +import com.google.common.collect.ImmutableList; +import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; import org.graylog2.database.NotFoundException; import org.graylog2.inputs.Input; import org.graylog2.inputs.InputService; import org.graylog2.plugin.Message; import org.graylog2.plugin.filters.MessageFilter; +import org.graylog2.rest.models.system.inputs.responses.InputCreated; +import org.graylog2.rest.models.system.inputs.responses.InputDeleted; +import org.graylog2.rest.models.system.inputs.responses.InputUpdated; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.inject.Inject; +import javax.inject.Named; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; /** * @author Lennart Koopmann @@ -43,15 +48,22 @@ public class StaticFieldFilter implements MessageFilter { private static final String NAME = "Static field appender"; - private final Cache>> cache = CacheBuilder.newBuilder() - .expireAfterWrite(1, TimeUnit.SECONDS) - .build(); + private final ConcurrentMap>> staticFields = new ConcurrentHashMap<>(); private final InputService inputService; + private final ScheduledExecutorService scheduler; @Inject - public StaticFieldFilter(InputService inputService) { + public StaticFieldFilter(InputService inputService, + EventBus serverEventBus, + @Named("daemonScheduler") ScheduledExecutorService scheduler) { this.inputService = inputService; + this.scheduler = scheduler; + + loadAllStaticFields(); + + // TODO: This class needs lifecycle management to avoid leaking objects in the EventBus + serverEventBus.register(this); } @Override @@ -59,7 +71,7 @@ public boolean filter(Message msg) { if (msg.getSourceInputId() == null) return false; - for(final Map.Entry field : loadStaticFields(msg.getSourceInputId())) { + for(final Map.Entry field : staticFields.getOrDefault(msg.getSourceInputId(), Collections.emptyList())) { if(!msg.hasField(field.getKey())) { msg.addField(field.getKey(), field.getValue()); } else { @@ -70,24 +82,41 @@ public boolean filter(Message msg) { return false; } - private List> loadStaticFields(final String inputId) { + @Subscribe + @SuppressWarnings("unused") + public void handleInputCreate(final InputCreated event) { + LOG.debug("Load static fields for input <{}>", event.id()); + scheduler.submit(() -> loadStaticFields(event.id())); + } + + @Subscribe + @SuppressWarnings("unused") + public void handleInputDelete(final InputDeleted event) { + LOG.debug("Removing input from static fields cache <{}>", event.id()); + staticFields.remove(event.id()); + } + + @Subscribe + @SuppressWarnings("unused") + public void handleInputUpdate(final InputUpdated event) { + scheduler.submit(() -> loadStaticFields(event.id())); + } + + private void loadAllStaticFields() { + try { + inputService.all().forEach(input -> loadStaticFields(input.getId())); + } catch (Exception e) { + LOG.error("Unable to load static fields for all inputs", e); + } + } + + private void loadStaticFields(final String inputId) { + LOG.debug("Re-loading static fields for input <{}> into cache.", inputId); try { - return cache.get(inputId, new Callable>>() { - @Override - public List> call() throws Exception { - LOG.debug("Re-loading static fields for input <{}> into cache.", inputId); - try { - final Input input = inputService.find(inputId); - return inputService.getStaticFields(input); - } catch (NotFoundException e) { - LOG.warn("Unable to load input: {}", e.getMessage()); - return Collections.emptyList(); - } - } - }); - } catch (ExecutionException e) { - LOG.error("Could not load static fields into cache. Returning empty list.", e); - return Collections.emptyList(); + final Input input = inputService.find(inputId); + staticFields.put(inputId, ImmutableList.copyOf(inputService.getStaticFields(input))); + } catch (NotFoundException e) { + LOG.warn("Unable to load input: {}", e.getMessage()); } } diff --git a/graylog2-server/src/main/java/org/graylog2/filters/events/FilterDescriptionUpdateEvent.java b/graylog2-server/src/main/java/org/graylog2/filters/events/FilterDescriptionUpdateEvent.java new file mode 100644 index 000000000000..8b9d6566ec49 --- /dev/null +++ b/graylog2-server/src/main/java/org/graylog2/filters/events/FilterDescriptionUpdateEvent.java @@ -0,0 +1,36 @@ +/** + * This file is part of Graylog. + * + * Graylog is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Graylog is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Graylog. If not, see . + */ +package org.graylog2.filters.events; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.auto.value.AutoValue; + +@JsonAutoDetect +@AutoValue +public abstract class FilterDescriptionUpdateEvent { + private static final String FIELD_ID = "id"; + + @JsonProperty(FIELD_ID) + public abstract String id(); + + @JsonCreator + public static FilterDescriptionUpdateEvent create(@JsonProperty(FIELD_ID) String id) { + return new AutoValue_FilterDescriptionUpdateEvent(id); + } +} \ No newline at end of file diff --git a/graylog2-server/src/main/java/org/graylog2/inputs/InputServiceImpl.java b/graylog2-server/src/main/java/org/graylog2/inputs/InputServiceImpl.java index a1a1ea9a9072..9607a4068519 100644 --- a/graylog2-server/src/main/java/org/graylog2/inputs/InputServiceImpl.java +++ b/graylog2-server/src/main/java/org/graylog2/inputs/InputServiceImpl.java @@ -33,6 +33,7 @@ import org.graylog2.database.PersistedServiceImpl; import org.graylog2.events.ClusterEventBus; import org.graylog2.inputs.converters.ConverterFactory; +import org.graylog2.rest.models.system.inputs.responses.InputUpdated; import org.graylog2.inputs.extractors.ExtractorFactory; import org.graylog2.plugin.configuration.Configuration; import org.graylog2.plugin.database.EmbeddedPersistable; @@ -184,6 +185,7 @@ public Input findForThisNode(String nodeId, String id) throws NotFoundException, @Override public void addExtractor(Input input, Extractor extractor) throws ValidationException { embed(input, InputImpl.EMBEDDED_EXTRACTORS, extractor); + publishChange(InputUpdated.create(input.getId())); } @Override @@ -198,6 +200,7 @@ public Map getPersistedFields() { }; embed(input, InputImpl.EMBEDDED_STATIC_FIELDS, obj); + publishChange(InputUpdated.create(input.getId())); } @Override @@ -309,11 +312,13 @@ private List getConvertersOfExtractor(DBObject extractor) { @Override public void removeExtractor(Input input, String extractorId) { removeEmbedded(input, InputImpl.EMBEDDED_EXTRACTORS, extractorId); + publishChange(InputUpdated.create(input.getId())); } @Override public void removeStaticField(Input input, String key) { removeEmbedded(input, InputImpl.FIELD_STATIC_FIELD_KEY, InputImpl.EMBEDDED_STATIC_FIELDS, key); + publishChange(InputUpdated.create(input.getId())); } @Override diff --git a/graylog2-server/src/main/java/org/graylog2/rest/models/system/inputs/responses/InputUpdated.java b/graylog2-server/src/main/java/org/graylog2/rest/models/system/inputs/responses/InputUpdated.java new file mode 100644 index 000000000000..b743098ce80c --- /dev/null +++ b/graylog2-server/src/main/java/org/graylog2/rest/models/system/inputs/responses/InputUpdated.java @@ -0,0 +1,34 @@ +/** + * This file is part of Graylog. + * + * Graylog is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Graylog is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Graylog. If not, see . + */ +package org.graylog2.rest.models.system.inputs.responses; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.auto.value.AutoValue; + +@JsonAutoDetect +@AutoValue +public abstract class InputUpdated { + @JsonProperty + public abstract String id(); + + @JsonCreator + public static InputUpdated create(@JsonProperty("id") String inputId) { + return new AutoValue_InputUpdated(inputId); + } +} \ No newline at end of file diff --git a/graylog2-server/src/main/java/org/graylog2/rest/resources/filters/BlacklistSourceResource.java b/graylog2-server/src/main/java/org/graylog2/rest/resources/filters/BlacklistSourceResource.java index 17c089158e15..685d395dfa69 100644 --- a/graylog2-server/src/main/java/org/graylog2/rest/resources/filters/BlacklistSourceResource.java +++ b/graylog2-server/src/main/java/org/graylog2/rest/resources/filters/BlacklistSourceResource.java @@ -22,6 +22,8 @@ import io.swagger.annotations.ApiParam; import org.apache.shiro.authz.annotation.RequiresAuthentication; import org.graylog2.auditlog.jersey.AuditLog; +import org.graylog2.events.ClusterEventBus; +import org.graylog2.filters.events.FilterDescriptionUpdateEvent; import org.graylog2.plugin.database.ValidationException; import org.graylog2.filters.FilterService; import org.graylog2.filters.blacklist.FilterDescription; @@ -58,10 +60,12 @@ public class BlacklistSourceResource extends RestResource { private static final Logger LOG = LoggerFactory.getLogger(BlacklistSourceResource.class); private FilterService filterService; + private final ClusterEventBus clusterEventBus; @Inject - public BlacklistSourceResource(FilterService filterService) { + public BlacklistSourceResource(FilterService filterService, ClusterEventBus clusterEventBus) { this.filterService = filterService; + this.clusterEventBus = clusterEventBus; } @POST @@ -84,6 +88,8 @@ public Response create(@ApiParam(name = "filterEntry", required = true) final FilterDescription savedFilter = filterService.save(filterDescription); + clusterEventBus.post(FilterDescriptionUpdateEvent.create(savedFilter._id.toHexString())); + final URI filterUri = getUriBuilderToSelf().path(BlacklistSourceResource.class) .path("{filterId}") .build(savedFilter._id); @@ -142,6 +148,7 @@ public void update(@ApiParam(name = "filterId", required = true) } filterService.save(filter); + clusterEventBus.post(FilterDescriptionUpdateEvent.create(filterId)); } @DELETE @@ -154,5 +161,6 @@ public void delete(@ApiParam(name = "filterId", required = true) if (filterService.delete(filterId) == 0) { throw new NotFoundException("Couldn't find filter with ID "+ filterId); } + clusterEventBus.post(FilterDescriptionUpdateEvent.create(filterId)); } } diff --git a/graylog2-server/src/main/java/org/graylog2/rest/resources/streams/StreamResource.java b/graylog2-server/src/main/java/org/graylog2/rest/resources/streams/StreamResource.java index 2dbb28a8af3d..00e8a5e51580 100644 --- a/graylog2-server/src/main/java/org/graylog2/rest/resources/streams/StreamResource.java +++ b/graylog2-server/src/main/java/org/graylog2/rest/resources/streams/StreamResource.java @@ -35,6 +35,7 @@ import org.graylog2.auditlog.Actions; import org.graylog2.auditlog.jersey.AuditLog; import org.graylog2.database.NotFoundException; +import org.graylog2.events.ClusterEventBus; import org.graylog2.plugin.Message; import org.graylog2.plugin.Tools; import org.graylog2.plugin.alarms.AlertCondition; @@ -59,6 +60,7 @@ import org.graylog2.streams.StreamRouterEngine; import org.graylog2.streams.StreamRuleService; import org.graylog2.streams.StreamService; +import org.graylog2.streams.events.StreamsChangedEvent; import org.hibernate.validator.constraints.NotEmpty; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -104,18 +106,21 @@ public class StreamResource extends RestResource { private final StreamRouterEngine.Factory streamRouterEngineFactory; private final AlarmCallbackConfigurationService alarmCallbackConfigurationService; private final AlertService alertService; + private final ClusterEventBus clusterEventBus; @Inject public StreamResource(StreamService streamService, StreamRuleService streamRuleService, StreamRouterEngine.Factory streamRouterEngineFactory, AlarmCallbackConfigurationService alarmCallbackConfigurationService, - AlertService alertService) { + AlertService alertService, + ClusterEventBus clusterEventBus) { this.streamService = streamService; this.streamRuleService = streamRuleService; this.streamRouterEngineFactory = streamRouterEngineFactory; this.alarmCallbackConfigurationService = alarmCallbackConfigurationService; this.alertService = alertService; + this.clusterEventBus = clusterEventBus; } @POST @@ -138,6 +143,8 @@ public Response create(@ApiParam(name = "JSON body", required = true) final Crea streamRuleService.save(streamRule); } + clusterEventBus.post(StreamsChangedEvent.create(stream.getId())); + final Map result = ImmutableMap.of("stream_id", id); final URI streamUri = getUriBuilderToSelf().path(StreamResource.class) .path("{streamId}") @@ -231,6 +238,7 @@ public StreamResponse update(@ApiParam(name = "streamId", required = true) } streamService.save(stream); + clusterEventBus.post(StreamsChangedEvent.create(stream.getId())); return streamToResponse(stream); } @@ -249,6 +257,7 @@ public void delete(@ApiParam(name = "streamId", required = true) @PathParam("str final Stream stream = streamService.load(streamId); streamService.destroy(stream); + clusterEventBus.post(StreamsChangedEvent.create(stream.getId())); } @POST @@ -266,6 +275,7 @@ public void pause(@ApiParam(name = "streamId", required = true) final Stream stream = streamService.load(streamId); streamService.pause(stream); + clusterEventBus.post(StreamsChangedEvent.create(stream.getId())); } @POST @@ -283,6 +293,7 @@ public void resume(@ApiParam(name = "streamId", required = true) final Stream stream = streamService.load(streamId); streamService.resume(stream); + clusterEventBus.post(StreamsChangedEvent.create(stream.getId())); } @POST @@ -389,6 +400,8 @@ public Response cloneStream(@ApiParam(name = "streamId", required = true) @PathP streamService.addOutput(stream, output); } + clusterEventBus.post(StreamsChangedEvent.create(stream.getId())); + final Map result = ImmutableMap.of("stream_id", id); final URI streamUri = getUriBuilderToSelf().path(StreamResource.class) .path("{streamId}") diff --git a/graylog2-server/src/main/java/org/graylog2/rest/resources/streams/outputs/StreamOutputResource.java b/graylog2-server/src/main/java/org/graylog2/rest/resources/streams/outputs/StreamOutputResource.java index dfacac6e6a84..666bf91dcfda 100644 --- a/graylog2-server/src/main/java/org/graylog2/rest/resources/streams/outputs/StreamOutputResource.java +++ b/graylog2-server/src/main/java/org/graylog2/rest/resources/streams/outputs/StreamOutputResource.java @@ -26,6 +26,7 @@ import org.apache.shiro.authz.annotation.RequiresPermissions; import org.graylog2.auditlog.jersey.AuditLog; import org.graylog2.database.NotFoundException; +import org.graylog2.events.ClusterEventBus; import org.graylog2.outputs.OutputRegistry; import org.graylog2.plugin.database.ValidationException; import org.graylog2.plugin.streams.Output; @@ -37,6 +38,7 @@ import org.graylog2.shared.security.RestPermissions; import org.graylog2.streams.OutputService; import org.graylog2.streams.StreamService; +import org.graylog2.streams.events.StreamsChangedEvent; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,12 +68,17 @@ public class StreamOutputResource extends RestResource { private final OutputService outputService; private final StreamService streamService; private final OutputRegistry outputRegistry; + private final ClusterEventBus clusterEventBus; @Inject - public StreamOutputResource(OutputService outputService, StreamService streamService, OutputRegistry outputRegistry) { + public StreamOutputResource(OutputService outputService, + StreamService streamService, + OutputRegistry outputRegistry, + ClusterEventBus clusterEventBus) { this.outputService = outputService; this.streamService = streamService; this.outputRegistry = outputRegistry; + this.clusterEventBus = clusterEventBus; } @GET @@ -143,6 +150,7 @@ public Response add(@ApiParam(name = "streamid", value = "The id of the stream w for (String outputId : aor.outputs()) { final Output output = outputService.load(outputId); streamService.addOutput(stream, output); + clusterEventBus.post(StreamsChangedEvent.create(stream.getId())); } return Response.accepted().build(); @@ -167,5 +175,6 @@ public void remove(@ApiParam(name = "streamid", value = "The id of the stream wh streamService.removeOutput(stream, output); outputRegistry.removeOutput(output); + clusterEventBus.post(StreamsChangedEvent.create(stream.getId())); } } diff --git a/graylog2-server/src/main/java/org/graylog2/rest/resources/streams/rules/StreamRuleResource.java b/graylog2-server/src/main/java/org/graylog2/rest/resources/streams/rules/StreamRuleResource.java index e3b2dfd9296a..461324cb1d6b 100644 --- a/graylog2-server/src/main/java/org/graylog2/rest/resources/streams/rules/StreamRuleResource.java +++ b/graylog2-server/src/main/java/org/graylog2/rest/resources/streams/rules/StreamRuleResource.java @@ -25,6 +25,7 @@ import org.apache.shiro.authz.annotation.RequiresAuthentication; import org.graylog2.auditlog.jersey.AuditLog; import org.graylog2.database.NotFoundException; +import org.graylog2.events.ClusterEventBus; import org.graylog2.plugin.database.ValidationException; import org.graylog2.plugin.streams.Stream; import org.graylog2.plugin.streams.StreamRule; @@ -37,6 +38,7 @@ import org.graylog2.shared.security.RestPermissions; import org.graylog2.streams.StreamRuleService; import org.graylog2.streams.StreamService; +import org.graylog2.streams.events.StreamsChangedEvent; import org.hibernate.validator.constraints.NotEmpty; import javax.inject.Inject; @@ -63,12 +65,15 @@ public class StreamRuleResource extends RestResource { private final StreamRuleService streamRuleService; private final StreamService streamService; + private final ClusterEventBus clusterEventBus; @Inject public StreamRuleResource(StreamRuleService streamRuleService, - StreamService streamService) { + StreamService streamService, + ClusterEventBus clusterEventBus) { this.streamRuleService = streamRuleService; this.streamService = streamService; + this.clusterEventBus = clusterEventBus; } @POST @@ -87,6 +92,8 @@ public Response create(@ApiParam(name = "streamid", value = "The stream id this final StreamRule streamRule = streamRuleService.create(streamId, cr); final String id = streamService.save(streamRule); + clusterEventBus.post(StreamsChangedEvent.create(stream.getId())); + final SingleStreamRuleSummaryResponse response = SingleStreamRuleSummaryResponse.create(id); final URI streamRuleUri = getUriBuilderToSelf().path(StreamRuleResource.class) @@ -134,6 +141,7 @@ public SingleStreamRuleSummaryResponse update(@ApiParam(name = "streamid", value streamRule.setDescription(cr.description()); streamRuleService.save(streamRule); + clusterEventBus.post(StreamsChangedEvent.create(streamid)); return SingleStreamRuleSummaryResponse.create(streamRule.getId()); } @@ -196,6 +204,7 @@ public void delete(@ApiParam(name = "streamid", value = "The stream id this new final StreamRule streamRule = streamRuleService.load(streamRuleId); if (streamRule.getStreamId().equals(streamid)) { streamRuleService.destroy(streamRule); + clusterEventBus.post(StreamsChangedEvent.create(streamid)); } else { throw new NotFoundException("Couldn't delete stream rule " + streamRuleId + "in stream " + streamid); } diff --git a/graylog2-server/src/main/java/org/graylog2/streams/StreamRouter.java b/graylog2-server/src/main/java/org/graylog2/streams/StreamRouter.java index 9beae52eab64..9b64bed16c0b 100644 --- a/graylog2-server/src/main/java/org/graylog2/streams/StreamRouter.java +++ b/graylog2-server/src/main/java/org/graylog2/streams/StreamRouter.java @@ -16,21 +16,23 @@ */ package org.graylog2.streams; +import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import javax.inject.Named; import org.graylog2.plugin.Message; import org.graylog2.plugin.ServerStatus; import org.graylog2.plugin.streams.Stream; +import org.graylog2.streams.events.StreamsChangedEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.inject.Inject; +import javax.inject.Named; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; /** @@ -39,24 +41,34 @@ public class StreamRouter { private static final Logger LOG = LoggerFactory.getLogger(StreamRouter.class); - private static final long ENGINE_UPDATE_INTERVAL = 1L; - protected final StreamService streamService; private final ServerStatus serverStatus; + private final ScheduledExecutorService scheduler; private final AtomicReference routerEngine = new AtomicReference<>(null); + private final StreamRouterEngineUpdater engineUpdater; @Inject public StreamRouter(StreamService streamService, ServerStatus serverStatus, StreamRouterEngine.Factory routerEngineFactory, + EventBus serverEventBus, @Named("daemonScheduler") ScheduledExecutorService scheduler) { this.streamService = streamService; this.serverStatus = serverStatus; + this.scheduler = scheduler; + + this.engineUpdater = new StreamRouterEngineUpdater(routerEngine, routerEngineFactory, streamService, executorService()); + this.routerEngine.set(engineUpdater.getNewEngine()); + + // TODO: This class needs lifecycle management to avoid leaking objects in the EventBus + serverEventBus.register(this); + } - final StreamRouterEngineUpdater streamRouterEngineUpdater = new StreamRouterEngineUpdater(routerEngine, routerEngineFactory, streamService, executorService()); - this.routerEngine.set(streamRouterEngineUpdater.getNewEngine()); - scheduler.scheduleAtFixedRate(streamRouterEngineUpdater, 0, ENGINE_UPDATE_INTERVAL, TimeUnit.SECONDS); + @Subscribe + @SuppressWarnings("unused") + public void handleStreamsUpdate(StreamsChangedEvent event) { + scheduler.submit(engineUpdater); } private ExecutorService executorService() { diff --git a/graylog2-server/src/main/java/org/graylog2/streams/events/StreamsChangedEvent.java b/graylog2-server/src/main/java/org/graylog2/streams/events/StreamsChangedEvent.java new file mode 100644 index 000000000000..3ca3aecc5aa1 --- /dev/null +++ b/graylog2-server/src/main/java/org/graylog2/streams/events/StreamsChangedEvent.java @@ -0,0 +1,41 @@ +/** + * This file is part of Graylog. + * + * Graylog is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Graylog is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Graylog. If not, see . + */ +package org.graylog2.streams.events; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.auto.value.AutoValue; +import com.google.common.collect.ImmutableSet; + +@JsonAutoDetect +@AutoValue +public abstract class StreamsChangedEvent { + private static final String FIELD_STREAM_IDS = "stream_ids"; + + @JsonProperty(FIELD_STREAM_IDS) + public abstract ImmutableSet streamIds(); + + @JsonCreator + public static StreamsChangedEvent create(@JsonProperty(FIELD_STREAM_IDS) ImmutableSet streamIds) { + return new AutoValue_StreamsChangedEvent(streamIds); + } + + public static StreamsChangedEvent create(String streamId) { + return create(ImmutableSet.of(streamId)); + } +} \ No newline at end of file diff --git a/graylog2-server/src/test/java/org/graylog2/filters/StaticFieldFilterTest.java b/graylog2-server/src/test/java/org/graylog2/filters/StaticFieldFilterTest.java index 787940943477..936f4fde79f1 100644 --- a/graylog2-server/src/test/java/org/graylog2/filters/StaticFieldFilterTest.java +++ b/graylog2-server/src/test/java/org/graylog2/filters/StaticFieldFilterTest.java @@ -18,6 +18,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.eventbus.EventBus; import org.graylog2.inputs.Input; import org.graylog2.inputs.InputService; import org.graylog2.plugin.Message; @@ -27,6 +28,8 @@ import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; +import java.util.concurrent.Executors; + import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.when; @@ -43,11 +46,13 @@ public void testFilter() throws Exception { Message msg = new Message("hello", "junit", Tools.nowUTC()); msg.setSourceInputId("someid"); + when(input.getId()).thenReturn("someid"); + when(inputService.all()).thenReturn(Lists.newArrayList(input)); when(inputService.find(eq("someid"))).thenReturn(input); when(inputService.getStaticFields(eq(input))) .thenReturn(Lists.newArrayList(Maps.immutableEntry("foo", "bar"))); - final StaticFieldFilter filter = new StaticFieldFilter(inputService); + final StaticFieldFilter filter = new StaticFieldFilter(inputService, new EventBus(), Executors.newSingleThreadScheduledExecutor()); filter.filter(msg); assertEquals("hello", msg.getMessage()); @@ -64,7 +69,7 @@ public void testFilterIsNotOverwritingExistingKeys() throws Exception { when(inputService.getStaticFields(eq(input))) .thenReturn(Lists.newArrayList(Maps.immutableEntry("foo", "bar"))); - final StaticFieldFilter filter = new StaticFieldFilter(inputService); + final StaticFieldFilter filter = new StaticFieldFilter(inputService, new EventBus(), Executors.newSingleThreadScheduledExecutor()); filter.filter(msg); assertEquals("hello", msg.getMessage());