Skip to content

Commit

Permalink
Switch message filters from polling to subscribing to change events (#…
Browse files Browse the repository at this point in the history
…2496)

* Switch stream router from polling to subscription for updates

Previously every StreamRouter instance was polling for stream changes
every second. Now it's listening on the server event bus for updates.

Stream related REST endpoints are now posting events when updating
stream configuration.

* Switch extractor filter from polling to subscription for updates

Before this, every ExtractorFilter was using a cache which expired after
1 second for the extractors. So it was basically polling the extractors
every second.

* Switch static fields filter from polling to subscription for updates

Before this, every StaticFieldFilter instance was using a cache which
expired after 1 second. It was basically polling the static fields
configuration every second.

* Switch rules filter from polling to subscription for updates

Before this, every RulesFilter instance was using a cache which expired
after 1 second so it was basically polling for new rules every second.

Fixes #2391
  • Loading branch information
bernd authored and joschi committed Jul 22, 2016
1 parent cc5902b commit e6b859c
Show file tree
Hide file tree
Showing 13 changed files with 348 additions and 134 deletions.
Expand Up @@ -16,39 +16,50 @@
*/ */
package org.graylog2.filters; package org.graylog2.filters;


import com.google.common.cache.Cache; import com.google.common.collect.ImmutableList;
import com.google.common.cache.CacheBuilder; import com.google.common.eventbus.EventBus;
import com.google.common.collect.Lists; import com.google.common.eventbus.Subscribe;
import org.graylog2.database.NotFoundException; import org.graylog2.database.NotFoundException;
import org.graylog2.inputs.Input; import org.graylog2.inputs.Input;
import org.graylog2.inputs.InputService; import org.graylog2.inputs.InputService;
import org.graylog2.plugin.Message; import org.graylog2.plugin.Message;
import org.graylog2.plugin.filters.MessageFilter; import org.graylog2.plugin.filters.MessageFilter;
import org.graylog2.plugin.inputs.Extractor; import org.graylog2.plugin.inputs.Extractor;
import org.graylog2.rest.models.system.inputs.responses.InputCreated;
import org.graylog2.rest.models.system.inputs.responses.InputDeleted;
import org.graylog2.rest.models.system.inputs.responses.InputUpdated;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import javax.inject.Inject; import javax.inject.Inject;
import javax.inject.Named;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;


