Skip to content

Commit

Permalink
[GIE-IR] support sending back a page of results if batch size met (#2076
Browse files Browse the repository at this point in the history
)
  • Loading branch information
shirly121 committed Sep 27, 2022
1 parent a7462ae commit 2d57a3d
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.alibaba.graphscope.gremlin.result.GremlinResultProcessor;
import com.google.common.collect.ImmutableMap;

import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
import org.apache.tinkerpop.gremlin.process.remote.traversal.DefaultRemoteTraverser;
import org.apache.tinkerpop.gremlin.server.Context;
import org.apache.tinkerpop.gremlin.structure.Edge;
Expand Down Expand Up @@ -48,18 +47,6 @@ public GremlinTestResultProcessor(
this.cachedProperties = testGraph.getProperties();
}

@Override
public void finish() {
synchronized (this) {
if (!locked) {
logger.debug("process finish");
formatResultIfNeed();
writeResultList(writeResult, resultCollectors, ResponseStatusCode.SUCCESS);
locked = true;
}
}
}

@Override
protected void formatResultIfNeed() {
super.formatResultIfNeed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
import io.netty.channel.ChannelHandlerContext;

import org.apache.tinkerpop.gremlin.driver.MessageSerializer;
import org.apache.tinkerpop.gremlin.driver.Tokens;
import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
import org.apache.tinkerpop.gremlin.server.Context;
import org.apache.tinkerpop.gremlin.server.Settings;
import org.apache.tinkerpop.gremlin.server.handler.Frame;
import org.apache.tinkerpop.gremlin.server.handler.StateKey;
import org.apache.tinkerpop.gremlin.server.op.standard.StandardOpProcessor;
Expand All @@ -40,20 +42,38 @@
public class GremlinResultProcessor extends StandardOpProcessor implements ResultProcessor {
private static Logger logger = LoggerFactory.getLogger(GremlinResultProcessor.class);
protected Context writeResult;
protected List<Object> resultCollectors = new ArrayList<>();
protected List<Object> resultCollectors;
protected int resultCollectorsBatchSize;
protected boolean locked = false;
protected ResultParser resultParser;

public GremlinResultProcessor(Context writeResult, ResultParser resultParser) {
this.writeResult = writeResult;
this.resultParser = resultParser;
RequestMessage msg = writeResult.getRequestMessage();
Settings settings = writeResult.getSettings();
// init batch size from resultIterationBatchSize in conf/gremlin-server.yaml,
// or args in RequestMessage which is originate from gremlin client
this.resultCollectorsBatchSize =
(Integer)
msg.optionalArgs(Tokens.ARGS_BATCH_SIZE)
.orElse(settings.resultIterationBatchSize);
this.resultCollectors = new ArrayList<>(this.resultCollectorsBatchSize);
}

@Override
public void process(PegasusClient.JobResponse response) {
synchronized (this) {
try {
if (!locked) {
// send back a page of results if batch size is met and then reset the
// resultCollectors
if (this.resultCollectors.size() >= this.resultCollectorsBatchSize) {
formatResultIfNeed();
writeResultList(
writeResult, resultCollectors, ResponseStatusCode.PARTIAL_CONTENT);
this.resultCollectors = new ArrayList<>(this.resultCollectorsBatchSize);
}
resultCollectors.addAll(resultParser.parseFrom(response));
}
} catch (Exception e) {
Expand Down

0 comments on commit 2d57a3d

Please sign in to comment.