Skip to content
This repository has been archived by the owner on Mar 21, 2023. It is now read-only.

add remove_from_default boolean option to route_to_stream function #220

Merged
merged 2 commits into from Oct 19, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -46,8 +46,6 @@
import org.graylog.plugins.pipelineprocessor.functions.encoding.Base32Encode;
import org.graylog.plugins.pipelineprocessor.functions.encoding.Base32HumanDecode;
import org.graylog.plugins.pipelineprocessor.functions.encoding.Base32HumanEncode;
import org.graylog.plugins.pipelineprocessor.functions.encoding.Base64Decode;
import org.graylog.plugins.pipelineprocessor.functions.encoding.Base64Encode;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uh oh, that's not a good sign.
That means someone forgot to add these functions…

Refs #190

import org.graylog.plugins.pipelineprocessor.functions.encoding.Base64UrlDecode;
import org.graylog.plugins.pipelineprocessor.functions.encoding.Base64UrlEncode;
import org.graylog.plugins.pipelineprocessor.functions.hashing.CRC32;
Expand All @@ -69,6 +67,7 @@
import org.graylog.plugins.pipelineprocessor.functions.messages.DropMessage;
import org.graylog.plugins.pipelineprocessor.functions.messages.HasField;
import org.graylog.plugins.pipelineprocessor.functions.messages.RemoveField;
import org.graylog.plugins.pipelineprocessor.functions.messages.RemoveFromStream;
import org.graylog.plugins.pipelineprocessor.functions.messages.RenameField;
import org.graylog.plugins.pipelineprocessor.functions.messages.RouteToStream;
import org.graylog.plugins.pipelineprocessor.functions.messages.SetField;
Expand Down Expand Up @@ -113,6 +112,7 @@ protected void configure() {
addMessageProcessorFunction(DropMessage.NAME, DropMessage.class);
addMessageProcessorFunction(CreateMessage.NAME, CreateMessage.class);
addMessageProcessorFunction(CloneMessage.NAME, CloneMessage.class);
addMessageProcessorFunction(RemoveFromStream.NAME, RemoveFromStream.class);
addMessageProcessorFunction(RouteToStream.NAME, RouteToStream.class);
// helper service for route_to_stream
serviceBinder().addBinding().to(StreamCacheService.class).in(Scopes.SINGLETON);
Expand Down
@@ -0,0 +1,105 @@
/**
* This file is part of Graylog Pipeline Processor.
*
* Graylog Pipeline Processor is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog Pipeline Processor is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog Pipeline Processor. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog.plugins.pipelineprocessor.functions.messages;

import com.google.inject.Inject;
import org.graylog.plugins.pipelineprocessor.EvaluationContext;
import org.graylog.plugins.pipelineprocessor.ast.functions.AbstractFunction;
import org.graylog.plugins.pipelineprocessor.ast.functions.FunctionArgs;
import org.graylog.plugins.pipelineprocessor.ast.functions.FunctionDescriptor;
import org.graylog.plugins.pipelineprocessor.ast.functions.ParameterDescriptor;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.streams.DefaultStream;
import org.graylog2.plugin.streams.Stream;

import javax.inject.Provider;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;

import static com.google.common.collect.ImmutableList.of;
import static org.graylog.plugins.pipelineprocessor.ast.functions.ParameterDescriptor.string;
import static org.graylog.plugins.pipelineprocessor.ast.functions.ParameterDescriptor.type;

public class RemoveFromStream extends AbstractFunction<Void> {

public static final String NAME = "remove_from_stream";
private static final String ID_ARG = "id";
private static final String NAME_ARG = "name";
private final StreamCacheService streamCacheService;
private final Provider<Stream> defaultStreamProvider;
private final ParameterDescriptor<Message, Message> messageParam;
private final ParameterDescriptor<String, String> nameParam;
private final ParameterDescriptor<String, String> idParam;

@Inject
public RemoveFromStream(StreamCacheService streamCacheService, @DefaultStream Provider<Stream> defaultStreamProvider) {
this.streamCacheService = streamCacheService;
this.defaultStreamProvider = defaultStreamProvider;

messageParam = type("message", Message.class).optional().description("The message to use, defaults to '$message'").build();
nameParam = string(NAME_ARG).optional().description("The name of the stream to remove the message from, must match exactly").build();
idParam = string(ID_ARG).optional().description("The ID of the stream").build();
}

@Override
public Void evaluate(FunctionArgs args, EvaluationContext context) {
Optional<String> id = idParam.optional(args, context);

Collection<Stream> streams;
if (!id.isPresent()) {
final Optional<Collection<Stream>> foundStreams = nameParam.optional(args, context).map(streamCacheService::getByName);

if (!foundStreams.isPresent()) {
// TODO signal error somehow
return null;
} else {
streams = foundStreams.get();
}
} else {
final Stream stream = streamCacheService.getById(id.get());
if (stream == null) {
return null;
}
streams = Collections.singleton(stream);
}
final Message message = messageParam.optional(args, context).orElse(context.currentMessage());
streams.forEach(stream -> {
if (!stream.isPaused()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it important whether a stream is paused or not?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not particularly, but paused streams aren't taken into account when adding either.
Actually more of a cargo cult copy'n'waste artifact from the route_to_stream function and the StreamRouter.

message.removeStream(stream);
}
});
// always leave a message at least on the default stream if we removed the last stream it was on
if (message.getStreams().isEmpty()) {
message.addStream(defaultStreamProvider.get());
}
return null;
}

@Override
public FunctionDescriptor<Void> descriptor() {
return FunctionDescriptor.<Void>builder()
.name(NAME)
.returnType(Void.class)
.params(of(
nameParam,
idParam,
messageParam))
.description("Removes a message from a stream. Removing the last stream will put the message back onto the default stream. To complete drop a message use the drop_message function.")
.build();
}
}
Expand Up @@ -17,19 +17,21 @@
package org.graylog.plugins.pipelineprocessor.functions.messages;

import com.google.inject.Inject;

import org.graylog.plugins.pipelineprocessor.EvaluationContext;
import org.graylog.plugins.pipelineprocessor.ast.functions.AbstractFunction;
import org.graylog.plugins.pipelineprocessor.ast.functions.FunctionArgs;
import org.graylog.plugins.pipelineprocessor.ast.functions.FunctionDescriptor;
import org.graylog.plugins.pipelineprocessor.ast.functions.ParameterDescriptor;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.streams.DefaultStream;
import org.graylog2.plugin.streams.Stream;

import javax.inject.Provider;
import java.util.Collection;
import java.util.Collections;

import static com.google.common.collect.ImmutableList.of;
import static org.graylog.plugins.pipelineprocessor.ast.functions.ParameterDescriptor.bool;
import static org.graylog.plugins.pipelineprocessor.ast.functions.ParameterDescriptor.string;
import static org.graylog.plugins.pipelineprocessor.ast.functions.ParameterDescriptor.type;

Expand All @@ -38,18 +40,23 @@ public class RouteToStream extends AbstractFunction<Void> {
public static final String NAME = "route_to_stream";
private static final String ID_ARG = "id";
private static final String NAME_ARG = "name";
private static final String REMOVE_FROM_DEFAULT = "remove_from_default";
private final StreamCacheService streamCacheService;
private final Provider<Stream> defaultStreamProvider;
private final ParameterDescriptor<Message, Message> messageParam;
private final ParameterDescriptor<String, String> nameParam;
private final ParameterDescriptor<String, String> idParam;
private final ParameterDescriptor<Boolean, Boolean> removeFromDefault;

@Inject
public RouteToStream(StreamCacheService streamCacheService) {
public RouteToStream(StreamCacheService streamCacheService, @DefaultStream Provider<Stream> defaultStreamProvider) {
this.streamCacheService = streamCacheService;
this.defaultStreamProvider = defaultStreamProvider;

messageParam = type("message", Message.class).optional().description("The message to use, defaults to '$message'").build();
nameParam = string(NAME_ARG).optional().description("The name of the stream to route the message to, must match exactly").build();
idParam = string(ID_ARG).optional().description("The ID of the stream").build();
removeFromDefault = bool(REMOVE_FROM_DEFAULT).optional().description("After routing the message, remove it from the default stream").build();
}

@Override
Expand Down Expand Up @@ -80,6 +87,9 @@ public Void evaluate(FunctionArgs args, EvaluationContext context) {
message.addStream(stream);
}
});
if (removeFromDefault.optional(args, context).orElse(Boolean.FALSE)) {
message.removeStream(defaultStreamProvider.get());
}
return null;
}

Expand All @@ -91,7 +101,8 @@ public FunctionDescriptor<Void> descriptor() {
.params(of(
nameParam,
idParam,
messageParam))
messageParam,
removeFromDefault))
.description("Routes a message to a stream")
.build();
}
Expand Down
Expand Up @@ -18,7 +18,6 @@

import com.google.common.base.Charsets;
import com.google.common.collect.Maps;

import org.graylog.plugins.pipelineprocessor.ast.Rule;
import org.graylog.plugins.pipelineprocessor.ast.functions.AbstractFunction;
import org.graylog.plugins.pipelineprocessor.ast.functions.Function;
Expand All @@ -31,10 +30,13 @@
import org.graylog.plugins.pipelineprocessor.parser.FunctionRegistry;
import org.graylog.plugins.pipelineprocessor.parser.PipelineRuleParser;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.streams.Stream;
import org.joda.time.DateTime;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.rules.TestName;

import javax.annotation.Nullable;
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
Expand All @@ -43,14 +45,16 @@
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.annotation.Nullable;
import java.util.function.Consumer;

import static com.google.common.collect.ImmutableList.of;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class BaseParserTest {
protected static final AtomicBoolean actionsTriggered = new AtomicBoolean(false);
protected static FunctionRegistry functionRegistry;
protected static Stream defaultStream;

@org.junit.Rule
public TestName name = new TestName();
Expand All @@ -77,6 +81,14 @@ public FunctionDescriptor<Void> descriptor() {
return functions;
}

@BeforeClass
public static void init() {
defaultStream = mock(Stream.class, "Default stream");
when(defaultStream.isPaused()).thenReturn(false);
when(defaultStream.getTitle()).thenReturn("default stream");
when(defaultStream.getId()).thenReturn(Stream.DEFAULT_STREAM_ID);
}

@Before
public void setup() {
parser = new PipelineRuleParser(functionRegistry, new CodeGenerator(JavaCompiler::new));
Expand Down Expand Up @@ -125,7 +137,14 @@ protected Message evaluateRule(Rule rule, Message message) {

@Nullable
protected Message evaluateRule(Rule rule) {
return evaluateRule(rule, (msg) -> {});
}

@Nullable
protected Message evaluateRule(Rule rule, Consumer<Message> messageModifier) {
final Message message = new Message("hello test", "source", DateTime.now());
message.addStream(defaultStream);
messageModifier.accept(message);
return evaluateRule(rule, message);
}

Expand Down
Expand Up @@ -73,6 +73,7 @@
import org.graylog.plugins.pipelineprocessor.functions.messages.DropMessage;
import org.graylog.plugins.pipelineprocessor.functions.messages.HasField;
import org.graylog.plugins.pipelineprocessor.functions.messages.RemoveField;
import org.graylog.plugins.pipelineprocessor.functions.messages.RemoveFromStream;
import org.graylog.plugins.pipelineprocessor.functions.messages.RenameField;
import org.graylog.plugins.pipelineprocessor.functions.messages.RouteToStream;
import org.graylog.plugins.pipelineprocessor.functions.messages.SetField;
Expand Down Expand Up @@ -117,6 +118,7 @@
import org.junit.Test;
import org.mockito.ArgumentMatchers;

import javax.inject.Provider;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
Expand All @@ -134,6 +136,7 @@ public class FunctionsSnippetsTest extends BaseParserTest {
public static final DateTime GRAYLOG_EPOCH = DateTime.parse("2010-07-30T16:03:25Z");
private static final EventBus eventBus = new EventBus();
private static StreamCacheService streamCacheService;
private static Stream otherStream;

@BeforeClass
public static void registerFunctions() {
Expand All @@ -157,29 +160,26 @@ public static void registerFunctions() {

// route to stream mocks
final StreamService streamService = mock(StreamService.class);
final Stream stream = mock(Stream.class);
when(stream.isPaused()).thenReturn(false);
when(stream.getTitle()).thenReturn("some name");
when(stream.getId()).thenReturn("id");

final Stream stream2 = mock(Stream.class);
when(stream2.isPaused()).thenReturn(false);
when(stream2.getTitle()).thenReturn("some name");
when(stream2.getId()).thenReturn("id2");

when(streamService.loadAll()).thenReturn(Lists.newArrayList(stream, stream2));
when(streamService.loadAllEnabled()).thenReturn(Lists.newArrayList(stream, stream2));

otherStream = mock(Stream.class, "some stream id2");
when(otherStream.isPaused()).thenReturn(false);
when(otherStream.getTitle()).thenReturn("some name");
when(otherStream.getId()).thenReturn("id2");

when(streamService.loadAll()).thenReturn(Lists.newArrayList(defaultStream, otherStream));
when(streamService.loadAllEnabled()).thenReturn(Lists.newArrayList(defaultStream, otherStream));
try {
when(streamService.load(anyString())).thenThrow(new NotFoundException());
when(streamService.load(ArgumentMatchers.eq("id"))).thenReturn(stream);
when(streamService.load(ArgumentMatchers.eq("id2"))).thenReturn(stream2);
when(streamService.load(ArgumentMatchers.eq(Stream.DEFAULT_STREAM_ID))).thenReturn(defaultStream);
when(streamService.load(ArgumentMatchers.eq("id2"))).thenReturn(otherStream);
} catch (NotFoundException ignored) {
// oh well, checked exceptions <3
}
streamCacheService = new StreamCacheService(eventBus, streamService, null);
streamCacheService.startAsync().awaitRunning();
functions.put(RouteToStream.NAME, new RouteToStream(streamCacheService));

final Provider<Stream> defaultStreamProvider = () -> defaultStream;
functions.put(RouteToStream.NAME, new RouteToStream(streamCacheService, defaultStreamProvider));
functions.put(RemoveFromStream.NAME, new RemoveFromStream(streamCacheService, defaultStreamProvider));
// input related functions
// TODO needs mock
//functions.put(FromInput.NAME, new FromInput());
Expand Down Expand Up @@ -810,4 +810,38 @@ public void routeToStream() {
assertThat(message2).isNotNull();
assertThat(message2.getStreams().size()).isEqualTo(2);
}

@Test
public void routeToStreamRemoveDefault() {
final Rule rule = parser.parseRule(ruleForTest(), true);
final Message message = evaluateRule(rule);

assertThat(message).isNotNull();
assertThat(message.getStreams()).isNotEmpty();
assertThat(message.getStreams().size()).isEqualTo(1);

streamCacheService.updateStreams(ImmutableSet.of(Stream.DEFAULT_STREAM_ID));

final Message message2 = evaluateRule(rule);
assertThat(message2).isNotNull();
assertThat(message2.getStreams().size()).isEqualTo(1);
}

@Test
public void removeFromStream() {
final Rule rule = parser.parseRule(ruleForTest(), true);
final Message message = evaluateRule(rule, msg -> msg.addStream(otherStream));

assertThat(message).isNotNull();
assertThat(message.getStreams()).containsOnly(defaultStream);
}

@Test
public void removeFromStreamRetainDefault() {
final Rule rule = parser.parseRule(ruleForTest(), true);
final Message message = evaluateRule(rule, msg -> msg.addStream(otherStream));

assertThat(message).isNotNull();
assertThat(message.getStreams()).containsOnly(defaultStream);
}
}
@@ -0,0 +1,5 @@
rule "stream routing"
when true
then
remove_from_stream(name: "some name");
end
@@ -0,0 +1,7 @@
rule "stream routing"
when true
then
remove_from_stream(name: "some name");
// if a message is taken off all stream it was on, the default stream will be added back to avoid dropping the message
remove_from_stream(id: "000000000000000000000001");
end
@@ -0,0 +1,5 @@
rule "stream routing"
when true
then
route_to_stream(name: "some name", remove_from_default: true);
end