Skip to content
This repository has been archived by the owner on Mar 21, 2023. It is now read-only.

Commit

Permalink
Providing a message decorator that uses pipelines.
Browse files Browse the repository at this point in the history
  • Loading branch information
dennisoelkers committed Jun 14, 2016
1 parent 6b35cec commit 69b334b
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 91 deletions.
@@ -0,0 +1,27 @@
package org.graylog.plugins.pipelineprocessor;

import com.google.common.collect.ImmutableSet;
import org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreter;
import org.graylog2.indexer.results.ResultMessage;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.decorators.MessageDecorator;

import javax.inject.Inject;
import java.util.List;

public class PipelineProcessorMessageDecorator implements MessageDecorator {
private final PipelineInterpreter pipelineInterpreter;

@Inject
public PipelineProcessorMessageDecorator(PipelineInterpreter pipelineInterpreter) {
this.pipelineInterpreter = pipelineInterpreter;
}

@Override
public ResultMessage apply(ResultMessage resultMessage) {
final Message message = resultMessage.getMessage();
final List<Message> messages = pipelineInterpreter.processForPipelines(message, message.getId(), ImmutableSet.of("575ea96d726bd1e03ebb63e7"));
resultMessage.setMessage(message);
return resultMessage;
}
}
Expand Up @@ -46,5 +46,7 @@ protected void configure() {
addPermissions(PipelineRestPermissions.class);

install(new ProcessorFunctionsModule());

installMessageDecorator(messageDecoratorBinder(), PipelineProcessorMessageDecorator.class);
}
}
Expand Up @@ -226,98 +226,8 @@ public Messages process(Messages messages) {
log.debug("[{}] running pipelines {} for streams {}", msgId, pipelinesToRun, streamsIds);
}

// record execution of pipeline in metrics
pipelinesToRun.stream().forEach(pipeline -> metricRegistry.counter(name(Pipeline.class, pipeline.id(), "executed")).inc());

final StageIterator stages = new StageIterator(pipelinesToRun);
final Set<Pipeline> pipelinesToSkip = Sets.newHashSet();