public class ExtractorFilter implements MessageFilter { public class ExtractorFilter implements MessageFilter {
private static final Logger LOG = LoggerFactory.getLogger(ExtractorFilter.class); private static final Logger LOG = LoggerFactory.getLogger(ExtractorFilter.class);
private static final String NAME = "Extractor"; private static final String NAME = "Extractor";


private Cache<String, List<Extractor>> cache = CacheBuilder.newBuilder() private final ConcurrentMap<String, List<Extractor>> extractors = new ConcurrentHashMap<>();
.expireAfterWrite(1, TimeUnit.SECONDS)
.build();


private final InputService inputService; private final InputService inputService;
private final ScheduledExecutorService scheduler;


@Inject @Inject
public ExtractorFilter(InputService inputService) { public ExtractorFilter(InputService inputService,
EventBus serverEventBus,
@Named("daemonScheduler") ScheduledExecutorService scheduler) {
this.inputService = inputService; this.inputService = inputService;
this.scheduler = scheduler;

loadAllExtractors();

// TODO: This class needs lifecycle management to avoid leaking objects in the EventBus
serverEventBus.register(this);
} }


@Override @Override
Expand All @@ -57,7 +68,7 @@ public boolean filter(Message msg) {
return false; return false;
} }


for (final Extractor extractor : loadExtractors(msg.getSourceInputId())) { for (final Extractor extractor : extractors.getOrDefault(msg.getSourceInputId(), Collections.emptyList())) {
try { try {
extractor.runExtractor(msg); extractor.runExtractor(msg);
} catch (Exception e) { } catch (Exception e) {
Expand All @@ -70,34 +81,47 @@ public boolean filter(Message msg) {
return false; return false;
} }


private List<Extractor> loadExtractors(final String inputId) { @Subscribe
@SuppressWarnings("unused")
public void handleInputCreate(final InputCreated event) {
LOG.debug("Load extractors for input <{}>", event.id());
scheduler.submit(() -> loadExtractors(event.id()));
}

@Subscribe
@SuppressWarnings("unused")
public void handleInputDelete(final InputDeleted event) {
LOG.debug("Removing input from extractors cache <{}>", event.id());
extractors.remove(event.id());
}

@Subscribe
@SuppressWarnings("unused")
public void handleInputUpdate(final InputUpdated event) {
scheduler.submit(() -> loadExtractors(event.id()));
}

private void loadAllExtractors() {
try {
inputService.all().forEach(input -> loadExtractors(input.getId()));
} catch (Exception e) {
LOG.error("Unable to load extractors for all inputs", e);
}
}

private void loadExtractors(final String inputId) {
LOG.debug("Re-loading extractors for input <{}>", inputId);

try { try {
return cache.get(inputId, new Callable<List<Extractor>>() { final Input input = inputService.find(inputId);
@Override final List<Extractor> sortedExtractors = inputService.getExtractors(input)
public List<Extractor> call() throws Exception { .stream()
LOG.debug("Re-loading extractors for input <{}> into cache.", inputId); .sorted((e1, e2) -> e1.getOrder().intValue() - e2.getOrder().intValue())

.collect(Collectors.toList());
try {
Input input = inputService.find(inputId); extractors.put(inputId, ImmutableList.copyOf(sortedExtractors));

} catch (NotFoundException e) {
List<Extractor> sorted = Lists.newArrayList(inputService.getExtractors(input)); LOG.warn("Unable to load input <{}>: {}", inputId, e.getMessage());

Collections.sort(sorted, new Comparator<Extractor>() {
public int compare(Extractor e1, Extractor e2) {
return e1.getOrder().intValue() - e2.getOrder().intValue();
}
});

return sorted;
} catch (NotFoundException e) {
LOG.warn("Unable to load input: {}", e.getMessage());
return Collections.emptyList();
}
}
});
} catch (ExecutionException e) {
LOG.error("Could not load extractors into cache. Returning empty list.", e);
return Collections.emptyList();
} }
} }


Expand Down
99 changes: 44 additions & 55 deletions graylog2-server/src/main/java/org/graylog2/filters/RulesFilter.java
Expand Up @@ -16,90 +16,79 @@
*/ */
package org.graylog2.filters; package org.graylog2.filters;


import com.google.common.cache.Cache; import com.google.common.eventbus.EventBus;
import com.google.common.cache.CacheBuilder; import com.google.common.eventbus.Subscribe;
import com.google.common.collect.Sets; import org.graylog2.database.NotFoundException;
import org.graylog2.filters.blacklist.FilterDescription; import org.graylog2.filters.events.FilterDescriptionUpdateEvent;
import org.graylog2.plugin.Message; import org.graylog2.plugin.Message;
import org.graylog2.plugin.RulesEngine; import org.graylog2.plugin.RulesEngine;
import org.graylog2.plugin.filters.MessageFilter; import org.graylog2.plugin.filters.MessageFilter;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import javax.inject.Inject; import javax.inject.Inject;
import java.util.Set; import javax.inject.Named;
import java.util.concurrent.Callable; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.TimeUnit;


/** /**
* @author Lennart Koopmann <lennart@socketfeed.com> * @author Lennart Koopmann <lennart@socketfeed.com>
*/ */
public class RulesFilter implements MessageFilter { public class RulesFilter implements MessageFilter {
private static final Logger LOG = LoggerFactory.getLogger(RulesFilter.class); private static final Logger LOG = LoggerFactory.getLogger(RulesFilter.class);


private final RulesEngine rulesEngine;
private final FilterService filterService; private final FilterService filterService;
private final RulesEngine.RulesSession privateSession; private final ScheduledExecutorService scheduler;
private final Cache<String, Set<FilterDescription>> cache; private final AtomicReference<RulesEngine.RulesSession> privateSession = new AtomicReference<>(null);
private Set<FilterDescription> currentFilterSet;


@Inject @Inject
public RulesFilter(RulesEngine rulesEngine, final FilterService filterService) { public RulesFilter(final RulesEngine rulesEngine,
final FilterService filterService,
final EventBus serverEventBus,
@Named("daemonScheduler") ScheduledExecutorService scheduler) {
this.rulesEngine = rulesEngine;
this.filterService = filterService; this.filterService = filterService;
this.scheduler = scheduler;


currentFilterSet = Sets.newHashSet(); loadRules();
cache = CacheBuilder.newBuilder()
.expireAfterWrite(1, TimeUnit.SECONDS) // TODO: This class needs lifecycle management to avoid leaking objects in the EventBus
.build(); serverEventBus.register(this);
privateSession = rulesEngine.createPrivateSession();
} }


@Override @Override
public boolean filter(Message msg) { public boolean filter(Message msg) {
final Set<FilterDescription> filters;
try {
filters = cache.get("filters", new Callable<Set<FilterDescription>>() {
@Override
public Set<FilterDescription> call() throws Exception {
// TODO this should be improved by computing the difference between the filter sets
// most of the time nothing changes at all.
final Set<FilterDescription> newFilters = filterService.loadAll();
final Sets.SetView<FilterDescription> difference =
Sets.symmetricDifference(currentFilterSet, newFilters);
if (difference.isEmpty()) {
// there wasn't any change, simply return the current filter set
LOG.debug("Filter sets are identical, not updating rules engine.");
return currentFilterSet;
}

// something changed, we simply update everything, not trying to do the minimal changes yet
// should this become too expensive we need to do something smarter
LOG.debug("Updating rules engine, filter sets differ: {}", difference);
// retract all current filter facts
for (FilterDescription filterDescription : currentFilterSet) {
privateSession.deleteFact(filterDescription);
}
// state all new filter facts
for (FilterDescription filterDescription : newFilters) {
privateSession.insertFact(filterDescription);
}
currentFilterSet.clear();
// remember currently stated facts
currentFilterSet.addAll(newFilters);
return currentFilterSet;
}
});
} catch (ExecutionException ignored) {
return false;
}

// Always run the rules engine to make sure rules from the external rules file will be run. // Always run the rules engine to make sure rules from the external rules file will be run.
privateSession.evaluate(msg, true); privateSession.get().evaluate(msg, true);


// false if not explicitly set to true in the rules. // false if not explicitly set to true in the rules.
return msg.getFilterOut(); return msg.getFilterOut();
} }


@Subscribe
@SuppressWarnings("unused")
public void handleRulesUpdate(FilterDescriptionUpdateEvent ignored) {
LOG.debug("Updating filter descriptions: {}", ignored);
scheduler.submit(this::loadRules);
}

private void loadRules() {
LOG.debug("Loading rule filters");
try {
final RulesEngine.RulesSession newSession = rulesEngine.createPrivateSession();

filterService.loadAll().forEach(filterDescription -> {
LOG.debug("Insert filter description: {}", filterDescription);
newSession.insertFact(filterDescription);
});

privateSession.set(newSession);
} catch (NotFoundException e) {
LOG.error("No filters found", e);
}
}

@Override @Override
public String getName() { public String getName() {
return "Rulesfilter"; return "Rulesfilter";
Expand Down

0 comments on commit e6b859c

Please sign in to comment.