Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(interactive): Add Rate Limiter for Control Flow #3368

Merged
merged 12 commits into from
Nov 23, 2023
2 changes: 2 additions & 0 deletions charts/gie-standalone/templates/frontend/statefulset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ spec:
value: {{ .Values.pegasusBatchSize | quote }}
- name: OUTPUT_CAPACITY
value: {{ .Values.pegasusOutputCapacity | quote }}
- name: FRONTEND_QUERY_PER_SECOND_LIMIT
value: {{ .Values.frontendQueryPerSecondLimit | quote }}
ports:
- name: gremlin
containerPort: {{ .Values.frontend.service.gremlinPort }}
Expand Down
2 changes: 2 additions & 0 deletions charts/gie-standalone/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ pegasusBatchSize: 1024

pegasusOutputCapacity: 16

frontendQueryPerSecondLimit: 2147483647

## need by vineyard in distributed env
etcdEndpoint: "etcd-for-vineyard.default.svc.cluster.local:2379"

Expand Down
14 changes: 9 additions & 5 deletions docs/interactive_engine/deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,12 @@ helm install [YOUR_RELEASE_NAME] graphscope/gie-standalone --set [key1]=[value1]
```
We've listed useful configuration keys in the following:

- gremlinPort: the port for accessing the Gremlin service (Default: 8182).
- pegasusWorkerNum: the number of working threads per each executor (Default: 2).
Obviously, the total number of working threads is: 'executor.replicaCount x pegasusWorkerNum'.
- pegasusTimeout: The maximum duration in `ms` you allow each query to run (Default: 24,000).

| Name | Description | Default Value |
| :-------------------------- | --------------------------------------------------------------------- | :-----------------------------: |
| gremlinPort | the port for accessing the Gremlin service | 8182 |
| cypherPort | the port for accessing the Cypher service | 7687 |
| pegasusWorkerNum | the number of working threads per each executor | 2 |
| pegasusTimeout | the maximum duration in `ms` you allow each query to run | 24,000 |
| pegasusBatchSize | the maximum size of streaming records can be output for an operator | 1024 |
| pegasusOutputCapacity | the maximum number of streaming records can be output for an operator | 16 |
| frontendQueryPerSecondLimit | the maximum qps can be handled by frontend service | 2147483647 (without limitation) |
6 changes: 4 additions & 2 deletions docs/storage_engine/groot.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ You can also check the deployment status and get the connection address by using
helm status demo
```


