Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-3399. Update JaegerTracing #835

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.function.SupplierWithIOException;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
Expand All @@ -57,6 +58,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.util.GlobalTracer;
import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
import org.apache.ratis.thirdparty.io.grpc.Status;
Expand Down Expand Up @@ -265,14 +267,18 @@ public ContainerCommandResponseProto sendCommand(
private XceiverClientReply sendCommandWithTraceIDAndRetry(
ContainerCommandRequestProto request, List<CheckedBiFunction> validators)
throws IOException {
try (Scope scope = GlobalTracer.get()
.buildSpan("XceiverClientGrpc." + request.getCmdType().name())
.startActive(true)) {
ContainerCommandRequestProto finalPayload =
ContainerCommandRequestProto.newBuilder(request)
.setTraceID(TracingUtil.exportCurrentSpan()).build();
return sendCommandWithRetry(finalPayload, validators);
}

String spanName = "XceiverClientGrpc." + request.getCmdType().name();

return TracingUtil.executeInNewSpan(spanName,
(SupplierWithIOException<XceiverClientReply>) () -> {

ContainerCommandRequestProto finalPayload =
ContainerCommandRequestProto.newBuilder(request)
.setTraceID(TracingUtil.exportCurrentSpan()).build();
return sendCommandWithRetry(finalPayload, validators);

});
}

private XceiverClientReply sendCommandWithRetry(
Expand Down Expand Up @@ -387,9 +393,11 @@ private XceiverClientReply sendCommandWithRetry(
public XceiverClientReply sendCommandAsync(
ContainerCommandRequestProto request)
throws IOException, ExecutionException, InterruptedException {
try (Scope scope = GlobalTracer.get()
.buildSpan("XceiverClientGrpc." + request.getCmdType().name())
.startActive(true)) {

Span span = GlobalTracer.get()
.buildSpan("XceiverClientGrpc." + request.getCmdType().name()).start();

try (Scope scope = GlobalTracer.get().activateSpan(span)) {

ContainerCommandRequestProto finalPayload =
ContainerCommandRequestProto.newBuilder(request)
Expand All @@ -405,6 +413,9 @@ public XceiverClientReply sendCommandAsync(
asyncReply.getResponse().get();
}
return asyncReply;

} finally {
span.finish();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.opentracing.Scope;
import io.opentracing.util.GlobalTracer;
import com.google.common.base.Supplier;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.proto.RaftProtos;
Expand Down Expand Up @@ -208,24 +207,27 @@ public ConcurrentMap<UUID, Long> getCommitInfoMap() {

private CompletableFuture<RaftClientReply> sendRequestAsync(
ContainerCommandRequestProto request) {
try (Scope scope = GlobalTracer.get()
.buildSpan("XceiverClientRatis." + request.getCmdType().name())
.startActive(true)) {
final ContainerCommandRequestMessage message
= ContainerCommandRequestMessage.toMessage(
return TracingUtil.executeInNewSpan(
"XceiverClientRatis." + request.getCmdType().name(),
(Supplier<CompletableFuture<RaftClientReply>>) () -> {
final ContainerCommandRequestMessage message
= ContainerCommandRequestMessage.toMessage(
request, TracingUtil.exportCurrentSpan());
if (HddsUtils.isReadOnly(request)) {
if (LOG.isDebugEnabled()) {
LOG.debug("sendCommandAsync ReadOnly {}", message);
}
return getClient().sendReadOnlyAsync(message);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("sendCommandAsync {}", message);
if (HddsUtils.isReadOnly(request)) {
if (LOG.isDebugEnabled()) {
LOG.debug("sendCommandAsync ReadOnly {}", message);
}
return getClient().sendReadOnlyAsync(message);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("sendCommandAsync {}", message);
}
return getClient().sendAsync(message);
}

}
return getClient().sendAsync(message);
}
}

);
}

// gets the minimum log index replicated to all servers
Expand Down
2 changes: 1 addition & 1 deletion hadoop-hdds/common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<dependency>
<groupId>io.opentracing</groupId>
<artifactId>opentracing-util</artifactId>
<version>0.31.0</version>
<version>0.33.0</version>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.function;

import java.io.IOException;

/**
* Functional interface like java.util.function.Supplier but with
* checked exception.
*/
@FunctionalInterface
public interface SupplierWithIOException<T> {

/**
* Return the given output..
*
* @return the function result
*/
T get() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.hadoop.hdds.tracing;

import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.util.GlobalTracer;
import org.apache.ratis.thirdparty.io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener;
import org.apache.ratis.thirdparty.io.grpc.Metadata;
import org.apache.ratis.thirdparty.io.grpc.ServerCall;
Expand All @@ -39,11 +41,14 @@ public <ReqT, RespT> Listener<ReqT> interceptCall(
next.startCall(call, headers)) {
@Override
public void onMessage(ReqT message) {
try (Scope scope = TracingUtil
.importAndCreateScope(
Span span = TracingUtil
.importAndCreateSpan(
call.getMethodDescriptor().getFullMethodName(),
headers.get(GrpcClientInterceptor.TRACING_HEADER))) {
headers.get(GrpcClientInterceptor.TRACING_HEADER));
try (Scope scope = GlobalTracer.get().activateSpan(span)) {
super.onMessage(message);
} finally {
span.finish();
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map.Entry;

import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.util.GlobalTracer;

/**
Expand Down Expand Up @@ -64,9 +65,10 @@ public Object invoke(Object proxy, Method method, Object[] args)
method.getName());
}

try (Scope scope = GlobalTracer.get().buildSpan(
Span span = GlobalTracer.get().buildSpan(
name + "." + method.getName())
.startActive(true)) {
.start();
try (Scope scope = GlobalTracer.get().activateSpan(span)) {
try {
return delegateMethod.invoke(delegate, args);
} catch (Exception ex) {
Expand All @@ -75,6 +77,8 @@ public Object invoke(Object proxy, Method method, Object[] args)
} else {
throw ex;
}
} finally {
span.finish();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@
*/
package org.apache.hadoop.hdds.tracing;

import java.io.IOException;
import java.lang.reflect.Proxy;
import java.util.function.Supplier;

import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.function.SupplierWithIOException;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;

import io.jaegertracing.Configuration;
import io.jaegertracing.internal.JaegerTracer;
Expand All @@ -27,9 +33,6 @@
import io.opentracing.Tracer;
import io.opentracing.util.GlobalTracer;

import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;

/**
* Utility class to collect all the tracing helper methods.
*/
Expand Down Expand Up @@ -86,11 +89,11 @@ public static String exportSpan(Span span) {
*
* @return OpenTracing scope.
*/
public static Scope importAndCreateScope(String name, String encodedParent) {
public static Span importAndCreateSpan(String name, String encodedParent) {
Tracer tracer = GlobalTracer.get();
return tracer.buildSpan(name)
.asChildOf(extractParent(encodedParent, tracer))
.startActive(true);
.start();
}

private static SpanContext extractParent(String parent, Tracer tracer) {
Expand Down Expand Up @@ -130,8 +133,57 @@ public static <T> T createProxy(
private static boolean isTracingEnabled(
ConfigurationSource conf) {
return conf.getBoolean(
ScmConfigKeys.HDDS_TRACING_ENABLED,
ScmConfigKeys.HDDS_TRACING_ENABLED_DEFAULT);
ScmConfigKeys.HDDS_TRACING_ENABLED,
ScmConfigKeys.HDDS_TRACING_ENABLED_DEFAULT);
}

/**
* Execute a new function inside an activated span.
*/
public static <R> R executeInNewSpan(String spanName,
SupplierWithIOException<R> supplier)
throws IOException {
Span span = GlobalTracer.get()
.buildSpan(spanName).start();
try (Scope scope = GlobalTracer.get().activateSpan(span)) {
return supplier.get();
} catch (Exception ex) {
span.setTag("failed", true);
throw ex;
} finally {
span.finish();
}
}

/**
* Execute a new function inside an activated span.
*/
public static <R> R executeInNewSpan(String spanName,
Supplier<R> supplier) {
Span span = GlobalTracer.get()
.buildSpan(spanName).start();
try (Scope scope = GlobalTracer.get().activateSpan(span)) {
return supplier.get();
} catch (Exception ex) {
span.setTag("failed", true);
throw ex;
} finally {
span.finish();
}
}

/**
* Create an active span with auto-close at finish.
* <p>
* This is a simplified way to use span as there is no way to add any tag
* in case of Exceptions.
*/
public static AutoCloseable createActivatedSpan(String spanName) {
Span span = GlobalTracer.get().buildSpan(spanName).start();
Scope scope = GlobalTracer.get().activateSpan(span);
return () -> {
scope.close();
span.finish();
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.util.GlobalTracer;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.malformedRequest;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.unsupportedRequest;
import org.apache.ratis.thirdparty.com.google.protobuf.ProtocolMessageEnum;
Expand Down Expand Up @@ -157,10 +159,12 @@ public ContainerCommandResponseProto dispatch(
ContainerCommandRequestProto msg, DispatcherContext dispatcherContext) {
String spanName = "HddsDispatcher." + msg.getCmdType().name();
long startTime = System.nanoTime();
try (Scope scope = TracingUtil
.importAndCreateScope(spanName, msg.getTraceID())) {
Span span = TracingUtil
.importAndCreateSpan(spanName, msg.getTraceID());
try (Scope scope = GlobalTracer.get().activateSpan(span)) {
return dispatchRequest(msg, dispatcherContext);
} finally {
span.finish();
protocolMetrics
.increment(msg.getCmdType(), System.nanoTime() - startTime);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@

import com.google.common.base.Preconditions;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.util.GlobalTracer;
import org.apache.ratis.thirdparty.io.grpc.BindableService;
import org.apache.ratis.thirdparty.io.grpc.Server;
import org.apache.ratis.thirdparty.io.grpc.ServerBuilder;
Expand Down Expand Up @@ -170,16 +172,19 @@ public void stop() {
@Override
public void submitRequest(ContainerCommandRequestProto request,
HddsProtos.PipelineID pipelineID) throws IOException {
try (Scope scope = TracingUtil
.importAndCreateScope(
Span span = TracingUtil
.importAndCreateSpan(
"XceiverServerGrpc." + request.getCmdType().name(),
request.getTraceID())) {
request.getTraceID());
try (Scope scope = GlobalTracer.get().activateSpan(span)) {
ContainerProtos.ContainerCommandResponseProto response =
storageContainer.dispatch(request, null);
if (response.getResult() != ContainerProtos.Result.SUCCESS) {
throw new StorageContainerException(response.getMessage(),
response.getResult());
}
} finally {
span.finish();
}
}

Expand Down
Loading