Skip to content

Commit

Permalink
[GIE Compiler] add QueryLogger to log per query
Browse files Browse the repository at this point in the history
  • Loading branch information
shirly121 committed Jul 5, 2023
1 parent 545bf43 commit 76fda65
Show file tree
Hide file tree
Showing 9 changed files with 213 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.alibaba.graphscope.common.store.IrMeta;
import com.alibaba.graphscope.gremlin.integration.result.GraphProperties;
import com.alibaba.graphscope.gremlin.integration.result.GremlinTestResultProcessor;
import com.alibaba.graphscope.gremlin.plugin.QueryStatusCallback;
import com.alibaba.graphscope.gremlin.plugin.processor.IrStandardOpProcessor;
import com.alibaba.graphscope.gremlin.plugin.script.AntlrGremlinScriptEngine;

Expand Down Expand Up @@ -93,21 +94,21 @@ public ThrowingConsumer<Context> select(Context ctx) {
Traversal traversal =
(Traversal) scriptEngine.eval(script, this.context);
applyStrategies(traversal);

long jobId = JOB_ID_COUNTER.incrementAndGet();
IrMeta irMeta = metaQueryCallback.beforeExec();
QueryStatusCallback statusCallback =
createQueryStatusCallback(script, jobId);
processTraversal(
traversal,
new GremlinTestResultProcessor(
ctx,
traversal,
createMetricsPrinter(jobId, script),
statusCallback,
testGraph,
this.configs),
jobId,
script,
irMeta,
new QueryTimeoutConfig(ctx.getRequestTimeout()));
new QueryTimeoutConfig(ctx.getRequestTimeout()),
statusCallback.getQueryLogger());
metaQueryCallback.afterExec(irMeta);
});
return op;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package com.alibaba.graphscope.gremlin.integration.result;

import com.alibaba.graphscope.common.config.Configs;
import com.alibaba.graphscope.gremlin.plugin.QueryStatusCallback;
import com.alibaba.graphscope.gremlin.result.processor.GremlinResultProcessor;
import com.alibaba.graphscope.gremlin.service.MetricsPrinter;
import com.google.common.collect.ImmutableMap;

