Skip to content

Commit

Permalink
Change apm-collector-stream from collector module to common codes.
Browse files Browse the repository at this point in the history
Make agent gRPC to be a collector module.
Make agent jetty to be a collector module.
Make agent stream to be a collector module.
  • Loading branch information
peng-yongsheng committed Nov 17, 2017
1 parent 8658ca7 commit f3e85ef
Show file tree
Hide file tree
Showing 177 changed files with 1,218 additions and 373 deletions.
Expand Up @@ -21,13 +21,13 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-agent</artifactId>
<artifactId>apm-collector-agent-grpc</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2.5-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>collector-agent-define</artifactId>
<artifactId>collector-agent-grpc-define</artifactId>
<packaging>jar</packaging>

</project>
</project>
Expand Up @@ -16,16 +16,16 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/

package org.skywalking.apm.collector.agent;
package org.skywalking.apm.collector.agent.grpc;

import org.skywalking.apm.collector.core.module.Module;

/**
* @author peng-yongsheng
*/
public class AgentModule extends Module {
public class AgentGRPCModule extends Module {

public static final String NAME = "agent";
public static final String NAME = "agent_gRPC";

@Override public String name() {
return NAME;
Expand Down
Expand Up @@ -16,4 +16,4 @@
# Project repository: https://github.com/OpenSkywalking/skywalking
#

org.skywalking.apm.collector.stream.StreamModule
org.skywalking.apm.collector.agent.grpc.AgentGRPCModule
@@ -0,0 +1,60 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2017, OpenSkywalking Organization All rights reserved.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
~ Project repository: https://github.com/OpenSkywalking/skywalking
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-agent-grpc</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2.5-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>collector-agent-grpc-provider</artifactId>
<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-agent-grpc-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-grpc-manager-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-cluster-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-naming-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-agent-stream-define</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
Expand Up @@ -19,16 +19,14 @@
package org.skywalking.apm.collector.agent.grpc;

import java.util.Properties;
import org.skywalking.apm.collector.agent.AgentModule;
import org.skywalking.apm.collector.agent.grpc.handler.ApplicationRegisterServiceHandler;
import org.skywalking.apm.collector.agent.grpc.handler.InstanceDiscoveryServiceHandler;
import org.skywalking.apm.collector.agent.grpc.handler.JVMMetricsServiceHandler;
import org.skywalking.apm.collector.agent.grpc.handler.ServiceNameDiscoveryServiceHandler;
import org.skywalking.apm.collector.agent.grpc.handler.TraceSegmentServiceHandler;
import org.skywalking.apm.collector.agent.grpc.handler.naming.AgentGRPCNamingHandler;
import org.skywalking.apm.collector.agent.grpc.handler.naming.AgentGRPCNamingListener;
import org.skywalking.apm.collector.agent.stream.AgentStreamSingleton;
import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.skywalking.apm.collector.cluster.ClusterModule;
import org.skywalking.apm.collector.cluster.service.ModuleListenerService;
import org.skywalking.apm.collector.cluster.service.ModuleRegisterService;
Expand All @@ -39,10 +37,7 @@
import org.skywalking.apm.collector.grpc.manager.service.GRPCManagerService;
import org.skywalking.apm.collector.naming.NamingModule;
import org.skywalking.apm.collector.naming.service.NamingHandlerRegisterService;
import org.skywalking.apm.collector.remote.RemoteModule;
import org.skywalking.apm.collector.server.Server;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.stream.StreamModule;

/**
* @author peng-yongsheng
Expand All @@ -58,7 +53,7 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
}

@Override public Class<? extends Module> module() {
return AgentModule.class;
return AgentGRPCModule.class;
}

@Override public void prepare(Properties config) throws ServiceNotProvidedException {
Expand All @@ -70,7 +65,7 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
Integer port = (Integer)config.get(PORT);

ModuleRegisterService moduleRegisterService = getManager().find(ClusterModule.NAME).getService(ModuleRegisterService.class);
moduleRegisterService.register(AgentModule.NAME, this.name(), new AgentModuleGRPCRegistration(host, port));
moduleRegisterService.register(AgentGRPCModule.NAME, this.name(), new AgentModuleGRPCRegistration(host, port));

AgentGRPCNamingListener namingListener = new AgentGRPCNamingListener();
ModuleListenerService moduleListenerService = getManager().find(ClusterModule.NAME).getService(ModuleListenerService.class);
Expand All @@ -82,7 +77,6 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
GRPCManagerService managerService = getManager().find(GRPCManagerModule.NAME).getService(GRPCManagerService.class);
Server gRPCServer = managerService.createIfAbsent(host, port);

AgentStreamSingleton.createInstanceIfAbsent(getManager());
addHandlers(gRPCServer);
}

Expand All @@ -91,14 +85,14 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
}

@Override public String[] requiredModules() {
return new String[] {ClusterModule.NAME, NamingModule.NAME, StorageModule.NAME, GRPCManagerModule.NAME, CacheModule.NAME, RemoteModule.NAME, StreamModule.NAME};
return new String[] {ClusterModule.NAME, NamingModule.NAME, GRPCManagerModule.NAME, AgentStreamModule.NAME};
}

private void addHandlers(Server gRPCServer) {
gRPCServer.addHandler(new ApplicationRegisterServiceHandler(getManager()));
gRPCServer.addHandler(new InstanceDiscoveryServiceHandler(getManager()));
gRPCServer.addHandler(new ServiceNameDiscoveryServiceHandler(getManager()));
gRPCServer.addHandler(new JVMMetricsServiceHandler());
gRPCServer.addHandler(new JVMMetricsServiceHandler(getManager()));
gRPCServer.addHandler(new TraceSegmentServiceHandler(getManager()));
}
}
Expand Up @@ -20,7 +20,8 @@

import com.google.protobuf.ProtocolStringList;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.agent.stream.worker.register.ApplicationIDService;
import org.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.skywalking.apm.collector.agent.stream.service.register.IApplicationIDService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.Application;
Expand All @@ -37,10 +38,10 @@ public class ApplicationRegisterServiceHandler extends ApplicationRegisterServic

private final Logger logger = LoggerFactory.getLogger(ApplicationRegisterServiceHandler.class);

private final ApplicationIDService applicationIDService;
private final IApplicationIDService applicationIDService;

public ApplicationRegisterServiceHandler(ModuleManager moduleManager) {
applicationIDService = new ApplicationIDService(moduleManager);
applicationIDService = moduleManager.find(AgentStreamModule.NAME).getService(IApplicationIDService.class);
}

@Override public void register(Application request, StreamObserver<ApplicationMapping> responseObserver) {
Expand Down
Expand Up @@ -21,7 +21,8 @@
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.agent.stream.worker.register.InstanceIDService;
import org.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.skywalking.apm.collector.agent.stream.service.register.IInstanceIDService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
Expand All @@ -41,10 +42,10 @@ public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrp

private final Logger logger = LoggerFactory.getLogger(InstanceDiscoveryServiceHandler.class);

private final InstanceIDService instanceIDService;
private final IInstanceIDService instanceIDService;

public InstanceDiscoveryServiceHandler(ModuleManager moduleManager) {
this.instanceIDService = new InstanceIDService(moduleManager);
this.instanceIDService = moduleManager.find(AgentStreamModule.NAME).getService(IInstanceIDService.class);
}

@Override
Expand Down
@@ -0,0 +1,101 @@
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/

package org.skywalking.apm.collector.agent.grpc.handler;

import io.grpc.stub.StreamObserver;
import java.util.List;
import org.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.skywalking.apm.collector.agent.stream.service.jvm.ICpuMetricService;
import org.skywalking.apm.collector.agent.stream.service.jvm.IGCMetricService;
import org.skywalking.apm.collector.agent.stream.service.jvm.IInstanceHeartBeatService;
import org.skywalking.apm.collector.agent.stream.service.jvm.IMemoryMetricService;
import org.skywalking.apm.collector.agent.stream.service.jvm.IMemoryPoolMetricService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.CPU;
import org.skywalking.apm.network.proto.Downstream;
import org.skywalking.apm.network.proto.GC;
import org.skywalking.apm.network.proto.JVMMetrics;
import org.skywalking.apm.network.proto.JVMMetricsServiceGrpc;
import org.skywalking.apm.network.proto.Memory;
import org.skywalking.apm.network.proto.MemoryPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author peng-yongsheng
*/
public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsServiceImplBase implements GRPCHandler {

private final Logger logger = LoggerFactory.getLogger(JVMMetricsServiceHandler.class);

private final ICpuMetricService cpuMetricService;
private final IGCMetricService gcMetricService;
private final IMemoryMetricService memoryMetricService;
private final IMemoryPoolMetricService memoryPoolMetricService;
private final IInstanceHeartBeatService instanceHeartBeatService;

public JVMMetricsServiceHandler(ModuleManager moduleManager) {
this.cpuMetricService = moduleManager.find(AgentStreamModule.NAME).getService(ICpuMetricService.class);
this.gcMetricService = moduleManager.find(AgentStreamModule.NAME).getService(IGCMetricService.class);
this.memoryMetricService = moduleManager.find(AgentStreamModule.NAME).getService(IMemoryMetricService.class);
this.memoryPoolMetricService = moduleManager.find(AgentStreamModule.NAME).getService(IMemoryPoolMetricService.class);
this.instanceHeartBeatService = moduleManager.find(AgentStreamModule.NAME).getService(IInstanceHeartBeatService.class);
}

@Override public void collect(JVMMetrics request, StreamObserver<Downstream> responseObserver) {
int instanceId = request.getApplicationInstanceId();
logger.debug("receive the jvm metric from application instance, id: {}", instanceId);

request.getMetricsList().forEach(metric -> {
long time = TimeBucketUtils.INSTANCE.getSecondTimeBucket(metric.getTime());
sendToInstanceHeartBeatService(instanceId, metric.getTime());
sendToCpuMetricService(instanceId, time, metric.getCpu());
sendToMemoryMetricService(instanceId, time, metric.getMemoryList());
sendToMemoryPoolMetricService(instanceId, time, metric.getMemoryPoolList());
sendToGCMetricService(instanceId, time, metric.getGcList());
});

responseObserver.onNext(Downstream.newBuilder().build());
responseObserver.onCompleted();
}

private void sendToInstanceHeartBeatService(int instanceId, long heartBeatTime) {
instanceHeartBeatService.send(instanceId, heartBeatTime);
}

private void sendToMemoryMetricService(int instanceId, long timeBucket, List<Memory> memories) {
memories.forEach(memory -> memoryMetricService.send(instanceId, timeBucket, memory.getIsHeap(), memory.getInit(), memory.getMax(), memory.getUsed(), memory.getCommitted()));
}

private void sendToMemoryPoolMetricService(int instanceId, long timeBucket,
List<MemoryPool> memoryPools) {

memoryPools.forEach(memoryPool -> memoryPoolMetricService.send(instanceId, timeBucket, memoryPool.getType().getNumber(), memoryPool.getInit(), memoryPool.getMax(), memoryPool.getUsed(), memoryPool.getCommited()));
}

private void sendToCpuMetricService(int instanceId, long timeBucket, CPU cpu) {
cpuMetricService.send(instanceId, timeBucket, cpu.getUsagePercent());
}

private void sendToGCMetricService(int instanceId, long timeBucket, List<GC> gcs) {
gcs.forEach(gc -> gcMetricService.send(instanceId, timeBucket, gc.getPhraseValue(), gc.getCount(), gc.getTime()));
}
}
Expand Up @@ -20,7 +20,8 @@

import io.grpc.stub.StreamObserver;
import java.util.List;
import org.skywalking.apm.collector.agent.stream.worker.register.ServiceNameService;
import org.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.skywalking.apm.collector.agent.stream.service.register.IServiceNameService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.ServiceNameCollection;
Expand All @@ -38,10 +39,10 @@ public class ServiceNameDiscoveryServiceHandler extends ServiceNameDiscoveryServ

private final Logger logger = LoggerFactory.getLogger(ServiceNameDiscoveryServiceHandler.class);

private final ServiceNameService serviceNameService;
private final IServiceNameService serviceNameService;

public ServiceNameDiscoveryServiceHandler(ModuleManager moduleManager) {
this.serviceNameService = new ServiceNameService(moduleManager);
this.serviceNameService = moduleManager.find(AgentStreamModule.NAME).getService(IServiceNameService.class);
}

@Override public void discovery(ServiceNameCollection request,
Expand Down
Expand Up @@ -19,7 +19,8 @@
package org.skywalking.apm.collector.agent.grpc.handler;

import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.agent.stream.parser.SegmentParse;
import org.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.skywalking.apm.collector.agent.stream.service.trace.ITraceSegmentService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.Downstream;
Expand All @@ -35,18 +36,17 @@ public class TraceSegmentServiceHandler extends TraceSegmentServiceGrpc.TraceSeg

private final Logger logger = LoggerFactory.getLogger(TraceSegmentServiceHandler.class);

private final ModuleManager moduleManager;
private final ITraceSegmentService traceSegmentService;

public TraceSegmentServiceHandler(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
this.traceSegmentService = moduleManager.find(AgentStreamModule.NAME).getService(ITraceSegmentService.class);
}

@Override public StreamObserver<UpstreamSegment> collect(StreamObserver<Downstream> responseObserver) {
return new StreamObserver<UpstreamSegment>() {
@Override public void onNext(UpstreamSegment segment) {
logger.debug("receive segment");
SegmentParse segmentParse = new SegmentParse(moduleManager);
segmentParse.parse(segment, SegmentParse.Source.Agent);
traceSegmentService.send(segment);
}

@Override public void onError(Throwable throwable) {
Expand Down
Expand Up @@ -37,8 +37,7 @@ public AgentGRPCNamingHandler(AgentGRPCNamingListener namingListener) {
}

@Override public String pathSpec() {
// return "/agent/gRPC";
return "/agentstream/grpc";
return "/agent/gRPC";
}

@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
Expand Down

0 comments on commit f3e85ef

Please sign in to comment.