Skip to content

Commit

Permalink
Merge branch 'master' into wu-sheng-3.2-readme
Browse files Browse the repository at this point in the history
  • Loading branch information
wu-sheng committed Aug 31, 2017
2 parents 1b8db8c + 61b4a08 commit 0003f7a
Show file tree
Hide file tree
Showing 134 changed files with 2,794 additions and 1,891 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ private void senToInstanceHeartBeatPersistenceWorker(StreamModuleContext context
long heartBeatTime) {
InstanceHeartBeatDataDefine.InstanceHeartBeat heartBeat = new InstanceHeartBeatDataDefine.InstanceHeartBeat();
heartBeat.setId(String.valueOf(applicationInstanceId));
heartBeat.setHeartBeatTime(heartBeatTime);
heartBeat.setHeartBeatTime(TimeBucketUtils.INSTANCE.getSecondTimeBucket(heartBeatTime));
heartBeat.setInstanceId(applicationInstanceId);
try {
logger.debug("send to instance heart beat persistence worker, id: {}", heartBeat.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
import com.google.gson.JsonObject;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.agentregister.instance.InstanceIDService;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.ApplicationInstance;
import org.skywalking.apm.network.proto.ApplicationInstanceHeartbeat;
import org.skywalking.apm.network.proto.ApplicationInstanceMapping;
import org.skywalking.apm.network.proto.ApplicationInstanceRecover;
import org.skywalking.apm.network.proto.Downstream;
Expand All @@ -26,23 +26,19 @@ public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrp

@Override
public void register(ApplicationInstance request, StreamObserver<ApplicationInstanceMapping> responseObserver) {
int instanceId = instanceIDService.getOrCreate(request.getApplicationId(), request.getAgentUUID(), request.getRegisterTime(), buildOsInfo(request.getOsinfo()));
long timeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(request.getRegisterTime());
int instanceId = instanceIDService.getOrCreate(request.getApplicationId(), request.getAgentUUID(), timeBucket, buildOsInfo(request.getOsinfo()));
ApplicationInstanceMapping.Builder builder = ApplicationInstanceMapping.newBuilder();
builder.setApplicationId(request.getApplicationId());
builder.setApplicationInstanceId(instanceId);
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
}

@Override public void heartbeat(ApplicationInstanceHeartbeat request, StreamObserver<Downstream> responseObserver) {
instanceIDService.heartBeat(request.getApplicationInstanceId(), request.getHeartbeatTime());
responseObserver.onNext(Downstream.newBuilder().build());
responseObserver.onCompleted();
}

@Override
public void registerRecover(ApplicationInstanceRecover request, StreamObserver<Downstream> responseObserver) {
instanceIDService.recover(request.getApplicationInstanceId(), request.getApplicationId(), request.getRegisterTime(), buildOsInfo(request.getOsinfo()));
long timeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(request.getRegisterTime());
instanceIDService.recover(request.getApplicationInstanceId(), request.getApplicationId(), timeBucket, buildOsInfo(request.getOsinfo()));
responseObserver.onNext(Downstream.newBuilder().build());
responseObserver.onCompleted();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package org.skywalking.apm.collector.agentregister.instance;

import org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterRemoteWorker;
import org.skywalking.apm.collector.storage.define.register.InstanceDataDefine;
import org.skywalking.apm.collector.agentstream.worker.register.instance.dao.IInstanceDAO;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.storage.define.register.InstanceDataDefine;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
Expand Down Expand Up @@ -36,12 +36,6 @@ public int getOrCreate(int applicationId, String agentUUID, long registerTime, S
return applicationId;
}

public void heartBeat(int instanceId, long heartbeatTime) {
logger.debug("instance heart beat, instance id: {}, heartbeat time: {}", instanceId, heartbeatTime);
IInstanceDAO dao = (IInstanceDAO)DAOContainer.INSTANCE.get(IInstanceDAO.class.getName());
dao.updateHeartbeatTime(instanceId, heartbeatTime);
}

public void recover(int instanceId, int applicationId, long registerTime, String osInfo) {
logger.debug("instance recover, instance id: {}, application id: {}, register time: {}", instanceId, applicationId, registerTime);
IInstanceDAO dao = (IInstanceDAO)DAOContainer.INSTANCE.get(IInstanceDAO.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class TraceSegmentServletHandler extends JettyHandler {

private void read(BufferedReader bufferedReader) throws IOException {
JsonReader reader = new JsonReader(bufferedReader);

reader.beginArray();
while (reader.hasNext()) {
SegmentParse segmentParse = new SegmentParse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,25 @@

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.agentstream.worker.register.servicename.dao.IServiceNameDAO;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.storage.dao.DAOContainer;

/**
* @author pengys5
*/
public class ServiceNameCache {
public class ServiceCache {

private static Cache<String, Integer> CACHE = CacheBuilder.newBuilder().maximumSize(2000).build();
private static Cache<Integer, String> CACHE = CacheBuilder.newBuilder().maximumSize(10000).build();

public static int get(int applicationId, String serviceName) {
public static String getServiceName(int serviceId) {
try {
return CACHE.get(applicationId + Const.ID_SPLIT + serviceName, () -> {
return CACHE.get(serviceId, () -> {
IServiceNameDAO dao = (IServiceNameDAO)DAOContainer.INSTANCE.get(IServiceNameDAO.class.getName());
return dao.getServiceId(applicationId, serviceName);
return dao.getServiceName(serviceId);
});
} catch (Throwable e) {
return 0;
return Const.EMPTY_STRING;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@

import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.storage.define.node.NodeComponentDataDefine;
import org.skywalking.apm.collector.agentstream.worker.segment.EntrySpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.ExitSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.FirstSpanListener;
import org.skywalking.apm.collector.agentstream.worker.segment.LocalSpanListener;
import org.skywalking.apm.collector.agentstream.worker.util.ExchangeMarkUtils;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.define.node.NodeComponentDataDefine;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
Expand All @@ -22,48 +20,59 @@
/**
* @author pengys5
*/
public class NodeComponentSpanListener implements EntrySpanListener, ExitSpanListener, FirstSpanListener, LocalSpanListener {
public class NodeComponentSpanListener implements EntrySpanListener, ExitSpanListener, FirstSpanListener {

private final Logger logger = LoggerFactory.getLogger(NodeComponentSpanListener.class);

private List<String> nodeComponents = new ArrayList<>();
private List<NodeComponentDataDefine.NodeComponent> nodeComponents = new ArrayList<>();
private long timeBucket;

@Override
public void parseExit(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
String componentName = ExchangeMarkUtils.INSTANCE.buildMarkedID(spanObject.getComponentId());
NodeComponentDataDefine.NodeComponent nodeComponent = new NodeComponentDataDefine.NodeComponent();
nodeComponent.setComponentId(spanObject.getComponentId());

String id;
if (spanObject.getComponentId() == 0) {
componentName = spanObject.getComponent();
nodeComponent.setComponentName(spanObject.getComponent());
id = nodeComponent.getComponentName();
} else {
nodeComponent.setComponentName(Const.EMPTY_STRING);
id = String.valueOf(nodeComponent.getComponentId());
}
String peer = ExchangeMarkUtils.INSTANCE.buildMarkedID(spanObject.getPeerId());

nodeComponent.setPeerId(spanObject.getPeerId());
if (spanObject.getPeerId() == 0) {
peer = spanObject.getPeer();
nodeComponent.setPeer(spanObject.getPeer());
id = id + Const.ID_SPLIT + nodeComponent.getPeer();
} else {
nodeComponent.setPeer(Const.EMPTY_STRING);
id = id + Const.ID_SPLIT + nodeComponent.getPeerId();
}

String agg = peer + Const.ID_SPLIT + componentName;
nodeComponents.add(agg);
nodeComponent.setId(id);
nodeComponents.add(nodeComponent);
}

@Override
public void parseEntry(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
buildEntryOrLocal(spanObject, applicationId);
}

@Override
public void parseLocal(SpanObject spanObject, int applicationId, int applicationInstanceId, String segmentId) {
buildEntryOrLocal(spanObject, applicationId);
}

private void buildEntryOrLocal(SpanObject spanObject, int applicationId) {
String componentName = ExchangeMarkUtils.INSTANCE.buildMarkedID(spanObject.getComponentId());
NodeComponentDataDefine.NodeComponent nodeComponent = new NodeComponentDataDefine.NodeComponent();
nodeComponent.setComponentId(spanObject.getComponentId());

String id;
if (spanObject.getComponentId() == 0) {
componentName = spanObject.getComponent();
nodeComponent.setComponentName(spanObject.getComponent());
id = nodeComponent.getComponentName();
} else {
id = String.valueOf(nodeComponent.getComponentId());
nodeComponent.setComponentName(Const.EMPTY_STRING);
}

String peer = ExchangeMarkUtils.INSTANCE.buildMarkedID(applicationId);
String agg = peer + Const.ID_SPLIT + componentName;
nodeComponents.add(agg);
nodeComponent.setPeerId(applicationId);
nodeComponent.setPeer(Const.EMPTY_STRING);
id = id + Const.ID_SPLIT + String.valueOf(applicationId);
nodeComponent.setId(id);

nodeComponents.add(nodeComponent);
}

@Override
Expand All @@ -74,10 +83,8 @@ public void parseFirst(SpanObject spanObject, int applicationId, int application
@Override public void build() {
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);

nodeComponents.forEach(agg -> {
NodeComponentDataDefine.NodeComponent nodeComponent = new NodeComponentDataDefine.NodeComponent();
nodeComponent.setId(timeBucket + Const.ID_SPLIT + agg);
nodeComponent.setAgg(agg);
nodeComponents.forEach(nodeComponent -> {
nodeComponent.setId(timeBucket + Const.ID_SPLIT + nodeComponent.getId());
nodeComponent.setTimeBucket(timeBucket);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.storage.define.node.NodeComponentTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;

/**
* @author pengys5
Expand All @@ -21,7 +21,10 @@ public class NodeComponentEsDAO extends EsDAO implements INodeComponentDAO, IPer
if (getResponse.isExists()) {
Data data = dataDefine.build(id);
Map<String, Object> source = getResponse.getSource();
data.setDataString(1, (String)source.get(NodeComponentTable.COLUMN_AGG));
data.setDataInteger(0, ((Number)source.get(NodeComponentTable.COLUMN_COMPONENT_ID)).intValue());
data.setDataString(1, (String)source.get(NodeComponentTable.COLUMN_COMPONENT_NAME));
data.setDataInteger(1, ((Number)source.get(NodeComponentTable.COLUMN_PEER_ID)).intValue());
data.setDataString(2, (String)source.get(NodeComponentTable.COLUMN_PEER));
data.setDataLong(0, (Long)source.get(NodeComponentTable.COLUMN_TIME_BUCKET));
return data;
} else {
Expand All @@ -31,15 +34,21 @@ public class NodeComponentEsDAO extends EsDAO implements INodeComponentDAO, IPer

@Override public IndexRequestBuilder prepareBatchInsert(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(NodeComponentTable.COLUMN_AGG, data.getDataString(1));
source.put(NodeComponentTable.COLUMN_COMPONENT_ID, data.getDataInteger(0));
source.put(NodeComponentTable.COLUMN_COMPONENT_NAME, data.getDataString(1));
source.put(NodeComponentTable.COLUMN_PEER_ID, data.getDataInteger(1));
source.put(NodeComponentTable.COLUMN_PEER, data.getDataString(2));
source.put(NodeComponentTable.COLUMN_TIME_BUCKET, data.getDataLong(0));

return getClient().prepareIndex(NodeComponentTable.TABLE, data.getDataString(0)).setSource(source);
}

@Override public UpdateRequestBuilder prepareBatchUpdate(Data data) {
Map<String, Object> source = new HashMap<>();
source.put(NodeComponentTable.COLUMN_AGG, data.getDataString(1));
source.put(NodeComponentTable.COLUMN_COMPONENT_ID, data.getDataInteger(0));
source.put(NodeComponentTable.COLUMN_COMPONENT_NAME, data.getDataString(1));
source.put(NodeComponentTable.COLUMN_PEER_ID, data.getDataInteger(1));
source.put(NodeComponentTable.COLUMN_PEER, data.getDataString(2));
source.put(NodeComponentTable.COLUMN_TIME_BUCKET, data.getDataLong(0));

return getClient().prepareUpdate(NodeComponentTable.TABLE, data.getDataString(0)).setDoc(source);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package org.skywalking.apm.collector.agentstream.worker.node.component.define;

import org.skywalking.apm.collector.storage.define.node.NodeComponentTable;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
import org.skywalking.apm.collector.storage.define.node.NodeComponentTable;

/**
* @author pengys5
Expand All @@ -26,7 +26,10 @@ public NodeComponentEsTableDefine() {
}

@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_AGG, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_COMPONENT_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_COMPONENT_NAME, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_PEER_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_PEER, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(NodeComponentTable.COLUMN_TIME_BUCKET, ElasticSearchColumnDefine.Type.Long.name()));
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package org.skywalking.apm.collector.agentstream.worker.node.component.define;

import org.skywalking.apm.collector.storage.define.node.NodeComponentTable;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
import org.skywalking.apm.collector.storage.define.node.NodeComponentTable;

/**
* @author pengys5
Expand All @@ -15,7 +15,10 @@ public NodeComponentH2TableDefine() {

@Override public void initialize() {
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_ID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_AGG, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_COMPONENT_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_COMPONENT_NAME, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_PEER_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_PEER, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(NodeComponentTable.COLUMN_TIME_BUCKET, H2ColumnDefine.Type.Bigint.name()));
}
}

0 comments on commit 0003f7a

Please sign in to comment.