diff --git a/src/main/java/org/graylog/plugins/pipelineprocessor/PipelineProcessorMessageDecorator.java b/src/main/java/org/graylog/plugins/pipelineprocessor/PipelineProcessorMessageDecorator.java index 3ddc67f..3f7381b 100644 --- a/src/main/java/org/graylog/plugins/pipelineprocessor/PipelineProcessorMessageDecorator.java +++ b/src/main/java/org/graylog/plugins/pipelineprocessor/PipelineProcessorMessageDecorator.java @@ -1,7 +1,9 @@ package org.graylog.plugins.pipelineprocessor; import com.google.common.base.Strings; +import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Multimap; import com.google.inject.assistedinject.Assisted; import org.graylog.plugins.pipelineprocessor.db.PipelineDao; import org.graylog.plugins.pipelineprocessor.db.PipelineService; @@ -15,6 +17,8 @@ import org.graylog2.plugin.decorators.MessageDecorator; import javax.inject.Inject; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -80,17 +84,23 @@ public PipelineProcessorMessageDecorator(PipelineInterpreter pipelineInterpreter @Override public List apply(List resultMessages) { + final List results = new ArrayList<>(); if (pipelines.isEmpty()) { return resultMessages; } - return resultMessages.stream() - .map((resultMessage) -> { - final Message message = resultMessage.getMessage(); - // discarding return type, as we cannot add more messages to the result message set yet. - pipelineInterpreter.processForPipelines(message, message.getId(), pipelines); - resultMessage.setMessage(message); - return resultMessage; - }) - .collect(Collectors.toList()); + resultMessages.stream() + .forEach((inMessage) -> { + final Message message = inMessage.getMessage(); + final List additionalCreatedMessages = pipelineInterpreter.processForPipelines(message, message.getId(), pipelines); + final ResultMessage outMessage = ResultMessage.fromMessage(message, inMessage.getIndex(), inMessage.getHighlightRanges()); + + results.add(outMessage); + additionalCreatedMessages.stream().forEach((additionalMessage) -> { + // TODO: pass proper highlight ranges. Need to rebuild them for new messages. + results.add(ResultMessage.fromMessage(additionalMessage, "[created from decorator]", ImmutableMultimap.of())); + }); + }); + + return results; } }