### Common Configurations
| Name | Description | Default value |
| --- | --- | --- |
Expand All @@ -70,10 +71,11 @@ helm status demo
| auth.username | Username. If empty, then there's no authentication | "" |
| auth.password | Password | "" |
| store.replicaCount | Number of Store Pod | 2 |
| frontend.replicaCount | Number of Frontend | 1 |
| ingestor.replicaCount | Number of Ingestor Pod | 2 |
| frontend.service.type | Kubernetes Service type of frontend | NodePort |
| dataset.modern | Load [modern graph](https://tinkerpop.apache.org/docs/current/tutorials/getting-started/) dataset at the start | false |
| frontend.replicaCount | Number of Frontend | 1 |
| frontend.service.type | Kubernetes Service type of frontend | NodePort |
| frontend.query.per.second.limit | the maximum qps can be handled by frontend service | 2147483647 (without limitation) |


If Groot is launched with the default configuration, then two Store Pods, one Frontend Pod, one Ingestor Pod, and one Coordinator Pod will be started. The number of Coordinator nodes is fixed to 1.
Expand Down
2 changes: 2 additions & 0 deletions interactive_engine/compiler/conf/ir.compiler.properties
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,5 @@ graph.planner.rules: FilterIntoJoinRule, FilterMatchRule, NotMatchToAntiJoinRule
# query.execution.timeout.ms: 3000000

calcite.default.charset: UTF-8

# frontend.query.per.second.limit: 2147483647
4 changes: 3 additions & 1 deletion interactive_engine/compiler/set_properties.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ gremlin_server_port="gremlin.server.port: $GREMLIN_SERVER_PORT";

cypher_server_port="neo4j.bolt.server.port: $CYPHER_SERVER_PORT";

frontend_query_per_second_limit="frontend.query.per.second.limit: $FRONTEND_QUERY_PER_SECOND_LIMIT"

count=1;
while (($count<$SERVERSSIZE))
do
Expand All @@ -41,6 +43,6 @@ done

graph_schema="graph.schema: $GRAPH_SCHEMA"

properties="$worker_num\n$timeout\n$batch_size\n$output_capacity\n$hosts\n$server_num\n$graph_schema\n$gremlin_server_port\n$cypher_server_port"
properties="$worker_num\n$timeout\n$batch_size\n$output_capacity\n$hosts\n$server_num\n$graph_schema\n$gremlin_server_port\n$cypher_server_port\n$frontend_query_per_second_limit"

echo -e $properties > ./conf/ir.compiler.properties
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.alibaba.graphscope.common.config;

import java.math.BigDecimal;
import java.util.function.Function;

public class Config<T> {
Expand Down Expand Up @@ -46,7 +47,7 @@ public static Config<Short> shortConfig(String key, short defaultVal) {
}

public static Config<Integer> intConfig(String key, int defaultVal) {
return new Config<>(key, String.valueOf(defaultVal), (s) -> Integer.parseInt(s));
return new Config<>(key, String.valueOf(defaultVal), (s) -> new BigDecimal(s).intValue());
}

public static Config<Long> longConfig(String key, long defaultVal) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,7 @@ public class FrontendConfig {

public static final Config<Integer> QUERY_CACHE_SIZE =
Config.intConfig("query.cache.size", 100);

public static final Config<Integer> QUERY_PER_SECOND_LIMIT =
Config.intConfig("frontend.query.per.second.limit", 2147483647);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.graphscope.common.manager;

import com.alibaba.graphscope.common.config.Configs;
import com.alibaba.graphscope.common.config.FrontendConfig;
import com.google.common.util.concurrent.RateLimiter;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.*;

public class RateLimitExecutor extends ThreadPoolExecutor {
private static final Logger logger = LoggerFactory.getLogger(RateLimitExecutor.class);
private final RateLimiter rateLimiter;

public RateLimitExecutor(
Configs configs,
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
threadFactory,
handler);
int permitsPerSecond = FrontendConfig.QUERY_PER_SECOND_LIMIT.get(configs);
this.rateLimiter = RateLimiter.create(permitsPerSecond);
}

public Future<?> submit(Runnable task) {
if (rateLimiter.tryAcquire()) {
return super.submit(task);
}
throw new RejectedExecutionException(
"rate limit exceeded, current limit is "
+ rateLimiter.getRate()
+ " per second. Please increase the QPS limit by the config"
+ " 'query.per.second.limit' or slow down the query sending speed");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package com.alibaba.graphscope.gremlin.plugin;

import org.apache.commons.lang3.StringUtils;
import org.checkerframework.checker.nullness.qual.Nullable;

public class QueryStatusCallback {
private final MetricsCollector metricsCollector;
private final QueryLogger queryLogger;
Expand All @@ -27,14 +30,15 @@ public QueryStatusCallback(MetricsCollector metricsCollector, QueryLogger queryL

public void onStart() {}

public void onEnd(boolean isSucceed) {
public void onEnd(boolean isSucceed, @Nullable String msg) {
this.metricsCollector.stop();
queryLogger.info("total execution time is {} ms", metricsCollector.getElapsedMillis());
queryLogger.metricsInfo(
"{} | {} | {}",
"{} | {} | {} | {}",
isSucceed,
metricsCollector.getElapsedMillis(),
metricsCollector.getStartMillis());
metricsCollector.getStartMillis(),
msg != null ? msg : StringUtils.EMPTY);
}

public QueryLogger getQueryLogger() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ protected void evalOpInternal(
(v, t) -> {
metaQueryCallback.afterExec(irMeta);
if (t != null) {
statusCallback.onEnd(false);
statusCallback.onEnd(false, null);
if (v instanceof AbstractResultProcessor) {
((AbstractResultProcessor) v).cancel();
}
Expand Down Expand Up @@ -245,10 +245,11 @@ protected void evalOpInternal(
return null;
});
} catch (RejectedExecutionException var17) {
statusCallback.getQueryLogger().error(var17.getMessage());
ctx.writeAndFlush(
ResponseMessage.build(msg)
.code(ResponseStatusCode.TOO_MANY_REQUESTS)
.statusMessage("Rate limiting")
.statusMessage(var17.getMessage())
.create());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public synchronized void process(PegasusClient.JobResponse response) {
statusCallback.getQueryLogger().error("process response from grpc fail", e);
// cannot write to this context any more
isContextWritable = false;
statusCallback.onEnd(false);
statusCallback.onEnd(false, null);
writeResultList(
writeResult,
Collections.singletonList(e.getMessage()),
Expand All @@ -104,7 +104,7 @@ public synchronized void process(PegasusClient.JobResponse response) {
public synchronized void finish() {
if (isContextWritable) {
isContextWritable = false;
statusCallback.onEnd(true);
statusCallback.onEnd(true, null);
aggregateResults();
writeResultList(writeResult, resultCollectors, ResponseStatusCode.SUCCESS);
}
Expand All @@ -115,7 +115,7 @@ public synchronized void error(Status status) {
logger.error("error return from grpc, status {}", status);
if (isContextWritable) {
isContextWritable = false;
statusCallback.onEnd(false);
statusCallback.onEnd(false, status.getDescription());
writeResultList(
writeResult,
Collections.singletonList(status.toString()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.alibaba.graphscope.common.config.FrontendConfig;
import com.alibaba.graphscope.common.ir.tools.GraphPlanner;
import com.alibaba.graphscope.common.manager.IrMetaQueryCallback;
import com.alibaba.graphscope.common.manager.RateLimitExecutor;
import com.alibaba.graphscope.gremlin.Utils;
import com.alibaba.graphscope.gremlin.auth.AuthManager;
import com.alibaba.graphscope.gremlin.auth.AuthManagerReference;
Expand All @@ -36,10 +37,12 @@
import org.apache.tinkerpop.gremlin.server.Settings;
import org.apache.tinkerpop.gremlin.server.op.AbstractOpProcessor;
import org.apache.tinkerpop.gremlin.server.util.ServerGremlinExecutor;
import org.apache.tinkerpop.gremlin.server.util.ThreadFactoryUtil;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory;

import java.io.InputStream;
import java.util.concurrent.*;

public class IrGremlinServer implements AutoCloseable {
private final Configs configs;
Expand Down Expand Up @@ -96,7 +99,7 @@ public void start() throws Exception {
AuthManager authManager = new DefaultAuthManager(configs);
AuthManagerReference.setAuthManager(authManager);

this.gremlinServer = new GremlinServer(settings);
this.gremlinServer = new GremlinServer(settings, createRateLimitExecutor());
ServerGremlinExecutor serverGremlinExecutor =
Utils.getFieldValue(
GremlinServer.class, this.gremlinServer, "serverGremlinExecutor");
Expand All @@ -106,6 +109,23 @@ public void start() throws Exception {
this.gremlinServer.start().join();
}

private ExecutorService createRateLimitExecutor() {
if (settings.gremlinPool == 0) {
settings.gremlinPool = Runtime.getRuntime().availableProcessors();
}
ThreadFactory threadFactoryGremlin = ThreadFactoryUtil.create("exec-%d");
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(settings.maxWorkQueueSize);
return new RateLimitExecutor(
configs,
settings.gremlinPool,
settings.gremlinPool,
0L,
TimeUnit.MILLISECONDS,
queue,
threadFactoryGremlin,
new ThreadPoolExecutor.AbortPolicy());
}

@Override
public void close() throws Exception {
if (this.gremlinServer != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ private com.alibaba.graphscope.common.config.Configs getConfigs() {
addToConfigMapIfExist(FrontendConfig.FRONTEND_SERVER_ID.getKey(), configMap);
// add frontend server num
addToConfigMapIfExist(FrontendConfig.FRONTEND_SERVER_NUM.getKey(), configMap);
// add frontend qps limit
addToConfigMapIfExist(FrontendConfig.QUERY_PER_SECOND_LIMIT.getKey(), configMap);
return new com.alibaba.graphscope.common.config.Configs(configMap);
}

Expand Down
Loading