Skip to content
This repository has been archived by the owner on Mar 21, 2023. It is now read-only.

Commit

Permalink
Adapt to changed decorators interface (#43)
Browse files Browse the repository at this point in the history
* Providing a message decorator that uses pipelines.
* Making decorator configurable.
* Allow adding new messages by pipeline decorator.
* Adding changes related due to introduced listener.
* Adapt to naming changes, using easier forEach idiom.
* Changing decorator to work on SearchResponse instead of message list.
  • Loading branch information
dennisoelkers authored and kroepke committed Jul 20, 2016
1 parent cf55dcb commit 32d18fa
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 18 deletions.
Expand Up @@ -19,34 +19,33 @@
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.inject.assistedinject.Assisted;
import org.graylog.plugins.pipelineprocessor.db.PipelineDao;
import org.graylog.plugins.pipelineprocessor.db.PipelineService;
import org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreter;
import org.graylog.plugins.pipelineprocessor.processors.listeners.NoopInterpreterListener;
import org.graylog2.decorators.Decorator;
import org.graylog2.indexer.results.ResultMessage;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.configuration.ConfigurationRequest;
import org.graylog2.plugin.configuration.fields.ConfigurationField;
import org.graylog2.plugin.configuration.fields.DropdownField;
import org.graylog2.plugin.decorators.MessageDecorator;
import org.graylog2.plugin.decorators.SearchResponseDecorator;
import org.graylog2.rest.models.messages.responses.ResultMessageSummary;
import org.graylog2.rest.resources.search.responses.SearchResponse;

import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class PipelineProcessorMessageDecorator implements MessageDecorator {
public class PipelineProcessorMessageDecorator implements SearchResponseDecorator {
private static final String CONFIG_FIELD_PIPELINE = "pipeline";

private final PipelineInterpreter pipelineInterpreter;
private final ImmutableSet<String> pipelines;

public interface Factory extends MessageDecorator.Factory {
public interface Factory extends SearchResponseDecorator.Factory {
@Override
PipelineProcessorMessageDecorator create(Decorator decorator);

Expand All @@ -57,7 +56,7 @@ public interface Factory extends MessageDecorator.Factory {
Descriptor getDescriptor();
}

public static class Config implements MessageDecorator.Config {
public static class Config implements SearchResponseDecorator.Config {
private final PipelineService pipelineService;

@Inject
Expand All @@ -81,7 +80,7 @@ public ConfigurationRequest getRequestedConfiguration() {
};
}

public static class Descriptor extends MessageDecorator.Descriptor {
public static class Descriptor extends SearchResponseDecorator.Descriptor {
public Descriptor() {
super("Pipeline Processor Decorator", false, "http://docs.graylog.org/en/2.0/pages/pipelines.html", "Pipeline Processor Decorator");
}
Expand All @@ -100,26 +99,25 @@ public PipelineProcessorMessageDecorator(PipelineInterpreter pipelineInterpreter
}

@Override
public List<ResultMessage> apply(List<ResultMessage> resultMessages) {
final List<ResultMessage> results = new ArrayList<>();
public SearchResponse apply(SearchResponse searchResponse) {
final List<ResultMessageSummary> results = new ArrayList<>();
if (pipelines.isEmpty()) {
return resultMessages;
return searchResponse;
}
resultMessages.forEach((inMessage) -> {
final Message message = inMessage.getMessage();
searchResponse.messages().forEach((inMessage) -> {
final Message message = new Message(inMessage.message());
final List<Message> additionalCreatedMessages = pipelineInterpreter.processForPipelines(message,
message.getId(),
pipelines,
new NoopInterpreterListener());
final ResultMessage outMessage = ResultMessage.createFromMessage(message, inMessage.getIndex(), inMessage.getHighlightRanges());

results.add(outMessage);
results.add(ResultMessageSummary.create(inMessage.highlightRanges(), message.getFields(), inMessage.index()));
additionalCreatedMessages.forEach((additionalMessage) -> {
// TODO: pass proper highlight ranges. Need to rebuild them for new messages.
results.add(ResultMessage.createFromMessage(additionalMessage, "[created from decorator]", ImmutableMultimap.of()));
results.add(ResultMessageSummary.create(ImmutableMultimap.of(), additionalMessage.getFields(), "[created from decorator]"));
});
});

return results;
return searchResponse.toBuilder().messages(results).build();
}
}
Expand Up @@ -47,6 +47,8 @@ protected void configure() {

install(new ProcessorFunctionsModule());

installMessageDecorator(messageDecoratorBinder(), PipelineProcessorMessageDecorator.class, PipelineProcessorMessageDecorator.Factory.class);
installSearchResponseDecorator(searchResponseDecoratorBinder(),
PipelineProcessorMessageDecorator.class,
PipelineProcessorMessageDecorator.Factory.class);
}
}

0 comments on commit 32d18fa

Please sign in to comment.