Skip to content

Commit

Permalink
Rename pipeline assignments to connections
Browse files Browse the repository at this point in the history
  • Loading branch information
Edmundo Alvarez committed Mar 3, 2016
1 parent 611349a commit 151f9e6
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 75 deletions.
Expand Up @@ -19,7 +19,7 @@
import org.graylog.plugins.pipelineprocessor.functions.ProcessorFunctionsModule;
import org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreter;
import org.graylog.plugins.pipelineprocessor.rest.PipelineResource;
import org.graylog.plugins.pipelineprocessor.rest.PipelineStreamResource;
import org.graylog.plugins.pipelineprocessor.rest.PipelineConnectionsResource;
import org.graylog.plugins.pipelineprocessor.rest.RuleResource;
import org.graylog2.plugin.PluginConfigBean;
import org.graylog2.plugin.PluginModule;
Expand All @@ -40,7 +40,7 @@ protected void configure() {
addMessageProcessor(PipelineInterpreter.class);
addRestResource(RuleResource.class);
addRestResource(PipelineResource.class);
addRestResource(PipelineStreamResource.class);
addRestResource(PipelineConnectionsResource.class);

install(new ProcessorFunctionsModule());
}
Expand Down
Expand Up @@ -17,8 +17,9 @@
package org.graylog.plugins.pipelineprocessor.db;

import com.google.common.collect.Sets;
import com.mongodb.BasicDBObject;
import com.mongodb.MongoException;
import org.graylog.plugins.pipelineprocessor.rest.PipelineStreamAssignment;
import org.graylog.plugins.pipelineprocessor.rest.PipelineStreamConnection;
import org.graylog2.bindings.providers.MongoJackObjectMapperProvider;
import org.graylog2.database.MongoConnection;
import org.graylog2.database.NotFoundException;
Expand All @@ -34,50 +35,50 @@
import java.util.Collections;
import java.util.Set;

public class PipelineStreamAssignmentService {
private static final Logger log = LoggerFactory.getLogger(PipelineStreamAssignmentService.class);
public class PipelineStreamConnectionsService {
private static final Logger log = LoggerFactory.getLogger(PipelineStreamConnectionsService.class);

public static final String COLLECTION = "pipeline_processor_pipelines_streams";

private final JacksonDBCollection<PipelineStreamAssignment, String> dbCollection;
private final JacksonDBCollection<PipelineStreamConnection, String> dbCollection;

@Inject
public PipelineStreamAssignmentService(MongoConnection mongoConnection, MongoJackObjectMapperProvider mapper) {
public PipelineStreamConnectionsService(MongoConnection mongoConnection, MongoJackObjectMapperProvider mapper) {
dbCollection = JacksonDBCollection.wrap(
mongoConnection.getDatabase().getCollection(COLLECTION),
PipelineStreamAssignment.class,
PipelineStreamConnection.class,
String.class,
mapper.get());
dbCollection.createIndex(DBSort.asc("stream_id"));
dbCollection.createIndex(DBSort.asc("stream_id"), new BasicDBObject("unique", true));
}


public PipelineStreamAssignment save(PipelineStreamAssignment assignment) {
PipelineStreamAssignment existingAssignment = dbCollection.findOne(DBQuery.is("stream_id", assignment.streamId()));
if (existingAssignment == null) {
existingAssignment = PipelineStreamAssignment.create(null, assignment.streamId(), Collections.emptySet());
public PipelineStreamConnection save(PipelineStreamConnection connections) {
PipelineStreamConnection existingConnections = dbCollection.findOne(DBQuery.is("stream_id", connections.streamId()));
if (existingConnections == null) {
existingConnections = PipelineStreamConnection.create(null, connections.streamId(), Collections.emptySet());
}

final PipelineStreamAssignment toSave = existingAssignment.toBuilder()
.pipelineIds(assignment.pipelineIds()).build();
final WriteResult<PipelineStreamAssignment, String> save = dbCollection.save(toSave);
final PipelineStreamConnection toSave = existingConnections.toBuilder()
.pipelineIds(connections.pipelineIds()).build();
final WriteResult<PipelineStreamConnection, String> save = dbCollection.save(toSave);
return save.getSavedObject();
}

public PipelineStreamAssignment load(String streamId) throws NotFoundException {
final PipelineStreamAssignment oneById = dbCollection.findOne(DBQuery.is("stream_id", streamId));
public PipelineStreamConnection load(String streamId) throws NotFoundException {
final PipelineStreamConnection oneById = dbCollection.findOne(DBQuery.is("stream_id", streamId));
if (oneById == null) {
throw new NotFoundException("No pipeline assignments with for stream " + streamId);
throw new NotFoundException("No pipeline connections with for stream " + streamId);
}
return oneById;
}

public Set<PipelineStreamAssignment> loadAll() {
public Set<PipelineStreamConnection> loadAll() {
try {
final DBCursor<PipelineStreamAssignment> assignments = dbCollection.find();
return Sets.newHashSet(assignments.iterator());
final DBCursor<PipelineStreamConnection> connections = dbCollection.find();
return Sets.newHashSet(connections.iterator());
} catch (MongoException e) {
log.error("Unable to load pipelines", e);
log.error("Unable to load pipeline connections", e);
return Collections.emptySet();
}
}
Expand Down
Expand Up @@ -35,14 +35,14 @@
import org.graylog.plugins.pipelineprocessor.ast.statements.Statement;
import org.graylog.plugins.pipelineprocessor.db.PipelineDao;
import org.graylog.plugins.pipelineprocessor.db.PipelineService;
import org.graylog.plugins.pipelineprocessor.db.PipelineStreamAssignmentService;
import org.graylog.plugins.pipelineprocessor.db.PipelineStreamConnectionsService;
import org.graylog.plugins.pipelineprocessor.db.RuleDao;
import org.graylog.plugins.pipelineprocessor.db.RuleService;
import org.graylog.plugins.pipelineprocessor.events.PipelinesChangedEvent;
import org.graylog.plugins.pipelineprocessor.events.RulesChangedEvent;
import org.graylog.plugins.pipelineprocessor.parser.ParseException;
import org.graylog.plugins.pipelineprocessor.parser.PipelineRuleParser;
import org.graylog.plugins.pipelineprocessor.rest.PipelineStreamAssignment;
import org.graylog.plugins.pipelineprocessor.rest.PipelineStreamConnection;
import org.graylog2.events.ClusterEventBus;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.MessageCollection;
Expand Down Expand Up @@ -75,34 +75,34 @@ public class PipelineInterpreter implements MessageProcessor {

private final RuleService ruleService;
private final PipelineService pipelineService;
private final PipelineStreamAssignmentService pipelineStreamAssignmentService;
private final PipelineStreamConnectionsService pipelineStreamConnectionsService;
private final PipelineRuleParser pipelineRuleParser;
private final Journal journal;
private final ScheduledExecutorService scheduler;
private final Meter filteredOutMessages;

private final AtomicReference<ImmutableMap<String, Pipeline>> currentPipelines = new AtomicReference<>(ImmutableMap.of());
private final AtomicReference<ImmutableSetMultimap<String, Pipeline>> streamPipelineAssignments = new AtomicReference<>(ImmutableSetMultimap.of());
private final AtomicReference<ImmutableSetMultimap<String, Pipeline>> streamPipelineConnections = new AtomicReference<>(ImmutableSetMultimap.of());

@Inject
public PipelineInterpreter(RuleService ruleService,
PipelineService pipelineService,
PipelineStreamAssignmentService pipelineStreamAssignmentService,
PipelineStreamConnectionsService pipelineStreamConnectionsService,
PipelineRuleParser pipelineRuleParser,
Journal journal,
MetricRegistry metricRegistry,
@Named("daemonScheduler") ScheduledExecutorService scheduler,
@ClusterEventBus EventBus clusterBus) {
this.ruleService = ruleService;
this.pipelineService = pipelineService;
this.pipelineStreamAssignmentService = pipelineStreamAssignmentService;
this.pipelineStreamConnectionsService = pipelineStreamConnectionsService;
this.pipelineRuleParser = pipelineRuleParser;

this.journal = journal;
this.scheduler = scheduler;
this.filteredOutMessages = metricRegistry.meter(name(ProcessBufferProcessor.class, "filteredOutMessages"));

// listens to cluster wide Rule, Pipeline and pipeline stream assignment changes
// listens to cluster wide Rule, Pipeline and pipeline stream connection changes
clusterBus.register(this);

reload();
Expand Down Expand Up @@ -155,15 +155,15 @@ private synchronized void reload() {
});
currentPipelines.set(ImmutableMap.copyOf(pipelineIdMap));

// read all stream assignments of those pipelines to allow processing messages through them
final HashMultimap<String, Pipeline> assignments = HashMultimap.create();
for (PipelineStreamAssignment streamAssignment : pipelineStreamAssignmentService.loadAll()) {
streamAssignment.pipelineIds().stream()
// read all stream connections of those pipelines to allow processing messages through them
final HashMultimap<String, Pipeline> connections = HashMultimap.create();
for (PipelineStreamConnection streamConnection : pipelineStreamConnectionsService.loadAll()) {
streamConnection.pipelineIds().stream()
.map(pipelineIdMap::get)
.filter(Objects::nonNull)
.forEach(pipeline -> assignments.put(streamAssignment.streamId(), pipeline));
.forEach(pipeline -> connections.put(streamConnection.streamId(), pipeline));
}
streamPipelineAssignments.set(ImmutableSetMultimap.copyOf(assignments));
streamPipelineConnections.set(ImmutableSetMultimap.copyOf(connections));

}

Expand Down Expand Up @@ -194,16 +194,16 @@ public Messages process(Messages messages) {
// this makes a copy of the list!
final Set<String> initialStreamIds = message.getStreams().stream().map(Stream::getId).collect(Collectors.toSet());

final ImmutableSetMultimap<String, Pipeline> streamAssignment = streamPipelineAssignments.get();
final ImmutableSetMultimap<String, Pipeline> streamConnection = streamPipelineConnections.get();

if (initialStreamIds.isEmpty()) {
if (processingBlacklist.contains(tuple(msgId, "default"))) {
// already processed default pipeline for this message
pipelinesToRun = ImmutableSet.of();
log.debug("[{}] already processed default stream, skipping", msgId);
} else {
// get the default stream pipeline assignments for this message
pipelinesToRun = streamAssignment.get("default");
// get the default stream pipeline connections for this message
pipelinesToRun = streamConnection.get("default");
log.debug("[{}] running default stream pipelines: [{}]",
msgId,
pipelinesToRun.stream().map(Pipeline::name).toArray());
Expand All @@ -212,10 +212,10 @@ public Messages process(Messages messages) {
// 2. if a message-stream combination has already been processed (is in the set), skip that execution
final Set<String> streamsIds = initialStreamIds.stream()
.filter(streamId -> !processingBlacklist.contains(tuple(msgId, streamId)))
.filter(streamAssignment::containsKey)
.filter(streamConnection::containsKey)
.collect(Collectors.toSet());
pipelinesToRun = ImmutableSet.copyOf(streamsIds.stream()
.flatMap(streamId -> streamAssignment.get(streamId).stream())
.flatMap(streamId -> streamConnection.get(streamId).stream())
.collect(Collectors.toSet()));
log.debug("[{}] running pipelines {} for streams {}", msgId, pipelinesToRun, streamsIds);
}
Expand Down Expand Up @@ -337,8 +337,8 @@ public void handlePipelineChanges(PipelinesChangedEvent event) {
}

@Subscribe
public void handlePipelineAssignmentChanges(PipelineStreamAssignment assignment) {
log.debug("Pipeline stream assignment changed: {}", assignment);
public void handlePipelineConnectionChanges(PipelineStreamConnection connection) {
log.debug("Pipeline stream connection changed: {}", connection);
scheduler.schedule((Runnable) this::reload, 0, TimeUnit.SECONDS);
}

Expand Down
Expand Up @@ -21,7 +21,7 @@
import com.wordnik.swagger.annotations.ApiOperation;
import com.wordnik.swagger.annotations.ApiParam;
import org.graylog.plugins.pipelineprocessor.db.PipelineService;
import org.graylog.plugins.pipelineprocessor.db.PipelineStreamAssignmentService;
import org.graylog.plugins.pipelineprocessor.db.PipelineStreamConnectionsService;
import org.graylog2.database.NotFoundException;
import org.graylog2.events.ClusterEventBus;
import org.graylog2.plugin.rest.PluginRestResource;
Expand All @@ -39,56 +39,56 @@
import javax.ws.rs.core.MediaType;
import java.util.Set;

@Api(value = "Pipelines/Streams", description = "Stream assignment of processing pipelines")
@Path("/system/pipelines/streams")
@Api(value = "Pipelines/Connections", description = "Stream connections of processing pipelines")
@Path("/system/pipelines/connections")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public class PipelineStreamResource extends RestResource implements PluginRestResource {
public class PipelineConnectionsResource extends RestResource implements PluginRestResource {

private final PipelineStreamAssignmentService assignmentService;
private final PipelineStreamConnectionsService connectionsService;
private final PipelineService pipelineService;
private final StreamService streamService;
private final EventBus clusterBus;

@Inject
public PipelineStreamResource(PipelineStreamAssignmentService assignmentService,
PipelineService pipelineService,
StreamService streamService,
@ClusterEventBus EventBus clusterBus) {
this.assignmentService = assignmentService;
public PipelineConnectionsResource(PipelineStreamConnectionsService connectionsService,
PipelineService pipelineService,
StreamService streamService,
@ClusterEventBus EventBus clusterBus) {
this.connectionsService = connectionsService;
this.pipelineService = pipelineService;
this.streamService = streamService;
this.clusterBus = clusterBus;
}

@ApiOperation(value = "Attach a processing pipeline to a stream", notes = "")
@ApiOperation(value = "Connect processing pipelines to a stream", notes = "")
@POST
public PipelineStreamAssignment assignPipelines(@ApiParam(name = "Json body", required = true) @NotNull PipelineStreamAssignment assignment) throws NotFoundException {
final String streamId = assignment.streamId();
public PipelineStreamConnection connectPipelines(@ApiParam(name = "Json body", required = true) @NotNull PipelineStreamConnection connection) throws NotFoundException {
final String streamId = connection.streamId();
// the default stream doesn't exist as an entity
if (!streamId.equalsIgnoreCase("default")) {
streamService.load(streamId);
}
// verify the pipelines exist
for (String s : assignment.pipelineIds()) {
for (String s : connection.pipelineIds()) {
pipelineService.load(s);
}
final PipelineStreamAssignment save = assignmentService.save(assignment);
final PipelineStreamConnection save = connectionsService.save(connection);
clusterBus.post(save);
return save;
}

@ApiOperation("Get pipeline attachments for the given stream")
@ApiOperation("Get pipeline connections for the given stream")
@GET
@Path("/{streamId}")
public PipelineStreamAssignment getPipelinesForStream(@ApiParam(name = "streamId") @PathParam("streamId") String streamId) throws NotFoundException {
return assignmentService.load(streamId);
public PipelineStreamConnection getPipelinesForStream(@ApiParam(name = "streamId") @PathParam("streamId") String streamId) throws NotFoundException {
return connectionsService.load(streamId);
}

@ApiOperation("Get all pipeline attachments")
@ApiOperation("Get all pipeline connections")
@GET
public Set<PipelineStreamAssignment> getAllAttachments() throws NotFoundException {
return assignmentService.loadAll();
public Set<PipelineStreamConnection> getAll() throws NotFoundException {
return connectionsService.loadAll();
}

}
Expand Up @@ -28,7 +28,7 @@

@AutoValue
@JsonAutoDetect
public abstract class PipelineStreamAssignment {
public abstract class PipelineStreamConnection {

@JsonProperty("id")
@Nullable
Expand All @@ -43,7 +43,7 @@ public abstract class PipelineStreamAssignment {
public abstract Set<String> pipelineIds();

@JsonCreator
public static PipelineStreamAssignment create(@Id @ObjectId @JsonProperty("id") @Nullable String _id,
public static PipelineStreamConnection create(@Id @ObjectId @JsonProperty("id") @Nullable String _id,
@JsonProperty("stream_id") String streamId,
@JsonProperty("pipeline_ids") Set<String> pipelineIds) {
return builder()
Expand All @@ -54,14 +54,14 @@ public static PipelineStreamAssignment create(@Id @ObjectId @JsonProperty("id")
}

public static Builder builder() {
return new AutoValue_PipelineStreamAssignment.Builder();
return new AutoValue_PipelineStreamConnection.Builder();
}

public abstract Builder toBuilder();

@AutoValue.Builder
public abstract static class Builder {
public abstract PipelineStreamAssignment build();
public abstract PipelineStreamConnection build();

public abstract Builder _id(String _id);

Expand Down

0 comments on commit 151f9e6

Please sign in to comment.