Skip to content

Commit

Permalink
Trace segment stream finish.
Browse files Browse the repository at this point in the history
  • Loading branch information
peng-yongsheng committed Nov 14, 2017
1 parent 34aad0f commit b744d40
Show file tree
Hide file tree
Showing 12 changed files with 252 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.skywalking.apm.collector.agent.stream.graph.JvmMetricStreamGraph;
import org.skywalking.apm.collector.agent.stream.graph.RegisterStreamGraph;
import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.stream.timer.PersistenceTimer;
Expand Down Expand Up @@ -74,6 +75,16 @@ private void createRegisterGraph() {
}

private void createTraceGraph() {

TraceStreamGraph traceStreamGraph = new TraceStreamGraph(moduleManager, workerCreateListener);
traceStreamGraph.createSegmentStandardizationGraph();
traceStreamGraph.createGlobalTraceGraph();
traceStreamGraph.createInstPerformanceGraph();
traceStreamGraph.createNodeComponentGraph();
traceStreamGraph.createNodeMappingGraph();
traceStreamGraph.createNodeReferenceGraph();
traceStreamGraph.createServiceEntryGraph();
traceStreamGraph.createServiceReferenceGraph();
traceStreamGraph.createSegmentGraph();
traceStreamGraph.createSegmentCostGraph();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/

package org.skywalking.apm.collector.agent.stream.graph;

import org.skywalking.apm.collector.agent.stream.parser.standardization.SegmentStandardizationWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.global.GlobalTracePersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.instance.InstPerformancePersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.node.NodeComponentAggregationWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.node.NodeComponentPersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.node.NodeComponentRemoteWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.node.NodeMappingAggregationWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.node.NodeMappingPersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.node.NodeMappingRemoteWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.noderef.NodeReferenceAggregationWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.noderef.NodeReferencePersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.noderef.NodeReferenceRemoteWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.segment.SegmentCostPersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.segment.SegmentPersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceEntryAggregationWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceEntryPersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceEntryRemoteWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.serviceref.ServiceReferenceAggregationWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.serviceref.ServiceReferencePersistenceWorker;
import org.skywalking.apm.collector.agent.stream.worker.trace.serviceref.ServiceReferenceRemoteWorker;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.queue.QueueModule;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.remote.RemoteModule;
import org.skywalking.apm.collector.remote.service.RemoteSenderService;
import org.skywalking.apm.collector.storage.table.global.GlobalTrace;
import org.skywalking.apm.collector.storage.table.instance.InstPerformance;
import org.skywalking.apm.collector.storage.table.node.NodeComponent;
import org.skywalking.apm.collector.storage.table.node.NodeMapping;
import org.skywalking.apm.collector.storage.table.noderef.NodeReference;
import org.skywalking.apm.collector.storage.table.segment.Segment;
import org.skywalking.apm.collector.storage.table.segment.SegmentCost;
import org.skywalking.apm.collector.storage.table.service.ServiceEntry;
import org.skywalking.apm.collector.storage.table.serviceref.ServiceReference;
import org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener;
import org.skywalking.apm.network.proto.UpstreamSegment;

/**
* @author peng-yongsheng
*/
public class TraceStreamGraph {

public static final int GLOBAL_TRACE_GRAPH_ID = 300;
public static final int INST_PERFORMANCE_GRAPH_ID = 301;
public static final int NODE_COMPONENT_GRAPH_ID = 302;
public static final int NODE_MAPPING_GRAPH_ID = 303;
public static final int NODE_REFERENCE_GRAPH_ID = 304;
public static final int SERVICE_ENTRY_GRAPH_ID = 305;
public static final int SERVICE_REFERENCE_GRAPH_ID = 306;
public static final int SEGMENT_GRAPH_ID = 307;
public static final int SEGMENT_COST_GRAPH_ID = 308;
public static final int SEGMENT_STANDARDIZATION_GRAPH_ID = 309;

private final ModuleManager moduleManager;
private final WorkerCreateListener workerCreateListener;

public TraceStreamGraph(ModuleManager moduleManager, WorkerCreateListener workerCreateListener) {
this.moduleManager = moduleManager;
this.workerCreateListener = workerCreateListener;
}

@SuppressWarnings("unchecked")
public Graph<UpstreamSegment> createSegmentStandardizationGraph() {
QueueCreatorService<UpstreamSegment> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);

Graph<UpstreamSegment> graph = GraphManager.INSTANCE.createIfAbsent(SEGMENT_STANDARDIZATION_GRAPH_ID, UpstreamSegment.class);
graph.addNode(new SegmentStandardizationWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
return graph;
}

@SuppressWarnings("unchecked")
public Graph<GlobalTrace> createGlobalTraceGraph() {
QueueCreatorService<GlobalTrace> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);

Graph<GlobalTrace> graph = GraphManager.INSTANCE.createIfAbsent(GLOBAL_TRACE_GRAPH_ID, GlobalTrace.class);
graph.addNode(new GlobalTracePersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
return graph;
}

@SuppressWarnings("unchecked")
public Graph<InstPerformance> createInstPerformanceGraph() {
QueueCreatorService<InstPerformance> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);

Graph<InstPerformance> graph = GraphManager.INSTANCE.createIfAbsent(INST_PERFORMANCE_GRAPH_ID, InstPerformance.class);
graph.addNode(new InstPerformancePersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
return graph;
}

@SuppressWarnings("unchecked")
public Graph<NodeComponent> createNodeComponentGraph() {
QueueCreatorService<NodeComponent> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class);

Graph<NodeComponent> graph = GraphManager.INSTANCE.createIfAbsent(NODE_COMPONENT_GRAPH_ID, NodeComponent.class);
graph.addNode(new NodeComponentAggregationWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener))
.addNext(new NodeComponentRemoteWorker.Factory(moduleManager, remoteSenderService, NODE_COMPONENT_GRAPH_ID).create(workerCreateListener))
.addNext(new NodeComponentPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
return graph;
}

@SuppressWarnings("unchecked")
public Graph<NodeMapping> createNodeMappingGraph() {
QueueCreatorService<NodeMapping> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class);

Graph<NodeMapping> graph = GraphManager.INSTANCE.createIfAbsent(NODE_MAPPING_GRAPH_ID, NodeMapping.class);
graph.addNode(new NodeMappingAggregationWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener))
.addNext(new NodeMappingRemoteWorker.Factory(moduleManager, remoteSenderService, NODE_MAPPING_GRAPH_ID).create(workerCreateListener))
.addNext(new NodeMappingPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
return graph;
}

