diff --git a/docs/en/setup/backend/backend-receivers.md b/docs/en/setup/backend/backend-receivers.md index 768020f10059..b808acbc50b6 100644 --- a/docs/en/setup/backend/backend-receivers.md +++ b/docs/en/setup/backend/backend-receivers.md @@ -10,23 +10,32 @@ We have following receivers, and `default` implementors are provided in our Apac 1. **service-mesh**. gRPC services accept data from inbound mesh probes. 1. **istio-telemetry**. Istio telemetry is from Istio official bypass adaptor, this receiver match its gRPC services. 1. **receiver-jvm**. gRPC services accept JVM metric data. +1. **receiver_zipkin**. HTTP service accepts Span in Zipkin v1 and v2 formats. Notice, this receiver only +works as expected in backend single node mode. Cluster mode is not supported. Welcome anyone to improve this. The sample settings of these receivers should be already in default `application.yml`, and also list here ```yaml +receiver-register: + default: receiver-trace: default: - bufferPath: ../buffer/ # Path to trace buffer files, suggest to use absolute path + bufferPath: ../trace-buffer/ # Path to trace buffer files, suggest to use absolute path bufferOffsetMaxFileSize: 100 # Unit is MB bufferDataMaxFileSize: 500 # Unit is MB - bufferFileCleanWhenRestart: false # Clean buffer file when backend restart. + bufferFileCleanWhenRestart: false receiver-jvm: default: service-mesh: default: - bufferPath: ../mesh-buffer/ # Path to mesh telemetry data buffer files, suggest to use absolute path + bufferPath: ../mesh-buffer/ # Path to trace buffer files, suggest to use absolute path bufferOffsetMaxFileSize: 100 # Unit is MB bufferDataMaxFileSize: 500 # Unit is MB bufferFileCleanWhenRestart: false istio-telemetry: default: +receiver_zipkin: + default: + host: 0.0.0.0 + port: 9411 + contextPath: / ``` \ No newline at end of file diff --git a/oap-server/pom.xml b/oap-server/pom.xml index 86f5d816c09f..d5ce3c74e8a0 100644 --- a/oap-server/pom.xml +++ b/oap-server/pom.xml @@ -61,6 +61,8 @@ 2.9.9 2.0.0 3.1.0 + 2.9.1 + 2.6.2 @@ -277,6 +279,18 @@ HikariCP ${hikaricp.version} + + + io.zipkin.zipkin2 + zipkin + ${zipkin.version} + + + com.github.ben-manes.caffeine + caffeine + ${caffeine.version} + + \ No newline at end of file diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/module/TraceModule.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/module/TraceModule.java index 887850e43eb8..aab042864c99 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/module/TraceModule.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/module/TraceModule.java @@ -19,17 +19,19 @@ package org.apache.skywalking.oap.server.receiver.trace.module; import org.apache.skywalking.oap.server.library.module.ModuleDefine; +import org.apache.skywalking.oap.server.receiver.trace.provider.parser.ISegmentParserService; /** * @author peng-yongsheng */ public class TraceModule extends ModuleDefine { + public static final String NAME = "receiver-trace"; public TraceModule() { - super("receiver-trace"); + super(NAME); } @Override public Class[] services() { - return new Class[0]; + return new Class[] {ISegmentParserService.class}; } } diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java index de32457fe65c..462fbf2096db 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java @@ -37,6 +37,7 @@ public class TraceModuleProvider extends ModuleProvider { private final TraceServiceModuleConfig moduleConfig; + private SegmentParse.Producer segmentProducer; public TraceModuleProvider() { this.moduleConfig = new TraceServiceModuleConfig(); @@ -54,19 +55,21 @@ public TraceModuleProvider() { return moduleConfig; } - @Override public void prepare() { - } - - @Override public void start() throws ModuleStartException { + @Override public void prepare() throws ServiceNotProvidedException { SegmentParserListenerManager listenerManager = new SegmentParserListenerManager(); listenerManager.add(new MultiScopesSpanListener.Factory()); listenerManager.add(new ServiceMappingSpanListener.Factory()); listenerManager.add(new SegmentSpanListener.Factory()); + segmentProducer = new SegmentParse.Producer(getManager(), listenerManager); + this.registerServiceImplementation(ISegmentParserService.class, new SegmentParserServiceImpl(segmentProducer)); + } + + @Override public void start() throws ModuleStartException { GRPCHandlerRegister grpcHandlerRegister = getManager().find(CoreModule.NAME).provider().getService(GRPCHandlerRegister.class); JettyHandlerRegister jettyHandlerRegister = getManager().find(CoreModule.NAME).provider().getService(JettyHandlerRegister.class); try { - SegmentParse.Producer segmentProducer = new SegmentParse.Producer(getManager(), listenerManager); + grpcHandlerRegister.addHandler(new TraceSegmentServiceHandler(segmentProducer)); jettyHandlerRegister.addHandler(new TraceSegmentServletHandler(segmentProducer)); diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/ISegmentParserListenerManager.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/ISegmentParserListenerManager.java new file mode 100644 index 000000000000..91b05aeef3ce --- /dev/null +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/ISegmentParserListenerManager.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.trace.provider.parser; + +import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListenerFactory; + +/** + * @author wusheng + */ +public interface ISegmentParserListenerManager { + void add(SpanListenerFactory spanListenerFactory); +} diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/ISegmentParserService.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/ISegmentParserService.java new file mode 100644 index 000000000000..4579d4829641 --- /dev/null +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/ISegmentParserService.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.trace.provider.parser; + +import org.apache.skywalking.apm.network.language.agent.UpstreamSegment; +import org.apache.skywalking.oap.server.library.module.Service; + +/** + * @author wusheng + */ +public interface ISegmentParserService extends Service { + void send(UpstreamSegment segment); +} diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParserListenerManager.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParserListenerManager.java index a2dd3036faa2..36a80a467f7d 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParserListenerManager.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParserListenerManager.java @@ -18,13 +18,14 @@ package org.apache.skywalking.oap.server.receiver.trace.provider.parser; -import java.util.*; +import java.util.LinkedList; +import java.util.List; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.SpanListenerFactory; /** * @author peng-yongsheng */ -public class SegmentParserListenerManager { +public class SegmentParserListenerManager implements ISegmentParserListenerManager { private List spanListenerFactories; @@ -32,6 +33,7 @@ public SegmentParserListenerManager() { this.spanListenerFactories = new LinkedList<>(); } + @Override public void add(SpanListenerFactory spanListenerFactory) { spanListenerFactories.add(spanListenerFactory); } diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParserServiceImpl.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParserServiceImpl.java new file mode 100644 index 000000000000..2d0ab6d8f6bf --- /dev/null +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParserServiceImpl.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.trace.provider.parser; + +import org.apache.skywalking.apm.network.language.agent.UpstreamSegment; + +/** + * @author wusheng + */ +public class SegmentParserServiceImpl implements ISegmentParserService { + private final SegmentParse.Producer segmentProducer; + + public SegmentParserServiceImpl( + SegmentParse.Producer segmentProducer) { + this.segmentProducer = segmentProducer; + } + + @Override + public void send(UpstreamSegment segment) { + segmentProducer.send(segment, SegmentParse.Source.Agent); + } +} diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java index 305878a959dd..4d8e65ec4be4 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java @@ -35,9 +35,7 @@ * * v5 | v6 * - * 1. Application == Service - * 2. Server == Service Instance - * 3. Service == Endpoint + * 1. Application == Service 2. Server == Service Instance 3. Service == Endpoint * * @author peng-yongsheng, wusheng */ @@ -120,6 +118,9 @@ public void parseEntry(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreI SourceBuilder sourceBuilder = new SourceBuilder(); int peerId = spanDecorator.getPeerId(); + if (peerId == 0) { + return; + } int destServiceId = serviceInventoryCache.getServiceId(peerId); int mappingServiceId = serviceInventoryCache.get(destServiceId).getMappingServiceId(); int destInstanceId = instanceInventoryCache.getServiceInstanceId(destServiceId, peerId); diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml index 58699c128e75..f171ab04c823 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/pom.xml @@ -17,7 +17,8 @@ ~ --> - + server-receiver-plugin org.apache.skywalking @@ -28,4 +29,28 @@ zipkin-receiver-plugin jar + + + org.apache.skywalking + skywalking-trace-receiver-plugin + ${project.version} + + + org.apache.skywalking + skywalking-register-receiver-plugin + ${project.version} + + + com.github.ben-manes.caffeine + caffeine + + + io.zipkin.zipkin2 + zipkin + + + com.github.ben-manes.caffeine + caffeine + + \ No newline at end of file diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/CoreRegisterLinker.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/CoreRegisterLinker.java new file mode 100644 index 000000000000..24591999dcd6 --- /dev/null +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/CoreRegisterLinker.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.zipkin; + +import org.apache.skywalking.oap.server.core.CoreModule; +import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister; +import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister; +import org.apache.skywalking.oap.server.library.module.ModuleManager; + +public class CoreRegisterLinker { + private static volatile ModuleManager MODULE_MANAGER; + private static volatile IServiceInventoryRegister SERVICE_INVENTORY_REGISTER; + private static volatile IServiceInstanceInventoryRegister SERVICE_INSTANCE_INVENTORY_REGISTER; + + public static void setModuleManager(ModuleManager moduleManager) { + CoreRegisterLinker.MODULE_MANAGER = moduleManager; + } + + public static IServiceInventoryRegister getServiceInventoryRegister() { + if (SERVICE_INVENTORY_REGISTER == null) { + SERVICE_INVENTORY_REGISTER = MODULE_MANAGER.find(CoreModule.NAME).provider().getService(IServiceInventoryRegister.class); + } + return SERVICE_INVENTORY_REGISTER; + } + + public static IServiceInstanceInventoryRegister getServiceInstanceInventoryRegister() { + if (SERVICE_INSTANCE_INVENTORY_REGISTER == null) { + SERVICE_INSTANCE_INVENTORY_REGISTER = MODULE_MANAGER.find(CoreModule.NAME).provider().getService(IServiceInstanceInventoryRegister.class); + } + return SERVICE_INSTANCE_INVENTORY_REGISTER; + } +} diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/Receiver2AnalysisBridge.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/Receiver2AnalysisBridge.java new file mode 100644 index 000000000000..1381da3ed44e --- /dev/null +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/Receiver2AnalysisBridge.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.zipkin; + +import org.apache.skywalking.oap.server.receiver.trace.provider.parser.ISegmentParserService; +import org.apache.skywalking.oap.server.receiver.zipkin.data.SkyWalkingTrace; +import org.apache.skywalking.oap.server.receiver.zipkin.transform.SegmentListener; +import org.apache.skywalking.oap.server.receiver.zipkin.transform.Zipkin2SkyWalkingTransfer; + +/** + * Send the segments to Analysis module, like receiving segments from native SkyWalking agents. + */ +public class Receiver2AnalysisBridge implements SegmentListener { + private ISegmentParserService segmentParseService; + + public Receiver2AnalysisBridge(ISegmentParserService segmentParseService) { + this.segmentParseService = segmentParseService; + } + + /** + * Add this bridge as listener to Zipkin span transfer. + */ + public void build() { + Zipkin2SkyWalkingTransfer.INSTANCE.addListener(this); + } + + @Override + public void notify(SkyWalkingTrace trace) { + trace.toUpstreamSegment().forEach(upstream -> segmentParseService.send(upstream.build())); + + } +} diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java new file mode 100644 index 000000000000..a5e14af22bf9 --- /dev/null +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.zipkin; + +import org.apache.skywalking.oap.server.library.module.ModuleConfig; + +/** + * @author wusheng + */ +public class ZipkinReceiverConfig extends ModuleConfig { + private String host; + private int port; + private String contextPath; + + private int expireTime = 20; + + private int maxCacheSize = 1_000_000; + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public String getContextPath() { + return contextPath; + } + + public void setContextPath(String contextPath) { + this.contextPath = contextPath; + } + + public int getExpireTime() { + return expireTime; + } + + public void setExpireTime(int expireTime) { + this.expireTime = expireTime; + } + + public int getMaxCacheSize() { + return maxCacheSize; + } + + public void setMaxCacheSize(int maxCacheSize) { + this.maxCacheSize = maxCacheSize; + } +} diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverModule.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverModule.java new file mode 100644 index 000000000000..abf1f094e283 --- /dev/null +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverModule.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.zipkin; + +import org.apache.skywalking.oap.server.library.module.ModuleDefine; + +/** + * Zipkin receiver module provides the HTTP, protoc serve for any SDK or agent by following Zipkin format. + * + * At this moment, Zipkin format is not compatible with SkyWalking, especially HEADERs. Please don't consider this as a + * Zipkin-SkyWalking integration, it is provided for adding analysis, aggregation and visualization capabilities to + * zipkin backend. + * + * @author wusheng + */ +public class ZipkinReceiverModule extends ModuleDefine { + public static final String NAME = "receiver_zipkin"; + + public ZipkinReceiverModule() { + super(NAME); + } + + @Override public Class[] services() { + return new Class[0]; + } +} diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java new file mode 100644 index 000000000000..2eb20791b699 --- /dev/null +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.zipkin; + +import org.apache.skywalking.oap.server.library.module.ModuleConfig; +import org.apache.skywalking.oap.server.library.module.ModuleDefine; +import org.apache.skywalking.oap.server.library.module.ModuleProvider; +import org.apache.skywalking.oap.server.library.module.ModuleStartException; +import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException; +import org.apache.skywalking.oap.server.library.server.ServerException; +import org.apache.skywalking.oap.server.library.server.jetty.JettyServer; +import org.apache.skywalking.oap.server.receiver.trace.module.TraceModule; +import org.apache.skywalking.oap.server.receiver.trace.provider.parser.ISegmentParserService; +import org.apache.skywalking.oap.server.receiver.zipkin.handler.SpanV1JettyHandler; +import org.apache.skywalking.oap.server.receiver.zipkin.handler.SpanV2JettyHandler; +import org.apache.skywalking.oap.server.receiver.zipkin.transform.Zipkin2SkyWalkingTransfer; + +/** + * @author wusheng + */ +public class ZipkinReceiverProvider extends ModuleProvider { + public static final String NAME = "default"; + private ZipkinReceiverConfig config; + private JettyServer jettyServer; + + public ZipkinReceiverProvider() { + config = new ZipkinReceiverConfig(); + } + + @Override public String name() { + return NAME; + } + + @Override public Class module() { + return ZipkinReceiverModule.class; + } + + @Override public ModuleConfig createConfigBeanIfAbsent() { + return config; + } + + @Override public void prepare() throws ServiceNotProvidedException { + + } + + @Override public void start() throws ServiceNotProvidedException, ModuleStartException { + CoreRegisterLinker.setModuleManager(getManager()); + + jettyServer = new JettyServer(config.getHost(), config.getPort(), config.getContextPath()); + jettyServer.initialize(); + + jettyServer.addHandler(new SpanV1JettyHandler(config)); + jettyServer.addHandler(new SpanV2JettyHandler(config)); + + ISegmentParserService segmentParseService = getManager().find(TraceModule.NAME).provider().getService(ISegmentParserService.class); + Receiver2AnalysisBridge bridge = new Receiver2AnalysisBridge(segmentParseService); + Zipkin2SkyWalkingTransfer.INSTANCE.addListener(bridge); + } + + @Override public void notifyAfterCompleted() throws ModuleStartException { + try { + jettyServer.start(); + } catch (ServerException e) { + throw new ModuleStartException(e.getMessage(), e); + } + } + + @Override public String[] requiredModules() { + return new String[] {TraceModule.NAME}; + } +} diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinTraceOSInfoBuilder.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinTraceOSInfoBuilder.java new file mode 100644 index 000000000000..cf929f5462a6 --- /dev/null +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinTraceOSInfoBuilder.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.zipkin; + +import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory; + +/** + * @author wusheng + */ +public class ZipkinTraceOSInfoBuilder { + + public static ServiceInstanceInventory.AgentOsInfo getOSInfoForZipkin(String instanceName) { + ServiceInstanceInventory.AgentOsInfo osInfo = new ServiceInstanceInventory.AgentOsInfo(); + osInfo.setHostname(instanceName); + return osInfo; + } +} diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/CacheFactory.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/CacheFactory.java new file mode 100644 index 000000000000..c4d5b06140da --- /dev/null +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/CacheFactory.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.zipkin.cache; + +import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig; +import org.apache.skywalking.oap.server.receiver.zipkin.cache.caffeine.CaffeineSpanCache; + +/** + * @author wusheng + */ +public class CacheFactory { + public static final CacheFactory INSTANCE = new CacheFactory(); + + private ISpanCache implementor; + + private CacheFactory() { + } + + public ISpanCache get(ZipkinReceiverConfig config) { + if (implementor == null) { + synchronized (INSTANCE) { + if (implementor == null) { + implementor = new CaffeineSpanCache(config); + } + } + } + return implementor; + } +} diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/ISpanCache.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/ISpanCache.java new file mode 100644 index 000000000000..b122bcfefb5f --- /dev/null +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/ISpanCache.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.zipkin.cache; + +import zipkin2.Span; + +/** + * @author wusheng + */ +public interface ISpanCache { + void addSpan(Span span); +} diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/caffeine/CaffeineSpanCache.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/caffeine/CaffeineSpanCache.java new file mode 100644 index 000000000000..c42e7105157f --- /dev/null +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/cache/caffeine/CaffeineSpanCache.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.zipkin.cache.caffeine; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; +import com.github.benmanes.caffeine.cache.RemovalListener; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig; +import org.apache.skywalking.oap.server.receiver.zipkin.cache.ISpanCache; +import org.apache.skywalking.oap.server.receiver.zipkin.data.ZipkinTrace; +import org.apache.skywalking.oap.server.receiver.zipkin.transform.Zipkin2SkyWalkingTransfer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import zipkin2.Span; + +/** + * NOTICE: FROM my test, Caffeine cache triggers/checks expire only face write/read op. + * In order to make trace finish in time, I have to set a timer to write a meaningless trace, for active expire. + * + * @author wusheng + */ +public class CaffeineSpanCache implements ISpanCache, RemovalListener { + private static final Logger logger = LoggerFactory.getLogger(CaffeineSpanCache.class); + private Cache inProcessSpanCache; + private ReentrantLock newTraceLock; + + public CaffeineSpanCache(ZipkinReceiverConfig config) { + newTraceLock = new ReentrantLock(); + inProcessSpanCache = Caffeine.newBuilder() + .expireAfterWrite(config.getExpireTime(), TimeUnit.SECONDS) + .maximumSize(config.getMaxCacheSize()) + .removalListener(this) + .build(); + Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { + inProcessSpanCache.put("ACTIVE", new ZipkinTrace.TriggerTrace()); + }, 2, 3, TimeUnit.SECONDS); + } + + /** + * Zipkin trace finished by the expired rule. + * + * @param key + * @param trace + * @param cause + */ + @Override + public void onRemoval(@Nullable String key, @Nullable ZipkinTrace trace, @Nonnull RemovalCause cause) { + if (trace instanceof ZipkinTrace.TriggerTrace) { + return; + } + try { + Zipkin2SkyWalkingTransfer.INSTANCE.transfer(trace); + } catch (Exception e) { + logger.error(e.getMessage(), e); + logger.warn("Zipkin trace:" + trace); + } + } + + @Override + public void addSpan(Span span) { + ZipkinTrace trace = inProcessSpanCache.getIfPresent(span.traceId()); + if (trace == null) { + newTraceLock.lock(); + try { + trace = inProcessSpanCache.getIfPresent(span.traceId()); + if (trace == null) { + trace = new ZipkinTrace(); + inProcessSpanCache.put(span.traceId(), trace); + } + } finally { + newTraceLock.unlock(); + } + } + trace.addSpan(span); + } +} diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/data/SkyWalkingTrace.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/data/SkyWalkingTrace.java new file mode 100644 index 000000000000..9eca022bdda4 --- /dev/null +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/data/SkyWalkingTrace.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.zipkin.data; + +import java.util.LinkedList; +import java.util.List; +import org.apache.skywalking.apm.network.language.agent.TraceSegmentObject; +import org.apache.skywalking.apm.network.language.agent.UniqueId; +import org.apache.skywalking.apm.network.language.agent.UpstreamSegment; + +/** + * Each SkyWalkingTrace consists of segments in each application, original from {@link ZipkinTrace}s + */ +public class SkyWalkingTrace { + private UniqueId globalTraceId; + private List segmentList; + + public SkyWalkingTrace(UniqueId globalTraceId, List segmentList) { + this.globalTraceId = globalTraceId; + this.segmentList = segmentList; + } + + public List toUpstreamSegment() { + List newUpstreamList = new LinkedList<>(); + segmentList.forEach(segment -> { + UpstreamSegment.Builder builder = UpstreamSegment.newBuilder(); + builder.addGlobalTraceIds(globalTraceId); + builder.setSegment(segment.build().toByteString()); + newUpstreamList.add(builder); + }); + return newUpstreamList; + } + + public UniqueId getGlobalTraceId() { + return globalTraceId; + } + + public List getSegmentList() { + return segmentList; + } +} diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/data/ZipkinTrace.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/data/ZipkinTrace.java new file mode 100644 index 000000000000..d12beb8b6085 --- /dev/null +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/data/ZipkinTrace.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.zipkin.data; + +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.locks.ReentrantLock; +import zipkin2.Span; + +/** + * @author wusheng + */ +public class ZipkinTrace { + private List spans; + private ReentrantLock spanWriteLock; + + public ZipkinTrace() { + spans = new LinkedList<>(); + spanWriteLock = new ReentrantLock(); + } + + public void addSpan(Span span) { + spanWriteLock.lock(); + try { + spans.add(span); + } finally { + spanWriteLock.unlock(); + } + } + + public List getSpans() { + return spans; + } + + @Override + public String toString() { + return "ZipkinTrace{" + + "spans=" + spans + + '}'; + } + + public static class TriggerTrace extends ZipkinTrace { + + + } +} diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java new file mode 100644 index 000000000000..f5e97be51cab --- /dev/null +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanProcessor.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.zipkin.handler; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.zip.GZIPInputStream; +import javax.servlet.http.HttpServletRequest; +import org.apache.skywalking.oap.server.receiver.zipkin.CoreRegisterLinker; +import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig; +import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinTraceOSInfoBuilder; +import org.apache.skywalking.oap.server.receiver.zipkin.cache.CacheFactory; +import zipkin2.Span; +import zipkin2.codec.SpanBytesDecoder; + +public class SpanProcessor { + void convert(ZipkinReceiverConfig config, SpanBytesDecoder decoder, HttpServletRequest request) throws IOException { + InputStream inputStream = getInputStream(request); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + byte[] buffer = new byte[2048]; + int readCntOnce; + + while ((readCntOnce = inputStream.read(buffer)) >= 0) { + out.write(buffer, 0, readCntOnce); + } + + List spanList = decoder.decodeList(out.toByteArray()); + + spanList.forEach(span -> { + // In Zipkin, the local service name represents the application owner. + String applicationCode = span.localServiceName(); + if (applicationCode != null) { + int applicationId = CoreRegisterLinker.getServiceInventoryRegister().getOrCreate(applicationCode); + if (applicationId != 0) { + CoreRegisterLinker.getServiceInstanceInventoryRegister().getOrCreate(applicationId, applicationCode, applicationCode, + span.timestampAsLong(), + ZipkinTraceOSInfoBuilder.getOSInfoForZipkin(applicationCode)); + } + } + + CacheFactory.INSTANCE.get(config).addSpan(span); + }); + } + + private InputStream getInputStream(HttpServletRequest request) throws IOException { + InputStream requestInStream; + + String headEncoding = request.getHeader("accept-encoding"); + if (headEncoding != null && (headEncoding.indexOf("gzip") != -1)) { + requestInStream = new GZIPInputStream(request.getInputStream()); + } else { + requestInStream = request.getInputStream(); + } + + return requestInStream; + } + +} diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV1JettyHandler.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV1JettyHandler.java new file mode 100644 index 000000000000..77f38e48c2b7 --- /dev/null +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV1JettyHandler.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.zipkin.handler; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import org.apache.skywalking.oap.server.library.server.jetty.JettyHandler; +import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import zipkin2.codec.SpanBytesDecoder; + +public class SpanV1JettyHandler extends JettyHandler { + private static final Logger logger = LoggerFactory.getLogger(SpanV2JettyHandler.class); + + private ZipkinReceiverConfig config; + + public SpanV1JettyHandler(ZipkinReceiverConfig config) { + this.config = config; + } + + @Override + public String pathSpec() { + return "/api/v1/spans"; + } + + @Override + protected void doPost(HttpServletRequest request, HttpServletResponse response) { + response.setContentType("application/json"); + response.setCharacterEncoding("utf-8"); + + try { + String type = request.getHeader("Content-Type"); + + SpanBytesDecoder decoder = type != null && type.contains("/x-thrift") + ? SpanBytesDecoder.THRIFT + : SpanBytesDecoder.JSON_V1; + + SpanProcessor processor = new SpanProcessor(); + processor.convert(config, decoder, request); + + response.setStatus(202); + } catch (Exception e) { + response.setStatus(500); + + logger.error(e.getMessage(), e); + } + } + +} diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV2JettyHandler.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV2JettyHandler.java new file mode 100644 index 000000000000..7c8705a284bb --- /dev/null +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV2JettyHandler.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.zipkin.handler; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import org.apache.skywalking.oap.server.library.server.jetty.JettyHandler; +import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import zipkin2.codec.SpanBytesDecoder; + +/** + * @author wusheng + */ +public class SpanV2JettyHandler extends JettyHandler { + private static final Logger logger = LoggerFactory.getLogger(SpanV2JettyHandler.class); + + private ZipkinReceiverConfig config; + + public SpanV2JettyHandler(ZipkinReceiverConfig config) { + this.config = config; + } + + @Override + public String pathSpec() { + return "/api/v2/spans"; + } + + @Override + protected void doPost(HttpServletRequest request, HttpServletResponse response) { + response.setContentType("application/json"); + response.setCharacterEncoding("utf-8"); + + try { + String type = request.getHeader("Content-Type"); + + SpanBytesDecoder decoder = type != null && type.contains("/x-protobuf") + ? SpanBytesDecoder.PROTO3 + : SpanBytesDecoder.JSON_V2; + + SpanProcessor processor = new SpanProcessor(); + processor.convert(config, decoder, request); + + response.setStatus(202); + } catch (Exception e) { + response.setStatus(500); + + logger.error(e.getMessage(), e); + } + } +} diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SegmentBuilder.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SegmentBuilder.java new file mode 100644 index 000000000000..3b06584e4fdc --- /dev/null +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SegmentBuilder.java @@ -0,0 +1,489 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.zipkin.transform; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.logging.log4j.util.Strings; +import org.apache.skywalking.apm.network.language.agent.KeyWithStringValue; +import org.apache.skywalking.apm.network.language.agent.LogMessage; +import org.apache.skywalking.apm.network.language.agent.RefType; +import org.apache.skywalking.apm.network.language.agent.SpanObject; +import org.apache.skywalking.apm.network.language.agent.SpanType; +import org.apache.skywalking.apm.network.language.agent.TraceSegmentObject; +import org.apache.skywalking.apm.network.language.agent.TraceSegmentReference; +import org.apache.skywalking.apm.network.language.agent.UniqueId; +import org.apache.skywalking.oap.server.library.util.StringUtils; +import org.apache.skywalking.oap.server.receiver.zipkin.CoreRegisterLinker; +import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinTraceOSInfoBuilder; +import org.apache.skywalking.oap.server.receiver.zipkin.data.SkyWalkingTrace; +import org.eclipse.jetty.util.StringUtil; +import zipkin2.Endpoint; +import zipkin2.Span; + +/** + * @author wusheng + */ +public class SegmentBuilder { + private Context context; + private LinkedList segments; + private Map clientPartSpan; + + private SegmentBuilder() { + segments = new LinkedList<>(); + context = new Context(); + clientPartSpan = new HashMap<>(); + } + + public static SkyWalkingTrace build(List traceSpans) throws Exception { + SegmentBuilder builder = new SegmentBuilder(); + // This map groups the spans by their parent id, in order to assist to build tree. + // key: parentId + // value: span + Map> childSpanMap = new HashMap<>(); + AtomicReference root = new AtomicReference<>(); + traceSpans.forEach(span -> { + if (span.parentId() == null) { + root.set(span); + } + List spanList = childSpanMap.get(span.parentId()); + if (spanList == null) { + spanList = new LinkedList<>(); + spanList.add(span); + childSpanMap.put(span.parentId(), spanList); + } else { + spanList.add(span); + } + }); + + Span rootSpan = root.get(); + long timestamp = 0; + if (rootSpan != null) { + String applicationCode = rootSpan.localServiceName(); + // If root span doesn't include applicationCode, a.k.a local service name, + // Segment can't be built + // Ignore the whole trace. + // :P Hope anyone could provide better solution. + // Wu Sheng. + if (StringUtils.isNotEmpty(applicationCode)) { + timestamp = rootSpan.timestampAsLong(); + builder.context.addApp(applicationCode, rootSpan.timestampAsLong() / 1000); + + SpanObject.Builder rootSpanBuilder = builder.initSpan(null, null, rootSpan, true); + builder.context.currentSegment().addSpan(rootSpanBuilder); + builder.scanSpansFromRoot(rootSpanBuilder, rootSpan, childSpanMap); + + builder.segments.add(builder.context.removeApp()); + } + } + + List segmentBuilders = new LinkedList<>(); + // microseconds -> million seconds + long finalTimestamp = timestamp / 1000; + builder.segments.forEach(segment -> { + TraceSegmentObject.Builder traceSegmentBuilder = segment.freeze(); + segmentBuilders.add(traceSegmentBuilder); + CoreRegisterLinker.getServiceInventoryRegister().heartbeat(traceSegmentBuilder.getApplicationId(), finalTimestamp); + CoreRegisterLinker.getServiceInstanceInventoryRegister().heartbeat(traceSegmentBuilder.getApplicationInstanceId(), finalTimestamp); + }); + return new SkyWalkingTrace(builder.generateTraceOrSegmentId(), segmentBuilders); + } + + private void scanSpansFromRoot(SpanObject.Builder parentSegmentSpan, Span parent, + Map> childSpanMap) throws Exception { + String parentId = parent.id(); + // get child spans by parent span id + List spanList = childSpanMap.get(parentId); + if (spanList == null) { + return; + } + for (Span childSpan : spanList) { + String localServiceName = childSpan.localServiceName(); + boolean isNewApp = false; + if (StringUtil.isNotBlank(localServiceName)) { + if (context.isAppChanged(localServiceName)) { + isNewApp = true; + } + } + + try { + if (isNewApp) { + context.addApp(localServiceName, childSpan.timestampAsLong() / 1000); + } + SpanObject.Builder childSpanBuilder = initSpan(parentSegmentSpan, parent, childSpan, isNewApp); + + context.currentSegment().addSpan(childSpanBuilder); + scanSpansFromRoot(childSpanBuilder, childSpan, childSpanMap); + + } finally { + if (isNewApp) { + segments.add(context.removeApp()); + } + } + } + } + + private SpanObject.Builder initSpan(SpanObject.Builder parentSegmentSpan, Span parentSpan, Span span, + boolean isSegmentRoot) { + SpanObject.Builder spanBuilder = SpanObject.newBuilder(); + spanBuilder.setSpanId(context.currentIDs().nextSpanId()); + if (isSegmentRoot) { + // spanId = -1, means no parent span + // spanId is considered unique, and from a positive sequence in each segment. + spanBuilder.setParentSpanId(-1); + } + if (!isSegmentRoot && parentSegmentSpan != null) { + spanBuilder.setParentSpanId(parentSegmentSpan.getSpanId()); + } + Span.Kind kind = span.kind(); + String opName = Strings.isBlank(span.name()) ? "-" : span.name(); + spanBuilder.setOperationName(opName); + ClientSideSpan clientSideSpan; + switch (kind) { + case CLIENT: + spanBuilder.setSpanType(SpanType.Exit); + String peer = getPeer(parentSpan, span); + if (peer != null) { + spanBuilder.setPeer(peer); + } + clientSideSpan = new ClientSideSpan(span, spanBuilder); + clientPartSpan.put(span.id(), clientSideSpan); + break; + case SERVER: + spanBuilder.setSpanType(SpanType.Entry); + this.buildRef(spanBuilder, span, parentSegmentSpan, parentSpan); + break; + case CONSUMER: + spanBuilder.setSpanType(SpanType.Entry); + this.buildRef(spanBuilder, span, parentSegmentSpan, parentSpan); + break; + case PRODUCER: + spanBuilder.setSpanType(SpanType.Exit); + peer = getPeer(parentSpan, span); + if (peer != null) { + spanBuilder.setPeer(peer); + } + clientSideSpan = new ClientSideSpan(span, spanBuilder); + clientPartSpan.put(span.id(), clientSideSpan); + break; + default: + spanBuilder.setSpanType(SpanType.Local); + } + // microseconds in Zipkin -> milliseconds in SkyWalking + long startTime = span.timestamp() / 1000; + // Some implement of zipkin client not include duration field in its report + // package when duration's value be 0ms, Causing a null pointer exception here. + Long durationObj = span.duration(); + long duration = (durationObj == null) ? 0 : durationObj.longValue() / 1000; + spanBuilder.setStartTime(startTime); + spanBuilder.setEndTime(startTime + duration); + + span.tags().forEach((tagKey, tagValue) -> spanBuilder.addTags( + KeyWithStringValue.newBuilder().setKey(tagKey).setValue(tagValue).build()) + ); + + span.annotations().forEach(annotation -> + spanBuilder.addLogs(LogMessage.newBuilder().setTime(annotation.timestamp() / 1000).addData( + KeyWithStringValue.newBuilder().setKey("zipkin.annotation").setValue(annotation.value()).build() + )) + ); + + return spanBuilder; + } + + private void buildRef(SpanObject.Builder spanBuilder, Span span, SpanObject.Builder parentSegmentSpan, + Span parentSpan) { + Segment parentSegment = context.parentSegment(); + if (parentSegment == null) { + return; + } + Segment rootSegment = context.rootSegment(); + if (rootSegment == null) { + return; + } + + if (span.shared() != null && span.shared()) { + // using same span id in client and server for RPC + // SkyWalking will build both sides of span + ClientSideSpan clientSideSpan = clientPartSpan.get(span.id()); + if (clientSideSpan != null) { + // For the root span, there may be no ref, because of no parent. + parentSegmentSpan = clientSideSpan.getBuilder(); + parentSpan = clientSideSpan.getSpan(); + } + } + + String peer = getPeer(parentSpan, span); + if (StringUtil.isBlank(peer)) { + //The IP is the most important for building the ref at both sides. + return; + } + + TraceSegmentReference.Builder refBuilder = TraceSegmentReference.newBuilder(); + refBuilder.setEntryApplicationInstanceId(rootSegment.builder().getApplicationInstanceId()); + int serviceId = rootSegment.getEntryServiceId(); + if (serviceId == 0) { + refBuilder.setEntryServiceName(rootSegment.getEntryServiceName()); + } else { + refBuilder.setEntryServiceId(serviceId); + } + refBuilder.setEntryApplicationInstanceId(rootSegment.builder().getApplicationInstanceId()); + + // parent ref info + refBuilder.setNetworkAddress(peer); + parentSegmentSpan.setPeer(refBuilder.getNetworkAddress()); + refBuilder.setParentApplicationInstanceId(parentSegment.builder().getApplicationInstanceId()); + refBuilder.setParentSpanId(parentSegmentSpan.getSpanId()); + refBuilder.setParentTraceSegmentId(parentSegment.builder().getTraceSegmentId()); + int parentServiceId = parentSegment.getEntryServiceId(); + if (parentServiceId == 0) { + refBuilder.setParentServiceName(parentSegment.getEntryServiceName()); + } else { + refBuilder.setParentServiceId(parentServiceId); + } + refBuilder.setRefType(RefType.CrossProcess); + + spanBuilder.addRefs(refBuilder); + } + + private String getPeer(Span parentSpan, Span childSpan) { + String peer; + + Endpoint serverEndpoint = childSpan == null ? null : childSpan.localEndpoint(); + peer = endpoint2Peer(serverEndpoint); + + if (peer == null) { + Endpoint clientEndpoint = parentSpan == null ? null : parentSpan.remoteEndpoint(); + peer = endpoint2Peer(clientEndpoint); + } + + return peer; + } + + private String endpoint2Peer(Endpoint endpoint) { + String ip = null; + Integer port = 0; + + if (endpoint != null) { + if (StringUtils.isNotEmpty(endpoint.ipv4())) { + ip = endpoint.ipv4(); + port = endpoint.port(); + } else if (StringUtils.isNotEmpty(endpoint.ipv6())) { + ip = endpoint.ipv6(); + port = endpoint.port(); + } + } + if (ip == null) { + return null; + } else { + return port == null || port == 0 ? ip : ip + ":" + port; + } + } + + /** + * Context holds the values in build process. + */ + private class Context { + private LinkedList segmentsStack = new LinkedList<>(); + + private boolean isAppChanged(String applicationCode) { + return StringUtils.isNotEmpty(applicationCode) && !applicationCode.equals(currentIDs().applicationCode); + } + + private Segment addApp(String applicationCode, long registerTime) throws Exception { + int serviceId = waitForExchange(() -> + CoreRegisterLinker.getServiceInventoryRegister().getOrCreate(applicationCode), + 10 + ); + + int serviceInstanceId = waitForExchange(() -> + CoreRegisterLinker.getServiceInstanceInventoryRegister().getOrCreate(serviceId, applicationCode, applicationCode, + registerTime, ZipkinTraceOSInfoBuilder.getOSInfoForZipkin(applicationCode)), + 10 + ); + + Segment segment = new Segment(applicationCode, serviceId, serviceInstanceId); + segmentsStack.add(segment); + return segment; + } + + private IDCollection currentIDs() { + return segmentsStack.getLast().ids; + } + + private Segment currentSegment() { + return segmentsStack.getLast(); + } + + private Segment parentSegment() { + if (segmentsStack.size() < 2) { + return null; + } else { + return segmentsStack.get(segmentsStack.size() - 2); + } + + } + + private Segment rootSegment() { + if (segmentsStack.size() < 2) { + return null; + } else { + return segmentsStack.getFirst(); + } + } + + private Segment removeApp() { + return segmentsStack.removeLast(); + } + + private int waitForExchange(Callable callable, int retry) throws Exception { + for (int i = 0; i < retry; i++) { + Integer id = callable.call(); + if (id == 0) { + Thread.sleep(1000L); + } else { + return id; + } + } + throw new TimeoutException("ID exchange costs more than expected."); + } + } + + private class Segment { + private TraceSegmentObject.Builder segmentBuilder; + private IDCollection ids; + private int entryServiceId = 0; + private String entryServiceName = null; + private List spans; + private long endTime = 0; + + private Segment(String applicationCode, int serviceId, int serviceInstanceId) { + ids = new IDCollection(applicationCode, serviceId, serviceInstanceId); + spans = new LinkedList<>(); + segmentBuilder = TraceSegmentObject.newBuilder(); + segmentBuilder.setApplicationId(serviceId); + segmentBuilder.setApplicationInstanceId(serviceInstanceId); + segmentBuilder.setTraceSegmentId(generateTraceOrSegmentId()); + } + + private TraceSegmentObject.Builder builder() { + return segmentBuilder; + } + + private void addSpan(SpanObject.Builder spanBuilder) { + String operationName = spanBuilder.getOperationName(); + if (entryServiceId == 0 && StringUtils.isNotEmpty(operationName)) { + if (SpanType.Entry == spanBuilder.getSpanType()) { + if (StringUtils.isNotEmpty(operationName)) { + entryServiceName = operationName; + } else { + entryServiceId = spanBuilder.getOperationNameId(); + } + } + } + + // init by root span + if (spanBuilder.getSpanId() == 1 && entryServiceId == 0) { + if (StringUtils.isNotEmpty(operationName)) { + entryServiceName = operationName; + } else { + entryServiceId = spanBuilder.getOperationNameId(); + } + } + + spans.add(spanBuilder); + if (spanBuilder.getEndTime() > endTime) { + endTime = spanBuilder.getEndTime(); + } + } + + public int getEntryServiceId() { + return entryServiceId; + } + + public String getEntryServiceName() { + return entryServiceName; + } + + private IDCollection ids() { + return ids; + } + + public TraceSegmentObject.Builder freeze() { + for (SpanObject.Builder span : spans) { + segmentBuilder.addSpans(span); + } + return segmentBuilder; + } + + public long getEndTime() { + return endTime; + } + } + + private class IDCollection { + private String applicationCode; + private int appId; + private int instanceId; + private int spanIdSeq; + + private IDCollection(String applicationCode, int appId, int instanceId) { + this.applicationCode = applicationCode; + this.appId = appId; + this.instanceId = instanceId; + this.spanIdSeq = 0; + } + + private int nextSpanId() { + return spanIdSeq++; + } + } + + private UniqueId generateTraceOrSegmentId() { + return UniqueId.newBuilder() + .addIdParts(ThreadLocalRandom.current().nextLong()) + .addIdParts(ThreadLocalRandom.current().nextLong()) + .addIdParts(ThreadLocalRandom.current().nextLong()) + .build(); + } + + private class ClientSideSpan { + private Span span; + private SpanObject.Builder builder; + + public ClientSideSpan(Span span, SpanObject.Builder builder) { + this.span = span; + this.builder = builder; + } + + public Span getSpan() { + return span; + } + + public SpanObject.Builder getBuilder() { + return builder; + } + } +} diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SegmentListener.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SegmentListener.java new file mode 100644 index 000000000000..9a0b7c728992 --- /dev/null +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SegmentListener.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.zipkin.transform; + +import org.apache.skywalking.oap.server.receiver.zipkin.data.SkyWalkingTrace; + +public interface SegmentListener { + void notify(SkyWalkingTrace trace); +} diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/Zipkin2SkyWalkingTransfer.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/Zipkin2SkyWalkingTransfer.java new file mode 100644 index 000000000000..b41e50ebfc33 --- /dev/null +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/Zipkin2SkyWalkingTransfer.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.zipkin.transform; + +import java.util.LinkedList; +import java.util.List; +import org.apache.skywalking.oap.server.receiver.zipkin.data.SkyWalkingTrace; +import org.apache.skywalking.oap.server.receiver.zipkin.data.ZipkinTrace; +import zipkin2.Span; + +/** + * @author wusheng + */ +public class Zipkin2SkyWalkingTransfer { + public static Zipkin2SkyWalkingTransfer INSTANCE = new Zipkin2SkyWalkingTransfer(); + private List listeners = new LinkedList<>(); + + private Zipkin2SkyWalkingTransfer() { + } + + public void addListener(SegmentListener listener) { + listeners.add(listener); + } + + public void transfer(ZipkinTrace trace) throws Exception { + List traceSpans = trace.getSpans(); + + if (traceSpans.size() > 0) { + SkyWalkingTrace skyWalkingTrace = SegmentBuilder.build(traceSpans); + + listeners.forEach(listener -> + listener.notify(skyWalkingTrace) + ); + + } + } +} diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleDefine b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleDefine new file mode 100644 index 000000000000..2be863b31a39 --- /dev/null +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleDefine @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# + +org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider new file mode 100644 index 000000000000..349942c20056 --- /dev/null +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# + +org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverProvider diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SpringSleuthSegmentBuilderTest.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SpringSleuthSegmentBuilderTest.java new file mode 100644 index 000000000000..59f1cb2ac557 --- /dev/null +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/zipkin/transform/SpringSleuthSegmentBuilderTest.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.zipkin.transform; + +import org.apache.skywalking.apm.network.language.agent.SpanObject; +import org.apache.skywalking.apm.network.language.agent.SpanType; +import org.apache.skywalking.apm.network.language.agent.TraceSegmentObject; +import org.apache.skywalking.apm.network.language.agent.TraceSegmentReference; +import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory; +import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister; +import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister; +import org.apache.skywalking.oap.server.receiver.zipkin.CoreRegisterLinker; +import org.apache.skywalking.oap.server.receiver.zipkin.data.SkyWalkingTrace; +import org.apache.skywalking.oap.server.receiver.zipkin.data.ZipkinTrace; +import org.junit.Assert; +import org.junit.Test; +import org.powermock.reflect.Whitebox; +import zipkin2.Span; +import zipkin2.codec.SpanBytesDecoder; + +import java.io.UnsupportedEncodingException; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * @author wusheng + */ +public class SpringSleuthSegmentBuilderTest implements SegmentListener { + private Map applicationInstRegister = new HashMap<>(); + private Map applicationRegister = new HashMap<>(); + private int appIdSeg = 1; + private int appInstIdSeq = 1; + + @Test + public void testTransform() throws Exception { + + IServiceInventoryRegister applicationIDService = new IServiceInventoryRegister() { + @Override public int getOrCreate(String serviceName) { + String key = "AppCode:" + serviceName; + if (applicationRegister.containsKey(key)) { + return applicationRegister.get(key); + } else { + int id = appIdSeg++; + applicationRegister.put(key, id); + return id; + } + } + + @Override public int getOrCreate(int addressId, String serviceName) { + String key = "Address:" + serviceName; + if (applicationRegister.containsKey(key)) { + return applicationRegister.get(key); + } else { + int id = appIdSeg++; + applicationRegister.put(key, id); + return id; + } + } + + @Override public void heartbeat(int serviceId, long heartBeatTime) { + + } + + @Override public void updateMapping(int serviceId, int mappingServiceId) { + + } + + + }; + + IServiceInstanceInventoryRegister instanceIDService = new IServiceInstanceInventoryRegister() { + @Override public int getOrCreate(int serviceId, String serviceInstanceName, String uuid, long registerTime, + ServiceInstanceInventory.AgentOsInfo osInfo) { + String key = "AppCode:" + serviceId + ",UUID:" + uuid; + if (applicationInstRegister.containsKey(key)) { + return applicationInstRegister.get(key); + } else { + int id = appInstIdSeq++; + applicationInstRegister.put(key, id); + return id; + } + } + + @Override public int getOrCreate(int serviceId, int addressId, long registerTime) { + String key = "VitualAppCode:" + serviceId + ",address:" + addressId; + if (applicationInstRegister.containsKey(key)) { + return applicationInstRegister.get(key); + } else { + int id = appInstIdSeq++; + applicationInstRegister.put(key, id); + return id; + } + } + + @Override public void heartbeat(int serviceInstanceId, long heartBeatTime) { + + } + }; + + Whitebox.setInternalState(CoreRegisterLinker.class, "SERVICE_INVENTORY_REGISTER", applicationIDService); + Whitebox.setInternalState(CoreRegisterLinker.class, "SERVICE_INSTANCE_INVENTORY_REGISTER", instanceIDService); + + Zipkin2SkyWalkingTransfer.INSTANCE.addListener(this); + + List spanList = buildSpringSleuthExampleTrace(); + Assert.assertEquals(3, spanList.size()); + + ZipkinTrace trace = new ZipkinTrace(); + spanList.forEach(span -> trace.addSpan(span)); + + Zipkin2SkyWalkingTransfer.INSTANCE.transfer(trace); + } + + private List buildSpringSleuthExampleTrace() throws UnsupportedEncodingException { + List spans = new LinkedList<>(); + String span = "{\"traceId\":\"5b0e64354eea4fa71a8a1b5bdd791b8a\",\"id\":\"1a8a1b5bdd791b8a\",\"kind\":\"SERVER\",\"name\":\"get /\",\"timestamp\":1527669813700123,\"duration\":11295,\"localEndpoint\":{\"serviceName\":\"frontend\",\"ipv4\":\"192.168.72.220\"},\"remoteEndpoint\":{\"ipv6\":\"::1\",\"port\":55146},\"tags\":{\"http.method\":\"GET\",\"http.path\":\"/\",\"mvc.controller.class\":\"Frontend\",\"mvc.controller.method\":\"callBackend\"}}"; + spans.add(SpanBytesDecoder.JSON_V2.decodeOne(span.getBytes("UTF-8"))); + span = "{\"traceId\":\"5b0e64354eea4fa71a8a1b5bdd791b8a\",\"parentId\":\"1a8a1b5bdd791b8a\",\"id\":\"d7d5b93dcda767c8\",\"kind\":\"CLIENT\",\"name\":\"get\",\"timestamp\":1527669813702456,\"duration\":6672,\"localEndpoint\":{\"serviceName\":\"frontend\",\"ipv4\":\"192.168.72.220\"},\"tags\":{\"http.method\":\"GET\",\"http.path\":\"/api\"}}"; + spans.add(SpanBytesDecoder.JSON_V2.decodeOne(span.getBytes("UTF-8"))); + span = "{\"traceId\":\"5b0e64354eea4fa71a8a1b5bdd791b8a\",\"parentId\":\"1a8a1b5bdd791b8a\",\"id\":\"d7d5b93dcda767c8\",\"kind\":\"SERVER\",\"name\":\"get /api\",\"timestamp\":1527669813705106,\"duration\":4802,\"localEndpoint\":{\"serviceName\":\"backend\",\"ipv4\":\"192.168.72.220\"},\"remoteEndpoint\":{\"ipv4\":\"127.0.0.1\",\"port\":55147},\"tags\":{\"http.method\":\"GET\",\"http.path\":\"/api\",\"mvc.controller.class\":\"Backend\",\"mvc.controller.method\":\"printDate\"},\"shared\":true}"; + spans.add(SpanBytesDecoder.JSON_V2.decodeOne(span.getBytes("UTF-8"))); + + return SpanBytesDecoder.JSON_V2.decodeList(spans.toString().getBytes("UTF-8")); + } + + @Override + public void notify(SkyWalkingTrace trace) { + List segments = trace.getSegmentList(); + Assert.assertEquals(2, segments.size()); + TraceSegmentObject.Builder builder = segments.get(0); + TraceSegmentObject.Builder builder1 = segments.get(1); + TraceSegmentObject.Builder front, end; + if (builder.getApplicationId() == applicationRegister.get("AppCode:frontend")) { + front = builder; + end = builder1; + Assert.assertEquals(applicationRegister.get("AppCode:backend").longValue(), builder1.getApplicationId()); + } else if (builder.getApplicationId() == applicationRegister.get("AppCode:backend")) { + end = builder; + front = builder1; + Assert.assertEquals(applicationRegister.get("AppCode:frontend").longValue(), builder1.getApplicationId()); + } else { + Assert.fail("Can't find frontend and backend applications. "); + return; + } + + Assert.assertEquals(2, front.getSpansCount()); + Assert.assertEquals(1, end.getSpansCount()); + + front.getSpansList().forEach(spanObject -> { + if (spanObject.getSpanId() == 0) { + // span id = 1, means incoming http of frontend + Assert.assertEquals(SpanType.Entry, spanObject.getSpanType()); + Assert.assertEquals("get /", spanObject.getOperationName()); + Assert.assertEquals(0, spanObject.getSpanId()); + Assert.assertEquals(-1, spanObject.getParentSpanId()); + } else if (spanObject.getSpanId() == 1) { + Assert.assertEquals("192.168.72.220", spanObject.getPeer()); + Assert.assertEquals(SpanType.Exit, spanObject.getSpanType()); + Assert.assertEquals(1, spanObject.getSpanId()); + Assert.assertEquals(0, spanObject.getParentSpanId()); + } else { + Assert.fail("Only two spans expected"); + } + Assert.assertTrue(spanObject.getTagsCount() > 0); + }); + + SpanObject spanObject = end.getSpans(0); + + Assert.assertEquals(1, spanObject.getRefsCount()); + TraceSegmentReference spanObjectRef = spanObject.getRefs(0); + Assert.assertEquals("get", spanObjectRef.getEntryServiceName()); + Assert.assertEquals("get", spanObjectRef.getParentServiceName()); + //Assert.assertEquals("192.168.72.220", spanObjectRef.getNetworkAddress()); + Assert.assertEquals(1, spanObjectRef.getParentSpanId()); + Assert.assertEquals(front.getTraceSegmentId(), spanObjectRef.getParentTraceSegmentId()); + + Assert.assertTrue(spanObject.getTagsCount() > 0); + + Assert.assertEquals("get /api", spanObject.getOperationName()); + Assert.assertEquals(0, spanObject.getSpanId()); + Assert.assertEquals(-1, spanObject.getParentSpanId()); + Assert.assertEquals(SpanType.Entry, spanObject.getSpanType()); + } +} \ No newline at end of file diff --git a/oap-server/server-starter/pom.xml b/oap-server/server-starter/pom.xml index 153a8713f0d4..f1295a5926c4 100644 --- a/oap-server/server-starter/pom.xml +++ b/oap-server/server-starter/pom.xml @@ -78,6 +78,11 @@ skywalking-trace-receiver-plugin ${project.version} + + org.apache.skywalking + zipkin-receiver-plugin + ${project.version} + diff --git a/oap-server/server-starter/src/main/assembly/application.yml b/oap-server/server-starter/src/main/assembly/application.yml index 312f35673ba0..f6612620a348 100644 --- a/oap-server/server-starter/src/main/assembly/application.yml +++ b/oap-server/server-starter/src/main/assembly/application.yml @@ -77,6 +77,11 @@ service-mesh: bufferFileCleanWhenRestart: false istio-telemetry: default: +#receiver_zipkin: +# default: +# host: 0.0.0.0 +# port: 9411 +# contextPath: / query: graphql: path: /graphql diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml index cd57703df5a7..16219bbee75d 100644 --- a/oap-server/server-starter/src/main/resources/application.yml +++ b/oap-server/server-starter/src/main/resources/application.yml @@ -77,6 +77,11 @@ service-mesh: bufferFileCleanWhenRestart: false istio-telemetry: default: +receiver_zipkin: + default: + host: 0.0.0.0 + port: 9411 + contextPath: / query: graphql: path: /graphql