From 8c3fbe4c03196419d3fe8b70d9e3104e74af5d18 Mon Sep 17 00:00:00 2001 From: peng-yongsheng <8082209@qq.com> Date: Thu, 16 Nov 2017 15:13:05 +0800 Subject: [PATCH] Provider agent register and trace stream by http json request. --- .../ApplicationRegisterServiceHandler.java | 9 +- .../agent/jetty/AgentModuleJettyProvider.java | 8 +- .../ApplicationRegisterServletHandler.java | 75 +++++++++++++++++ .../InstanceDiscoveryServletHandler.java | 78 ++++++++++++++++++ .../ServiceNameDiscoveryServiceHandler.java | 82 +++++++++++++++++++ .../handler/TraceSegmentServletHandler.java | 9 +- .../stream/buffer/SegmentBufferManager.java | 5 +- .../stream/buffer/SegmentBufferReader.java | 7 +- .../SegmentStandardizationWorker.java | 2 +- .../worker/register/ApplicationIDService.java | 4 +- .../worker/register/InstanceIDService.java | 8 +- 11 files changed, 263 insertions(+), 24 deletions(-) create mode 100644 apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/handler/ApplicationRegisterServletHandler.java create mode 100644 apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/handler/InstanceDiscoveryServletHandler.java create mode 100644 apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/handler/ServiceNameDiscoveryServiceHandler.java diff --git a/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/handler/ApplicationRegisterServiceHandler.java b/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/handler/ApplicationRegisterServiceHandler.java index d6f6649a850d..1008f22c8d38 100644 --- a/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/handler/ApplicationRegisterServiceHandler.java +++ b/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/handler/ApplicationRegisterServiceHandler.java @@ -22,8 +22,6 @@ import io.grpc.stub.StreamObserver; import org.skywalking.apm.collector.agent.stream.worker.register.ApplicationIDService; import org.skywalking.apm.collector.core.module.ModuleManager; -import org.skywalking.apm.collector.core.module.ModuleNotFoundException; -import org.skywalking.apm.collector.core.module.ServiceNotProvidedException; import org.skywalking.apm.collector.server.grpc.GRPCHandler; import org.skywalking.apm.network.proto.Application; import org.skywalking.apm.network.proto.ApplicationMapping; @@ -52,12 +50,7 @@ public ApplicationRegisterServiceHandler(ModuleManager moduleManager) { ApplicationMapping.Builder builder = ApplicationMapping.newBuilder(); for (int i = 0; i < applicationCodes.size(); i++) { String applicationCode = applicationCodes.get(i); - int applicationId = 0; - try { - applicationId = applicationIDService.getOrCreate(applicationCode); - } catch (ModuleNotFoundException | ServiceNotProvidedException e) { - logger.error(e.getMessage(), e); - } + int applicationId = applicationIDService.getOrCreate(applicationCode); if (applicationId != 0) { KeyWithIntegerValue value = KeyWithIntegerValue.newBuilder().setKey(applicationCode).setValue(applicationId).build(); diff --git a/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/AgentModuleJettyProvider.java b/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/AgentModuleJettyProvider.java index 68ac58805317..75d911b443ff 100644 --- a/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/AgentModuleJettyProvider.java +++ b/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/AgentModuleJettyProvider.java @@ -20,6 +20,9 @@ import java.util.Properties; import org.skywalking.apm.collector.agent.AgentModule; +import org.skywalking.apm.collector.agent.jetty.handler.ApplicationRegisterServletHandler; +import org.skywalking.apm.collector.agent.jetty.handler.InstanceDiscoveryServletHandler; +import org.skywalking.apm.collector.agent.jetty.handler.ServiceNameDiscoveryServiceHandler; import org.skywalking.apm.collector.agent.jetty.handler.TraceSegmentServletHandler; import org.skywalking.apm.collector.agent.jetty.handler.naming.AgentJettyNamingHandler; import org.skywalking.apm.collector.agent.jetty.handler.naming.AgentJettyNamingListener; @@ -89,6 +92,9 @@ public class AgentModuleJettyProvider extends ModuleProvider { } private void addHandlers(Server jettyServer) { - jettyServer.addHandler(new TraceSegmentServletHandler()); + jettyServer.addHandler(new TraceSegmentServletHandler(getManager())); + jettyServer.addHandler(new ApplicationRegisterServletHandler(getManager())); + jettyServer.addHandler(new InstanceDiscoveryServletHandler(getManager())); + jettyServer.addHandler(new ServiceNameDiscoveryServiceHandler(getManager())); } } diff --git a/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/handler/ApplicationRegisterServletHandler.java b/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/handler/ApplicationRegisterServletHandler.java new file mode 100644 index 000000000000..5fb5db4b6a74 --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/handler/ApplicationRegisterServletHandler.java @@ -0,0 +1,75 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.jetty.handler; + +import com.google.gson.Gson; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import java.io.IOException; +import javax.servlet.http.HttpServletRequest; +import org.skywalking.apm.collector.agent.stream.worker.register.ApplicationIDService; +import org.skywalking.apm.collector.core.module.ModuleManager; +import org.skywalking.apm.collector.server.jetty.ArgumentsParseException; +import org.skywalking.apm.collector.server.jetty.JettyHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author peng-yongsheng + */ +public class ApplicationRegisterServletHandler extends JettyHandler { + + private final Logger logger = LoggerFactory.getLogger(ApplicationRegisterServletHandler.class); + + private final ApplicationIDService applicationIDService; + private Gson gson = new Gson(); + private static final String APPLICATION_CODE = "c"; + private static final String APPLICATION_ID = "i"; + + public ApplicationRegisterServletHandler(ModuleManager moduleManager) { + this.applicationIDService = new ApplicationIDService(moduleManager); + } + + @Override public String pathSpec() { + return "/application/register"; + } + + @Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException { + throw new UnsupportedOperationException(); + } + + @Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException { + JsonArray responseArray = new JsonArray(); + try { + JsonArray applicationCodes = gson.fromJson(req.getReader(), JsonArray.class); + for (int i = 0; i < applicationCodes.size(); i++) { + String applicationCode = applicationCodes.get(i).getAsString(); + int applicationId = applicationIDService.getOrCreate(applicationCode); + JsonObject mapping = new JsonObject(); + mapping.addProperty(APPLICATION_CODE, applicationCode); + mapping.addProperty(APPLICATION_ID, applicationId); + responseArray.add(mapping); + } + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + return responseArray; + } +} diff --git a/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/handler/InstanceDiscoveryServletHandler.java b/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/handler/InstanceDiscoveryServletHandler.java new file mode 100644 index 000000000000..658d7d419d7e --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/handler/InstanceDiscoveryServletHandler.java @@ -0,0 +1,78 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.jetty.handler; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import java.io.IOException; +import javax.servlet.http.HttpServletRequest; +import org.skywalking.apm.collector.agent.stream.worker.register.InstanceIDService; +import org.skywalking.apm.collector.core.module.ModuleManager; +import org.skywalking.apm.collector.server.jetty.ArgumentsParseException; +import org.skywalking.apm.collector.server.jetty.JettyHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author peng-yongsheng + */ +public class InstanceDiscoveryServletHandler extends JettyHandler { + + private final Logger logger = LoggerFactory.getLogger(InstanceDiscoveryServletHandler.class); + + private final InstanceIDService instanceIDService; + private Gson gson = new Gson(); + + private static final String APPLICATION_ID = "ai"; + private static final String AGENT_UUID = "au"; + private static final String REGISTER_TIME = "rt"; + private static final String INSTANCE_ID = "ii"; + private static final String OS_INFO = "oi"; + + public InstanceDiscoveryServletHandler(ModuleManager moduleManager) { + this.instanceIDService = new InstanceIDService(moduleManager); + } + + @Override public String pathSpec() { + return "/instance/register"; + } + + @Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException { + throw new UnsupportedOperationException(); + } + + @Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException { + JsonObject responseJson = new JsonObject(); + try { + JsonObject instance = gson.fromJson(req.getReader(), JsonObject.class); + int applicationId = instance.get(APPLICATION_ID).getAsInt(); + String agentUUID = instance.get(AGENT_UUID).getAsString(); + long registerTime = instance.get(REGISTER_TIME).getAsLong(); + JsonObject osInfo = instance.get(OS_INFO).getAsJsonObject(); + + int instanceId = instanceIDService.getOrCreate(applicationId, agentUUID, registerTime, osInfo.toString()); + responseJson.addProperty(APPLICATION_ID, applicationId); + responseJson.addProperty(INSTANCE_ID, instanceId); + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + return responseJson; + } +} diff --git a/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/handler/ServiceNameDiscoveryServiceHandler.java b/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/handler/ServiceNameDiscoveryServiceHandler.java new file mode 100644 index 000000000000..b842e17c9f0e --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/handler/ServiceNameDiscoveryServiceHandler.java @@ -0,0 +1,82 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.jetty.handler; + +import com.google.gson.Gson; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import java.io.IOException; +import javax.servlet.http.HttpServletRequest; +import org.skywalking.apm.collector.agent.stream.worker.register.ServiceNameService; +import org.skywalking.apm.collector.core.module.ModuleManager; +import org.skywalking.apm.collector.server.jetty.ArgumentsParseException; +import org.skywalking.apm.collector.server.jetty.JettyHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author peng-yongsheng + */ +public class ServiceNameDiscoveryServiceHandler extends JettyHandler { + + private final Logger logger = LoggerFactory.getLogger(ServiceNameDiscoveryServiceHandler.class); + + private final ServiceNameService serviceNameService; + private Gson gson = new Gson(); + + private static final String APPLICATION_ID = "ai"; + private static final String SERVICE_NAME = "sn"; + private static final String SERVICE_ID = "si"; + private static final String ELEMENT = "el"; + + public ServiceNameDiscoveryServiceHandler(ModuleManager moduleManager) { + this.serviceNameService = new ServiceNameService(moduleManager); + } + + @Override public String pathSpec() { + return "/servicename/discovery"; + } + + @Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException { + throw new UnsupportedOperationException(); + } + + @Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException { + JsonArray responseArray = new JsonArray(); + try { + JsonArray services = gson.fromJson(req.getReader(), JsonArray.class); + for (JsonElement service : services) { + int applicationId = service.getAsJsonObject().get(APPLICATION_ID).getAsInt(); + String serviceName = service.getAsJsonObject().get(SERVICE_NAME).getAsString(); + + int serviceId = serviceNameService.getOrCreate(applicationId, serviceName); + if (serviceId != 0) { + JsonObject responseJson = new JsonObject(); + responseJson.addProperty(SERVICE_ID, serviceId); + responseJson.add(ELEMENT, service); + responseArray.add(responseJson); + } + } + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + return responseArray; + } +} diff --git a/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/handler/TraceSegmentServletHandler.java b/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/handler/TraceSegmentServletHandler.java index dd3ac8ed2943..6c873bd77cfa 100644 --- a/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/handler/TraceSegmentServletHandler.java +++ b/apm-collector/apm-collector-agent/collector-agent-jetty-provider/src/main/java/org/skywalking/apm/collector/agent/jetty/handler/TraceSegmentServletHandler.java @@ -26,6 +26,7 @@ import org.skywalking.apm.collector.agent.jetty.handler.reader.TraceSegment; import org.skywalking.apm.collector.agent.jetty.handler.reader.TraceSegmentJsonReader; import org.skywalking.apm.collector.agent.stream.parser.SegmentParse; +import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.server.jetty.ArgumentsParseException; import org.skywalking.apm.collector.server.jetty.JettyHandler; import org.slf4j.Logger; @@ -38,6 +39,12 @@ public class TraceSegmentServletHandler extends JettyHandler { private final Logger logger = LoggerFactory.getLogger(TraceSegmentServletHandler.class); + private final ModuleManager moduleManager; + + public TraceSegmentServletHandler(ModuleManager moduleManager) { + this.moduleManager = moduleManager; + } + @Override public String pathSpec() { return "/segments"; } @@ -64,7 +71,7 @@ private void read(BufferedReader bufferedReader) throws IOException { reader.beginArray(); while (reader.hasNext()) { - SegmentParse segmentParse = new SegmentParse(null); + SegmentParse segmentParse = new SegmentParse(moduleManager); TraceSegment traceSegment = jsonReader.read(reader); segmentParse.parse(traceSegment.getUpstreamSegment(), SegmentParse.Source.Agent); } diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/buffer/SegmentBufferManager.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/buffer/SegmentBufferManager.java index 938d0e98038d..2e2a3d86c342 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/buffer/SegmentBufferManager.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/buffer/SegmentBufferManager.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.core.util.Const; import org.skywalking.apm.collector.core.util.StringUtils; import org.skywalking.apm.collector.core.util.TimeBucketUtils; @@ -39,7 +40,7 @@ public enum SegmentBufferManager { public static final String DATA_FILE_PREFIX = "data"; private FileOutputStream outputStream; - public synchronized void initialize() { + public synchronized void initialize(ModuleManager moduleManager) { logger.info("segment buffer initialize"); try { OffsetManager.INSTANCE.initialize(); @@ -58,7 +59,7 @@ public synchronized void initialize() { newDataFile(); } } - SegmentBufferReader.INSTANCE.initialize(); + SegmentBufferReader.INSTANCE.initialize(moduleManager); } catch (IOException e) { logger.error(e.getMessage(), e); } diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/buffer/SegmentBufferReader.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/buffer/SegmentBufferReader.java index a073b212b739..148b33d9b06a 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/buffer/SegmentBufferReader.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/buffer/SegmentBufferReader.java @@ -27,6 +27,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.skywalking.apm.collector.agent.stream.parser.SegmentParse; +import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.core.util.CollectionUtils; import org.skywalking.apm.collector.core.util.Const; import org.skywalking.apm.collector.core.util.StringUtils; @@ -42,8 +43,10 @@ public enum SegmentBufferReader { private final Logger logger = LoggerFactory.getLogger(SegmentBufferReader.class); private InputStream inputStream; + private ModuleManager moduleManager; - public void initialize() { + public void initialize(ModuleManager moduleManager) { + this.moduleManager = moduleManager; Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::preRead, 3, 3, TimeUnit.SECONDS); } @@ -117,7 +120,7 @@ private boolean read(File readFile, long readFileOffset) { while (readFile.length() > readFileOffset && readFileOffset < endPoint) { UpstreamSegment upstreamSegment = UpstreamSegment.parser().parseDelimitedFrom(inputStream); - SegmentParse parse = new SegmentParse(null); + SegmentParse parse = new SegmentParse(moduleManager); if (!parse.parse(upstreamSegment, SegmentParse.Source.Buffer)) { return false; } diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SegmentStandardizationWorker.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SegmentStandardizationWorker.java index 8fe297ad253d..0383d9327351 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SegmentStandardizationWorker.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SegmentStandardizationWorker.java @@ -38,7 +38,7 @@ public class SegmentStandardizationWorker extends AbstractLocalAsyncWorker