Skip to content

Commit

Permalink
inject the pipeline interpreter directly (#44)
Browse files Browse the repository at this point in the history
this still is a problem, because the interpreter leaks references (it's missing lifecycle handling)
  • Loading branch information
bernd authored and edmundoa committed Jul 19, 2016
1 parent 88a9ed3 commit 8140cac
Showing 1 changed file with 11 additions and 17 deletions.
Expand Up @@ -24,13 +24,10 @@
import org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreter; import org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreter;
import org.graylog.plugins.pipelineprocessor.simulator.PipelineInterpreterTracer; import org.graylog.plugins.pipelineprocessor.simulator.PipelineInterpreterTracer;
import org.graylog2.database.NotFoundException; import org.graylog2.database.NotFoundException;
import org.graylog2.messageprocessors.OrderedMessageProcessors;
import org.graylog2.plugin.Message; import org.graylog2.plugin.Message;
import org.graylog2.plugin.messageprocessors.MessageProcessor;
import org.graylog2.plugin.rest.PluginRestResource; import org.graylog2.plugin.rest.PluginRestResource;
import org.graylog2.plugin.streams.Stream; import org.graylog2.plugin.streams.Stream;
import org.graylog2.rest.models.messages.responses.ResultMessageSummary; import org.graylog2.rest.models.messages.responses.ResultMessageSummary;
import org.graylog2.rest.resources.messages.MessageResource;
import org.graylog2.shared.rest.resources.RestResource; import org.graylog2.shared.rest.resources.RestResource;
import org.graylog2.shared.security.RestPermissions; import org.graylog2.shared.security.RestPermissions;
import org.graylog2.streams.StreamService; import org.graylog2.streams.StreamService;
Expand All @@ -51,14 +48,13 @@
@Produces(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON)
@RequiresAuthentication @RequiresAuthentication
public class SimulatorResource extends RestResource implements PluginRestResource { public class SimulatorResource extends RestResource implements PluginRestResource {
private final OrderedMessageProcessors orderedMessageProcessors;
private final MessageResource messageResource;
private final StreamService streamService; private final StreamService streamService;
private final PipelineInterpreter pipelineInterpreter;


@Inject @Inject
public SimulatorResource(OrderedMessageProcessors orderedMessageProcessors, MessageResource messageResource, StreamService streamService) { public SimulatorResource(PipelineInterpreter pipelineInterpreter,
this.orderedMessageProcessors = orderedMessageProcessors; StreamService streamService) {
this.messageResource = messageResource; this.pipelineInterpreter = pipelineInterpreter;
this.streamService = streamService; this.streamService = streamService;
} }


Expand All @@ -77,15 +73,13 @@ public SimulationResponse simulate(@ApiParam(name = "simulation", required = tru
final List<ResultMessageSummary> simulationResults = new ArrayList<>(); final List<ResultMessageSummary> simulationResults = new ArrayList<>();
final PipelineInterpreterTracer pipelineInterpreterTracer = new PipelineInterpreterTracer(); final PipelineInterpreterTracer pipelineInterpreterTracer = new PipelineInterpreterTracer();


for (MessageProcessor messageProcessor : orderedMessageProcessors) { org.graylog2.plugin.Messages processedMessages = pipelineInterpreter.process(message,
if (messageProcessor instanceof PipelineInterpreter) { pipelineInterpreterTracer.getSimulatorInterpreterListener());
org.graylog2.plugin.Messages processedMessages = ((PipelineInterpreter) messageProcessor).process(message, pipelineInterpreterTracer.getSimulatorInterpreterListener()); for (Message processedMessage : processedMessages) {
for (Message processedMessage : processedMessages) { simulationResults.add(ResultMessageSummary.create(null, processedMessage.getFields(), ""));
simulationResults.add(ResultMessageSummary.create(null, processedMessage.getFields(), ""));
}
}
} }

return SimulationResponse.create(simulationResults,
return SimulationResponse.create(simulationResults, pipelineInterpreterTracer.getExecutionTrace(), pipelineInterpreterTracer.took()); pipelineInterpreterTracer.getExecutionTrace(),
pipelineInterpreterTracer.took());
} }
} }

0 comments on commit 8140cac

Please sign in to comment.