Skip to content

Commit

Permalink
Merge pull request #377 from wu-sheng/feature/373
Browse files Browse the repository at this point in the history
Feature/373 service reference finish
  • Loading branch information
wu-sheng committed Aug 30, 2017
2 parents a04843a + 76a8a12 commit 9b97018
Show file tree
Hide file tree
Showing 101 changed files with 1,904 additions and 1,528 deletions.
Expand Up @@ -59,7 +59,7 @@ private void senToInstanceHeartBeatPersistenceWorker(StreamModuleContext context
long heartBeatTime) {
InstanceHeartBeatDataDefine.InstanceHeartBeat heartBeat = new InstanceHeartBeatDataDefine.InstanceHeartBeat();
heartBeat.setId(String.valueOf(applicationInstanceId));
heartBeat.setHeartBeatTime(heartBeatTime);
heartBeat.setHeartBeatTime(TimeBucketUtils.INSTANCE.getSecondTimeBucket(heartBeatTime));
heartBeat.setInstanceId(applicationInstanceId);
try {
logger.debug("send to instance heart beat persistence worker, id: {}", heartBeat.getId());
Expand Down
Expand Up @@ -4,9 +4,9 @@
import com.google.gson.JsonObject;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.agentregister.instance.InstanceIDService;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.ApplicationInstance;
import org.skywalking.apm.network.proto.ApplicationInstanceHeartbeat;
import org.skywalking.apm.network.proto.ApplicationInstanceMapping;
import org.skywalking.apm.network.proto.ApplicationInstanceRecover;
import org.skywalking.apm.network.proto.Downstream;
Expand All @@ -26,23 +26,19 @@ public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrp

@Override
public void register(ApplicationInstance request, StreamObserver<ApplicationInstanceMapping> responseObserver) {
int instanceId = instanceIDService.getOrCreate(request.getApplicationId(), request.getAgentUUID(), request.getRegisterTime(), buildOsInfo(request.getOsinfo()));
long timeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(request.getRegisterTime());
int instanceId = instanceIDService.getOrCreate(request.getApplicationId(), request.getAgentUUID(), timeBucket, buildOsInfo(request.getOsinfo()));
ApplicationInstanceMapping.Builder builder = ApplicationInstanceMapping.newBuilder();
builder.setApplicationId(request.getApplicationId());
builder.setApplicationInstanceId(instanceId);
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
}

@Override public void heartbeat(ApplicationInstanceHeartbeat request, StreamObserver<Downstream> responseObserver) {
instanceIDService.heartBeat(request.getApplicationInstanceId(), request.getHeartbeatTime());
responseObserver.onNext(Downstream.newBuilder().build());
responseObserver.onCompleted();
}

@Override
public void registerRecover(ApplicationInstanceRecover request, StreamObserver<Downstream> responseObserver) {
instanceIDService.recover(request.getApplicationInstanceId(), request.getApplicationId(), request.getRegisterTime(), buildOsInfo(request.getOsinfo()));
long timeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(request.getRegisterTime());
instanceIDService.recover(request.getApplicationInstanceId(), request.getApplicationId(), timeBucket, buildOsInfo(request.getOsinfo()));
responseObserver.onNext(Downstream.newBuilder().build());
responseObserver.onCompleted();
}
Expand Down
@@ -1,10 +1,10 @@
package org.skywalking.apm.collector.agentregister.instance;

import org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterRemoteWorker;
import org.skywalking.apm.collector.storage.define.register.InstanceDataDefine;
import org.skywalking.apm.collector.agentstream.worker.register.instance.dao.IInstanceDAO;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.storage.define.register.InstanceDataDefine;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
Expand Down Expand Up @@ -36,12 +36,6 @@ public int getOrCreate(int applicationId, String agentUUID, long registerTime, S
return applicationId;
}

public void heartBeat(int instanceId, long heartbeatTime) {
logger.debug("instance heart beat, instance id: {}, heartbeat time: {}", instanceId, heartbeatTime);
IInstanceDAO dao = (IInstanceDAO)DAOContainer.INSTANCE.get(IInstanceDAO.class.getName());
dao.updateHeartbeatTime(instanceId, heartbeatTime);
}

public void recover(int instanceId, int applicationId, long registerTime, String osInfo) {
logger.debug("instance recover, instance id: {}, application id: {}, register time: {}", instanceId, applicationId, registerTime);
IInstanceDAO dao = (IInstanceDAO)DAOContainer.INSTANCE.get(IInstanceDAO.class.getName());
Expand Down
Expand Up @@ -43,6 +43,7 @@ public class TraceSegmentServletHandler extends JettyHandler {

private void read(BufferedReader bufferedReader) throws IOException {
JsonReader reader = new JsonReader(bufferedReader);

reader.beginArray();
while (reader.hasNext()) {
SegmentParse segmentParse = new SegmentParse();
Expand Down
Expand Up @@ -2,25 +2,25 @@

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.agentstream.worker.register.servicename.dao.IServiceNameDAO;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.storage.dao.DAOContainer;

/**
* @author pengys5
*/
public class ServiceNameCache {
public class ServiceCache {

private static Cache<String, Integer> CACHE = CacheBuilder.newBuilder().maximumSize(2000).build();
private static Cache<Integer, String> CACHE = CacheBuilder.newBuilder().maximumSize(10000).build();

public static int get(int applicationId, String serviceName) {
public static String getServiceName(int serviceId) {
try {
return CACHE.get(applicationId + Const.ID_SPLIT + serviceName, () -> {
return CACHE.get(serviceId, () -> {
IServiceNameDAO dao = (IServiceNameDAO)DAOContainer.INSTANCE.get(IServiceNameDAO.class.getName());
return dao.getServiceId(applicationId, serviceName);
return dao.getServiceName(serviceId);
});
} catch (Throwable e) {
return 0;
return Const.EMPTY_STRING;
}
}
}
@@ -1,6 +1,6 @@
package org.skywalking.apm.collector.agentstream.worker.noderef.summary;
package org.skywalking.apm.collector.agentstream.worker.noderef;

import org.skywalking.apm.collector.storage.define.noderef.NodeRefSumDataDefine;
import org.skywalking.apm.collector.storage.define.noderef.NodeReferenceDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
Expand All @@ -15,9 +15,9 @@
/**
* @author pengys5
*/
public class NodeRefSumAggregationWorker extends AggregationWorker {
public class NodeReferenceAggregationWorker extends AggregationWorker {

public NodeRefSumAggregationWorker(Role role, ClusterWorkerContext clusterContext) {
public NodeReferenceAggregationWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}

Expand All @@ -26,18 +26,18 @@ public NodeRefSumAggregationWorker(Role role, ClusterWorkerContext clusterContex
}

@Override protected WorkerRefs nextWorkRef(String id) throws WorkerNotFoundException {
return getClusterContext().lookup(NodeRefSumRemoteWorker.WorkerRole.INSTANCE);
return getClusterContext().lookup(NodeReferenceRemoteWorker.WorkerRole.INSTANCE);
}

public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeRefSumAggregationWorker> {
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeReferenceAggregationWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}

@Override
public NodeRefSumAggregationWorker workerInstance(ClusterWorkerContext clusterContext) {
return new NodeRefSumAggregationWorker(role(), clusterContext);
public NodeReferenceAggregationWorker workerInstance(ClusterWorkerContext clusterContext) {
return new NodeReferenceAggregationWorker(role(), clusterContext);
}

@Override
Expand All @@ -51,7 +51,7 @@ public enum WorkerRole implements Role {

@Override
public String roleName() {
return NodeRefSumAggregationWorker.class.getSimpleName();
return NodeReferenceAggregationWorker.class.getSimpleName();
}

@Override
Expand All @@ -60,7 +60,7 @@ public WorkerSelector workerSelector() {
}

@Override public DataDefine dataDefine() {
return new NodeRefSumDataDefine();
return new NodeReferenceDataDefine();
}
}
}
@@ -1,24 +1,24 @@
package org.skywalking.apm.collector.agentstream.worker.noderef.reference;
package org.skywalking.apm.collector.agentstream.worker.noderef;

import org.skywalking.apm.collector.agentstream.worker.noderef.reference.dao.INodeReferenceDAO;
import org.skywalking.apm.collector.storage.define.noderef.NodeRefDataDefine;
import org.skywalking.apm.collector.agentstream.worker.noderef.dao.INodeReferenceDAO;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.storage.define.noderef.NodeReferenceDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;

/**
* @author pengys5
*/
public class NodeRefPersistenceWorker extends PersistenceWorker {
public class NodeReferencePersistenceWorker extends PersistenceWorker {

public NodeRefPersistenceWorker(Role role, ClusterWorkerContext clusterContext) {
public NodeReferencePersistenceWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}

Expand All @@ -34,15 +34,15 @@ public NodeRefPersistenceWorker(Role role, ClusterWorkerContext clusterContext)
return (IPersistenceDAO)DAOContainer.INSTANCE.get(INodeReferenceDAO.class.getName());
}

public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeRefPersistenceWorker> {
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeReferencePersistenceWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}

@Override
public NodeRefPersistenceWorker workerInstance(ClusterWorkerContext clusterContext) {
return new NodeRefPersistenceWorker(role(), clusterContext);
public NodeReferencePersistenceWorker workerInstance(ClusterWorkerContext clusterContext) {
return new NodeReferencePersistenceWorker(role(), clusterContext);
}

@Override
Expand All @@ -56,7 +56,7 @@ public enum WorkerRole implements Role {

@Override
public String roleName() {
return NodeRefPersistenceWorker.class.getSimpleName();
return NodeReferencePersistenceWorker.class.getSimpleName();
}

@Override
Expand All @@ -65,7 +65,7 @@ public WorkerSelector workerSelector() {
}

@Override public DataDefine dataDefine() {
return new NodeRefDataDefine();
return new NodeReferenceDataDefine();
}
}
}
@@ -1,6 +1,6 @@
package org.skywalking.apm.collector.agentstream.worker.noderef.summary;
package org.skywalking.apm.collector.agentstream.worker.noderef;

import org.skywalking.apm.collector.storage.define.noderef.NodeRefSumDataDefine;
import org.skywalking.apm.collector.storage.define.noderef.NodeReferenceDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorker;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
Expand All @@ -14,9 +14,9 @@
/**
* @author pengys5
*/
public class NodeRefSumRemoteWorker extends AbstractRemoteWorker {
public class NodeReferenceRemoteWorker extends AbstractRemoteWorker {

protected NodeRefSumRemoteWorker(Role role, ClusterWorkerContext clusterContext) {
protected NodeReferenceRemoteWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}

Expand All @@ -25,18 +25,18 @@ protected NodeRefSumRemoteWorker(Role role, ClusterWorkerContext clusterContext)
}

@Override protected void onWork(Object message) throws WorkerException {
getClusterContext().lookup(NodeRefSumPersistenceWorker.WorkerRole.INSTANCE).tell(message);
getClusterContext().lookup(NodeReferencePersistenceWorker.WorkerRole.INSTANCE).tell(message);
}

public static class Factory extends AbstractRemoteWorkerProvider<NodeRefSumRemoteWorker> {
public static class Factory extends AbstractRemoteWorkerProvider<NodeReferenceRemoteWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}

@Override
public NodeRefSumRemoteWorker workerInstance(ClusterWorkerContext clusterContext) {
return new NodeRefSumRemoteWorker(role(), clusterContext);
public NodeReferenceRemoteWorker workerInstance(ClusterWorkerContext clusterContext) {
return new NodeReferenceRemoteWorker(role(), clusterContext);
}
}

Expand All @@ -45,7 +45,7 @@ public enum WorkerRole implements Role {

@Override
public String roleName() {
return NodeRefSumRemoteWorker.class.getSimpleName();
return NodeReferenceRemoteWorker.class.getSimpleName();
}

@Override
Expand All @@ -54,7 +54,7 @@ public WorkerSelector workerSelector() {
}

@Override public DataDefine dataDefine() {
return new NodeRefSumDataDefine();
return new NodeReferenceDataDefine();
}
}
}
@@ -1,4 +1,4 @@
package org.skywalking.apm.collector.agentstream.worker.noderef.summary;
package org.skywalking.apm.collector.agentstream.worker.noderef;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -10,7 +10,7 @@
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.define.noderef.NodeRefSumDataDefine;
import org.skywalking.apm.collector.storage.define.noderef.NodeReferenceDataDefine;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
Expand All @@ -23,13 +23,13 @@
/**
* @author pengys5
*/
public class NodeRefSumSpanListener implements EntrySpanListener, ExitSpanListener, FirstSpanListener, RefsListener {
public class NodeReferenceSpanListener implements EntrySpanListener, ExitSpanListener, FirstSpanListener, RefsListener {

private final Logger logger = LoggerFactory.getLogger(NodeRefSumSpanListener.class);
private final Logger logger = LoggerFactory.getLogger(NodeReferenceSpanListener.class);

private List<NodeRefSumDataDefine.NodeReferenceSum> nodeExitReferences = new ArrayList<>();
private List<NodeRefSumDataDefine.NodeReferenceSum> nodeEntryReferences = new ArrayList<>();
private List<NodeRefSumDataDefine.NodeReferenceSum> nodeReferences = new ArrayList<>();
private List<NodeReferenceDataDefine.NodeReferenceSum> nodeExitReferences = new ArrayList<>();
private List<NodeReferenceDataDefine.NodeReferenceSum> nodeEntryReferences = new ArrayList<>();
private List<NodeReferenceDataDefine.NodeReferenceSum> nodeReferences = new ArrayList<>();
private long timeBucket;
private boolean hasReference = false;
private long startTime;
Expand All @@ -38,7 +38,7 @@ public class NodeRefSumSpanListener implements EntrySpanListener, ExitSpanListen

@Override
public void parseExit(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
NodeRefSumDataDefine.NodeReferenceSum referenceSum = new NodeRefSumDataDefine.NodeReferenceSum();
NodeReferenceDataDefine.NodeReferenceSum referenceSum = new NodeReferenceDataDefine.NodeReferenceSum();
referenceSum.setApplicationId(applicationId);
referenceSum.setBehindApplicationId(spanObject.getPeerId());

Expand All @@ -56,7 +56,7 @@ public void parseExit(SpanObject spanObject, int applicationId, int applicationI

@Override
public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
NodeRefSumDataDefine.NodeReferenceSum referenceSum = new NodeRefSumDataDefine.NodeReferenceSum();
NodeReferenceDataDefine.NodeReferenceSum referenceSum = new NodeReferenceDataDefine.NodeReferenceSum();
referenceSum.setApplicationId(Const.USER_ID);
referenceSum.setBehindApplicationId(applicationId);
referenceSum.setBehindPeer(Const.EMPTY_STRING);
Expand All @@ -66,7 +66,7 @@ public void parseEntry(SpanObject spanObject, int applicationId, int application
nodeEntryReferences.add(buildNodeRefSum(referenceSum, spanObject.getStartTime(), spanObject.getEndTime(), spanObject.getIsError()));
}

private NodeRefSumDataDefine.NodeReferenceSum buildNodeRefSum(NodeRefSumDataDefine.NodeReferenceSum referenceSum,
private NodeReferenceDataDefine.NodeReferenceSum buildNodeRefSum(NodeReferenceDataDefine.NodeReferenceSum referenceSum,
long startTime, long endTime, boolean isError) {
long cost = endTime - startTime;
if (cost <= 1000 && !isError) {
Expand Down Expand Up @@ -96,7 +96,7 @@ public void parseFirst(SpanObject spanObject, int applicationId, int application
String segmentId) {
int parentApplicationId = InstanceCache.get(reference.getParentApplicationInstanceId());

NodeRefSumDataDefine.NodeReferenceSum referenceSum = new NodeRefSumDataDefine.NodeReferenceSum();
NodeReferenceDataDefine.NodeReferenceSum referenceSum = new NodeReferenceDataDefine.NodeReferenceSum();
referenceSum.setApplicationId(parentApplicationId);
referenceSum.setBehindApplicationId(applicationId);
referenceSum.setBehindPeer(Const.EMPTY_STRING);
Expand All @@ -119,13 +119,13 @@ public void parseFirst(SpanObject spanObject, int applicationId, int application
});
}

for (NodeRefSumDataDefine.NodeReferenceSum referenceSum : nodeExitReferences) {
for (NodeReferenceDataDefine.NodeReferenceSum referenceSum : nodeExitReferences) {
referenceSum.setId(timeBucket + Const.ID_SPLIT + referenceSum.getId());
referenceSum.setTimeBucket(timeBucket);

try {
logger.debug("send to node reference summary aggregation worker, id: {}", referenceSum.getId());
context.getClusterWorkerContext().lookup(NodeRefSumAggregationWorker.WorkerRole.INSTANCE).tell(referenceSum.toData());
context.getClusterWorkerContext().lookup(NodeReferenceAggregationWorker.WorkerRole.INSTANCE).tell(referenceSum.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
Expand Down

0 comments on commit 9b97018

Please sign in to comment.