Skip to content

Commit

Permalink
chore: Export realtime write and submitting query metrics in groot (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
siyuan0322 committed May 9, 2024
1 parent 8b8b4bb commit a8a164d
Show file tree
Hide file tree
Showing 28 changed files with 235 additions and 701 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/gss.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ jobs:
export RUSTC_WRAPPER=/usr/local/bin/sccache
cd ${GITHUB_WORKSPACE}/interactive_engine
mvn clean install -P groot -Drust.compile.mode=debug -DskipTests --quiet
mvn clean install -Pgroot-data-load --quiet
sccache --show-stats
- name: Gremlin Test
Expand Down
5 changes: 0 additions & 5 deletions interactive_engine/assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,6 @@
<artifactId>executor</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.graphscope</groupId>
<artifactId>data-load-tool</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
</profiles>
Expand Down
21 changes: 0 additions & 21 deletions interactive_engine/compiler/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -146,33 +146,12 @@
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-logging</artifactId>
</dependency>
<dependency>
<!-- Not managed by opentelemetry-bom -->
<groupId>io.opentelemetry.semconv</groupId>
<artifactId>opentelemetry-semconv</artifactId>
<version>1.23.1-alpha</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-extension-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-extension-autoconfigure-spi</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-instrumentation-annotations</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.jgrapht</groupId>
<artifactId>jgrapht-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,25 @@

package com.alibaba.graphscope.gremlin.plugin;

import static io.opentelemetry.api.common.AttributeKey.*;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.LongHistogram;

import org.apache.commons.lang3.StringUtils;
import org.checkerframework.checker.nullness.qual.Nullable;

public class QueryStatusCallback {
private final MetricsCollector metricsCollector;
private final QueryLogger queryLogger;

public QueryStatusCallback(MetricsCollector metricsCollector, QueryLogger queryLogger) {
private LongHistogram queryHistogram;

public QueryStatusCallback(
MetricsCollector metricsCollector, LongHistogram histogram, QueryLogger queryLogger) {
this.metricsCollector = metricsCollector;
this.queryLogger = queryLogger;
this.queryHistogram = histogram;
}

public void onStart() {}
Expand All @@ -35,6 +44,16 @@ public void onEnd(boolean isSucceed, @Nullable String msg) {
if (isSucceed) {
queryLogger.info("total execution time is {} ms", metricsCollector.getElapsedMillis());
}

Attributes attrs =
Attributes.builder()
.put("id", queryLogger.getQueryId().toString())
.put("query", queryLogger.getQuery())
.put("success", isSucceed)
.put("message", msg != null ? msg : "")
.build();
this.queryHistogram.record(metricsCollector.getElapsedMillis(), attrs);

queryLogger.metricsInfo(
"{} | {} | {} | {}",
isSucceed,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
import com.google.protobuf.InvalidProtocolBufferException;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.metrics.LongHistogram;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
Expand Down Expand Up @@ -110,6 +112,7 @@ public class IrStandardOpProcessor extends StandardOpProcessor {
protected final QueryCache queryCache;
protected final ExecutionClient executionClient;
Tracer tracer;
LongHistogram queryHistogram;

public IrStandardOpProcessor(
Configs configs,
Expand Down Expand Up @@ -138,7 +141,8 @@ public IrStandardOpProcessor(
this.idGenerator = idGenerator;
this.queryCache = queryCache;
this.executionClient = executionClient;
this.tracer = GlobalOpenTelemetry.getTracer("compiler");
initTracer();
initMetrics();
}

@Override
Expand Down Expand Up @@ -296,7 +300,7 @@ protected void evalOpInternal(

protected QueryStatusCallback createQueryStatusCallback(String query, BigInteger queryId) {
return new QueryStatusCallback(
new MetricsCollector(evalOpTimer), new QueryLogger(query, queryId));
new MetricsCollector(evalOpTimer), queryHistogram, new QueryLogger(query, queryId));
}

protected GremlinExecutor.LifeCycle createLifeCycle(
Expand Down Expand Up @@ -395,11 +399,11 @@ protected void processTraversal(
.build();
request = request.toBuilder().setConf(jobConfig).build();
Span outgoing =
tracer.spanBuilder("/evalOpInternal").setSpanKind(SpanKind.CLIENT).startSpan();
try (Scope scope = outgoing.makeCurrent()) {
tracer.spanBuilder("frontend/query").setSpanKind(SpanKind.CLIENT).startSpan();
try (Scope ignored = outgoing.makeCurrent()) {
outgoing.setAttribute("query.id", queryLogger.getQueryId().toString());
outgoing.setAttribute("query.statement", queryLogger.getQuery());
outgoing.setAttribute("query.plan.logical", irPlanStr);
outgoing.setAttribute("query.plan", irPlanStr);
this.rpcClient.submit(request, resultProcessor, timeoutConfig.getChannelTimeoutMS());
// request results from remote engine service in blocking way
resultProcessor.request();
Expand Down Expand Up @@ -453,4 +457,18 @@ public void close() throws Exception {
this.rpcClient.shutdown();
}
}

public void initTracer() {
this.tracer = GlobalOpenTelemetry.getTracer("default");
}

public void initMetrics() {
Meter meter = GlobalOpenTelemetry.getMeter("default");
this.queryHistogram =
meter.histogramBuilder("groot.frontend.query.duration")
.setDescription("Duration of gremlin queries.")
.setUnit("ms")
.ofLongs()
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@
import com.alibaba.pegasus.service.protocol.PegasusClient.JobResponse;

import io.grpc.Status;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.stub.StreamObserver;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.grpc.v1_6.GrpcTelemetry;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -51,12 +48,6 @@ public RpcClient(List<RpcChannel> channels) {
.collect(Collectors.toList());
}

void configureClientInterceptor(
OpenTelemetry openTelemetry, NettyChannelBuilder nettyChannelBuilder) {
GrpcTelemetry grpcTelemetry = GrpcTelemetry.create(openTelemetry);
nettyChannelBuilder.intercept(grpcTelemetry.newClientInterceptor());
}

public void submit(JobRequest jobRequest, ResultProcessor processor, long rpcTimeoutMS) {
AtomicInteger counter = new AtomicInteger(this.channels.size());
AtomicBoolean finished = new AtomicBoolean(false);
Expand Down
5 changes: 3 additions & 2 deletions interactive_engine/executor/engine/pegasus/server/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ where
let parent_ctx = global::get_text_map_propagator(|prop| prop.extract(&MetadataMap(req.metadata())));
let tracer = global::tracer("executor");
let _span = tracer
.span_builder("/JobServiceImpl/cancel")
.span_builder("JobService/cancel")
.with_kind(SpanKind::Server)
.start_with_context(&tracer, &parent_ctx);
let pb::CancelRequest { job_id } = req.into_inner();
Expand All @@ -203,6 +203,7 @@ where
debug!("accept new request from {:?};", req.remote_addr());
let parent_ctx = global::get_text_map_propagator(|prop| prop.extract(&MetadataMap(req.metadata())));
let tracer = global::tracer("executor");

let pb::JobRequest { conf, source, plan, resource } = req.into_inner();
if conf.is_none() {
return Err(Status::new(Code::InvalidArgument, "job configuration not found"));
Expand All @@ -219,7 +220,7 @@ where
let job = JobDesc { input: source, plan, resource };

let mut span = tracer
.span_builder("/JobServiceImpl/submit")
.span_builder("JobService/submit")
.with_kind(SpanKind::Server)
.start_with_context(&tracer, &parent_ctx);
span.set_attributes(vec![
Expand Down
42 changes: 42 additions & 0 deletions interactive_engine/groot-module/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,16 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
<exclusions>
<exclusion>
<groupId>com.squareup.okhttp</groupId>
<artifactId>okhttp</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
Expand Down Expand Up @@ -98,6 +108,38 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-otlp</artifactId>
</dependency>
<dependency>
<!-- Not managed by opentelemetry-bom -->
<groupId>io.opentelemetry.semconv</groupId>
<artifactId>opentelemetry-semconv</artifactId>
<version>1.23.1-alpha</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-extension-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-extension-autoconfigure-spi</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-grpc-1.6</artifactId>
<version>2.1.0-alpha</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.alibaba.graphscope.groot;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;

public class OTELUtils {
public static OpenTelemetry openTelemetry() {
return AutoConfiguredOpenTelemetrySdk.initialize().getOpenTelemetrySdk();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.alibaba.graphscope.groot.common.schema.wrapper.*;
import com.alibaba.graphscope.groot.common.util.DataLoadTarget;
import com.alibaba.graphscope.groot.meta.MetaService;
import com.alibaba.graphscope.groot.metrics.MetricsAggregator;
import com.alibaba.graphscope.groot.rpc.RoleClients;
import com.alibaba.graphscope.groot.schema.request.AddEdgeKindRequest;
import com.alibaba.graphscope.groot.schema.request.CreateEdgeTypeRequest;
Expand All @@ -47,19 +46,16 @@ public class ClientService extends ClientGrpc.ClientImplBase {
private static final Logger logger = LoggerFactory.getLogger(ClientService.class);

private final SnapshotCache snapshotCache;
private final MetricsAggregator metricsAggregator;
private final RoleClients<FrontendStoreClient> frontendStoreClients;
private final MetaService metaService;
private final BatchDdlClient batchDdlClient;

public ClientService(
SnapshotCache snapshotCache,
MetricsAggregator metricsAggregator,
RoleClients<FrontendStoreClient> frontendStoreClients,
MetaService metaService,
BatchDdlClient batchDdlClient) {
this.snapshotCache = snapshotCache;
this.metricsAggregator = metricsAggregator;
this.frontendStoreClients = frontendStoreClients;
this.metaService = metaService;
this.batchDdlClient = batchDdlClient;
Expand Down Expand Up @@ -293,28 +289,6 @@ public void dropSchema(
}
}

@Override
public void getMetrics(
GetMetricsRequest request, StreamObserver<GetMetricsResponse> responseObserver) {
String roleNames = request.getRoleNames();
this.metricsAggregator.aggregateMetricsJson(
roleNames,
new CompletionCallback<String>() {
@Override
public void onCompleted(String res) {
responseObserver.onNext(
GetMetricsResponse.newBuilder().setMetricsJson(res).build());
responseObserver.onCompleted();
}

@Override
public void onError(Throwable t) {
logger.error("get metrics failed", t);
responseObserver.onError(t);
}
});
}

@Override
public void getSchema(GetSchemaRequest request, StreamObserver<GetSchemaResponse> observer) {
GraphDef graphDef = this.snapshotCache.getSnapshotWithSchema().getGraphDef();
Expand Down
Loading

0 comments on commit a8a164d

Please sign in to comment.