diff --git a/src/main/java/org/graylog/plugins/pipelineprocessor/PipelineProcessorMessageDecorator.java b/src/main/java/org/graylog/plugins/pipelineprocessor/PipelineProcessorMessageDecorator.java new file mode 100644 index 0000000..74407b9 --- /dev/null +++ b/src/main/java/org/graylog/plugins/pipelineprocessor/PipelineProcessorMessageDecorator.java @@ -0,0 +1,27 @@ +package org.graylog.plugins.pipelineprocessor; + +import com.google.common.collect.ImmutableSet; +import org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreter; +import org.graylog2.indexer.results.ResultMessage; +import org.graylog2.plugin.Message; +import org.graylog2.plugin.decorators.MessageDecorator; + +import javax.inject.Inject; +import java.util.List; + +public class PipelineProcessorMessageDecorator implements MessageDecorator { + private final PipelineInterpreter pipelineInterpreter; + + @Inject + public PipelineProcessorMessageDecorator(PipelineInterpreter pipelineInterpreter) { + this.pipelineInterpreter = pipelineInterpreter; + } + + @Override + public ResultMessage apply(ResultMessage resultMessage) { + final Message message = resultMessage.getMessage(); + final List messages = pipelineInterpreter.processForPipelines(message, message.getId(), ImmutableSet.of("575ea96d726bd1e03ebb63e7")); + resultMessage.setMessage(message); + return resultMessage; + } +} diff --git a/src/main/java/org/graylog/plugins/pipelineprocessor/PipelineProcessorModule.java b/src/main/java/org/graylog/plugins/pipelineprocessor/PipelineProcessorModule.java index 4ad14f0..d227d83 100644 --- a/src/main/java/org/graylog/plugins/pipelineprocessor/PipelineProcessorModule.java +++ b/src/main/java/org/graylog/plugins/pipelineprocessor/PipelineProcessorModule.java @@ -46,5 +46,7 @@ protected void configure() { addPermissions(PipelineRestPermissions.class); install(new ProcessorFunctionsModule()); + + installMessageDecorator(messageDecoratorBinder(), PipelineProcessorMessageDecorator.class); } } diff --git a/src/main/java/org/graylog/plugins/pipelineprocessor/processors/PipelineInterpreter.java b/src/main/java/org/graylog/plugins/pipelineprocessor/processors/PipelineInterpreter.java index 56dbe03..7ec2787 100644 --- a/src/main/java/org/graylog/plugins/pipelineprocessor/processors/PipelineInterpreter.java +++ b/src/main/java/org/graylog/plugins/pipelineprocessor/processors/PipelineInterpreter.java @@ -226,98 +226,8 @@ public Messages process(Messages messages) { log.debug("[{}] running pipelines {} for streams {}", msgId, pipelinesToRun, streamsIds); } - // record execution of pipeline in metrics - pipelinesToRun.stream().forEach(pipeline -> metricRegistry.counter(name(Pipeline.class, pipeline.id(), "executed")).inc()); - - final StageIterator stages = new StageIterator(pipelinesToRun); - final Set pipelinesToSkip = Sets.newHashSet(); - - // iterate through all stages for all matching pipelines, per "stage slice" instead of per pipeline. - // pipeline execution ordering is not guaranteed - while (stages.hasNext()) { - final Set> stageSet = stages.next(); - for (Tuple2 pair : stageSet) { - final Stage stage = pair.v1(); - final Pipeline pipeline = pair.v2(); - if (pipelinesToSkip.contains(pipeline)) { - log.debug("[{}] previous stage result prevents further processing of pipeline `{}`", - msgId, - pipeline.name()); - continue; - } - metricRegistry.counter(name(Pipeline.class, pipeline.id(), "stage", String.valueOf(stage.stage()), "executed")).inc(); - log.debug("[{}] evaluating rule conditions in stage {}: match {}", - msgId, - stage.stage(), - stage.matchAll() ? "all" : "either"); - - // TODO the message should be decorated to allow layering changes and isolate stages - final EvaluationContext context = new EvaluationContext(message); - - // 3. iterate over all the stages in these pipelines and execute them in order - final ArrayList rulesToRun = Lists.newArrayListWithCapacity(stage.getRules().size()); - boolean anyRulesMatched = false; - for (Rule rule : stage.getRules()) { - if (rule.when().evaluateBool(context)) { - anyRulesMatched = true; - countRuleExecution(rule, pipeline, stage, "matched"); - - if (context.hasEvaluationErrors()) { - final EvaluationContext.EvalError lastError = Iterables.getLast(context.evaluationErrors()); - appendProcessingError(rule, message, lastError.toString()); - log.debug("Encountered evaluation error during condition, skipping rule actions: {}", - lastError); - continue; - } - log.debug("[{}] rule `{}` matches, scheduling to run", msgId, rule.name()); - rulesToRun.add(rule); - } else { - countRuleExecution(rule, pipeline, stage, "not-matched"); - log.debug("[{}] rule `{}` does not match", msgId, rule.name()); - } - } - RULES: - for (Rule rule : rulesToRun) { - countRuleExecution(rule, pipeline, stage, "executed"); - log.debug("[{}] rule `{}` matched running actions", msgId, rule.name()); - for (Statement statement : rule.then()) { - statement.evaluate(context); - if (context.hasEvaluationErrors()) { - // if the last statement resulted in an error, do not continue to execute this rules - final EvaluationContext.EvalError lastError = Iterables.getLast(context.evaluationErrors()); - appendProcessingError(rule, message, lastError.toString()); - log.debug("Encountered evaluation error, skipping rest of the rule: {}", - lastError); - countRuleExecution(rule, pipeline, stage, "failed"); - break RULES; - } - } - } - // stage needed to match all rule conditions to enable the next stage, - // record that it is ok to proceed with this pipeline - // OR - // any rule could match, but at least one had to, - // record that it is ok to proceed with the pipeline - if ((stage.matchAll() && (rulesToRun.size() == stage.getRules().size())) - || (rulesToRun.size() > 0 && anyRulesMatched)) { - log.debug("[{}] stage {} for pipeline `{}` required match: {}, ok to proceed with next stage", - msgId, stage.stage(), pipeline.name(), stage.matchAll() ? "all" : "either"); - } else { - // no longer execute stages from this pipeline, the guard prevents it - log.debug("[{}] stage {} for pipeline `{}` required match: {}, NOT ok to proceed with next stage", - msgId, stage.stage(), pipeline.name(), stage.matchAll() ? "all" : "either"); - pipelinesToSkip.add(pipeline); - } - - // 4. after each complete stage run, merge the processing changes, stages are isolated from each other - // TODO message changes become visible immediately for now + toProcess.addAll(processForPipelines(message, msgId, pipelinesToRun.stream().map(Pipeline::id).collect(Collectors.toSet()))); - // 4a. also add all new messages from the context to the toProcess work list - Iterables.addAll(toProcess, context.createdMessages()); - context.clearCreatedMessages(); - } - - } boolean addedStreams = false; // 5. add each message-stream combination to the blacklist set for (Stream stream : message.getStreams()) { @@ -350,6 +260,104 @@ public Messages process(Messages messages) { return new MessageCollection(fullyProcessed); } + public List processForPipelines(Message message, String msgId, Set pipelines) { + final ImmutableSet pipelinesToRun = ImmutableSet.copyOf(pipelines.stream().map(pipelineId -> this.currentPipelines.get().get(pipelineId)).collect(Collectors.toSet())); + final List result = new ArrayList<>(); + // record execution of pipeline in metrics + pipelinesToRun.stream().forEach(pipeline -> metricRegistry.counter(name(Pipeline.class, pipeline.id(), "executed")).inc()); + + final StageIterator stages = new StageIterator(pipelinesToRun); + final Set pipelinesToSkip = Sets.newHashSet(); + + // iterate through all stages for all matching pipelines, per "stage slice" instead of per pipeline. + // pipeline execution ordering is not guaranteed + while (stages.hasNext()) { + final Set> stageSet = stages.next(); + for (Tuple2 pair : stageSet) { + final Stage stage = pair.v1(); + final Pipeline pipeline = pair.v2(); + if (pipelinesToSkip.contains(pipeline)) { + log.debug("[{}] previous stage result prevents further processing of pipeline `{}`", + msgId, + pipeline.name()); + continue; + } + metricRegistry.counter(name(Pipeline.class, pipeline.id(), "stage", String.valueOf(stage.stage()), "executed")).inc(); + log.debug("[{}] evaluating rule conditions in stage {}: match {}", + msgId, + stage.stage(), + stage.matchAll() ? "all" : "either"); + + // TODO the message should be decorated to allow layering changes and isolate stages + final EvaluationContext context = new EvaluationContext(message); + + // 3. iterate over all the stages in these pipelines and execute them in order + final ArrayList rulesToRun = Lists.newArrayListWithCapacity(stage.getRules().size()); + boolean anyRulesMatched = false; + for (Rule rule : stage.getRules()) { + if (rule.when().evaluateBool(context)) { + anyRulesMatched = true; + countRuleExecution(rule, pipeline, stage, "matched"); + + if (context.hasEvaluationErrors()) { + final EvaluationContext.EvalError lastError = Iterables.getLast(context.evaluationErrors()); + appendProcessingError(rule, message, lastError.toString()); + log.debug("Encountered evaluation error during condition, skipping rule actions: {}", + lastError); + continue; + } + log.debug("[{}] rule `{}` matches, scheduling to run", msgId, rule.name()); + rulesToRun.add(rule); + } else { + countRuleExecution(rule, pipeline, stage, "not-matched"); + log.debug("[{}] rule `{}` does not match", msgId, rule.name()); + } + } + RULES: + for (Rule rule : rulesToRun) { + countRuleExecution(rule, pipeline, stage, "executed"); + log.debug("[{}] rule `{}` matched running actions", msgId, rule.name()); + for (Statement statement : rule.then()) { + statement.evaluate(context); + if (context.hasEvaluationErrors()) { + // if the last statement resulted in an error, do not continue to execute this rules + final EvaluationContext.EvalError lastError = Iterables.getLast(context.evaluationErrors()); + appendProcessingError(rule, message, lastError.toString()); + log.debug("Encountered evaluation error, skipping rest of the rule: {}", + lastError); + countRuleExecution(rule, pipeline, stage, "failed"); + break RULES; + } + } + } + // stage needed to match all rule conditions to enable the next stage, + // record that it is ok to proceed with this pipeline + // OR + // any rule could match, but at least one had to, + // record that it is ok to proceed with the pipeline + if ((stage.matchAll() && (rulesToRun.size() == stage.getRules().size())) + || (rulesToRun.size() > 0 && anyRulesMatched)) { + log.debug("[{}] stage {} for pipeline `{}` required match: {}, ok to proceed with next stage", + msgId, stage.stage(), pipeline.name(), stage.matchAll() ? "all" : "either"); + } else { + // no longer execute stages from this pipeline, the guard prevents it + log.debug("[{}] stage {} for pipeline `{}` required match: {}, NOT ok to proceed with next stage", + msgId, stage.stage(), pipeline.name(), stage.matchAll() ? "all" : "either"); + pipelinesToSkip.add(pipeline); + } + + // 4. after each complete stage run, merge the processing changes, stages are isolated from each other + // TODO message changes become visible immediately for now + + // 4a. also add all new messages from the context to the toProcess work list + Iterables.addAll(result, context.createdMessages()); + context.clearCreatedMessages(); + } + } + + return result; + } + private void countRuleExecution(Rule rule, Pipeline pipeline, Stage stage, String type) { metricRegistry.counter(name(Rule.class, rule.id(), type)).inc(); metricRegistry.counter(name(Rule.class, rule.id(), pipeline.id(), String.valueOf(stage.stage()), type)).inc();