// iterate through all stages for all matching pipelines, per "stage slice" instead of per pipeline.
// pipeline execution ordering is not guaranteed
while (stages.hasNext()) {
final Set<Tuple2<Stage, Pipeline>> stageSet = stages.next();
for (Tuple2<Stage, Pipeline> pair : stageSet) {
final Stage stage = pair.v1();
final Pipeline pipeline = pair.v2();
if (pipelinesToSkip.contains(pipeline)) {
log.debug("[{}] previous stage result prevents further processing of pipeline `{}`",
msgId,
pipeline.name());
continue;
}
metricRegistry.counter(name(Pipeline.class, pipeline.id(), "stage", String.valueOf(stage.stage()), "executed")).inc();
log.debug("[{}] evaluating rule conditions in stage {}: match {}",
msgId,
stage.stage(),
stage.matchAll() ? "all" : "either");

// TODO the message should be decorated to allow layering changes and isolate stages
final EvaluationContext context = new EvaluationContext(message);

// 3. iterate over all the stages in these pipelines and execute them in order
final ArrayList<Rule> rulesToRun = Lists.newArrayListWithCapacity(stage.getRules().size());
boolean anyRulesMatched = false;
for (Rule rule : stage.getRules()) {
if (rule.when().evaluateBool(context)) {
anyRulesMatched = true;
countRuleExecution(rule, pipeline, stage, "matched");

if (context.hasEvaluationErrors()) {
final EvaluationContext.EvalError lastError = Iterables.getLast(context.evaluationErrors());
appendProcessingError(rule, message, lastError.toString());
log.debug("Encountered evaluation error during condition, skipping rule actions: {}",
lastError);
continue;
}
log.debug("[{}] rule `{}` matches, scheduling to run", msgId, rule.name());
rulesToRun.add(rule);
} else {
countRuleExecution(rule, pipeline, stage, "not-matched");
log.debug("[{}] rule `{}` does not match", msgId, rule.name());
}
}
RULES:
for (Rule rule : rulesToRun) {
countRuleExecution(rule, pipeline, stage, "executed");
log.debug("[{}] rule `{}` matched running actions", msgId, rule.name());
for (Statement statement : rule.then()) {
statement.evaluate(context);
if (context.hasEvaluationErrors()) {
// if the last statement resulted in an error, do not continue to execute this rules
final EvaluationContext.EvalError lastError = Iterables.getLast(context.evaluationErrors());
appendProcessingError(rule, message, lastError.toString());
log.debug("Encountered evaluation error, skipping rest of the rule: {}",
lastError);
countRuleExecution(rule, pipeline, stage, "failed");
break RULES;
}
}
}
// stage needed to match all rule conditions to enable the next stage,
// record that it is ok to proceed with this pipeline
// OR
// any rule could match, but at least one had to,
// record that it is ok to proceed with the pipeline
if ((stage.matchAll() && (rulesToRun.size() == stage.getRules().size()))
|| (rulesToRun.size() > 0 && anyRulesMatched)) {
log.debug("[{}] stage {} for pipeline `{}` required match: {}, ok to proceed with next stage",
msgId, stage.stage(), pipeline.name(), stage.matchAll() ? "all" : "either");
} else {
// no longer execute stages from this pipeline, the guard prevents it
log.debug("[{}] stage {} for pipeline `{}` required match: {}, NOT ok to proceed with next stage",
msgId, stage.stage(), pipeline.name(), stage.matchAll() ? "all" : "either");
pipelinesToSkip.add(pipeline);
}

// 4. after each complete stage run, merge the processing changes, stages are isolated from each other
// TODO message changes become visible immediately for now
toProcess.addAll(processForPipelines(message, msgId, pipelinesToRun.stream().map(Pipeline::id).collect(Collectors.toSet())));

// 4a. also add all new messages from the context to the toProcess work list
Iterables.addAll(toProcess, context.createdMessages());
context.clearCreatedMessages();
}

}
boolean addedStreams = false;
// 5. add each message-stream combination to the blacklist set
for (Stream stream : message.getStreams()) {
Expand Down Expand Up @@ -350,6 +260,104 @@ public Messages process(Messages messages) {
return new MessageCollection(fullyProcessed);
}

public List<Message> processForPipelines(Message message, String msgId, Set<String> pipelines) {
final ImmutableSet<Pipeline> pipelinesToRun = ImmutableSet.copyOf(pipelines.stream().map(pipelineId -> this.currentPipelines.get().get(pipelineId)).collect(Collectors.toSet()));
final List<Message> result = new ArrayList<>();
// record execution of pipeline in metrics
pipelinesToRun.stream().forEach(pipeline -> metricRegistry.counter(name(Pipeline.class, pipeline.id(), "executed")).inc());

final StageIterator stages = new StageIterator(pipelinesToRun);
final Set<Pipeline> pipelinesToSkip = Sets.newHashSet();

// iterate through all stages for all matching pipelines, per "stage slice" instead of per pipeline.
// pipeline execution ordering is not guaranteed
while (stages.hasNext()) {
final Set<Tuple2<Stage, Pipeline>> stageSet = stages.next();
for (Tuple2<Stage, Pipeline> pair : stageSet) {
final Stage stage = pair.v1();
final Pipeline pipeline = pair.v2();
if (pipelinesToSkip.contains(pipeline)) {
log.debug("[{}] previous stage result prevents further processing of pipeline `{}`",
msgId,
pipeline.name());
continue;
}
metricRegistry.counter(name(Pipeline.class, pipeline.id(), "stage", String.valueOf(stage.stage()), "executed")).inc();
log.debug("[{}] evaluating rule conditions in stage {}: match {}",
msgId,
stage.stage(),
stage.matchAll() ? "all" : "either");

// TODO the message should be decorated to allow layering changes and isolate stages
final EvaluationContext context = new EvaluationContext(message);

// 3. iterate over all the stages in these pipelines and execute them in order
final ArrayList<Rule> rulesToRun = Lists.newArrayListWithCapacity(stage.getRules().size());
boolean anyRulesMatched = false;
for (Rule rule : stage.getRules()) {
if (rule.when().evaluateBool(context)) {
anyRulesMatched = true;
countRuleExecution(rule, pipeline, stage, "matched");

if (context.hasEvaluationErrors()) {
final EvaluationContext.EvalError lastError = Iterables.getLast(context.evaluationErrors());
appendProcessingError(rule, message, lastError.toString());
log.debug("Encountered evaluation error during condition, skipping rule actions: {}",
lastError);
continue;
}
log.debug("[{}] rule `{}` matches, scheduling to run", msgId, rule.name());
rulesToRun.add(rule);
} else {
countRuleExecution(rule, pipeline, stage, "not-matched");
log.debug("[{}] rule `{}` does not match", msgId, rule.name());
}
}
RULES:
for (Rule rule : rulesToRun) {
countRuleExecution(rule, pipeline, stage, "executed");
log.debug("[{}] rule `{}` matched running actions", msgId, rule.name());
for (Statement statement : rule.then()) {
statement.evaluate(context);
if (context.hasEvaluationErrors()) {
// if the last statement resulted in an error, do not continue to execute this rules
final EvaluationContext.EvalError lastError = Iterables.getLast(context.evaluationErrors());
appendProcessingError(rule, message, lastError.toString());
log.debug("Encountered evaluation error, skipping rest of the rule: {}",
lastError);
countRuleExecution(rule, pipeline, stage, "failed");
break RULES;
}
}
}
// stage needed to match all rule conditions to enable the next stage,
// record that it is ok to proceed with this pipeline
// OR
// any rule could match, but at least one had to,
// record that it is ok to proceed with the pipeline
if ((stage.matchAll() && (rulesToRun.size() == stage.getRules().size()))
|| (rulesToRun.size() > 0 && anyRulesMatched)) {
log.debug("[{}] stage {} for pipeline `{}` required match: {}, ok to proceed with next stage",
msgId, stage.stage(), pipeline.name(), stage.matchAll() ? "all" : "either");
} else {
// no longer execute stages from this pipeline, the guard prevents it
log.debug("[{}] stage {} for pipeline `{}` required match: {}, NOT ok to proceed with next stage",
msgId, stage.stage(), pipeline.name(), stage.matchAll() ? "all" : "either");
pipelinesToSkip.add(pipeline);
}

// 4. after each complete stage run, merge the processing changes, stages are isolated from each other
// TODO message changes become visible immediately for now

// 4a. also add all new messages from the context to the toProcess work list
Iterables.addAll(result, context.createdMessages());
context.clearCreatedMessages();
}
}

return result;
}

private void countRuleExecution(Rule rule, Pipeline pipeline, Stage stage, String type) {
metricRegistry.counter(name(Rule.class, rule.id(), type)).inc();
metricRegistry.counter(name(Rule.class, rule.id(), pipeline.id(), String.valueOf(stage.stage()), type)).inc();
Expand Down

0 comments on commit 69b334b

Please sign in to comment.