This repository has been archived by the owner on Mar 21, 2023. It is now read-only.
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
69b334b
commit 83340fd
Showing
2 changed files
with
76 additions
and
7 deletions.
There are no files selected for viewing
81 changes: 75 additions & 6 deletions
81
src/main/java/org/graylog/plugins/pipelineprocessor/PipelineProcessorMessageDecorator.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,27 +1,96 @@ | ||
package org.graylog.plugins.pipelineprocessor; | ||
|
||
import com.google.common.base.Strings; | ||
import com.google.common.collect.ImmutableSet; | ||
import com.google.inject.assistedinject.Assisted; | ||
import org.graylog.plugins.pipelineprocessor.db.PipelineDao; | ||
import org.graylog.plugins.pipelineprocessor.db.PipelineService; | ||
import org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreter; | ||
import org.graylog2.decorators.Decorator; | ||
import org.graylog2.indexer.results.ResultMessage; | ||
import org.graylog2.plugin.Message; | ||
import org.graylog2.plugin.configuration.ConfigurationRequest; | ||
import org.graylog2.plugin.configuration.fields.ConfigurationField; | ||
import org.graylog2.plugin.configuration.fields.DropdownField; | ||
import org.graylog2.plugin.decorators.MessageDecorator; | ||
|
||
import javax.inject.Inject; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.stream.Collectors; | ||
|
||
public class PipelineProcessorMessageDecorator implements MessageDecorator { | ||
private static final String CONFIG_FIELD_PIPELINE = "pipeline"; | ||
|
||
private final PipelineInterpreter pipelineInterpreter; | ||
private final ImmutableSet<String> pipelines; | ||
|
||
public interface Factory extends MessageDecorator.Factory { | ||
@Override | ||
PipelineProcessorMessageDecorator create(Decorator decorator); | ||
|
||
@Override | ||
Config getConfig(); | ||
|
||
@Override | ||
Descriptor getDescriptor(); | ||
} | ||
|
||
public static class Config implements MessageDecorator.Config { | ||
private final PipelineService pipelineService; | ||
|
||
@Inject | ||
public Config(PipelineService pipelineService) { | ||
this.pipelineService = pipelineService; | ||
} | ||
|
||
@Override | ||
public ConfigurationRequest getRequestedConfiguration() { | ||
final Map<String, String> pipelineOptions = this.pipelineService.loadAll().stream() | ||
.sorted((o1, o2) -> o1.title().compareTo(o2.title())) | ||
.collect(Collectors.toMap(PipelineDao::id, PipelineDao::title)); | ||
return new ConfigurationRequest() {{ | ||
addField(new DropdownField(CONFIG_FIELD_PIPELINE, | ||
"Pipeline", | ||
"", | ||
pipelineOptions, | ||
"Which pipeline to use for message decoration", | ||
ConfigurationField.Optional.NOT_OPTIONAL)); | ||
}}; | ||
}; | ||
} | ||
|
||
public static class Descriptor extends MessageDecorator.Descriptor { | ||
public Descriptor() { | ||
super("Pipeline Processor Decorator", false, "http://docs.graylog.org/en/2.0/pages/pipelines.html", "Pipeline Processor Decorator"); | ||
} | ||
} | ||
|
||
@Inject | ||
public PipelineProcessorMessageDecorator(PipelineInterpreter pipelineInterpreter) { | ||
public PipelineProcessorMessageDecorator(PipelineInterpreter pipelineInterpreter, | ||
@Assisted Decorator decorator) { | ||
this.pipelineInterpreter = pipelineInterpreter; | ||
final String pipelineId = (String)decorator.config().get(CONFIG_FIELD_PIPELINE); | ||
if (Strings.isNullOrEmpty(pipelineId)) { | ||
this.pipelines = ImmutableSet.of(); | ||
} else { | ||
this.pipelines = ImmutableSet.of(pipelineId); | ||
} | ||
} | ||
|
||
@Override | ||
public ResultMessage apply(ResultMessage resultMessage) { | ||
final Message message = resultMessage.getMessage(); | ||
final List<Message> messages = pipelineInterpreter.processForPipelines(message, message.getId(), ImmutableSet.of("575ea96d726bd1e03ebb63e7")); | ||
resultMessage.setMessage(message); | ||
return resultMessage; | ||
public List<ResultMessage> apply(List<ResultMessage> resultMessages) { | ||
if (pipelines.isEmpty()) { | ||
return resultMessages; | ||
} | ||
return resultMessages.stream() | ||
.map((resultMessage) -> { | ||
final Message message = resultMessage.getMessage(); | ||
// discarding return type, as we cannot add more messages to the result message set yet. | ||
pipelineInterpreter.processForPipelines(message, message.getId(), pipelines); | ||
resultMessage.setMessage(message); | ||
return resultMessage; | ||
}) | ||
.collect(Collectors.toList()); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters