Skip to content

Commit

Permalink
rewrite StageIterator to do less on-the-fly work
Browse files Browse the repository at this point in the history
when the iterator is created the entire set of stages is known, don't try to be clever and pre-compute the set instead of having to do computation to solve it "elegantly"
  • Loading branch information
kroepke committed Oct 18, 2016
1 parent 754b348 commit 70777c0
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 46 deletions.
Expand Up @@ -29,6 +29,8 @@
@AutoValue
public abstract class Stage implements Comparable<Stage> {
private List<Rule> rules;
// not an autovalue property, because it introduces a cycle in hashCode() and we have no way of excluding it
private transient Pipeline pipeline;
private transient Meter executed;
private transient String meterName;

Expand Down Expand Up @@ -84,6 +86,14 @@ public void markExecution() {
}
}

public Pipeline getPipeline() {
return pipeline;
}

public void setPipeline(Pipeline pipeline) {
this.pipeline = pipeline;
}

@AutoValue.Builder
public abstract static class Builder {
public abstract Stage build();
Expand Down
Expand Up @@ -185,6 +185,7 @@ private Pipeline resolvePipeline(Pipeline pipeline, Map<String, Rule> ruleNameMa
})
.collect(Collectors.toList());
stage.setRules(resolvedRules);
stage.setPipeline(pipeline);
stage.registerMetrics(metricRegistry, pipeline.id());
});

