New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch message filters from polling to subscribing to change events #2496

Merged
merged 11 commits into from Jul 22, 2016

Conversation

Projects
None yet
3 participants
@bernd
Member

bernd commented Jul 18, 2016

Switch all MessageFilter implementations from polling for changes every second to subscribing to change events.

This avoids unneeded load on the Graylog and MongoDB servers.

Fixes #2391

bernd added some commits Jul 17, 2016

Switch stream router from polling to subscription for updates
Previously every StreamRouter instance was polling for stream changes
every second. Now it's listening on the server event bus for updates.

Stream related REST endpoints are now posting events when updating
stream configuration.

Fixes #2391
Switch extractor filter from polling to subscription for updates
Before this, every ExtractorFilter was using a cache which expired after
1 second for the extractors. So it was basically polling the extractors
every second.

Refs #2391
Switch static fields filter from polling to subscription for updates
Before this, every StaticFieldFilter instance was using a cache which
expired after 1 second. It was basically polling the static fields
configuration every second.

Refs #2391
Switch rules filter from polling to subscription for updates
Before this, every RulesFilter instance was using a cache which expired
after 1 second so it was basically polling for new rules every second.

Refs #2391

@bernd bernd added this to the 2.1.0 milestone Jul 18, 2016

@pdepaepe

This comment has been minimized.

pdepaepe commented Jul 19, 2016

Event driven instead of arbitrary interval refresh is very smart, thanks!

May i suggest you to extend theses events to almost anything touching Mongo add/update/delete?
It will helps a lot to write any plugin based on @subscribe events.

Example: custom filter message chain with a cache flushed by StreamRuleAdd/Delete events

import java.util.Set;

@JsonAutoDetect
@JsonDeserialize(builder = AutoValue_StreamsChangedEvent.Builder.class)

This comment has been minimized.

@joschi

joschi Jul 19, 2016

Contributor

Any specific reason to do it this way and not with a "normal" static factory method annotated with @JsonCreator?

This comment has been minimized.

@bernd

bernd Jul 19, 2016

Member

I like the way you can build up immutable collections this way.

This comment has been minimized.

@joschi

joschi Jul 19, 2016

Contributor

For the sake of consistency with the other classes, I'd prefer having a "stupid" static factory method with @JsonCreator in StreamsChangedEvent.

This comment has been minimized.

@bernd

bernd Jul 19, 2016

Member

I will change this instance because there is only one parameter, true. But in general I will keep using the builder pattern for this because the regular factory methods become really hard to work with once you have some more parameters.

And as said earlier, the builder pattern makes it easier to build up immutable collections. The factory pattern needs explicit conversion of collections to immutable collections to avoid having mutable collections in the value objects. I think we have some AutoValue instances in our code where this is broken.


