Skip to content

Commit

Permalink
jvm stream finish.
Browse files Browse the repository at this point in the history
  • Loading branch information
peng-yongsheng committed Nov 14, 2017
1 parent 714b4cc commit 34aad0f
Show file tree
Hide file tree
Showing 19 changed files with 126 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.skywalking.apm.collector.remote.RemoteModule;
import org.skywalking.apm.collector.server.Server;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener;
import org.skywalking.apm.collector.stream.StreamModule;

/**
* @author peng-yongsheng
Expand Down Expand Up @@ -82,7 +82,7 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
GRPCManagerService managerService = getManager().find(GRPCManagerModule.NAME).getService(GRPCManagerService.class);
Server gRPCServer = managerService.createIfAbsent(host, port);

AgentStreamSingleton.getInstance(getManager(), new WorkerCreateListener());
AgentStreamSingleton.createInstanceIfAbsent(getManager());
addHandlers(gRPCServer);
}

Expand All @@ -91,7 +91,7 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
}

@Override public String[] requiredModules() {
return new String[] {ClusterModule.NAME, NamingModule.NAME, StorageModule.NAME, GRPCManagerModule.NAME, CacheModule.NAME, RemoteModule.NAME};
return new String[] {ClusterModule.NAME, NamingModule.NAME, StorageModule.NAME, GRPCManagerModule.NAME, CacheModule.NAME, RemoteModule.NAME, StreamModule.NAME};
}

private void addHandlers(Server gRPCServer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public JVMMetricsServiceHandler() {

request.getMetricsList().forEach(metric -> {
long time = TimeBucketUtils.INSTANCE.getSecondTimeBucket(metric.getTime());
senToInstanceHeartBeatPersistenceWorker(instanceId, metric.getTime());
sendToInstanceHeartBeatPersistenceWorker(instanceId, metric.getTime());
sendToCpuMetricPersistenceWorker(instanceId, time, metric.getCpu());
sendToMemoryMetricPersistenceWorker(instanceId, time, metric.getMemoryList());
sendToMemoryPoolMetricPersistenceWorker(instanceId, time, metric.getMemoryPoolList());
Expand All @@ -79,7 +79,7 @@ public JVMMetricsServiceHandler() {
responseObserver.onCompleted();
}

private void senToInstanceHeartBeatPersistenceWorker(int instanceId, long heartBeatTime) {
private void sendToInstanceHeartBeatPersistenceWorker(int instanceId, long heartBeatTime) {
Instance instance = new Instance(String.valueOf(instanceId));
instance.setHeartBeatTime(TimeBucketUtils.INSTANCE.getSecondTimeBucket(heartBeatTime));
instance.setInstanceId(instanceId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.skywalking.apm.collector.naming.service.NamingHandlerRegisterService;
import org.skywalking.apm.collector.server.Server;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.stream.StreamModule;

/**
* @author peng-yongsheng
Expand Down Expand Up @@ -75,22 +75,20 @@ public class AgentModuleJettyProvider extends ModuleProvider {
NamingHandlerRegisterService namingHandlerRegisterService = getManager().find(NamingModule.NAME).getService(NamingHandlerRegisterService.class);
namingHandlerRegisterService.register(new AgentJettyNamingHandler(namingListener));

DAOService daoService = getManager().find(StorageModule.NAME).getService(DAOService.class);

JettyManagerService managerService = getManager().find(JettyManagerModule.NAME).getService(JettyManagerService.class);
Server jettyServer = managerService.createIfAbsent(host, port, contextPath);
addHandlers(daoService, jettyServer);
addHandlers(jettyServer);
}

@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {

}

@Override public String[] requiredModules() {
return new String[] {ClusterModule.NAME, NamingModule.NAME, StorageModule.NAME, JettyManagerModule.NAME, CacheModule.NAME};
return new String[] {ClusterModule.NAME, NamingModule.NAME, StorageModule.NAME, JettyManagerModule.NAME, CacheModule.NAME, StreamModule.NAME};
}

private void addHandlers(DAOService daoService, Server jettyServer) {
private void addHandlers(Server jettyServer) {
jettyServer.addHandler(new TraceSegmentServletHandler());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

package org.skywalking.apm.collector.agent.stream;

import org.skywalking.apm.collector.agent.stream.graph.JvmMetricStreamGraph;
import org.skywalking.apm.collector.agent.stream.graph.RegisterStreamGraph;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.stream.timer.PersistenceTimer;
import org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener;

/**
Expand All @@ -33,24 +35,35 @@ public class AgentStreamSingleton {
private final ModuleManager moduleManager;
private final WorkerCreateListener workerCreateListener;

public AgentStreamSingleton(ModuleManager moduleManager, WorkerCreateListener workerCreateListener) {
private AgentStreamSingleton(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
this.workerCreateListener = workerCreateListener;
createJVMGraph();
createRegisterGraph();
createTraceGraph();
this.workerCreateListener = new WorkerCreateListener();
this.create();
}

public static synchronized AgentStreamSingleton getInstance(ModuleManager moduleManager,
WorkerCreateListener workerCreateListener) {
public static synchronized AgentStreamSingleton createInstanceIfAbsent(ModuleManager moduleManager) {
if (ObjectUtils.isEmpty(INSTANCE)) {
INSTANCE = new AgentStreamSingleton(moduleManager, workerCreateListener);
INSTANCE = new AgentStreamSingleton(moduleManager);
}
return INSTANCE;
}

private void createJVMGraph() {
private void create() {
createJVMGraph();
createRegisterGraph();
createTraceGraph();

PersistenceTimer timer = new PersistenceTimer();
timer.start(moduleManager, workerCreateListener.getPersistenceWorkers());
}

private void createJVMGraph() {
JvmMetricStreamGraph jvmMetricStreamGraph = new JvmMetricStreamGraph(moduleManager, workerCreateListener);
jvmMetricStreamGraph.createCpuMetricGraph();
jvmMetricStreamGraph.createGcMetricGraph();
jvmMetricStreamGraph.createMemoryMetricGraph();
jvmMetricStreamGraph.createMemoryPoolMetricGraph();
jvmMetricStreamGraph.createHeartBeatGraph();
}

private void createRegisterGraph() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,22 @@

package org.skywalking.apm.collector.agent.stream.graph;

import org.skywalking.apm.collector.agent.stream.worker.jvm.CpuMetricPersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.jvm.GCMetricPersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.jvm.InstHeartBeatPersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.jvm.MemoryMetricPersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.jvm.MemoryPoolMetricPersistenceWorker;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.queue.QueueModule;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.table.jvm.CpuMetric;
import org.skywalking.apm.collector.storage.table.jvm.GCMetric;
import org.skywalking.apm.collector.storage.table.jvm.MemoryMetric;
import org.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetric;
import org.skywalking.apm.collector.storage.table.register.Instance;
import org.skywalking.apm.collector.stream.worker.base.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener;

/**
* @author peng-yongsheng
Expand All @@ -38,28 +46,56 @@ public class JvmMetricStreamGraph {
public static final int CPU_METRIC_GRAPH_ID = 103;
public static final int INST_HEART_BEAT_GRAPH_ID = 104;

private final ModuleManager moduleManager;
private final WorkerCreateListener workerCreateListener;

public JvmMetricStreamGraph(ModuleManager moduleManager, WorkerCreateListener workerCreateListener) {
this.moduleManager = moduleManager;
this.workerCreateListener = workerCreateListener;
}

@SuppressWarnings("unchecked")
public Graph<GCMetric> createGcMetricGraph() {
QueueCreatorService<GCMetric> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);

Graph<GCMetric> graph = GraphManager.INSTANCE.createIfAbsent(GC_METRIC_GRAPH_ID, GCMetric.class);
graph.addNode(new GCMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
return graph;
}

public Graph<CpuMetric> createCpuMetricGraph() throws ProviderNotFoundException {
@SuppressWarnings("unchecked")
public Graph<CpuMetric> createCpuMetricGraph() {
QueueCreatorService<CpuMetric> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);

Graph<CpuMetric> graph = GraphManager.INSTANCE.createIfAbsent(CPU_METRIC_GRAPH_ID, CpuMetric.class);
graph.addNode(new CpuMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
return graph;
}

@SuppressWarnings("unchecked")
public Graph<MemoryMetric> createMemoryMetricGraph() {
QueueCreatorService<MemoryMetric> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);

Graph<MemoryMetric> graph = GraphManager.INSTANCE.createIfAbsent(MEMORY_METRIC_GRAPH_ID, MemoryMetric.class);
graph.addNode(new MemoryMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
return graph;
}

@SuppressWarnings("unchecked")
public Graph<MemoryPoolMetric> createMemoryPoolMetricGraph() {
QueueCreatorService<MemoryPoolMetric> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);

Graph<MemoryPoolMetric> graph = GraphManager.INSTANCE.createIfAbsent(MEMORY_POOL_METRIC_GRAPH_ID, MemoryPoolMetric.class);
graph.addNode(new MemoryPoolMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
return graph;
}

@SuppressWarnings("unchecked")
public Graph<Instance> createHeartBeatGraph() {
QueueCreatorService<Instance> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);

Graph<Instance> graph = GraphManager.INSTANCE.createIfAbsent(INST_HEART_BEAT_GRAPH_ID, Instance.class);
graph.addNode(new InstHeartBeatPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
return graph;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.remote.RemoteModule;
import org.skywalking.apm.collector.remote.service.RemoteSenderService;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.table.register.Application;
import org.skywalking.apm.collector.storage.table.register.Instance;
import org.skywalking.apm.collector.storage.table.register.ServiceName;
Expand All @@ -57,7 +55,6 @@ public RegisterStreamGraph(ModuleManager moduleManager, WorkerCreateListener wor

@SuppressWarnings("unchecked")
public Graph<Application> createApplicationRegisterGraph() {
DAOService daoService = moduleManager.find(StorageModule.NAME).getService(DAOService.class);
RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class);

QueueCreatorService<Application> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
Expand All @@ -70,7 +67,6 @@ public Graph<Application> createApplicationRegisterGraph() {

@SuppressWarnings("unchecked")
public Graph<Instance> createInstanceRegisterGraph() {
DAOService daoService = moduleManager.find(StorageModule.NAME).getService(DAOService.class);
RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class);

QueueCreatorService<Instance> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
Expand All @@ -83,7 +79,6 @@ public Graph<Instance> createInstanceRegisterGraph() {

@SuppressWarnings("unchecked")
public Graph<ServiceName> createServiceNameRegisterGraph() {
DAOService daoService = moduleManager.find(StorageModule.NAME).getService(DAOService.class);
RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class);

QueueCreatorService<ServiceName> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.skywalking.apm.collector.agent.stream.graph.RegisterStreamGraph;
import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.cache.service.ApplicationCacheService;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.module.ModuleNotFoundException;
Expand All @@ -37,12 +38,13 @@ public class ApplicationIDService {
private final Logger logger = LoggerFactory.getLogger(ApplicationIDService.class);

private final ModuleManager moduleManager;
private final Graph<Application> applicationRegisterGraph;

public ApplicationIDService(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
this.applicationRegisterGraph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraph.APPLICATION_REGISTER_GRAPH_ID, Application.class);
}

@SuppressWarnings("unchecked")
public int getOrCreate(String applicationCode) throws ModuleNotFoundException, ServiceNotProvidedException {
ApplicationCacheService service = moduleManager.find(CacheModule.NAME).getService(ApplicationCacheService.class);
int applicationId = service.get(applicationCode);
Expand All @@ -52,7 +54,7 @@ public int getOrCreate(String applicationCode) throws ModuleNotFoundException, S
application.setApplicationCode(applicationCode);
application.setApplicationId(0);

GraphManager.INSTANCE.findGraph(RegisterStreamGraph.APPLICATION_REGISTER_GRAPH_ID).start(application);
applicationRegisterGraph.start(application);
}
return applicationId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.skywalking.apm.collector.agent.stream.graph.RegisterStreamGraph;
import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.cache.service.InstanceCacheService;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.module.ModuleNotFoundException;
Expand All @@ -40,12 +41,13 @@ public class InstanceIDService {
private final Logger logger = LoggerFactory.getLogger(InstanceIDService.class);

private final ModuleManager moduleManager;
private final Graph<Instance> instanceRegisterGraph;

public InstanceIDService(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
this.instanceRegisterGraph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraph.INSTANCE_REGISTER_GRAPH_ID, Instance.class);
}

@SuppressWarnings("unchecked")
public int getOrCreate(int applicationId, String agentUUID, long registerTime,
String osInfo) throws ModuleNotFoundException, ServiceNotProvidedException {
logger.debug("get or create instance id, application id: {}, agentUUID: {}, registerTime: {}, osInfo: {}", applicationId, agentUUID, registerTime, osInfo);
Expand All @@ -61,7 +63,7 @@ public int getOrCreate(int applicationId, String agentUUID, long registerTime,
instance.setInstanceId(0);
instance.setOsInfo(osInfo);

GraphManager.INSTANCE.findGraph(RegisterStreamGraph.INSTANCE_REGISTER_GRAPH_ID).start(instance);
instanceRegisterGraph.start(instance);
}
return instanceId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.skywalking.apm.collector.agent.stream.graph.RegisterStreamGraph;
import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.cache.service.ServiceIdCacheService;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.storage.table.register.ServiceName;
Expand All @@ -35,12 +36,13 @@ public class ServiceNameService {
private final Logger logger = LoggerFactory.getLogger(ServiceNameService.class);

private final ModuleManager moduleManager;
private final Graph<ServiceName> serviceNameRegisterGraph;

public ServiceNameService(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
this.serviceNameRegisterGraph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraph.APPLICATION_REGISTER_GRAPH_ID, ServiceName.class);
}

@SuppressWarnings("unchecked")
public int getOrCreate(int applicationId, String serviceName) {
ServiceIdCacheService idCacheService = moduleManager.find(CacheModule.NAME).getService(ServiceIdCacheService.class);
int serviceId = idCacheService.get(applicationId, serviceName);
Expand All @@ -51,7 +53,7 @@ public int getOrCreate(int applicationId, String serviceName) {
service.setServiceName(serviceName);
service.setServiceId(0);

GraphManager.INSTANCE.findGraph(RegisterStreamGraph.SERVICE_NAME_REGISTER_GRAPH_ID).start(service);
serviceNameRegisterGraph.start(service);
}
return serviceId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
~ Project repository: https://github.com/OpenSkywalking/skywalking
-->

<Configuration status="info">
<Configuration status="debug">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout charset="UTF-8" pattern="%d - %c -%-4r [%t] %-5p %x - %m%n"/>
Expand All @@ -26,9 +26,9 @@
<Loggers>
<logger name="org.eclipse.jetty" level="INFO"/>
<logger name="org.apache.zookeeper" level="INFO"/>
<logger name="org.skywalking.apm.collector.agentstream.worker.storage.PersistenceTimer" level="INFO"/>
<logger name="io.grpc.netty.NettyServerHandler" level="INFO"/>
<Root level="info">
<logger name="org.skywalking.apm.collector.agentstream.worker.storage.PersistenceTimer" level="debug"/>
<logger name="io.grpc.netty" level="INFO"/>
<Root level="debug">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ jetty_manager:
jetty:
gRPC_manager:
gRPC:
stream:
worker:
storage:
h2:
url: jdbc:h2:~/memorydb
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.skywalking.apm.collector.core.module.Module;
import org.skywalking.apm.collector.storage.service.DAOService;
import org.skywalking.apm.collector.storage.base.dao.IBatchDAO;

/**
* @author peng-yongsheng
Expand All @@ -33,6 +34,6 @@ public class StorageModule extends Module {
}

@Override public Class[] services() {
return new Class[] {DAOService.class};
return new Class[] {DAOService.class, IBatchDAO.class};
}
}

0 comments on commit 34aad0f

Please sign in to comment.