Skip to content

Commit

Permalink
Merge branch 'feature/collector-modelization' into feature/agent-memo…
Browse files Browse the repository at this point in the history
…ry-control
  • Loading branch information
wu-sheng committed Nov 15, 2017
2 parents 41e459d + b744d40 commit 2d80d8d
Show file tree
Hide file tree
Showing 39 changed files with 414 additions and 145 deletions.
4 changes: 2 additions & 2 deletions README.md
Expand Up @@ -21,8 +21,8 @@ Sky Walking | [中文](README_ZH.md)
* The UI released on [skywalking-ui](https://github.com/OpenSkywalking/sky-walking-ui)

# Architecture
* Architecture graph for 3.2+
<img src="https://skywalkingtest.github.io/page-resources/3.x-architecture.jpg"/>
* Architecture graph for 3.2.5+
<img src="https://skywalkingtest.github.io/page-resources/3.2.5%2b_architecture.jpg"/>

# Document
[![EN doc](https://img.shields.io/badge/document-English-blue.svg)](docs/README.md) [![cn doc](https://img.shields.io/badge/document-中文-blue.svg)](docs/README_ZH.md)
Expand Down
2 changes: 1 addition & 1 deletion README_ZH.md
Expand Up @@ -25,7 +25,7 @@ Sky Walking | [English](README.md)

# Architecture
* 3.2+版本架构图
<img src="https://skywalkingtest.github.io/page-resources/3.x-architecture.jpg"/>
<img src="https://skywalkingtest.github.io/page-resources/3.2.5%2b_architecture.jpg"/>

# Document
[![EN doc](https://img.shields.io/badge/document-English-blue.svg)](docs/README.md) [![cn doc](https://img.shields.io/badge/document-中文-blue.svg)](docs/README_ZH.md)
Expand Down
Expand Up @@ -42,7 +42,7 @@
import org.skywalking.apm.collector.remote.RemoteModule;
import org.skywalking.apm.collector.server.Server;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener;
import org.skywalking.apm.collector.stream.StreamModule;

/**
* @author peng-yongsheng
Expand Down Expand Up @@ -82,7 +82,7 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
GRPCManagerService managerService = getManager().find(GRPCManagerModule.NAME).getService(GRPCManagerService.class);
Server gRPCServer = managerService.createIfAbsent(host, port);

AgentStreamSingleton.getInstance(getManager(), new WorkerCreateListener());
AgentStreamSingleton.createInstanceIfAbsent(getManager());
addHandlers(gRPCServer);
}

Expand All @@ -91,7 +91,7 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
}

@Override public String[] requiredModules() {
return new String[] {ClusterModule.NAME, NamingModule.NAME, StorageModule.NAME, GRPCManagerModule.NAME, CacheModule.NAME, RemoteModule.NAME};
return new String[] {ClusterModule.NAME, NamingModule.NAME, StorageModule.NAME, GRPCManagerModule.NAME, CacheModule.NAME, RemoteModule.NAME, StreamModule.NAME};
}

private void addHandlers(Server gRPCServer) {
Expand Down
Expand Up @@ -68,7 +68,7 @@ public JVMMetricsServiceHandler() {

request.getMetricsList().forEach(metric -> {
long time = TimeBucketUtils.INSTANCE.getSecondTimeBucket(metric.getTime());
senToInstanceHeartBeatPersistenceWorker(instanceId, metric.getTime());
sendToInstanceHeartBeatPersistenceWorker(instanceId, metric.getTime());
sendToCpuMetricPersistenceWorker(instanceId, time, metric.getCpu());
sendToMemoryMetricPersistenceWorker(instanceId, time, metric.getMemoryList());
sendToMemoryPoolMetricPersistenceWorker(instanceId, time, metric.getMemoryPoolList());
Expand All @@ -79,7 +79,7 @@ public JVMMetricsServiceHandler() {
responseObserver.onCompleted();
}

private void senToInstanceHeartBeatPersistenceWorker(int instanceId, long heartBeatTime) {
private void sendToInstanceHeartBeatPersistenceWorker(int instanceId, long heartBeatTime) {
Instance instance = new Instance(String.valueOf(instanceId));
instance.setHeartBeatTime(TimeBucketUtils.INSTANCE.getSecondTimeBucket(heartBeatTime));
instance.setInstanceId(instanceId);
Expand Down
Expand Up @@ -35,7 +35,6 @@ public class ApplicationRegisterServiceHandlerTestCase {

private ApplicationRegisterServiceGrpc.ApplicationRegisterServiceBlockingStub stub;

//@Test
public void testRegister() {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 11800).usePlaintext(true).build();
stub = ApplicationRegisterServiceGrpc.newBlockingStub(channel);
Expand Down
Expand Up @@ -36,7 +36,7 @@
import org.skywalking.apm.collector.naming.service.NamingHandlerRegisterService;
import org.skywalking.apm.collector.server.Server;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.stream.StreamModule;

/**
* @author peng-yongsheng
Expand Down Expand Up @@ -75,22 +75,20 @@ public class AgentModuleJettyProvider extends ModuleProvider {
NamingHandlerRegisterService namingHandlerRegisterService = getManager().find(NamingModule.NAME).getService(NamingHandlerRegisterService.class);
namingHandlerRegisterService.register(new AgentJettyNamingHandler(namingListener));

DAOService daoService = getManager().find(StorageModule.NAME).getService(DAOService.class);

JettyManagerService managerService = getManager().find(JettyManagerModule.NAME).getService(JettyManagerService.class);
Server jettyServer = managerService.createIfAbsent(host, port, contextPath);
addHandlers(daoService, jettyServer);
addHandlers(jettyServer);
}

@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {

}

@Override public String[] requiredModules() {
return new String[] {ClusterModule.NAME, NamingModule.NAME, StorageModule.NAME, JettyManagerModule.NAME, CacheModule.NAME};
return new String[] {ClusterModule.NAME, NamingModule.NAME, StorageModule.NAME, JettyManagerModule.NAME, CacheModule.NAME, StreamModule.NAME};
}

private void addHandlers(DAOService daoService, Server jettyServer) {
private void addHandlers(Server jettyServer) {
jettyServer.addHandler(new TraceSegmentServletHandler());
}
}
Expand Up @@ -18,9 +18,12 @@

package org.skywalking.apm.collector.agent.stream;

import org.skywalking.apm.collector.agent.stream.graph.JvmMetricStreamGraph;
import org.skywalking.apm.collector.agent.stream.graph.RegisterStreamGraph;
import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.stream.timer.PersistenceTimer;
import org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener;

/**
Expand All @@ -33,24 +36,35 @@ public class AgentStreamSingleton {
private final ModuleManager moduleManager;
private final WorkerCreateListener workerCreateListener;

public AgentStreamSingleton(ModuleManager moduleManager, WorkerCreateListener workerCreateListener) {
private AgentStreamSingleton(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
this.workerCreateListener = workerCreateListener;
createJVMGraph();
createRegisterGraph();
createTraceGraph();
this.workerCreateListener = new WorkerCreateListener();
this.create();
}

public static synchronized AgentStreamSingleton getInstance(ModuleManager moduleManager,
WorkerCreateListener workerCreateListener) {
public static synchronized AgentStreamSingleton createInstanceIfAbsent(ModuleManager moduleManager) {
if (ObjectUtils.isEmpty(INSTANCE)) {
INSTANCE = new AgentStreamSingleton(moduleManager, workerCreateListener);
INSTANCE = new AgentStreamSingleton(moduleManager);
}
return INSTANCE;
}

private void createJVMGraph() {
private void create() {
createJVMGraph();
createRegisterGraph();
createTraceGraph();

PersistenceTimer timer = new PersistenceTimer();
timer.start(moduleManager, workerCreateListener.getPersistenceWorkers());
}

private void createJVMGraph() {
JvmMetricStreamGraph jvmMetricStreamGraph = new JvmMetricStreamGraph(moduleManager, workerCreateListener);
jvmMetricStreamGraph.createCpuMetricGraph();
jvmMetricStreamGraph.createGcMetricGraph();
jvmMetricStreamGraph.createMemoryMetricGraph();
jvmMetricStreamGraph.createMemoryPoolMetricGraph();
jvmMetricStreamGraph.createHeartBeatGraph();
}

private void createRegisterGraph() {
Expand All @@ -61,6 +75,16 @@ private void createRegisterGraph() {
}

private void createTraceGraph() {

TraceStreamGraph traceStreamGraph = new TraceStreamGraph(moduleManager, workerCreateListener);
traceStreamGraph.createSegmentStandardizationGraph();
traceStreamGraph.createGlobalTraceGraph();
traceStreamGraph.createInstPerformanceGraph();
traceStreamGraph.createNodeComponentGraph();
traceStreamGraph.createNodeMappingGraph();
traceStreamGraph.createNodeReferenceGraph();
traceStreamGraph.createServiceEntryGraph();
traceStreamGraph.createServiceReferenceGraph();
traceStreamGraph.createSegmentGraph();
traceStreamGraph.createSegmentCostGraph();
}
}
Expand Up @@ -18,14 +18,22 @@

package org.skywalking.apm.collector.agent.stream.graph;

import org.skywalking.apm.collector.agent.stream.worker.jvm.CpuMetricPersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.jvm.GCMetricPersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.jvm.InstHeartBeatPersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.jvm.MemoryMetricPersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.jvm.MemoryPoolMetricPersistenceWorker;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.queue.QueueModule;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.table.jvm.CpuMetric;
import org.skywalking.apm.collector.storage.table.jvm.GCMetric;
import org.skywalking.apm.collector.storage.table.jvm.MemoryMetric;
import org.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetric;
import org.skywalking.apm.collector.storage.table.register.Instance;
import org.skywalking.apm.collector.stream.worker.base.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener;

/**
* @author peng-yongsheng
Expand All @@ -38,28 +46,56 @@ public class JvmMetricStreamGraph {
public static final int CPU_METRIC_GRAPH_ID = 103;
public static final int INST_HEART_BEAT_GRAPH_ID = 104;

private final ModuleManager moduleManager;
private final WorkerCreateListener workerCreateListener;

public JvmMetricStreamGraph(ModuleManager moduleManager, WorkerCreateListener workerCreateListener) {
this.moduleManager = moduleManager;
this.workerCreateListener = workerCreateListener;
}

@SuppressWarnings("unchecked")
public Graph<GCMetric> createGcMetricGraph() {
QueueCreatorService<GCMetric> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);

Graph<GCMetric> graph = GraphManager.INSTANCE.createIfAbsent(GC_METRIC_GRAPH_ID, GCMetric.class);
graph.addNode(new GCMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
return graph;
}

public Graph<CpuMetric> createCpuMetricGraph() throws ProviderNotFoundException {
@SuppressWarnings("unchecked")
public Graph<CpuMetric> createCpuMetricGraph() {
QueueCreatorService<CpuMetric> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);

Graph<CpuMetric> graph = GraphManager.INSTANCE.createIfAbsent(CPU_METRIC_GRAPH_ID, CpuMetric.class);
graph.addNode(new CpuMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
return graph;
}

@SuppressWarnings("unchecked")
public Graph<MemoryMetric> createMemoryMetricGraph() {
QueueCreatorService<MemoryMetric> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);

Graph<MemoryMetric> graph = GraphManager.INSTANCE.createIfAbsent(MEMORY_METRIC_GRAPH_ID, MemoryMetric.class);
graph.addNode(new MemoryMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
return graph;
}

@SuppressWarnings("unchecked")
public Graph<MemoryPoolMetric> createMemoryPoolMetricGraph() {
QueueCreatorService<MemoryPoolMetric> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);

Graph<MemoryPoolMetric> graph = GraphManager.INSTANCE.createIfAbsent(MEMORY_POOL_METRIC_GRAPH_ID, MemoryPoolMetric.class);
graph.addNode(new MemoryPoolMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
return graph;
}

@SuppressWarnings("unchecked")
public Graph<Instance> createHeartBeatGraph() {
QueueCreatorService<Instance> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);

Graph<Instance> graph = GraphManager.INSTANCE.createIfAbsent(INST_HEART_BEAT_GRAPH_ID, Instance.class);
graph.addNode(new InstHeartBeatPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
return graph;
}
}
Expand Up @@ -31,8 +31,6 @@
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.remote.RemoteModule;
import org.skywalking.apm.collector.remote.service.RemoteSenderService;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.register.Application;
import org.skywalking.apm.collector.storage.table.register.Instance;
import org.skywalking.apm.collector.storage.table.register.ServiceName;
Expand All @@ -57,7 +55,6 @@ public RegisterStreamGraph(ModuleManager moduleManager, WorkerCreateListener wor

@SuppressWarnings("unchecked")
public Graph<Application> createApplicationRegisterGraph() {
DAOService daoService = moduleManager.find(StorageModule.NAME).getService(DAOService.class);
RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class);

QueueCreatorService<Application> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
Expand All @@ -70,7 +67,6 @@ public Graph<Application> createApplicationRegisterGraph() {

@SuppressWarnings("unchecked")
public Graph<Instance> createInstanceRegisterGraph() {
DAOService daoService = moduleManager.find(StorageModule.NAME).getService(DAOService.class);
RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class);

QueueCreatorService<Instance> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
Expand All @@ -83,7 +79,6 @@ public Graph<Instance> createInstanceRegisterGraph() {

@SuppressWarnings("unchecked")
public Graph<ServiceName> createServiceNameRegisterGraph() {
DAOService daoService = moduleManager.find(StorageModule.NAME).getService(DAOService.class);
RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class);

QueueCreatorService<ServiceName> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
Expand Down

0 comments on commit 2d80d8d

Please sign in to comment.