@JsonAutoDetect
@AutoValue
public abstract class InputUpdated {

This comment has been minimized.

@joschi

joschi Jul 19, 2016

Contributor

All other event classes have the "Event" suffix, why not this one?

This comment has been minimized.

@bernd

bernd Jul 19, 2016

Member

For consistency with the existing InputCreated and InputDeleted classes.

This comment has been minimized.

@joschi

joschi Jul 19, 2016

Contributor

Hm, okay.

I don't really like binding the internal events to the public response format of the Graylog REST API (as which InputCreated and InputDeleted are being used), but I guess I can live with it right now.

This comment has been minimized.

@joschi

joschi Jul 19, 2016

Contributor

This comment has been minimized.

@bernd

bernd Jul 19, 2016

Member

I don't really like binding the internal events to the public response format of the Graylog REST API (as which InputCreated and InputDeleted are being used), but I guess I can live with it right now.

Me neither, but I didn't want to change all of this in this PR.

@joschi

This comment has been minimized.

Contributor

joschi commented Jul 19, 2016

@bernd What's the worst case if a node is missing one or more of those events?

ClusterEventService doesn't strictly guarantee that nodes eventually process the events (or process them twice).

@joschi joschi self-assigned this Jul 19, 2016

import java.util.concurrent.TimeUnit;

public class ExtractorFilter implements MessageFilter {
private static final Logger LOG = LoggerFactory.getLogger(ExtractorFilter.class);
private static final String NAME = "Extractor";
private static final List<Extractor> EMPTY_LIST = ImmutableList.of();

This comment has been minimized.

@joschi

joschi Jul 19, 2016

Contributor

ImmutableList.of() already returns a constant. No need to define another one for redirection. 😉
Alternatively use Collections.emptyList() if you want to make the intention totally clear.

}

@Subscribe
public void handleInputUpdate(final InputUpdated event) {

This comment has been minimized.

@joschi

joschi Jul 19, 2016

Contributor

Adding @SuppressWarnings("unused") would disable the compiler warnings (and make the IDE happy).

This comment has been minimized.

@bernd

bernd Jul 19, 2016

Member

I don't have any warnings in my IDE and also didn't see any warnings during the build.

image

This comment has been minimized.

@joschi

joschi Jul 19, 2016

Contributor

Maybe some check in my IntelliJ IDEA configuration or in newer versions of IDEA?

}

@Subscribe
public void handleInputDelete(final InputDeleted event) {

This comment has been minimized.

@joschi

joschi Jul 19, 2016

Contributor

Adding @SuppressWarnings("unused") would disable the compiler warnings (and make the IDE happy).

@@ -70,34 +83,43 @@ public boolean filter(Message msg) {
return false;
}

private List<Extractor> loadExtractors(final String inputId) {
@Subscribe
public void handleInputCreate(final InputCreated event) {

This comment has been minimized.

@joschi

joschi Jul 19, 2016

Contributor

Adding @SuppressWarnings("unused") would disable the compiler warnings (and make the IDE happy).


private final InputService inputService;
private final ScheduledExecutorService scheduler;

This comment has been minimized.

@joschi

joschi Jul 19, 2016

Contributor

Why are we using a ScheduledExecutorService here instead of a normal ExecutorService? We're always starting the jobs right away (i. e. without any delay).

This comment has been minimized.

@bernd

bernd Jul 19, 2016

Member

Because we do not have a regular ExecutorService pool. I wan to use the existing daemonScheduler pool.

This comment has been minimized.

@joschi

joschi Jul 19, 2016

Contributor

Ah, got it.

In this case, we could simply use ExecutorService#submit(Runnable) instead of ScheduledExecutorService#schedule(Runnable, long, TimeUnit) in those places to get rid of the useless 0s delay.

This comment has been minimized.

@bernd

bernd Jul 19, 2016

Member

Done.

}
}

private void loadExtractors(final String inputId) {

This comment has been minimized.

@joschi

joschi Jul 19, 2016

Contributor

Alternative suggestion (without having a intermediary list):

    private void loadExtractors(final String inputId) {
        LOG.debug("Re-loading extractors for input <{}>", inputId);

        try {
            final Input input = inputService.find(inputId);
            final List<Extractor> sortedExtractors = inputService.getExtractors(input)
                    .stream()
                    .sorted((e1, e2) -> e1.getOrder().intValue() - e2.getOrder().intValue())
                    .collect(Collectors.toList());

            extractors.put(inputId, sortedExtractors);
        } catch (NotFoundException e) {
            LOG.warn("Unable to load input <{}>: {}", inputId, e.getMessage());
        }
    }

This comment has been minimized.

@joschi

joschi Jul 19, 2016

Contributor

We could also think about InputServiceImpl#getExtractors() returning a sorted list in the first place. It should be the default case anyway.

This comment has been minimized.

@bernd

bernd Jul 19, 2016

Member

I went with your implementation.


// false if not explicitly set to true in the rules.
return msg.getFilterOut();
}

@Subscribe
public void handleRulesUpdate(FilterDescriptionUpdateEvent ignored) {

This comment has been minimized.

@joschi

joschi Jul 19, 2016

Contributor

Adding @SuppressWarnings("unused") would disable the compiler warnings (and make the IDE happy).

}

@Override
public boolean filter(Message msg) {
final Set<FilterDescription> filters;

This comment has been minimized.

@joschi

joschi Jul 19, 2016

Contributor

❤️ the much simpler implementation!

private final RulesEngine.RulesSession privateSession;
private final Cache<String, Set<FilterDescription>> cache;
private Set<FilterDescription> currentFilterSet;
private final ScheduledExecutorService scheduler;

This comment has been minimized.

@joschi

joschi Jul 19, 2016

Contributor

Same as before. This could (should?) be a normal ExecutorService.

This comment has been minimized.

@bernd

bernd Jul 19, 2016

Member

See above.

private final Cache<String, List<Map.Entry<String, String>>> cache = CacheBuilder.newBuilder()
.expireAfterWrite(1, TimeUnit.SECONDS)
.build();
private static final List<Map.Entry<String, String>> EMPTY = ImmutableList.of();

This comment has been minimized.

@joschi

joschi Jul 19, 2016

Contributor

ImmutableList.of() already returns a constant. No need to define another one for redirection. 😉
Alternatively use Collections.emptyList() if you want to make the intention totally clear.


private final InputService inputService;
private final ScheduledExecutorService scheduler;

This comment has been minimized.

@joschi

joschi Jul 19, 2016

Contributor

Same as before. This could (should?) be a normal ExecutorService.

}

@Subscribe
public void handleInputDelete(final InputDeleted event) {

This comment has been minimized.

@joschi

joschi Jul 19, 2016

Contributor

Adding @SuppressWarnings("unused") would disable the compiler warnings (and make the IDE happy).

@@ -70,24 +84,38 @@ public boolean filter(Message msg) {
return false;
}

private List<Map.Entry<String, String>> loadStaticFields(final String inputId) {
@Subscribe
public void handleInputCreate(final InputCreated event) {

This comment has been minimized.

@joschi

joschi Jul 19, 2016

Contributor

Adding @SuppressWarnings("unused") would disable the compiler warnings (and make the IDE happy).

}

@Subscribe
public void handleInputUpdate(final InputUpdated event) {

This comment has been minimized.

@joschi

joschi Jul 19, 2016

Contributor

Adding @SuppressWarnings("unused") would disable the compiler warnings (and make the IDE happy).

this.routerEngine.set(streamRouterEngineUpdater.getNewEngine());
scheduler.scheduleAtFixedRate(streamRouterEngineUpdater, 0, ENGINE_UPDATE_INTERVAL, TimeUnit.SECONDS);
@Subscribe
public void handleStreamsUpdate(StreamsChangedEvent event) {

This comment has been minimized.

@joschi

joschi Jul 19, 2016

Contributor

Adding @SuppressWarnings("unused") would disable the compiler warnings (and make the IDE happy).

@joschi

This comment has been minimized.

Contributor

joschi commented Jul 19, 2016

Any chance for having tests for the event handlers (methods annotated with @Subscribe)?

@bernd

This comment has been minimized.

Member

bernd commented Jul 19, 2016

@bernd What's the worst case if a node is missing one or more of those events?
ClusterEventService doesn't strictly guarantee that nodes eventually process the events (or process them twice).

Worst case is that the node that didn't receive the message keeps running with the old configuration.

@bernd

This comment has been minimized.

Member

bernd commented Jul 19, 2016

May i suggest you to extend theses events to almost anything touching Mongo add/update/delete?
It will helps a lot to write any plugin based on @Subscribe events.

@pdepaepe I actually tried to do that for this PR but the problem is that we use several different ways to access MongoDB so this is currently not really feasible.

@bernd

This comment has been minimized.

Member

bernd commented Jul 19, 2016

Any chance for having tests for the event handlers (methods annotated with @Subscribe)?

I could do that, but I would rather spend my time on fixing more issues for 2.1 than writing tests to check if a job is submitted to the scheduler. 😉

@joschi

This comment has been minimized.

Contributor

joschi commented Jul 19, 2016

Worst case is that the node that didn't receive the message keeps running with the old configuration.

The problem I see with this is, that the configuration won't eventually converge to the correct one but would require a restart of the out-of-sync Graylog node or another change of the same type of entity.

Would it make sense to trigger the reloads periodically (e. g. every minute or so)? I'm well aware that we wanted to get rid of this, but I don't think that ClusterEventService is robust enough for this (without people actively checking for configuration drift in their Graylog cluster).

@bernd

This comment has been minimized.

Member

bernd commented Jul 19, 2016

Would it make sense to trigger the reloads periodically (e. g. every minute or so)? I'm well aware that we wanted to get rid of this, but I don't think that ClusterEventService is robust enough for this (without people actively checking for configuration drift in their Graylog cluster).

Then we already have a problem because we use this pattern in several places. (at least inputs, pipeline, map widget and probably more)

If the cluster event service is not reliable, we should improve this instead of adding another periodic reload. IMHO

@joschi

This comment has been minimized.

Contributor

joschi commented Jul 22, 2016

With the increased cleanup interval in #2509, this should be acceptable.

LGTM. 👍

@joschi joschi merged commit e6b859c into master Jul 22, 2016

4 checks passed

ci-server-integration Jenkins build graylog2-server-integration-pr 1114 has succeeded
Details
ci-web-linter Jenkins build graylog-pr-linter-check 600 has succeeded
Details
continuous-integration/travis-ci/pr The Travis CI build passed
Details
continuous-integration/travis-ci/push The Travis CI build passed
Details

@joschi joschi deleted the issue-2391-events branch Jul 22, 2016

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment