Skip to content

Commit

Permalink
AggregateGlobalStep acceps all pre-defined Operators
Browse files Browse the repository at this point in the history
  • Loading branch information
rdtr committed May 22, 2024
1 parent 225aeff commit 6c8e1c3
Show file tree
Hide file tree
Showing 5 changed files with 238 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,15 @@ public GraphTraversalSource visitTraversalSourceSelfMethod_withSack(final Gremli
*/
@Override
public GraphTraversalSource visitTraversalSourceSelfMethod_withSideEffect(final GremlinParser.TraversalSourceSelfMethod_withSideEffectContext ctx) {
return source.withSideEffect(antlr.argumentVisitor.parseString(ctx.stringArgument()),
antlr.argumentVisitor.visitGenericLiteralArgument(ctx.genericLiteralArgument()));
if (ctx.getChildCount() < 8) {
// with 4 children withSideEffect() was called without a reducer specified.
return source.withSideEffect(antlr.argumentVisitor.parseString(ctx.stringArgument()),
antlr.argumentVisitor.visitGenericLiteralArgument(ctx.genericLiteralArgument()));
} else {
return source.withSideEffect(antlr.argumentVisitor.parseString(ctx.stringArgument()),
antlr.argumentVisitor.visitGenericLiteralArgument(ctx.genericLiteralArgument()),
(BinaryOperator) antlr.argumentVisitor.visitTraversalBiFunctionArgument(ctx.traversalBiFunctionArgument()));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.tinkerpop.gremlin.process.traversal.Operator;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.ByModulating;
import org.apache.tinkerpop.gremlin.process.traversal.step.LocalBarrier;
Expand All @@ -35,11 +36,7 @@
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import org.apache.tinkerpop.gremlin.util.function.BulkSetSupplier;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.*;
import java.util.function.Supplier;

/**
Expand Down Expand Up @@ -122,8 +119,16 @@ protected Traverser.Admin<S> processNextStart() {

@Override
public void processAllStarts() {
final TraversalSideEffects sideEffects = this.getTraversal().getSideEffects();

// Pre-defined Operator such as addAll and assign will reduce over the whole input set, rather than
// applying a single input one by one.
final boolean isOperatorForBulkSet = sideEffects.getReducer(sideEffectKey) == Operator.addAll ||
sideEffects.getReducer(sideEffectKey) == Operator.assign;

if (this.starts.hasNext()) {
final BulkSet<Object> bulkSet = new BulkSet<>();

while (this.starts.hasNext()) {
final Traverser.Admin<S> traverser = this.starts.next();
TraversalUtil.produce(traverser, aggregateTraversal).ifProductive(p -> bulkSet.add(p, traverser.bulk()));
Expand All @@ -133,7 +138,12 @@ public void processAllStarts() {
// when barrier is reloaded, the traversers should be at the next step
this.barrier.add(traverser);
}
this.getTraversal().getSideEffects().add(this.sideEffectKey, bulkSet);

if (isOperatorForBulkSet) {
sideEffects.add(this.sideEffectKey, bulkSet);
} else {
bulkSet.forEach(p -> sideEffects.add(sideEffectKey, p));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public static Iterable<Object[]> generateTestParameters() {
{"withSack('hello')", g.withSack("hello")},
{"withSack('hello', addAll)", g.withSack("hello", Operator.addAll)},
{"withSideEffect('hello', 12)", g.withSideEffect("hello", 12)},
{"withSideEffect('hello', 12, sum)", g.withSideEffect("hello", 12, Operator.sum)},
{"withStrategies(ReadOnlyStrategy)", g.withStrategies(ReadOnlyStrategy.instance())},
{"withStrategies(new EdgeLabelVerificationStrategy(logWarning: true, throwException: true))", g.withStrategies(EdgeLabelVerificationStrategy.build().logWarning(true).throwException(true).create())},
{"withStrategies(ReadOnlyStrategy, new EdgeLabelVerificationStrategy(logWarning: true, throwException: true))", g.withStrategies(ReadOnlyStrategy.instance(), EdgeLabelVerificationStrategy.build().logWarning(true).throwException(true).create())},
Expand All @@ -65,6 +66,7 @@ public static Iterable<Object[]> generateTestParameters() {
{"with('requestId')", g.with("requestId")},
{"withSideEffect('hello', ['one':1])", g.withSideEffect("hello", map)},
{"withSideEffect('hello', ['one' : 1])", g.withSideEffect("hello", map)},
{"withSideEffect('hello', ['one' : 1], Operator.addAll)", g.withSideEffect("hello", map, Operator.addAll)},
{"withSideEffect('hello', ['one':1, 'two':2])", g.withSideEffect("hello", new HashMap<String, Integer>() {{
put("one", 1);
put("two", 2);
Expand Down
1 change: 1 addition & 0 deletions gremlin-language/src/main/antlr4/Gremlin.g4
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ traversalSourceSelfMethod_withSack

traversalSourceSelfMethod_withSideEffect
: 'withSideEffect' LPAREN stringArgument COMMA genericLiteralArgument RPAREN
| 'withSideEffect' LPAREN stringArgument COMMA genericLiteralArgument COMMA traversalBiFunctionArgument RPAREN
;

traversalSourceSelfMethod_withStrategies
Expand Down
Loading

0 comments on commit 6c8e1c3

Please sign in to comment.