Skip to content

Commit

Permalink
support the future stub of grpc framework
Browse files Browse the repository at this point in the history
  • Loading branch information
ascrutae committed Dec 8, 2017
1 parent b7dfd59 commit c17f955
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 99 deletions.
Expand Up @@ -19,46 +19,31 @@
package org.skywalking.apm.plugin.grpc.v1;

import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import java.lang.reflect.Method;
import org.skywalking.apm.agent.core.context.CarrierItem;
import org.skywalking.apm.agent.core.context.ContextCarrier;
import org.skywalking.apm.agent.core.context.ContextManager;
import org.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.skywalking.apm.network.trace.component.ComponentsDefine;
import org.skywalking.apm.plugin.grpc.v1.vo.GRPCDynamicFields;

/**
* {@link ClientCallOnNextInterceptor} create a exist span when the grpc start call. it will stop span when the method
* type is non-unary.
*
* @author zhangxin
*/
public class ClientCallStartInterceptor
implements InstanceMethodsAroundInterceptor {

public class ClientCallStartInterceptor implements InstanceMethodsAroundInterceptor {
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
GRPCDynamicFields cachedObjects = (GRPCDynamicFields)objInst.getSkyWalkingDynamicField();
final Metadata headers = (Metadata)allArguments[1];
final AbstractSpan span = ContextManager.createExitSpan(cachedObjects.getRequestMethodName(), cachedObjects.getAuthority());
span.setComponent(ComponentsDefine.GRPC);
SpanLayer.asRPCFramework(span);

final ContextCarrier contextCarrier = new ContextCarrier();
ContextManager.inject(contextCarrier);

CarrierItem contextItem = contextCarrier.items();
while (contextItem.hasNext()) {
contextItem = contextItem.next();
Metadata.Key<String> headerKey = Metadata.Key.of(contextItem.getHeadKey(), Metadata.ASCII_STRING_MARSHALLER);
headers.put(headerKey, contextItem.getHeadValue());
}

GRPCDynamicFields cachedObjects = (GRPCDynamicFields)objInst.getSkyWalkingDynamicField();
GRPCDynamicFields listenerCachedObject = new GRPCDynamicFields();
listenerCachedObject.setSnapshot(ContextManager.capture());
listenerCachedObject.setDescriptor(cachedObjects.getDescriptor());
Expand All @@ -68,15 +53,11 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {

if (((GRPCDynamicFields)objInst.getSkyWalkingDynamicField()).getMethodType() != MethodDescriptor.MethodType.UNARY) {
ContextManager.stopSpan();
}
return ret;
}

@Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().errorOccurred().log(t);

}
}
Expand Up @@ -18,45 +18,44 @@

package org.skywalking.apm.plugin.grpc.v1;

import io.grpc.Metadata;
import io.grpc.Status;
import java.lang.reflect.Method;
import org.skywalking.apm.agent.core.context.ContextManager;
import org.skywalking.apm.agent.core.context.tag.Tags;
import org.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.StaticMethodsAroundInterceptor;
import org.skywalking.apm.network.trace.component.ComponentsDefine;
import org.skywalking.apm.plugin.grpc.v1.vo.GRPCDynamicFields;

