Skip to content
This repository has been archived by the owner on Mar 21, 2023. It is now read-only.

Commit

Permalink
Unregister PipelineInterpreter from event bus (#79)
Browse files Browse the repository at this point in the history
Message decorators and the pipeline simulator create new instances of
PipelineInterpreters that never get garbage collected, as they are still
registered in the event bus.

These changes add a simple workaround for that. We should probably
refactor the lifecycle of the PipelineInterpreter, but this is probably
not the best time to do it.
  • Loading branch information
edmundoa authored and bernd committed Aug 16, 2016
1 parent d5cdf0a commit 944c7b9
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 1 deletion.
Expand Up @@ -17,7 +17,6 @@
package org.graylog.plugins.pipelineprocessor;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.inject.assistedinject.Assisted;
Expand Down Expand Up @@ -123,6 +122,8 @@ public SearchResponse apply(SearchResponse searchResponse) {
});
});

pipelineInterpreter.stop();

return searchResponse.toBuilder().messages(results).build();
}
}
Expand Up @@ -85,6 +85,7 @@ public class PipelineInterpreter implements MessageProcessor {
private final MetricRegistry metricRegistry;
private final ScheduledExecutorService scheduler;
private final Meter filteredOutMessages;
private EventBus serverEventBus;

private final AtomicReference<ImmutableMap<String, Pipeline>> currentPipelines = new AtomicReference<>(ImmutableMap.of());
private final AtomicReference<ImmutableSetMultimap<String, Pipeline>> streamPipelineConnections = new AtomicReference<>(ImmutableSetMultimap.of());
Expand All @@ -107,13 +108,22 @@ public PipelineInterpreter(RuleService ruleService,
this.metricRegistry = metricRegistry;
this.scheduler = scheduler;
this.filteredOutMessages = metricRegistry.meter(name(ProcessBufferProcessor.class, "filteredOutMessages"));
this.serverEventBus = serverEventBus;

// listens to cluster wide Rule, Pipeline and pipeline stream connection changes
serverEventBus.register(this);

reload();
}

/*
* Allow to unregister PipelineInterpreter from the event bus, allowing the object to be garbage collected.
* This is needed in some classes, when new PipelineInterpreter instances are created per request.
*/
public void stop() {
serverEventBus.unregister(this);
}

// this should not run in parallel
private synchronized void reload() {
// read all rules and compile them
Expand Down
Expand Up @@ -82,6 +82,8 @@ public SimulationResponse simulate(@ApiParam(name = "simulation", required = tru
for (Message processedMessage : processedMessages) {
simulationResults.add(ResultMessageSummary.create(null, processedMessage.getFields(), ""));
}

pipelineInterpreter.stop();
return SimulationResponse.create(simulationResults,
pipelineInterpreterTracer.getExecutionTrace(),
pipelineInterpreterTracer.took());
Expand Down

0 comments on commit 944c7b9

Please sign in to comment.