import org.apache.tinkerpop.gremlin.process.remote.traversal.DefaultRemoteTraverser;
Expand Down Expand Up @@ -46,10 +46,10 @@ public class GremlinTestResultProcessor extends GremlinResultProcessor {
public GremlinTestResultProcessor(
Context writeResult,
Traversal traversal,
MetricsPrinter metricsPrinter,
QueryStatusCallback statusCallback,
GraphProperties testGraph,
Configs configs) {
super(writeResult, traversal, metricsPrinter);
super(writeResult, traversal, statusCallback);
this.cachedProperties = testGraph.getProperties(configs);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.graphscope.gremlin.plugin;

import com.alibaba.graphscope.gremlin.Utils;
import com.codahale.metrics.Timer;

import java.util.concurrent.TimeUnit;

// collect metrics per gremlin query
public class MetricsCollector {
private final Timer.Context timeContext;
private long startMillis;
private long elapsedMillis;

public MetricsCollector(Timer timer) {
this.timeContext = timer.time();
}

public long getStartMillis() {
return this.startMillis;
}

public long getElapsedMillis() {
return this.elapsedMillis;
}

public void stop() {
this.startMillis =
TimeUnit.NANOSECONDS.toMillis(
Utils.getFieldValue(Timer.Context.class, timeContext, "startTime"));
this.elapsedMillis = TimeUnit.NANOSECONDS.toMillis(timeContext.stop());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.graphscope.gremlin.plugin;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueryLogger {
private static final Logger defaultLogger = LoggerFactory.getLogger(QueryLogger.class);
private static Logger metricLogger = LoggerFactory.getLogger("MetricLog");

private final String query;
private final long queryId;

public QueryLogger(String query, long queryId) {
this.query = query;
this.queryId = queryId;
}

public void info(String format, Object... args) {
defaultLogger.info(this + " : " + format, args);
}

public void warn(String format, Object... args) {
defaultLogger.warn(this + " : " + format, args);
}

public void error(String format, Object... args) {
defaultLogger.error(this + " : " + format, args);
}

public void metricsInfo(String format, Object... args) {
metricLogger.info(queryId + " | " + query + " | " + format, args);
}

@Override
public String toString() {
return "[" + "query='" + query + '\'' + ", queryId=" + queryId + ']';
}

public String getQuery() {
return query;
}

public long getQueryId() {
return queryId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.graphscope.gremlin.plugin;

public class QueryStatusCallback {
private final MetricsCollector metricsCollector;
private final QueryLogger queryLogger;

public QueryStatusCallback(MetricsCollector metricsCollector, QueryLogger queryLogger) {
this.metricsCollector = metricsCollector;
this.queryLogger = queryLogger;
}

public void onStart() {}

public void onEnd(boolean isSucceed) {
this.metricsCollector.stop();
queryLogger.info("total execution time is {} ms", metricsCollector.getElapsedMillis());
queryLogger.metricsInfo(
"{} | {} | {}",
isSucceed,
metricsCollector.getElapsedMillis(),
metricsCollector.getStartMillis());
}

public QueryLogger getQueryLogger() {
return queryLogger;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@
import com.alibaba.graphscope.common.store.IrMeta;
import com.alibaba.graphscope.gremlin.InterOpCollectionBuilder;
import com.alibaba.graphscope.gremlin.Utils;
import com.alibaba.graphscope.gremlin.plugin.MetricsCollector;
import com.alibaba.graphscope.gremlin.plugin.QueryLogger;
import com.alibaba.graphscope.gremlin.plugin.QueryStatusCallback;
import com.alibaba.graphscope.gremlin.plugin.script.AntlrGremlinScriptEngineFactory;
import com.alibaba.graphscope.gremlin.plugin.strategy.ExpandFusionStepStrategy;
import com.alibaba.graphscope.gremlin.plugin.strategy.RemoveUselessStepStrategy;
import com.alibaba.graphscope.gremlin.plugin.strategy.ScanFusionStepStrategy;
import com.alibaba.graphscope.gremlin.result.processor.AbstractResultProcessor;
import com.alibaba.graphscope.gremlin.result.processor.GremlinResultProcessor;
import com.alibaba.graphscope.gremlin.service.MetricsPrinter;
import com.alibaba.pegasus.RpcClient;
import com.alibaba.pegasus.intf.ResultProcessor;
import com.alibaba.pegasus.service.protocol.PegasusClient;
Expand All @@ -66,8 +68,6 @@
import org.apache.tinkerpop.gremlin.server.op.standard.StandardOpProcessor;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.codehaus.groovy.control.MultipleCompilationErrorsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Map;
Expand All @@ -82,9 +82,6 @@
import javax.script.SimpleBindings;

public class IrStandardOpProcessor extends StandardOpProcessor {
private static Logger metricLogger = LoggerFactory.getLogger("MetricLog");
private static Logger logger = LoggerFactory.getLogger(IrStandardOpProcessor.class);

protected static final AtomicLong JOB_ID_COUNTER = new AtomicLong(0L);
protected Graph graph;
protected GraphTraversalSource g;
Expand Down Expand Up @@ -126,24 +123,18 @@ protected void evalOpInternal(

long jobId = JOB_ID_COUNTER.incrementAndGet();
IrMeta irMeta = metaQueryCallback.beforeExec();
MetricsPrinter metricsPrinter = createMetricsPrinter(jobId, script);
QueryStatusCallback statusCallback = createQueryStatusCallback(script, jobId);
GremlinExecutor.LifeCycle lifeCycle =
createLifeCycle(
ctx,
gremlinExecutorSupplier,
bindingsSupplier,
jobId,
script,
irMeta,
metricsPrinter);
ctx, gremlinExecutorSupplier, bindingsSupplier, irMeta, statusCallback);
try {
CompletableFuture<Object> evalFuture =
gremlinExecutor.eval(script, language, new SimpleBindings(), lifeCycle);
evalFuture.handle(
(v, t) -> {
metaQueryCallback.afterExec(irMeta);
if (t != null) {
metricsPrinter.stop(false);
statusCallback.onEnd(false);
if (v instanceof AbstractResultProcessor) {
((AbstractResultProcessor) v).cancel();
}
Expand Down Expand Up @@ -174,7 +165,7 @@ protected void evalOpInternal(
+ " increasing the limit given to"
+ " TimedInterruptCustomizerProvider",
msg);
logger.warn(errorMessage);
statusCallback.getQueryLogger().warn(errorMessage);
ctx.writeAndFlush(
ResponseMessage.build(msg)
.code(ResponseStatusCode.SERVER_ERROR_TIMEOUT)
Expand All @@ -190,7 +181,7 @@ protected void evalOpInternal(
"Script evaluation exceeded the configured"
+ " threshold for request [%s]",
msg);
logger.warn(errorMessage, t);
statusCallback.getQueryLogger().warn(errorMessage, t);
ctx.writeAndFlush(
ResponseMessage.build(msg)
.code(ResponseStatusCode.SERVER_ERROR_TIMEOUT)
Expand All @@ -210,7 +201,7 @@ protected void evalOpInternal(
+ " allowed by the JVM, please split it"
+ " into multiple smaller statements - %s",
msg);
logger.warn(errorMessage);
statusCallback.getQueryLogger().warn(errorMessage);
ctx.writeAndFlush(
ResponseMessage.build(msg)
.code(
Expand All @@ -222,12 +213,14 @@ protected void evalOpInternal(
} else {
errorMessage =
t.getMessage() == null ? t.toString() : t.getMessage();
logger.warn(
String.format(
"Exception processing a script on request"
+ " [%s].",
msg),
t);
statusCallback
.getQueryLogger()
.warn(
String.format(
"Exception processing a script on"
+ " request [%s].",
msg),
t);
ctx.writeAndFlush(
ResponseMessage.build(msg)
.code(
Expand All @@ -250,18 +243,17 @@ protected void evalOpInternal(
}
}

protected MetricsPrinter createMetricsPrinter(long queryId, String script) {
return new MetricsPrinter(queryId, script, evalOpTimer, metricLogger);
protected QueryStatusCallback createQueryStatusCallback(String query, long queryId) {
return new QueryStatusCallback(
new MetricsCollector(evalOpTimer), new QueryLogger(query, queryId));
}

protected GremlinExecutor.LifeCycle createLifeCycle(
Context ctx,
Supplier<GremlinExecutor> gremlinExecutorSupplier,
BindingSupplier bindingsSupplier,
long jobId,
String script,
IrMeta irMeta,
MetricsPrinter metricsPrinter) {
QueryStatusCallback statusCallback) {
QueryTimeoutConfig timeoutConfig = new QueryTimeoutConfig(ctx.getRequestTimeout());
return GremlinExecutor.LifeCycle.build()
.evaluationTimeoutOverride(timeoutConfig.getExecutionTimeoutMS())
Expand Down Expand Up @@ -290,11 +282,10 @@ protected GremlinExecutor.LifeCycle createLifeCycle(
processTraversal(
traversal,
new GremlinResultProcessor(
ctx, traversal, metricsPrinter),
jobId,
script,
ctx, traversal, statusCallback),
irMeta,
timeoutConfig);
timeoutConfig,
statusCallback.getQueryLogger());
}
} catch (Exception e) {
throw new RuntimeException(e);
Expand All @@ -307,25 +298,21 @@ protected GremlinExecutor.LifeCycle createLifeCycle(
protected void processTraversal(
Traversal traversal,
ResultProcessor resultProcessor,
long jobId,
String script,
IrMeta irMeta,
QueryTimeoutConfig timeoutConfig)
QueryTimeoutConfig timeoutConfig,
QueryLogger queryLogger)
throws InvalidProtocolBufferException, IOException, RuntimeException {
InterOpCollection opCollection = (new InterOpCollectionBuilder(traversal)).build();
// fuse order with limit to topK
InterOpCollection.applyStrategies(opCollection);
// add sink operator
InterOpCollection.process(opCollection);

long jobId = queryLogger.getQueryId();
String jobName = "ir_plan_" + jobId;
IrPlan irPlan = new IrPlan(irMeta, opCollection);
// print script and jobName with ir plan
logger.info(
"gremlin query \"{}\", job conf name \"{}\", ir plan {}",
script,
jobName,
irPlan.getPlanAsJson());
queryLogger.info("ir plan {}", irPlan.getPlanAsJson());
byte[] physicalPlanBytes = irPlan.toPhysicalBytes(configs);
irPlan.close();

Expand Down

0 comments on commit 76fda65

Please sign in to comment.