diff --git a/src/main/java/org/graylog/plugins/pipelineprocessor/PipelineProcessorModule.java b/src/main/java/org/graylog/plugins/pipelineprocessor/PipelineProcessorModule.java index 0d67b8279371..ec8c5493c60a 100644 --- a/src/main/java/org/graylog/plugins/pipelineprocessor/PipelineProcessorModule.java +++ b/src/main/java/org/graylog/plugins/pipelineprocessor/PipelineProcessorModule.java @@ -19,7 +19,7 @@ import org.graylog.plugins.pipelineprocessor.functions.ProcessorFunctionsModule; import org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreter; import org.graylog.plugins.pipelineprocessor.rest.PipelineResource; -import org.graylog.plugins.pipelineprocessor.rest.PipelineStreamResource; +import org.graylog.plugins.pipelineprocessor.rest.PipelineConnectionsResource; import org.graylog.plugins.pipelineprocessor.rest.RuleResource; import org.graylog2.plugin.PluginConfigBean; import org.graylog2.plugin.PluginModule; @@ -40,7 +40,7 @@ protected void configure() { addMessageProcessor(PipelineInterpreter.class); addRestResource(RuleResource.class); addRestResource(PipelineResource.class); - addRestResource(PipelineStreamResource.class); + addRestResource(PipelineConnectionsResource.class); install(new ProcessorFunctionsModule()); } diff --git a/src/main/java/org/graylog/plugins/pipelineprocessor/db/PipelineStreamAssignmentService.java b/src/main/java/org/graylog/plugins/pipelineprocessor/db/PipelineStreamConnectionsService.java similarity index 59% rename from src/main/java/org/graylog/plugins/pipelineprocessor/db/PipelineStreamAssignmentService.java rename to src/main/java/org/graylog/plugins/pipelineprocessor/db/PipelineStreamConnectionsService.java index f698189e795f..c8e66b5834d2 100644 --- a/src/main/java/org/graylog/plugins/pipelineprocessor/db/PipelineStreamAssignmentService.java +++ b/src/main/java/org/graylog/plugins/pipelineprocessor/db/PipelineStreamConnectionsService.java @@ -17,8 +17,9 @@ package org.graylog.plugins.pipelineprocessor.db; import com.google.common.collect.Sets; +import com.mongodb.BasicDBObject; import com.mongodb.MongoException; -import org.graylog.plugins.pipelineprocessor.rest.PipelineStreamAssignment; +import org.graylog.plugins.pipelineprocessor.rest.PipelineStreamConnection; import org.graylog2.bindings.providers.MongoJackObjectMapperProvider; import org.graylog2.database.MongoConnection; import org.graylog2.database.NotFoundException; @@ -34,50 +35,50 @@ import java.util.Collections; import java.util.Set; -public class PipelineStreamAssignmentService { - private static final Logger log = LoggerFactory.getLogger(PipelineStreamAssignmentService.class); +public class PipelineStreamConnectionsService { + private static final Logger log = LoggerFactory.getLogger(PipelineStreamConnectionsService.class); public static final String COLLECTION = "pipeline_processor_pipelines_streams"; - private final JacksonDBCollection dbCollection; + private final JacksonDBCollection dbCollection; @Inject - public PipelineStreamAssignmentService(MongoConnection mongoConnection, MongoJackObjectMapperProvider mapper) { + public PipelineStreamConnectionsService(MongoConnection mongoConnection, MongoJackObjectMapperProvider mapper) { dbCollection = JacksonDBCollection.wrap( mongoConnection.getDatabase().getCollection(COLLECTION), - PipelineStreamAssignment.class, + PipelineStreamConnection.class, String.class, mapper.get()); - dbCollection.createIndex(DBSort.asc("stream_id")); + dbCollection.createIndex(DBSort.asc("stream_id"), new BasicDBObject("unique", true)); } - public PipelineStreamAssignment save(PipelineStreamAssignment assignment) { - PipelineStreamAssignment existingAssignment = dbCollection.findOne(DBQuery.is("stream_id", assignment.streamId())); - if (existingAssignment == null) { - existingAssignment = PipelineStreamAssignment.create(null, assignment.streamId(), Collections.emptySet()); + public PipelineStreamConnection save(PipelineStreamConnection connections) { + PipelineStreamConnection existingConnections = dbCollection.findOne(DBQuery.is("stream_id", connections.streamId())); + if (existingConnections == null) { + existingConnections = PipelineStreamConnection.create(null, connections.streamId(), Collections.emptySet()); } - final PipelineStreamAssignment toSave = existingAssignment.toBuilder() - .pipelineIds(assignment.pipelineIds()).build(); - final WriteResult save = dbCollection.save(toSave); + final PipelineStreamConnection toSave = existingConnections.toBuilder() + .pipelineIds(connections.pipelineIds()).build(); + final WriteResult save = dbCollection.save(toSave); return save.getSavedObject(); } - public PipelineStreamAssignment load(String streamId) throws NotFoundException { - final PipelineStreamAssignment oneById = dbCollection.findOne(DBQuery.is("stream_id", streamId)); + public PipelineStreamConnection load(String streamId) throws NotFoundException { + final PipelineStreamConnection oneById = dbCollection.findOne(DBQuery.is("stream_id", streamId)); if (oneById == null) { - throw new NotFoundException("No pipeline assignments with for stream " + streamId); + throw new NotFoundException("No pipeline connections with for stream " + streamId); } return oneById; } - public Set loadAll() { + public Set loadAll() { try { - final DBCursor assignments = dbCollection.find(); - return Sets.newHashSet(assignments.iterator()); + final DBCursor connections = dbCollection.find(); + return Sets.newHashSet(connections.iterator()); } catch (MongoException e) { - log.error("Unable to load pipelines", e); + log.error("Unable to load pipeline connections", e); return Collections.emptySet(); } } diff --git a/src/main/java/org/graylog/plugins/pipelineprocessor/processors/PipelineInterpreter.java b/src/main/java/org/graylog/plugins/pipelineprocessor/processors/PipelineInterpreter.java index 0364cff45de1..91db60221e4b 100644 --- a/src/main/java/org/graylog/plugins/pipelineprocessor/processors/PipelineInterpreter.java +++ b/src/main/java/org/graylog/plugins/pipelineprocessor/processors/PipelineInterpreter.java @@ -35,14 +35,14 @@ import org.graylog.plugins.pipelineprocessor.ast.statements.Statement; import org.graylog.plugins.pipelineprocessor.db.PipelineDao; import org.graylog.plugins.pipelineprocessor.db.PipelineService; -import org.graylog.plugins.pipelineprocessor.db.PipelineStreamAssignmentService; +import org.graylog.plugins.pipelineprocessor.db.PipelineStreamConnectionsService; import org.graylog.plugins.pipelineprocessor.db.RuleDao; import org.graylog.plugins.pipelineprocessor.db.RuleService; import org.graylog.plugins.pipelineprocessor.events.PipelinesChangedEvent; import org.graylog.plugins.pipelineprocessor.events.RulesChangedEvent; import org.graylog.plugins.pipelineprocessor.parser.ParseException; import org.graylog.plugins.pipelineprocessor.parser.PipelineRuleParser; -import org.graylog.plugins.pipelineprocessor.rest.PipelineStreamAssignment; +import org.graylog.plugins.pipelineprocessor.rest.PipelineStreamConnection; import org.graylog2.events.ClusterEventBus; import org.graylog2.plugin.Message; import org.graylog2.plugin.MessageCollection; @@ -75,19 +75,19 @@ public class PipelineInterpreter implements MessageProcessor { private final RuleService ruleService; private final PipelineService pipelineService; - private final PipelineStreamAssignmentService pipelineStreamAssignmentService; + private final PipelineStreamConnectionsService pipelineStreamConnectionsService; private final PipelineRuleParser pipelineRuleParser; private final Journal journal; private final ScheduledExecutorService scheduler; private final Meter filteredOutMessages; private final AtomicReference> currentPipelines = new AtomicReference<>(ImmutableMap.of()); - private final AtomicReference> streamPipelineAssignments = new AtomicReference<>(ImmutableSetMultimap.of()); + private final AtomicReference> streamPipelineConnections = new AtomicReference<>(ImmutableSetMultimap.of()); @Inject public PipelineInterpreter(RuleService ruleService, PipelineService pipelineService, - PipelineStreamAssignmentService pipelineStreamAssignmentService, + PipelineStreamConnectionsService pipelineStreamConnectionsService, PipelineRuleParser pipelineRuleParser, Journal journal, MetricRegistry metricRegistry, @@ -95,14 +95,14 @@ public PipelineInterpreter(RuleService ruleService, @ClusterEventBus EventBus clusterBus) { this.ruleService = ruleService; this.pipelineService = pipelineService; - this.pipelineStreamAssignmentService = pipelineStreamAssignmentService; + this.pipelineStreamConnectionsService = pipelineStreamConnectionsService; this.pipelineRuleParser = pipelineRuleParser; this.journal = journal; this.scheduler = scheduler; this.filteredOutMessages = metricRegistry.meter(name(ProcessBufferProcessor.class, "filteredOutMessages")); - // listens to cluster wide Rule, Pipeline and pipeline stream assignment changes + // listens to cluster wide Rule, Pipeline and pipeline stream connection changes clusterBus.register(this); reload(); @@ -155,15 +155,15 @@ private synchronized void reload() { }); currentPipelines.set(ImmutableMap.copyOf(pipelineIdMap)); - // read all stream assignments of those pipelines to allow processing messages through them - final HashMultimap assignments = HashMultimap.create(); - for (PipelineStreamAssignment streamAssignment : pipelineStreamAssignmentService.loadAll()) { - streamAssignment.pipelineIds().stream() + // read all stream connections of those pipelines to allow processing messages through them + final HashMultimap connections = HashMultimap.create(); + for (PipelineStreamConnection streamConnection : pipelineStreamConnectionsService.loadAll()) { + streamConnection.pipelineIds().stream() .map(pipelineIdMap::get) .filter(Objects::nonNull) - .forEach(pipeline -> assignments.put(streamAssignment.streamId(), pipeline)); + .forEach(pipeline -> connections.put(streamConnection.streamId(), pipeline)); } - streamPipelineAssignments.set(ImmutableSetMultimap.copyOf(assignments)); + streamPipelineConnections.set(ImmutableSetMultimap.copyOf(connections)); } @@ -194,7 +194,7 @@ public Messages process(Messages messages) { // this makes a copy of the list! final Set initialStreamIds = message.getStreams().stream().map(Stream::getId).collect(Collectors.toSet()); - final ImmutableSetMultimap streamAssignment = streamPipelineAssignments.get(); + final ImmutableSetMultimap streamConnection = streamPipelineConnections.get(); if (initialStreamIds.isEmpty()) { if (processingBlacklist.contains(tuple(msgId, "default"))) { @@ -202,8 +202,8 @@ public Messages process(Messages messages) { pipelinesToRun = ImmutableSet.of(); log.debug("[{}] already processed default stream, skipping", msgId); } else { - // get the default stream pipeline assignments for this message - pipelinesToRun = streamAssignment.get("default"); + // get the default stream pipeline connections for this message + pipelinesToRun = streamConnection.get("default"); log.debug("[{}] running default stream pipelines: [{}]", msgId, pipelinesToRun.stream().map(Pipeline::name).toArray()); @@ -212,10 +212,10 @@ public Messages process(Messages messages) { // 2. if a message-stream combination has already been processed (is in the set), skip that execution final Set streamsIds = initialStreamIds.stream() .filter(streamId -> !processingBlacklist.contains(tuple(msgId, streamId))) - .filter(streamAssignment::containsKey) + .filter(streamConnection::containsKey) .collect(Collectors.toSet()); pipelinesToRun = ImmutableSet.copyOf(streamsIds.stream() - .flatMap(streamId -> streamAssignment.get(streamId).stream()) + .flatMap(streamId -> streamConnection.get(streamId).stream()) .collect(Collectors.toSet())); log.debug("[{}] running pipelines {} for streams {}", msgId, pipelinesToRun, streamsIds); } @@ -337,8 +337,8 @@ public void handlePipelineChanges(PipelinesChangedEvent event) { } @Subscribe - public void handlePipelineAssignmentChanges(PipelineStreamAssignment assignment) { - log.debug("Pipeline stream assignment changed: {}", assignment); + public void handlePipelineConnectionChanges(PipelineStreamConnection connection) { + log.debug("Pipeline stream connection changed: {}", connection); scheduler.schedule((Runnable) this::reload, 0, TimeUnit.SECONDS); } diff --git a/src/main/java/org/graylog/plugins/pipelineprocessor/rest/PipelineStreamResource.java b/src/main/java/org/graylog/plugins/pipelineprocessor/rest/PipelineConnectionsResource.java similarity index 61% rename from src/main/java/org/graylog/plugins/pipelineprocessor/rest/PipelineStreamResource.java rename to src/main/java/org/graylog/plugins/pipelineprocessor/rest/PipelineConnectionsResource.java index 7f9a45da43fd..1a8a0bba6289 100644 --- a/src/main/java/org/graylog/plugins/pipelineprocessor/rest/PipelineStreamResource.java +++ b/src/main/java/org/graylog/plugins/pipelineprocessor/rest/PipelineConnectionsResource.java @@ -21,7 +21,7 @@ import com.wordnik.swagger.annotations.ApiOperation; import com.wordnik.swagger.annotations.ApiParam; import org.graylog.plugins.pipelineprocessor.db.PipelineService; -import org.graylog.plugins.pipelineprocessor.db.PipelineStreamAssignmentService; +import org.graylog.plugins.pipelineprocessor.db.PipelineStreamConnectionsService; import org.graylog2.database.NotFoundException; import org.graylog2.events.ClusterEventBus; import org.graylog2.plugin.rest.PluginRestResource; @@ -39,56 +39,56 @@ import javax.ws.rs.core.MediaType; import java.util.Set; -@Api(value = "Pipelines/Streams", description = "Stream assignment of processing pipelines") -@Path("/system/pipelines/streams") +@Api(value = "Pipelines/Connections", description = "Stream connections of processing pipelines") +@Path("/system/pipelines/connections") @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) -public class PipelineStreamResource extends RestResource implements PluginRestResource { +public class PipelineConnectionsResource extends RestResource implements PluginRestResource { - private final PipelineStreamAssignmentService assignmentService; + private final PipelineStreamConnectionsService connectionsService; private final PipelineService pipelineService; private final StreamService streamService; private final EventBus clusterBus; @Inject - public PipelineStreamResource(PipelineStreamAssignmentService assignmentService, - PipelineService pipelineService, - StreamService streamService, - @ClusterEventBus EventBus clusterBus) { - this.assignmentService = assignmentService; + public PipelineConnectionsResource(PipelineStreamConnectionsService connectionsService, + PipelineService pipelineService, + StreamService streamService, + @ClusterEventBus EventBus clusterBus) { + this.connectionsService = connectionsService; this.pipelineService = pipelineService; this.streamService = streamService; this.clusterBus = clusterBus; } - @ApiOperation(value = "Attach a processing pipeline to a stream", notes = "") + @ApiOperation(value = "Connect processing pipelines to a stream", notes = "") @POST - public PipelineStreamAssignment assignPipelines(@ApiParam(name = "Json body", required = true) @NotNull PipelineStreamAssignment assignment) throws NotFoundException { - final String streamId = assignment.streamId(); + public PipelineStreamConnection connectPipelines(@ApiParam(name = "Json body", required = true) @NotNull PipelineStreamConnection connection) throws NotFoundException { + final String streamId = connection.streamId(); // the default stream doesn't exist as an entity if (!streamId.equalsIgnoreCase("default")) { streamService.load(streamId); } // verify the pipelines exist - for (String s : assignment.pipelineIds()) { + for (String s : connection.pipelineIds()) { pipelineService.load(s); } - final PipelineStreamAssignment save = assignmentService.save(assignment); + final PipelineStreamConnection save = connectionsService.save(connection); clusterBus.post(save); return save; } - @ApiOperation("Get pipeline attachments for the given stream") + @ApiOperation("Get pipeline connections for the given stream") @GET @Path("/{streamId}") - public PipelineStreamAssignment getPipelinesForStream(@ApiParam(name = "streamId") @PathParam("streamId") String streamId) throws NotFoundException { - return assignmentService.load(streamId); + public PipelineStreamConnection getPipelinesForStream(@ApiParam(name = "streamId") @PathParam("streamId") String streamId) throws NotFoundException { + return connectionsService.load(streamId); } - @ApiOperation("Get all pipeline attachments") + @ApiOperation("Get all pipeline connections") @GET - public Set getAllAttachments() throws NotFoundException { - return assignmentService.loadAll(); + public Set getAll() throws NotFoundException { + return connectionsService.loadAll(); } } diff --git a/src/main/java/org/graylog/plugins/pipelineprocessor/rest/PipelineStreamAssignment.java b/src/main/java/org/graylog/plugins/pipelineprocessor/rest/PipelineStreamConnection.java similarity index 90% rename from src/main/java/org/graylog/plugins/pipelineprocessor/rest/PipelineStreamAssignment.java rename to src/main/java/org/graylog/plugins/pipelineprocessor/rest/PipelineStreamConnection.java index 8b74bee4c21e..5d82b54bc6d3 100644 --- a/src/main/java/org/graylog/plugins/pipelineprocessor/rest/PipelineStreamAssignment.java +++ b/src/main/java/org/graylog/plugins/pipelineprocessor/rest/PipelineStreamConnection.java @@ -28,7 +28,7 @@ @AutoValue @JsonAutoDetect -public abstract class PipelineStreamAssignment { +public abstract class PipelineStreamConnection { @JsonProperty("id") @Nullable @@ -43,7 +43,7 @@ public abstract class PipelineStreamAssignment { public abstract Set pipelineIds(); @JsonCreator - public static PipelineStreamAssignment create(@Id @ObjectId @JsonProperty("id") @Nullable String _id, + public static PipelineStreamConnection create(@Id @ObjectId @JsonProperty("id") @Nullable String _id, @JsonProperty("stream_id") String streamId, @JsonProperty("pipeline_ids") Set pipelineIds) { return builder() @@ -54,14 +54,14 @@ public static PipelineStreamAssignment create(@Id @ObjectId @JsonProperty("id") } public static Builder builder() { - return new AutoValue_PipelineStreamAssignment.Builder(); + return new AutoValue_PipelineStreamConnection.Builder(); } public abstract Builder toBuilder(); @AutoValue.Builder public abstract static class Builder { - public abstract PipelineStreamAssignment build(); + public abstract PipelineStreamConnection build(); public abstract Builder _id(String _id); diff --git a/src/test/java/org/graylog/plugins/pipelineprocessor/processors/PipelineInterpreterTest.java b/src/test/java/org/graylog/plugins/pipelineprocessor/processors/PipelineInterpreterTest.java index 60d5d9d232ec..1e2b59330532 100644 --- a/src/test/java/org/graylog/plugins/pipelineprocessor/processors/PipelineInterpreterTest.java +++ b/src/test/java/org/graylog/plugins/pipelineprocessor/processors/PipelineInterpreterTest.java @@ -23,14 +23,14 @@ import org.graylog.plugins.pipelineprocessor.ast.functions.Function; import org.graylog.plugins.pipelineprocessor.db.PipelineDao; import org.graylog.plugins.pipelineprocessor.db.PipelineService; -import org.graylog.plugins.pipelineprocessor.db.PipelineStreamAssignmentService; +import org.graylog.plugins.pipelineprocessor.db.PipelineStreamConnectionsService; import org.graylog.plugins.pipelineprocessor.db.RuleDao; import org.graylog.plugins.pipelineprocessor.db.RuleService; import org.graylog.plugins.pipelineprocessor.functions.conversion.StringConversion; import org.graylog.plugins.pipelineprocessor.functions.messages.CreateMessage; import org.graylog.plugins.pipelineprocessor.parser.FunctionRegistry; import org.graylog.plugins.pipelineprocessor.parser.PipelineRuleParser; -import org.graylog.plugins.pipelineprocessor.rest.PipelineStreamAssignment; +import org.graylog.plugins.pipelineprocessor.rest.PipelineStreamConnection; import org.graylog2.plugin.Message; import org.graylog2.plugin.Messages; import org.graylog2.plugin.Tools; @@ -75,12 +75,12 @@ public void testCreateMessage() { null) )); - final PipelineStreamAssignmentService pipelineStreamAssignmentService = mock(PipelineStreamAssignmentService.class); - final PipelineStreamAssignment pipelineStreamAssignment = PipelineStreamAssignment.create(null, + final PipelineStreamConnectionsService pipelineStreamConnectionsService = mock(PipelineStreamConnectionsService.class); + final PipelineStreamConnection pipelineStreamConnection = PipelineStreamConnection.create(null, "default", newHashSet("cde")); - when(pipelineStreamAssignmentService.loadAll()).thenReturn( - newHashSet(pipelineStreamAssignment) + when(pipelineStreamConnectionsService.loadAll()).thenReturn( + newHashSet(pipelineStreamConnection) ); final Map> functions = Maps.newHashMap(); @@ -92,7 +92,7 @@ public void testCreateMessage() { final PipelineInterpreter interpreter = new PipelineInterpreter( ruleService, pipelineService, - pipelineStreamAssignmentService, + pipelineStreamConnectionsService, parser, mock(Journal.class), mock(MetricRegistry.class),