@SuppressWarnings("unchecked")
public Graph<NodeReference> createNodeReferenceGraph() {
QueueCreatorService<NodeReference> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class);

Graph<NodeReference> graph = GraphManager.INSTANCE.createIfAbsent(NODE_REFERENCE_GRAPH_ID, NodeReference.class);
graph.addNode(new NodeReferenceAggregationWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener))
.addNext(new NodeReferenceRemoteWorker.Factory(moduleManager, remoteSenderService, NODE_REFERENCE_GRAPH_ID).create(workerCreateListener))
.addNext(new NodeReferencePersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
return graph;
}

@SuppressWarnings("unchecked")
public Graph<ServiceEntry> createServiceEntryGraph() {
QueueCreatorService<ServiceEntry> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class);

Graph<ServiceEntry> graph = GraphManager.INSTANCE.createIfAbsent(SERVICE_ENTRY_GRAPH_ID, ServiceEntry.class);
graph.addNode(new ServiceEntryAggregationWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener))
.addNext(new ServiceEntryRemoteWorker.Factory(moduleManager, remoteSenderService, SERVICE_ENTRY_GRAPH_ID).create(workerCreateListener))
.addNext(new ServiceEntryPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
return graph;
}

@SuppressWarnings("unchecked")
public Graph<ServiceReference> createServiceReferenceGraph() {
QueueCreatorService<ServiceReference> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);
RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class);

Graph<ServiceReference> graph = GraphManager.INSTANCE.createIfAbsent(SERVICE_REFERENCE_GRAPH_ID, ServiceReference.class);
graph.addNode(new ServiceReferenceAggregationWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener))
.addNext(new ServiceReferenceRemoteWorker.Factory(moduleManager, remoteSenderService, SERVICE_REFERENCE_GRAPH_ID).create(workerCreateListener))
.addNext(new ServiceReferencePersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
return graph;
}

@SuppressWarnings("unchecked")
public Graph<Segment> createSegmentGraph() {
QueueCreatorService<Segment> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);

Graph<Segment> graph = GraphManager.INSTANCE.createIfAbsent(SEGMENT_GRAPH_ID, Segment.class);
graph.addNode(new SegmentPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
return graph;
}

@SuppressWarnings("unchecked")
public Graph<SegmentCost> createSegmentCostGraph() {
QueueCreatorService<SegmentCost> queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class);