Expand Down Expand Up @@ -314,10 +315,9 @@ private List<Message> processForResolvedPipelines(Message message,
// iterate through all stages for all matching pipelines, per "stage slice" instead of per pipeline.
// pipeline execution ordering is not guaranteed
while (stages.hasNext()) {
final Set<Tuple2<Stage, Pipeline>> stageSet = stages.next();
for (Tuple2<Stage, Pipeline> pair : stageSet) {
final Stage stage = pair.v1();
final Pipeline pipeline = pair.v2();
final List<Stage> stageSet = stages.next();
for (final Stage stage : stageSet) {
final Pipeline pipeline = stage.getPipeline();
if (pipelinesToSkip.contains(pipeline)) {
log.debug("[{}] previous stage result prevents further processing of pipeline `{}`",
msgId,
Expand Down
Expand Up @@ -17,47 +17,61 @@
package org.graylog.plugins.pipelineprocessor.processors;

import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.PeekingIterator;
import com.google.common.collect.ArrayListMultimap;
import org.graylog.plugins.pipelineprocessor.ast.Pipeline;
import org.graylog.plugins.pipelineprocessor.ast.Stage;
import org.jooq.lambda.tuple.Tuple;
import org.jooq.lambda.tuple.Tuple2;

import java.util.OptionalInt;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static com.google.common.collect.Iterators.peekingIterator;
public class StageIterator extends AbstractIterator<List<Stage>> {

public class StageIterator extends AbstractIterator<Set<Tuple2<Stage, Pipeline>>> {
// first and last stage for the given pipelines
private final int[] extent = new int[]{Integer.MAX_VALUE, Integer.MIN_VALUE};

// the currentStage is always one before the next one to be returned
private int currentStage;

private final ArrayListMultimap<Integer, Stage> stageMultimap = ArrayListMultimap.create();

private final ImmutableList<Tuple2<PeekingIterator<Stage>, Pipeline>> stageIterators;

public StageIterator(Set<Pipeline> pipelines) {
stageIterators = ImmutableList.copyOf(pipelines.stream()
.map(p -> Tuple.tuple(peekingIterator(p.stages().iterator()), p))
.iterator());
if (pipelines.isEmpty()) {
currentStage = extent[0] = extent[1] = 0;
return;
}
pipelines.forEach(pipeline -> {
// skip pipelines without any stages, they don't contribute any rules to run
if (pipeline.stages().isEmpty()) {
return;
}
extent[0] = Math.min(extent[0], pipeline.stages().first().stage());
extent[1] = Math.max(extent[1], pipeline.stages().last().stage());
});

// map each stage number to the corresponding stages
pipelines.stream()
.flatMap(pipeline -> pipeline.stages().stream())
.forEach(stage -> stageMultimap.put(stage.stage(), stage));

if (extent[0] == Integer.MIN_VALUE) {
throw new IllegalArgumentException("First stage cannot be at " + Integer.MIN_VALUE);
}
// the stage before the first stage.
currentStage = extent[0] - 1;
}

@Override
protected Set<Tuple2<Stage, Pipeline>> computeNext() {
final OptionalInt min = stageIterators.stream()
.filter(pair -> pair.v1().hasNext()) // only iterators that have remaining elements
.mapToInt(pair -> pair.v1().peek().stage()) // get the stage of each remaining element
.min(); // we want the minimum stage number of them all

if (!min.isPresent()) {
protected List<Stage> computeNext() {
if (currentStage == extent[1]) {
return endOfData();

}
final int currStage = min.getAsInt();

return stageIterators.stream()
.filter(pair -> pair.v1().hasNext()) // only iterators that have remaining elements
.filter(pair -> pair.v1().peek().stage() == currStage) // only elements for the current stage
.map(pair -> Tuple.tuple(pair.v1().next(), pair.v2()))
.collect(Collectors.toSet());

do {
currentStage++;
if (currentStage > extent[1]) {
return endOfData();
}
} while (!stageMultimap.containsKey(currentStage));
return stageMultimap.get(currentStage);
}
}
Expand Up @@ -24,13 +24,11 @@
import org.graylog.plugins.pipelineprocessor.ast.Pipeline;
import org.graylog.plugins.pipelineprocessor.ast.Stage;
import org.jooq.lambda.Seq;
import org.jooq.lambda.tuple.Tuple2;
import org.junit.Test;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;

import static com.google.common.collect.ImmutableSortedSet.of;
import static org.junit.Assert.assertArrayEquals;
Expand Down Expand Up @@ -62,11 +60,11 @@ public void singlePipelineNoStage() {
.build());
final StageIterator iterator = new StageIterator(input);
assertTrue(iterator.hasNext());
final Set<Tuple2<Stage, Pipeline>> nextStages = iterator.next();
final List<Stage> nextStages = iterator.next();
assertEquals(1, nextStages.size());

final Tuple2<Stage, Pipeline> stage = Iterables.getOnlyElement(nextStages);
assertEquals(0, stage.v1.ruleReferences().size());
final Stage stage = Iterables.getOnlyElement(nextStages);
assertEquals(0, stage.ruleReferences().size());
}

@Test
Expand All @@ -87,13 +85,13 @@ public void singlePipelineTwoStages() {
)).build());
final StageIterator iterator = new StageIterator(input);
//noinspection unchecked
final Set<Tuple2<Stage, Pipeline>>[] stages = Iterators.toArray(iterator, Set.class);
final List<Stage>[] stages = Iterators.toArray(iterator, List.class);

assertEquals(2, stages.length);
assertEquals(1, stages[0].size());
assertEquals("last set of stages are on stage 0", 0, Iterables.getOnlyElement(stages[0]).v1.stage());
assertEquals("last set of stages are on stage 0", 0, Iterables.getOnlyElement(stages[0]).stage());
assertEquals(1, stages[1].size());
assertEquals("last set of stages are on stage 1", 10, Iterables.getOnlyElement(stages[1]).v1.stage());
assertEquals("last set of stages are on stage 1", 10, Iterables.getOnlyElement(stages[1]).stage());
}


Expand Down Expand Up @@ -142,23 +140,23 @@ public void multiplePipelines() {
Pipeline.builder()
.name("p2")
.stages(stages2).build()
,Pipeline.builder()
,Pipeline.builder()
.name("p3")
.stages(stages3).build()
);
final StageIterator iterator = new StageIterator(input);

final List<Set<Tuple2<Stage, Pipeline>>> stageSets = Lists.newArrayList(iterator);
final List<List<Stage>> stageSets = Lists.newArrayList(iterator);

assertEquals("5 different stages to execute", 5, stageSets.size());

for (Set<Tuple2<Stage, Pipeline>> stageSet : stageSets) {
for (List<Stage> stageSet : stageSets) {
assertEquals("Each stage set should only contain stages with the same number",
1,
Seq.seq(stageSet).map(Tuple2::v1).groupBy(Stage::stage).keySet().size());
Seq.seq(stageSet).groupBy(Stage::stage).keySet().size());
}
assertArrayEquals("Stages must be sorted numerically",
new int[] {-1, 0, 4, 10, 11},
stageSets.stream().flatMap(Collection::stream).map(Tuple2::v1).mapToInt(Stage::stage).distinct().toArray());
stageSets.stream().flatMap(Collection::stream).mapToInt(Stage::stage).distinct().toArray());
}
}
}

0 comments on commit 70777c0

Please sign in to comment.