From 83340fddd41bbe2853b8132f103f727e282a01d6 Mon Sep 17 00:00:00 2001 From: Dennis Oelkers Date: Wed, 15 Jun 2016 16:52:39 +0200 Subject: [PATCH] Making decorator configurable. --- .../PipelineProcessorMessageDecorator.java | 81 +++++++++++++++++-- .../PipelineProcessorModule.java | 2 +- 2 files changed, 76 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/graylog/plugins/pipelineprocessor/PipelineProcessorMessageDecorator.java b/src/main/java/org/graylog/plugins/pipelineprocessor/PipelineProcessorMessageDecorator.java index 74407b9..3ddc67f 100644 --- a/src/main/java/org/graylog/plugins/pipelineprocessor/PipelineProcessorMessageDecorator.java +++ b/src/main/java/org/graylog/plugins/pipelineprocessor/PipelineProcessorMessageDecorator.java @@ -1,27 +1,96 @@ package org.graylog.plugins.pipelineprocessor; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; +import com.google.inject.assistedinject.Assisted; +import org.graylog.plugins.pipelineprocessor.db.PipelineDao; +import org.graylog.plugins.pipelineprocessor.db.PipelineService; import org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreter; +import org.graylog2.decorators.Decorator; import org.graylog2.indexer.results.ResultMessage; import org.graylog2.plugin.Message; +import org.graylog2.plugin.configuration.ConfigurationRequest; +import org.graylog2.plugin.configuration.fields.ConfigurationField; +import org.graylog2.plugin.configuration.fields.DropdownField; import org.graylog2.plugin.decorators.MessageDecorator; import javax.inject.Inject; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; public class PipelineProcessorMessageDecorator implements MessageDecorator { + private static final String CONFIG_FIELD_PIPELINE = "pipeline"; + private final PipelineInterpreter pipelineInterpreter; + private final ImmutableSet pipelines; + + public interface Factory extends MessageDecorator.Factory { + @Override + PipelineProcessorMessageDecorator create(Decorator decorator); + + @Override + Config getConfig(); + + @Override + Descriptor getDescriptor(); + } + + public static class Config implements MessageDecorator.Config { + private final PipelineService pipelineService; + + @Inject + public Config(PipelineService pipelineService) { + this.pipelineService = pipelineService; + } + + @Override + public ConfigurationRequest getRequestedConfiguration() { + final Map pipelineOptions = this.pipelineService.loadAll().stream() + .sorted((o1, o2) -> o1.title().compareTo(o2.title())) + .collect(Collectors.toMap(PipelineDao::id, PipelineDao::title)); + return new ConfigurationRequest() {{ + addField(new DropdownField(CONFIG_FIELD_PIPELINE, + "Pipeline", + "", + pipelineOptions, + "Which pipeline to use for message decoration", + ConfigurationField.Optional.NOT_OPTIONAL)); + }}; + }; + } + + public static class Descriptor extends MessageDecorator.Descriptor { + public Descriptor() { + super("Pipeline Processor Decorator", false, "http://docs.graylog.org/en/2.0/pages/pipelines.html", "Pipeline Processor Decorator"); + } + } @Inject - public PipelineProcessorMessageDecorator(PipelineInterpreter pipelineInterpreter) { + public PipelineProcessorMessageDecorator(PipelineInterpreter pipelineInterpreter, + @Assisted Decorator decorator) { this.pipelineInterpreter = pipelineInterpreter; + final String pipelineId = (String)decorator.config().get(CONFIG_FIELD_PIPELINE); + if (Strings.isNullOrEmpty(pipelineId)) { + this.pipelines = ImmutableSet.of(); + } else { + this.pipelines = ImmutableSet.of(pipelineId); + } } @Override - public ResultMessage apply(ResultMessage resultMessage) { - final Message message = resultMessage.getMessage(); - final List messages = pipelineInterpreter.processForPipelines(message, message.getId(), ImmutableSet.of("575ea96d726bd1e03ebb63e7")); - resultMessage.setMessage(message); - return resultMessage; + public List apply(List resultMessages) { + if (pipelines.isEmpty()) { + return resultMessages; + } + return resultMessages.stream() + .map((resultMessage) -> { + final Message message = resultMessage.getMessage(); + // discarding return type, as we cannot add more messages to the result message set yet. + pipelineInterpreter.processForPipelines(message, message.getId(), pipelines); + resultMessage.setMessage(message); + return resultMessage; + }) + .collect(Collectors.toList()); } } diff --git a/src/main/java/org/graylog/plugins/pipelineprocessor/PipelineProcessorModule.java b/src/main/java/org/graylog/plugins/pipelineprocessor/PipelineProcessorModule.java index d227d83..578c83e 100644 --- a/src/main/java/org/graylog/plugins/pipelineprocessor/PipelineProcessorModule.java +++ b/src/main/java/org/graylog/plugins/pipelineprocessor/PipelineProcessorModule.java @@ -47,6 +47,6 @@ protected void configure() { install(new ProcessorFunctionsModule()); - installMessageDecorator(messageDecoratorBinder(), PipelineProcessorMessageDecorator.class); + installMessageDecorator(messageDecoratorBinder(), PipelineProcessorMessageDecorator.class, PipelineProcessorMessageDecorator.Factory.class); } }