Skip to content

Commit

Permalink
Merge pull request #690 from apache/5.0-network-protocol
Browse files Browse the repository at this point in the history
Change network protocol for 5.0
  • Loading branch information
peng-yongsheng committed Dec 18, 2017
2 parents 07ad767 + 6f1f4cf commit 5319d72
Show file tree
Hide file tree
Showing 15 changed files with 43 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import org.apache.skywalking.apm.collector.agent.stream.service.register.IApplicationIDService;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.apache.skywalking.apm.network.proto.Application;
import org.apache.skywalking.apm.network.proto.ApplicationMapping;
import org.apache.skywalking.apm.network.proto.ApplicationMappings;
import org.apache.skywalking.apm.network.proto.ApplicationRegisterServiceGrpc;
import org.apache.skywalking.apm.network.proto.Applications;
import org.apache.skywalking.apm.network.proto.KeyWithIntegerValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -45,18 +45,18 @@ public ApplicationRegisterServiceHandler(ModuleManager moduleManager) {
applicationIDService = moduleManager.find(AgentStreamModule.NAME).getService(IApplicationIDService.class);
}

@Override public void register(Application request, StreamObserver<ApplicationMapping> responseObserver) {
@Override public void batchRegister(Applications request, StreamObserver<ApplicationMappings> responseObserver) {
logger.debug("register application");
ProtocolStringList applicationCodes = request.getApplicationCodeList();
ProtocolStringList applicationCodes = request.getApplicationCodesList();

ApplicationMapping.Builder builder = ApplicationMapping.newBuilder();
ApplicationMappings.Builder builder = ApplicationMappings.newBuilder();
for (int i = 0; i < applicationCodes.size(); i++) {
String applicationCode = applicationCodes.get(i);
int applicationId = applicationIDService.getOrCreate(applicationCode);

if (applicationId != 0) {
KeyWithIntegerValue value = KeyWithIntegerValue.newBuilder().setKey(applicationCode).setValue(applicationId).build();
builder.addApplication(value);
builder.addApplications(value);
}
}
responseObserver.onNext(builder.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import org.apache.skywalking.apm.network.proto.Application;
import org.apache.skywalking.apm.network.proto.ApplicationMapping;
import org.apache.skywalking.apm.network.proto.ApplicationMappings;
import org.apache.skywalking.apm.network.proto.ApplicationRegisterServiceGrpc;
import org.apache.skywalking.apm.network.proto.Applications;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -40,8 +40,8 @@ public void testRegister() {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 11800).usePlaintext(true).build();
stub = ApplicationRegisterServiceGrpc.newBlockingStub(channel);

Application application = Application.newBuilder().addApplicationCode("test141").build();
ApplicationMapping mapping = stub.register(application);
logger.debug(mapping.getApplication(0).getKey() + ", " + mapping.getApplication(0).getValue());
Applications application = Applications.newBuilder().addApplicationCodes("test141").build();
ApplicationMappings mapping = stub.batchRegister(application);
logger.debug(mapping.getApplications(0).getKey() + ", " + mapping.getApplications(0).getValue());
}
}
10 changes: 5 additions & 5 deletions apm-network/src/main/proto/ApplicationRegisterService.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ import "KeyWithIntegerValue.proto";

//register service for ApplicationCode, this service is called when service starts.
service ApplicationRegisterService {
rpc register (Application) returns (ApplicationMapping) {
rpc batchRegister (Applications) returns (ApplicationMappings) {
}
}

message Application {
repeated string applicationCode = 1;
message Applications {
repeated string applicationCodes = 1;
}

message ApplicationMapping {
repeated KeyWithIntegerValue application = 1;
message ApplicationMappings {
repeated KeyWithIntegerValue applications = 1;
}
1 change: 1 addition & 0 deletions apm-network/src/main/proto/TraceSegmentService.proto
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ enum SpanLayer {
RPCFramework = 2;
Http = 3;
MQ = 4;
Cache = 5;
}

message LogMessage {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*
*/


package org.apache.skywalking.apm.agent.core.context.trace;

/**
Expand All @@ -26,7 +25,8 @@ public enum SpanLayer {
DB(1),
RPC_FRAMEWORK(2),
HTTP(3),
MQ(4);
MQ(4),
CACHE(5);

private int code;

Expand All @@ -42,6 +42,10 @@ public static void asDB(AbstractSpan span) {
span.setLayer(SpanLayer.DB);
}

public static void asCache(AbstractSpan span) {
span.setLayer(SpanLayer.CACHE);
}

public static void asRPCFramework(AbstractSpan span) {
span.setLayer(SpanLayer.RPC_FRAMEWORK);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.skywalking.apm.network.proto.Application;
import org.apache.skywalking.apm.network.proto.ApplicationMapping;
import org.apache.skywalking.apm.network.proto.ApplicationMappings;
import org.apache.skywalking.apm.network.proto.ApplicationRegisterServiceGrpc;
import org.apache.skywalking.apm.network.proto.Applications;
import org.apache.skywalking.apm.network.proto.KeyWithIntegerValue;

import static org.apache.skywalking.apm.agent.core.conf.Config.Dictionary.APPLICATION_CODE_BUFFER_SIZE;
Expand Down Expand Up @@ -55,10 +55,10 @@ public PossibleFound find(String applicationCode) {
public void syncRemoteDictionary(
ApplicationRegisterServiceGrpc.ApplicationRegisterServiceBlockingStub applicationRegisterServiceBlockingStub) {
if (unRegisterApplications.size() > 0) {
ApplicationMapping applicationMapping = applicationRegisterServiceBlockingStub.register(
Application.newBuilder().addAllApplicationCode(unRegisterApplications).build());
if (applicationMapping.getApplicationCount() > 0) {
for (KeyWithIntegerValue keyWithIntegerValue : applicationMapping.getApplicationList()) {
ApplicationMappings applicationMapping = applicationRegisterServiceBlockingStub.batchRegister(
Applications.newBuilder().addAllApplicationCodes(unRegisterApplications).build());
if (applicationMapping.getApplicationsCount() > 0) {
for (KeyWithIntegerValue keyWithIntegerValue : applicationMapping.getApplicationsList()) {
unRegisterApplications.remove(keyWithIntegerValue.getKey());
applicationDictionary.put(keyWithIntegerValue.getKey(), keyWithIntegerValue.getValue());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@
import org.apache.skywalking.apm.agent.core.dictionary.OperationNameDictionary;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.network.proto.Application;
import org.apache.skywalking.apm.network.proto.ApplicationInstance;
import org.apache.skywalking.apm.network.proto.ApplicationInstanceHeartbeat;
import org.apache.skywalking.apm.network.proto.ApplicationInstanceMapping;
import org.apache.skywalking.apm.network.proto.ApplicationInstanceRecover;
import org.apache.skywalking.apm.network.proto.ApplicationMapping;
import org.apache.skywalking.apm.network.proto.ApplicationMappings;
import org.apache.skywalking.apm.network.proto.ApplicationRegisterServiceGrpc;
import org.apache.skywalking.apm.network.proto.Applications;
import org.apache.skywalking.apm.network.proto.InstanceDiscoveryServiceGrpc;
import org.apache.skywalking.apm.network.proto.ServiceNameDiscoveryServiceGrpc;

Expand Down Expand Up @@ -112,10 +112,10 @@ public void run() {
try {
if (RemoteDownstreamConfig.Agent.APPLICATION_ID == DictionaryUtil.nullValue()) {
if (applicationRegisterServiceBlockingStub != null) {
ApplicationMapping applicationMapping = applicationRegisterServiceBlockingStub.register(
Application.newBuilder().addApplicationCode(Config.Agent.APPLICATION_CODE).build());
if (applicationMapping.getApplicationCount() > 0) {
RemoteDownstreamConfig.Agent.APPLICATION_ID = applicationMapping.getApplication(0).getValue();
ApplicationMappings applicationMapping = applicationRegisterServiceBlockingStub.batchRegister(
Applications.newBuilder().addApplicationCodes(Config.Agent.APPLICATION_CODE).build());
if (applicationMapping.getApplicationsCount() > 0) {
RemoteDownstreamConfig.Agent.APPLICATION_ID = applicationMapping.getApplications(0).getValue();
shouldTry = true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class JedisMethodInterceptor implements InstanceMethodsAroundInterceptor
AbstractSpan span = ContextManager.createExitSpan("Jedis/" + method.getName(), peer);
span.setComponent(ComponentsDefine.REDIS);
Tags.DB_TYPE.set(span, "Redis");
SpanLayer.asDB(span);
SpanLayer.asCache(span);

if (allArguments.length > 0 && allArguments[0] instanceof String) {
Tags.DB_STATEMENT.set(span, method.getName() + " " + allArguments[0]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ private void assertRedisSpan(AbstractTracingSpan span) {
List<KeyValuePair> tags = SpanHelper.getTags(span);
assertThat(tags.get(0).getValue(), is("Redis"));
assertThat(tags.get(1).getValue(), is("set OperationKey"));
assertThat(SpanHelper.getLayer(span), CoreMatchers.is(SpanLayer.DB));
assertThat(SpanHelper.getLayer(span), CoreMatchers.is(SpanLayer.CACHE));
}

private Method getMockSetMethod() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public final void beforeMethod(EnhancedInstance objInst, Method method, Object[]
AbstractSpan span = ContextManager.createEntrySpan(COMSUMER_OPERATION_NAME_PREFIX + msgs.get(0).getTopic() + "/Consumer", contextCarrier);

span.setComponent(ComponentsDefine.ROCKET_MQ);
span.setLayer(SpanLayer.MQ);
SpanLayer.asMQ(span);
for (int i = 1; i < msgs.size(); i++) {
ContextManager.extract(getContextCarrierFromMessage(msgs.get(i)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr
String namingServiceAddress = String.valueOf(objInst.getSkyWalkingDynamicField());
AbstractSpan span = ContextManager.createExitSpan(buildOperationName(message.getTopic()), contextCarrier, namingServiceAddress);
span.setComponent(ComponentsDefine.ROCKET_MQ);
span.setLayer(SpanLayer.MQ);
SpanLayer.asMQ(span);
span.tag("brokerName", (String)allArguments[1]);
span.tag("tags", message.getTags());
span.tag("communication.mode", ((CommunicationMode)allArguments[5]).name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr
AbstractSpan span = ContextManager.createExitSpan(SPY_MEMCACHE + method.getName(), peer);
span.setComponent(ComponentsDefine.MEMCACHED);
Tags.DB_TYPE.set(span, ComponentsDefine.MEMCACHED.getName());
SpanLayer.asDB(span);
SpanLayer.asCache(span);
Tags.DB_STATEMENT.set(span, method.getName() + " " + allArguments[0]);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ private void assertMemcacheSpan(AbstractTracingSpan span) {
List<KeyValuePair> tags = SpanHelper.getTags(span);
assertThat(tags.get(0).getValue(), is("Memcached"));
assertThat(tags.get(1).getValue(), is("set OperationKey"));
MatcherAssert.assertThat(SpanHelper.getLayer(span), CoreMatchers.is(SpanLayer.DB));
MatcherAssert.assertThat(SpanHelper.getLayer(span), CoreMatchers.is(SpanLayer.CACHE));
}

private Method getMockSetMethod() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class XMemcachedMethodInterceptor implements InstanceMethodsAroundInterce
AbstractSpan span = ContextManager.createExitSpan(XMEMCACHED + method.getName(), peer);
span.setComponent(ComponentsDefine.MEMCACHED);
Tags.DB_TYPE.set(span, ComponentsDefine.MEMCACHED.getName());
SpanLayer.asDB(span);
SpanLayer.asCache(span);
Tags.DB_STATEMENT.set(span, method.getName() + " " + allArguments[0]);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ private void assertMemcacheSpan(AbstractTracingSpan span) {
List<KeyValuePair> tags = SpanHelper.getTags(span);
assertThat(tags.get(0).getValue(), is("Memcached"));
assertThat(tags.get(1).getValue(), is("set OperationKey"));
assertThat(SpanHelper.getLayer(span), CoreMatchers.is(SpanLayer.DB));
assertThat(SpanHelper.getLayer(span), CoreMatchers.is(SpanLayer.CACHE));
}

private Method getMockSetMethod() throws Exception {
Expand Down

0 comments on commit 5319d72

Please sign in to comment.