Skip to content

Commit

Permalink
[GIE Compiler] Unify Gremlin Timeout Configurations (#2953)
Browse files Browse the repository at this point in the history
<!--
Thanks for your contribution! please review
https://github.com/alibaba/GraphScope/blob/main/CONTRIBUTING.md before
opening an issue.
-->

## What do these changes do?
Unify gremlin timeout configurations. By this pr, the console will
output like this if gremlin query execution timeout is set to 2000ms.

<img width="1627" alt="image"
src="https://github.com/alibaba/GraphScope/assets/22363306/922d0702-1a7f-4904-814c-0e9ebd6e7102">


Timeout can be set by two ways: 
1. system configuration, set `query.execution.timeout.ms: 2000` in
`conf/ir.compiler.properties`
2. set per query in gremlin query `g.with(ARGS_EVAL_TIMEOUT, 2000).V()`
or `g.with(Tokens.ARGS_EVAL_TIMEOUT, 2000).V()`. if timeout value is of
long type, suffix with 'L'

<!-- Please give a short brief about these changes. -->

## Related issue number

<!-- Are there any issues opened that will be resolved by merging this
change? -->

Fixes #2854

---------

Co-authored-by: Longbin Lai <longbin.lailb@alibaba-inc.com>
  • Loading branch information
shirly121 and longbinlai committed Jul 3, 2023
1 parent 4694963 commit 1e2c8f6
Show file tree
Hide file tree
Showing 18 changed files with 156 additions and 44 deletions.
3 changes: 3 additions & 0 deletions interactive_engine/compiler/conf/ir.compiler.properties
Expand Up @@ -32,3 +32,6 @@ graph.planner: {"isOn":true,"opt":"RBO","rules":["FilterMatchRule"]}
# neo4j.bolt.server.disabled: true
# set neo4j server port if neo4j server is enabled
# neo4j.bolt.server.port: 7687

# set timeout in system config, can be overridden by session config per query
# query.execution.timeout.ms: 3000000
19 changes: 17 additions & 2 deletions interactive_engine/compiler/src/main/antlr4/GremlinGS.g4
Expand Up @@ -25,8 +25,11 @@ query
;

// g
// g.with(ARGS_EVAL_TIMEOUT, 2000L)
// g.with(Tokens.ARGS_EVAL_TIMEOUT, 2000L)
// g.with('evaluationTimeout', 2000L)
traversalSource
: TRAVERSAL_ROOT
: TRAVERSAL_ROOT (DOT traversalMethod_with) ?
;

// g.rootTraversal()
Expand Down Expand Up @@ -165,8 +168,20 @@ traversalMethod_bothE
// with('PATH_OPT', 'SIMPLE' | 'ARBITRARY')
// with('RESULT_OPT', 'ALL_V' | 'END_V')
// with('UNTIL', expression)
// with('ARGS_EVAL_TIMEOUT', 2000L) // set evaluation timeout to 2 seconds
// with('Tokens.ARGS_EVAL_TIMEOUT', 2000L) // set evaluation timeout to 2 seconds
// with('evaluationTimeout', 2000L) // set evaluation timeout to 2 seconds
traversalMethod_with
: 'with' LPAREN stringLiteral COMMA stringLiteral RPAREN
: 'with' LPAREN stringLiteral COMMA genericLiteral RPAREN
| 'with' LPAREN evaluationTimeoutKey COMMA evaluationTimeoutValue RPAREN
;

evaluationTimeoutKey
: 'ARGS_EVAL_TIMEOUT' | 'Tokens.ARGS_EVAL_TIMEOUT'
;

evaluationTimeoutValue
: integerLiteral
;

// outV()
Expand Down
Expand Up @@ -20,6 +20,7 @@
import com.alibaba.graphscope.common.client.type.ExecutionRequest;
import com.alibaba.graphscope.common.client.type.ExecutionResponseListener;
import com.alibaba.graphscope.common.config.Configs;
import com.alibaba.graphscope.common.config.QueryTimeoutConfig;

/**
* client to submit request to remote engine service
Expand All @@ -33,7 +34,10 @@ public ExecutionClient(ChannelFetcher<C> channelFetcher) {
this.channelFetcher = channelFetcher;
}

public abstract void submit(ExecutionRequest request, ExecutionResponseListener listener)
public abstract void submit(
ExecutionRequest request,
ExecutionResponseListener listener,
QueryTimeoutConfig timeoutConfig)
throws Exception;

public abstract void close() throws Exception;
Expand Down
Expand Up @@ -21,6 +21,7 @@
import com.alibaba.graphscope.common.client.type.ExecutionResponseListener;
import com.alibaba.graphscope.common.config.Configs;
import com.alibaba.graphscope.common.config.HiactorConfig;
import com.alibaba.graphscope.common.config.QueryTimeoutConfig;
import com.alibaba.graphscope.common.ir.tools.LogicalPlan;
import com.alibaba.graphscope.gaia.proto.IrResult;
import com.google.common.collect.Lists;
Expand All @@ -36,6 +37,7 @@
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
* http client to send request to hqps engine service
Expand All @@ -58,7 +60,10 @@ public HttpExecutionClient(Configs graphConfig, ChannelFetcher<URI> channelFetch
}

@Override
public void submit(ExecutionRequest request, ExecutionResponseListener listener)
public void submit(
ExecutionRequest request,
ExecutionResponseListener listener,
QueryTimeoutConfig timeoutConfig)
throws Exception {
List<CompletableFuture> responseFutures = Lists.newArrayList();
for (URI httpURI : channelFetcher.fetch()) {
Expand All @@ -73,6 +78,7 @@ public void submit(ExecutionRequest request, ExecutionResponseListener listener)
CompletableFuture<HttpResponse<byte[]>> responseFuture =
httpClient
.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofByteArray())
.orTimeout(timeoutConfig.getChannelTimeoutMS(), TimeUnit.MILLISECONDS)
.whenComplete(
(bytes, exception) -> {
if (exception != null) {
Expand Down
Expand Up @@ -21,6 +21,7 @@
import com.alibaba.graphscope.common.client.type.ExecutionResponseListener;
import com.alibaba.graphscope.common.config.Configs;
import com.alibaba.graphscope.common.config.PegasusConfig;
import com.alibaba.graphscope.common.config.QueryTimeoutConfig;
import com.alibaba.graphscope.gaia.proto.IrResult;
import com.alibaba.pegasus.RpcChannel;
import com.alibaba.pegasus.RpcClient;
Expand All @@ -46,14 +47,13 @@ public RpcExecutionClient(Configs graphConfig, ChannelFetcher<RpcChannel> channe
}

@Override
public void submit(ExecutionRequest request, ExecutionResponseListener listener)
public void submit(
ExecutionRequest request,
ExecutionResponseListener listener,
QueryTimeoutConfig timeoutConfig)
throws Exception {
if (rpcClientRef.get() == null) {
rpcClientRef.compareAndSet(
null,
new RpcClient(
PegasusConfig.PEGASUS_GRPC_TIMEOUT.get(graphConfig),
channelFetcher.fetch()));
rpcClientRef.compareAndSet(null, new RpcClient(channelFetcher.fetch()));
}
RpcClient rpcClient = rpcClientRef.get();
PegasusClient.JobRequest jobRequest =
Expand All @@ -68,7 +68,7 @@ public void submit(ExecutionRequest request, ExecutionResponseListener listener)
.setBatchSize(PegasusConfig.PEGASUS_BATCH_SIZE.get(graphConfig))
.setMemoryLimit(PegasusConfig.PEGASUS_MEMORY_LIMIT.get(graphConfig))
.setBatchCapacity(PegasusConfig.PEGASUS_OUTPUT_CAPACITY.get(graphConfig))
.setTimeLimit(PegasusConfig.PEGASUS_TIMEOUT.get(graphConfig))
.setTimeLimit(timeoutConfig.getEngineTimeoutMS())
.setAll(
com.alibaba.pegasus.service.protocol.PegasusClient.Empty
.newBuilder()
Expand Down Expand Up @@ -97,7 +97,8 @@ public void finish() {
public void error(Status status) {
listener.onError(status.asException());
}
});
},
timeoutConfig.getChannelTimeoutMS());
}

@Override
Expand Down
Expand Up @@ -29,5 +29,8 @@ public class FrontendConfig {
public static final Config<Integer> NEO4J_BOLT_SERVER_PORT =
Config.intConfig("neo4j.bolt.server.port", 7687);

public static final Config<Integer> QUERY_EXECUTION_TIMEOUT_MS =
Config.intConfig("query.execution.timeout.ms", 3000000);

public static final Config<String> ENGINE_TYPE = Config.stringConfig("engine.type", "pegasus");
}
@@ -0,0 +1,57 @@
/*
* Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.graphscope.common.config;

public class QueryTimeoutConfig {
// timeout in milliseconds for engine execution
private final long engineTimeoutMS;
// timeout in milliseconds for channel communication
private final long channelTimeoutMS;
// timeout in milliseconds for total query execution
private final long executionTimeoutMS;
private static final double GRADUAL_FACTOR = 0.1d;

public QueryTimeoutConfig(long engineTimeoutMS) {
this.engineTimeoutMS = engineTimeoutMS;
this.channelTimeoutMS = (long) (engineTimeoutMS * (1 + GRADUAL_FACTOR));
this.executionTimeoutMS = (long) (engineTimeoutMS * (1 + 2 * GRADUAL_FACTOR));
}

public long getExecutionTimeoutMS() {
return executionTimeoutMS;
}

public long getChannelTimeoutMS() {
return channelTimeoutMS;
}

public long getEngineTimeoutMS() {
return engineTimeoutMS;
}

@Override
public String toString() {
return "QueryTimeoutConfig{"
+ "executionTimeoutMS="
+ executionTimeoutMS
+ ", channelTimeoutMS="
+ channelTimeoutMS
+ ", engineTimeoutMS="
+ engineTimeoutMS
+ '}';
}
}
Expand Up @@ -18,6 +18,7 @@

import com.alibaba.graphscope.common.client.ExecutionClient;
import com.alibaba.graphscope.common.client.type.ExecutionRequest;
import com.alibaba.graphscope.common.config.QueryTimeoutConfig;
import com.alibaba.graphscope.common.ir.tools.AliasInference;
import com.alibaba.graphscope.common.ir.tools.GraphPlanner;
import com.alibaba.graphscope.common.ir.tools.LogicalPlan;
Expand All @@ -41,10 +42,15 @@
public class GraphPlanExecution<C> implements StatementResults.SubscribableExecution {
private final ExecutionClient<C> client;
private final GraphPlanner.Summary planSummary;
private final QueryTimeoutConfig timeoutConfig;

public GraphPlanExecution(ExecutionClient<C> client, GraphPlanner.Summary planSummary) {
public GraphPlanExecution(
ExecutionClient<C> client,
GraphPlanner.Summary planSummary,
QueryTimeoutConfig timeoutConfig) {
this.client = client;
this.planSummary = planSummary;
this.timeoutConfig = timeoutConfig;
}

@Override
Expand All @@ -60,7 +66,7 @@ public QueryExecution subscribe(QuerySubscriber querySubscriber) {
new CypherRecordProcessor(
new CypherRecordParser(getOutputType(planSummary.getLogicalPlan())),
querySubscriber);
this.client.submit(request, recordProcessor);
this.client.submit(request, recordProcessor, timeoutConfig);
return recordProcessor;
} catch (Exception e) {
throw new RuntimeException(e);
Expand Down
Expand Up @@ -19,6 +19,7 @@
import com.alibaba.graphscope.common.antlr4.Antlr4Parser;
import com.alibaba.graphscope.common.client.ExecutionClient;
import com.alibaba.graphscope.common.config.Configs;
import com.alibaba.graphscope.common.config.QueryTimeoutConfig;
import com.alibaba.graphscope.common.ir.runtime.PhysicalBuilder;
import com.alibaba.graphscope.common.ir.tools.GraphPlanner;
import com.alibaba.graphscope.common.manager.IrMetaQueryCallback;
Expand Down Expand Up @@ -53,6 +54,7 @@ public class GraphQueryExecutor extends FabricExecutor {
private final ExecutionClient client;

private final GraphPlanner graphPlanner;
private final FabricConfig fabricConfig;

public GraphQueryExecutor(
FabricConfig config,
Expand All @@ -75,6 +77,7 @@ public GraphQueryExecutor(
internalLog,
statementLifecycles,
fabricWorkerExecutor);
this.fabricConfig = config;
this.graphConfig = graphConfig;
this.antlr4Parser = antlr4Parser;
this.graphPlanner = graphPlanner;
Expand Down Expand Up @@ -118,7 +121,7 @@ public StatementResult run(
}
QuerySubject querySubject = new QuerySubject.BasicQuerySubject();
StatementResults.SubscribableExecution execution =
new GraphPlanExecution(this.client, planSummary);
new GraphPlanExecution(this.client, planSummary, getQueryTimeoutConfig());
metaQueryCallback.afterExec(irMeta);
StatementResult result = StatementResults.connectVia(execution, querySubject);
return result;
Expand All @@ -131,4 +134,8 @@ public StatementResult run(
}
}
}

private QueryTimeoutConfig getQueryTimeoutConfig() {
return new QueryTimeoutConfig(fabricConfig.getTransactionTimeout().toMillis());
}
}
Expand Up @@ -96,7 +96,9 @@ public void request(long l) throws Exception {
}

@Override
public void cancel() {}
public void cancel() {
this.recordIterator.close();
}

@Override
public boolean await() throws Exception {
Expand Down
Expand Up @@ -31,9 +31,9 @@ public GraphTraversalSourceVisitor(GraphTraversalSource g) {

@Override
public GraphTraversalSource visitTraversalSource(GremlinGSParser.TraversalSourceContext ctx) {
if (ctx.getChildCount() != 1) {
if (ctx.getChildCount() > 3) {
throw new UnsupportedEvalException(
ctx.getClass(), "supported pattern of source is [g]");
ctx.getClass(), "supported pattern of source is [g] or [g.with(..)]");
}
return g;
}
Expand Down
Expand Up @@ -756,10 +756,13 @@ public Traversal visitTraversalMethod_with(GremlinGSParser.TraversalMethod_withC
Step endStep = graphTraversal.asAdmin().getEndStep();
if (!(endStep instanceof PathExpandStep)) {
throw new UnsupportedEvalException(
ctx.getClass(), "with should follow path expand, i.e. out('1..2').with(..)");
ctx.getClass(),
"with should follow source or path expand, i.e. g.with(..) or"
+ " out('1..2').with(..)");
}
String optKey = GenericLiteralVisitor.getStringLiteral(ctx.stringLiteral(0));
String optValue = GenericLiteralVisitor.getStringLiteral(ctx.stringLiteral(1));
String optKey = GenericLiteralVisitor.getStringLiteral(ctx.stringLiteral());
Object optValue =
GenericLiteralVisitor.getInstance().visitGenericLiteral(ctx.genericLiteral());
return graphTraversal.with(optKey, optValue);
}

Expand Down
Expand Up @@ -18,6 +18,7 @@

import com.alibaba.graphscope.common.client.channel.ChannelFetcher;
import com.alibaba.graphscope.common.config.Configs;
import com.alibaba.graphscope.common.config.QueryTimeoutConfig;
import com.alibaba.graphscope.common.ir.tools.GraphPlanner;
import com.alibaba.graphscope.common.manager.IrMetaQueryCallback;
import com.alibaba.graphscope.common.store.IrMeta;
Expand Down Expand Up @@ -101,7 +102,8 @@ public ThrowingConsumer<Context> select(Context ctx) {
ctx, traversal, testGraph, this.configs),
jobId,
script,
irMeta);
irMeta,
new QueryTimeoutConfig(ctx.getRequestTimeout()));
metaQueryCallback.afterExec(irMeta);
});
return op;
Expand Down

0 comments on commit 1e2c8f6

Please sign in to comment.