/**
* {@link UnaryClientOnCloseInterceptor} stop the active span when the call end.
* {@link ClientCallOnNextInterceptor} create a exist span when the grpc start call. it will stop span when the method
* type is non-unary.
*
* @author zhangxin
*/
public class UnaryClientOnCloseInterceptor implements InstanceMethodsAroundInterceptor {
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {

}
public class ClientCallsMethodInterceptor
implements StaticMethodsAroundInterceptor {

@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
AbstractSpan activeSpan = ContextManager.activeSpan();
Status status = (Status)allArguments[0];
@Override public void beforeMethod(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes,
MethodInterceptResult result) {
EnhancedInstance clientCall = (EnhancedInstance)allArguments[0];
GRPCDynamicFields cachedObjects = (GRPCDynamicFields)clientCall.getSkyWalkingDynamicField();

if (status != Status.OK) {
activeSpan.errorOccurred().log(status.asRuntimeException((Metadata)allArguments[1]));
Tags.STATUS_CODE.set(activeSpan, status.getCode().toString());
}
final AbstractSpan span = ContextManager.createExitSpan(cachedObjects.getRequestMethodName(), cachedObjects.getAuthority());
span.setComponent(ComponentsDefine.GRPC);
SpanLayer.asRPCFramework(span);
}

@Override public Object afterMethod(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes,
Object ret) {
ContextManager.stopSpan();
return ret;
}

@Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
@Override
public void handleMethodException(Class clazz, Method method, Object[] allArguments, Class<?>[] parameterTypes,
Throwable t) {
ContextManager.activeSpan().errorOccurred().log(t);
}
}
@@ -0,0 +1,34 @@
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/

package org.skywalking.apm.plugin.grpc.v1;

import org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;

/**
* {@link UnaryStreamToFutureConstructorInterceptor} stop the active span when the call end.
*
* @author zhangxin
*/
public class UnaryStreamToFutureConstructorInterceptor implements InstanceConstructorInterceptor {

@Override public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {

}
}
Expand Up @@ -31,7 +31,7 @@

/**
* {@link ClientCallInstrumentation} presents that skywalking intercept the <code>start</code> method in
* <code>io.grpc.internal.ClientCallImpl</code> class by <code>org.skywalking.apm.plugin.grpc.v1.ClientCallStartInterceptor</code>
* <code>io.grpc.internal.ClientCallImpl</code> class by <code>org.skywalking.apm.plugin.grpc.v1.ClientCallsMethodInterceptor</code>
* and the constructor in <code>io.grpc.internal.ClientCallImpl</code> by <code>org.skywalking.apm.plugin.grpc.v1.ClientCallIConstructorInterceptor</code>
*
* @author zhangxin
Expand Down
@@ -0,0 +1,58 @@
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/

package org.skywalking.apm.plugin.grpc.v1.define;

import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.skywalking.apm.agent.core.plugin.interceptor.StaticMethodsInterceptPoint;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassStaticMethodsEnhancePluginDefine;
import org.skywalking.apm.agent.core.plugin.match.ClassMatch;

import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
import static org.skywalking.apm.agent.core.plugin.match.NameMatch.byName;

public class ClientCallsInstrumentation extends ClassStaticMethodsEnhancePluginDefine {
private static final String ENHANCE_CLASS = "io.grpc.stub.ClientCalls";

@Override protected StaticMethodsInterceptPoint[] getStaticMethodsInterceptPoints() {
return new StaticMethodsInterceptPoint[] {
new StaticMethodsInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
return (named("asyncUnaryRequestCall").and(takesArgumentWithType(2,"io.grpc.ClientCall$Listener")))
.or(named("asyncStreamingRequestCall"))
.or(named("blockingUnaryCall"))
.or(named("futureUnaryCall"));
}

@Override public String getMethodsInterceptor() {
return "org.skywalking.apm.plugin.grpc.v1.ClientCallsMethodInterceptor";
}

@Override public boolean isOverrideArgs() {
return false;
}
}
};
}

@Override protected ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
}
}
Expand Up @@ -25,42 +25,37 @@
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.skywalking.apm.agent.core.plugin.match.ClassMatch;

import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.any;
import static org.skywalking.apm.agent.core.plugin.match.NameMatch.byName;

/**
* {@link UnaryClientCallListenerInstrumentation} indicates that skywalking enhance the <code>onClose</code> method in
* <code>io.grpc.stub.ClientCalls$UnaryStreamToFuture</code> class by <code>org.skywalking.apm.plugin.grpc.v1.UnaryClientOnCloseInterceptor</code>
* <code>io.grpc.stub.ClientCalls$UnaryStreamToFuture</code> class by <code>org.skywalking.apm.plugin.grpc.v1.UnaryStreamToFutureConstructorInterceptor</code>
*
* @author zhangxin
*/
public class UnaryClientCallListenerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String ENHANCE_CLASS = "io.grpc.stub.ClientCalls$UnaryStreamToFuture";
private static final String ENHANCE_METHOD = "onClose";
public static final String INTERCEPT_CLASS = "org.skywalking.apm.plugin.grpc.v1.UnaryClientOnCloseInterceptor";
public static final String INTERCEPT_CLASS = "org.skywalking.apm.plugin.grpc.v1.UnaryStreamToFutureConstructorInterceptor";

@Override protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[0];
}

@Override protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(ENHANCE_METHOD);
return new ConstructorInterceptPoint[] {
new ConstructorInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getConstructorMatcher() {
return any();
}

@Override public String getMethodsInterceptor() {
@Override public String getConstructorInterceptor() {
return INTERCEPT_CLASS;
}

@Override public boolean isOverrideArgs() {
return false;
}
}
};
}

@Override protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[0];
}

@Override protected ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
}
Expand Down
Expand Up @@ -2,6 +2,7 @@ grpc-1.x=org.skywalking.apm.plugin.grpc.v1.define.ClientCallInstrumentation
grpc-1.x=org.skywalking.apm.plugin.grpc.v1.define.UnaryClientCallListenerInstrumentation
grpc-1.x=org.skywalking.apm.plugin.grpc.v1.define.UnaryServerCallListenerInstrumentation
grpc-1.x=org.skywalking.apm.plugin.grpc.v1.define.UnaryServerCallHandlerInstrumentation
grpc-1.x=org.skywalking.apm.plugin.grpc.v1.define.ClientCallsInstrumentation
grpc-1.x=org.skywalking.apm.plugin.grpc.v1.define.ManagedChannelInstrumentation
grpc-1.x=org.skywalking.apm.plugin.grpc.v1.define.StreamingServerCallHandlerInstrumentation
grpc-1.x=org.skywalking.apm.plugin.grpc.v1.define.StreamingServerCallListenerInstrumentation
Expand Down

0 comments on commit c17f955

Please sign in to comment.