Graph<SegmentCost> graph = GraphManager.INSTANCE.createIfAbsent(SEGMENT_COST_GRAPH_ID, SegmentCost.class);
graph.addNode(new SegmentCostPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener));
return graph;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.protobuf.InvalidProtocolBufferException;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph;
import org.skywalking.apm.collector.agent.stream.parser.standardization.ReferenceDecorator;
import org.skywalking.apm.collector.agent.stream.parser.standardization.ReferenceIdExchanger;
import org.skywalking.apm.collector.agent.stream.parser.standardization.SegmentDecorator;
Expand All @@ -34,6 +35,8 @@
import org.skywalking.apm.collector.agent.stream.worker.trace.segment.SegmentCostSpanListener;
import org.skywalking.apm.collector.agent.stream.worker.trace.service.ServiceEntrySpanListener;
import org.skywalking.apm.collector.agent.stream.worker.trace.serviceref.ServiceReferenceSpanListener;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.storage.table.segment.Segment;
import org.skywalking.apm.network.proto.SpanType;
Expand Down Expand Up @@ -150,11 +153,14 @@ private boolean preBuild(List<UniqueId> traceIds, SegmentDecorator segmentDecora
private void buildSegment(String id, byte[] dataBinary) {
Segment segment = new Segment(id);
segment.setDataBinary(dataBinary);
Graph<Segment> graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.SEGMENT_GRAPH_ID, Segment.class);
graph.start(segment);
}

private void writeToBufferFile(String id, UpstreamSegment upstreamSegment) {
logger.debug("send to segment buffer write worker, id: {}", id);
// context.getClusterWorkerContext().lookup(SegmentStandardizationWorker.WorkerRole.INSTANCE).tell(upstreamSegment);
Graph<UpstreamSegment> graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.SEGMENT_STANDARDIZATION_GRAPH_ID, UpstreamSegment.class);
graph.start(upstreamSegment);
}

private void notifyListenerToBuild() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public SegmentStandardizationWorker(ModuleManager moduleManager) {
}

@Override public int id() {
return 0;
return SegmentStandardizationWorker.class.hashCode();
}

@Override protected void onWork(UpstreamSegment upstreamSegment) throws WorkerException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@

import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph;
import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener;
import org.skywalking.apm.collector.agent.stream.parser.GlobalTraceIdsListener;
import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.table.global.GlobalTrace;
Expand Down Expand Up @@ -63,11 +66,13 @@ public void parseFirst(SpanDecorator spanDecorator, int applicationId, int insta
@Override public void build() {
logger.debug("global trace listener build");

Graph<GlobalTrace> graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.GLOBAL_TRACE_GRAPH_ID, GlobalTrace.class);
for (String globalTraceId : globalTraceIds) {
GlobalTrace globalTrace = new GlobalTrace(segmentId + Const.ID_SPLIT + globalTraceId);
globalTrace.setGlobalTraceId(globalTraceId);
globalTrace.setSegmentId(segmentId);
globalTrace.setTimeBucket(timeBucket);
graph.start(globalTrace);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@

package org.skywalking.apm.collector.agent.stream.worker.trace.instance;

import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph;
import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener;
import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener;
import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.table.instance.InstPerformance;
Expand Down Expand Up @@ -60,5 +63,8 @@ public void parseFirst(SpanDecorator spanDecorator, int applicationId, int insta
instPerformance.setCalls(1);
instPerformance.setCostTotal(cost);
instPerformance.setTimeBucket(timeBucket);

Graph<InstPerformance> graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.INST_PERFORMANCE_GRAPH_ID, InstPerformance.class);
graph.start(instPerformance);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@

import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph;
import org.skywalking.apm.collector.agent.stream.parser.EntrySpanListener;
import org.skywalking.apm.collector.agent.stream.parser.ExitSpanListener;
import org.skywalking.apm.collector.agent.stream.parser.FirstSpanListener;
import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.table.node.NodeComponent;
Expand Down Expand Up @@ -89,9 +92,12 @@ public void parseFirst(SpanDecorator spanDecorator, int applicationId, int insta
}

@Override public void build() {
Graph<NodeComponent> graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.NODE_COMPONENT_GRAPH_ID, NodeComponent.class);

nodeComponents.forEach(nodeComponent -> {
nodeComponent.setId(timeBucket + Const.ID_SPLIT + nodeComponent.getId());
nodeComponent.setTimeBucket(timeBucket);
graph.start(nodeComponent);
});
}
}

0 comments on commit b744d40

Please sign in to comment.