Skip to content

Commit

Permalink
Provider agent register and trace stream by http json request.
Browse files Browse the repository at this point in the history
  • Loading branch information
peng-yongsheng committed Nov 16, 2017
1 parent 1936716 commit 8c3fbe4
Show file tree
Hide file tree
Showing 11 changed files with 263 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.agent.stream.worker.register.ApplicationIDService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.module.ModuleNotFoundException;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.Application;
import org.skywalking.apm.network.proto.ApplicationMapping;
Expand Down Expand Up @@ -52,12 +50,7 @@ public ApplicationRegisterServiceHandler(ModuleManager moduleManager) {
ApplicationMapping.Builder builder = ApplicationMapping.newBuilder();
for (int i = 0; i < applicationCodes.size(); i++) {
String applicationCode = applicationCodes.get(i);
int applicationId = 0;
try {
applicationId = applicationIDService.getOrCreate(applicationCode);
} catch (ModuleNotFoundException | ServiceNotProvidedException e) {
logger.error(e.getMessage(), e);
}
int applicationId = applicationIDService.getOrCreate(applicationCode);

if (applicationId != 0) {
KeyWithIntegerValue value = KeyWithIntegerValue.newBuilder().setKey(applicationCode).setValue(applicationId).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

import java.util.Properties;
import org.skywalking.apm.collector.agent.AgentModule;
import org.skywalking.apm.collector.agent.jetty.handler.ApplicationRegisterServletHandler;
import org.skywalking.apm.collector.agent.jetty.handler.InstanceDiscoveryServletHandler;
import org.skywalking.apm.collector.agent.jetty.handler.ServiceNameDiscoveryServiceHandler;
import org.skywalking.apm.collector.agent.jetty.handler.TraceSegmentServletHandler;
import org.skywalking.apm.collector.agent.jetty.handler.naming.AgentJettyNamingHandler;
import org.skywalking.apm.collector.agent.jetty.handler.naming.AgentJettyNamingListener;
Expand Down Expand Up @@ -89,6 +92,9 @@ public class AgentModuleJettyProvider extends ModuleProvider {
}

private void addHandlers(Server jettyServer) {
jettyServer.addHandler(new TraceSegmentServletHandler());
jettyServer.addHandler(new TraceSegmentServletHandler(getManager()));
jettyServer.addHandler(new ApplicationRegisterServletHandler(getManager()));
jettyServer.addHandler(new InstanceDiscoveryServletHandler(getManager()));
jettyServer.addHandler(new ServiceNameDiscoveryServiceHandler(getManager()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/

package org.skywalking.apm.collector.agent.jetty.handler;

import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.io.IOException;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.agent.stream.worker.register.ApplicationIDService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author peng-yongsheng
*/
public class ApplicationRegisterServletHandler extends JettyHandler {

private final Logger logger = LoggerFactory.getLogger(ApplicationRegisterServletHandler.class);

private final ApplicationIDService applicationIDService;
private Gson gson = new Gson();
private static final String APPLICATION_CODE = "c";
private static final String APPLICATION_ID = "i";

public ApplicationRegisterServletHandler(ModuleManager moduleManager) {
this.applicationIDService = new ApplicationIDService(moduleManager);
}

@Override public String pathSpec() {
return "/application/register";
}

@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
throw new UnsupportedOperationException();
}

@Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException {
JsonArray responseArray = new JsonArray();
try {
JsonArray applicationCodes = gson.fromJson(req.getReader(), JsonArray.class);
for (int i = 0; i < applicationCodes.size(); i++) {
String applicationCode = applicationCodes.get(i).getAsString();
int applicationId = applicationIDService.getOrCreate(applicationCode);
JsonObject mapping = new JsonObject();
mapping.addProperty(APPLICATION_CODE, applicationCode);
mapping.addProperty(APPLICATION_ID, applicationId);
responseArray.add(mapping);
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
return responseArray;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/

package org.skywalking.apm.collector.agent.jetty.handler;

import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.io.IOException;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.agent.stream.worker.register.InstanceIDService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author peng-yongsheng
*/
public class InstanceDiscoveryServletHandler extends JettyHandler {

private final Logger logger = LoggerFactory.getLogger(InstanceDiscoveryServletHandler.class);

private final InstanceIDService instanceIDService;
private Gson gson = new Gson();

private static final String APPLICATION_ID = "ai";
private static final String AGENT_UUID = "au";
private static final String REGISTER_TIME = "rt";
private static final String INSTANCE_ID = "ii";
private static final String OS_INFO = "oi";

public InstanceDiscoveryServletHandler(ModuleManager moduleManager) {
this.instanceIDService = new InstanceIDService(moduleManager);
}

@Override public String pathSpec() {
return "/instance/register";
}

@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
throw new UnsupportedOperationException();
}

@Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException {
JsonObject responseJson = new JsonObject();
try {
JsonObject instance = gson.fromJson(req.getReader(), JsonObject.class);
int applicationId = instance.get(APPLICATION_ID).getAsInt();
String agentUUID = instance.get(AGENT_UUID).getAsString();
long registerTime = instance.get(REGISTER_TIME).getAsLong();
JsonObject osInfo = instance.get(OS_INFO).getAsJsonObject();

int instanceId = instanceIDService.getOrCreate(applicationId, agentUUID, registerTime, osInfo.toString());
responseJson.addProperty(APPLICATION_ID, applicationId);
responseJson.addProperty(INSTANCE_ID, instanceId);
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
return responseJson;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/

package org.skywalking.apm.collector.agent.jetty.handler;

import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.io.IOException;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.agent.stream.worker.register.ServiceNameService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author peng-yongsheng
*/
public class ServiceNameDiscoveryServiceHandler extends JettyHandler {

private final Logger logger = LoggerFactory.getLogger(ServiceNameDiscoveryServiceHandler.class);

private final ServiceNameService serviceNameService;
private Gson gson = new Gson();

private static final String APPLICATION_ID = "ai";
private static final String SERVICE_NAME = "sn";
private static final String SERVICE_ID = "si";
private static final String ELEMENT = "el";

public ServiceNameDiscoveryServiceHandler(ModuleManager moduleManager) {
this.serviceNameService = new ServiceNameService(moduleManager);
}

@Override public String pathSpec() {
return "/servicename/discovery";
}

@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
throw new UnsupportedOperationException();
}

@Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException {
JsonArray responseArray = new JsonArray();
try {
JsonArray services = gson.fromJson(req.getReader(), JsonArray.class);
for (JsonElement service : services) {
int applicationId = service.getAsJsonObject().get(APPLICATION_ID).getAsInt();
String serviceName = service.getAsJsonObject().get(SERVICE_NAME).getAsString();

int serviceId = serviceNameService.getOrCreate(applicationId, serviceName);
if (serviceId != 0) {
JsonObject responseJson = new JsonObject();
responseJson.addProperty(SERVICE_ID, serviceId);
responseJson.add(ELEMENT, service);
responseArray.add(responseJson);
}
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
return responseArray;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.skywalking.apm.collector.agent.jetty.handler.reader.TraceSegment;
import org.skywalking.apm.collector.agent.jetty.handler.reader.TraceSegmentJsonReader;
import org.skywalking.apm.collector.agent.stream.parser.SegmentParse;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
import org.slf4j.Logger;
Expand All @@ -38,6 +39,12 @@ public class TraceSegmentServletHandler extends JettyHandler {

private final Logger logger = LoggerFactory.getLogger(TraceSegmentServletHandler.class);

private final ModuleManager moduleManager;

public TraceSegmentServletHandler(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}

@Override public String pathSpec() {
return "/segments";
}
Expand All @@ -64,7 +71,7 @@ private void read(BufferedReader bufferedReader) throws IOException {

reader.beginArray();
while (reader.hasNext()) {
SegmentParse segmentParse = new SegmentParse(null);
SegmentParse segmentParse = new SegmentParse(moduleManager);
TraceSegment traceSegment = jsonReader.read(reader);
segmentParse.parse(traceSegment.getUpstreamSegment(), SegmentParse.Source.Agent);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.StringUtils;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
Expand All @@ -39,7 +40,7 @@ public enum SegmentBufferManager {
public static final String DATA_FILE_PREFIX = "data";
private FileOutputStream outputStream;

public synchronized void initialize() {
public synchronized void initialize(ModuleManager moduleManager) {
logger.info("segment buffer initialize");
try {
OffsetManager.INSTANCE.initialize();
Expand All @@ -58,7 +59,7 @@ public synchronized void initialize() {
newDataFile();
}
}
SegmentBufferReader.INSTANCE.initialize();
SegmentBufferReader.INSTANCE.initialize(moduleManager);
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.skywalking.apm.collector.agent.stream.parser.SegmentParse;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.CollectionUtils;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.StringUtils;
Expand All @@ -42,8 +43,10 @@ public enum SegmentBufferReader {

private final Logger logger = LoggerFactory.getLogger(SegmentBufferReader.class);
private InputStream inputStream;
private ModuleManager moduleManager;

public void initialize() {
public void initialize(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::preRead, 3, 3, TimeUnit.SECONDS);
}

Expand Down Expand Up @@ -117,7 +120,7 @@ private boolean read(File readFile, long readFileOffset) {

while (readFile.length() > readFileOffset && readFileOffset < endPoint) {
UpstreamSegment upstreamSegment = UpstreamSegment.parser().parseDelimitedFrom(inputStream);
SegmentParse parse = new SegmentParse(null);
SegmentParse parse = new SegmentParse(moduleManager);
if (!parse.parse(upstreamSegment, SegmentParse.Source.Buffer)) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class SegmentStandardizationWorker extends AbstractLocalAsyncWorker<Segme

public SegmentStandardizationWorker(ModuleManager moduleManager) {
super(moduleManager);
SegmentBufferManager.INSTANCE.initialize();
SegmentBufferManager.INSTANCE.initialize(moduleManager);
}

@Override public int id() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.module.ModuleNotFoundException;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.skywalking.apm.collector.storage.table.register.Application;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -45,7 +43,7 @@ public ApplicationIDService(ModuleManager moduleManager) {
this.applicationRegisterGraph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraph.APPLICATION_REGISTER_GRAPH_ID, Application.class);
}

public int getOrCreate(String applicationCode) throws ModuleNotFoundException, ServiceNotProvidedException {
public int getOrCreate(String applicationCode) {
ApplicationCacheService service = moduleManager.find(CacheModule.NAME).getService(ApplicationCacheService.class);
int applicationId = service.get(applicationCode);

Expand Down

0 comments on commit 8c3fbe4

Please sign in to comment.