Skip to content

Commit

Permalink
pinpoint-apm#1708 Enable handling of message queue service types
Browse files Browse the repository at this point in the history
  • Loading branch information
Xylus committed May 3, 2016
1 parent 1518c5a commit 9c73e86
Show file tree
Hide file tree
Showing 13 changed files with 185 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,26 +86,50 @@ public void handleSimple(TBase<?, ?> tbase) {

private void insertSpanStat(TSpan span) {
final ServiceType applicationServiceType = getApplicationServiceType(span);
final ServiceType spanServiceType = registry.findServiceType(span.getServiceType());
// TODO consider to change span.isSetErr();
final boolean isError = span.getErr() != 0;
int bugCheck = 0;
if (span.getParentSpanId() == -1) {
if (spanServiceType.isQueue()) {
// create virtual queue node
statisticsHandler.updateCaller(span.getAcceptorHost(), spanServiceType, span.getRemoteAddr(), span.getApplicationName(), applicationServiceType, span.getEndPoint(), span.getElapsed(), isError);

// create virtual user
statisticsHandler.updateCaller(span.getApplicationName(), ServiceType.USER, span.getAgentId(), span.getApplicationName(), applicationServiceType, span.getAgentId(), span.getElapsed(), isError);
statisticsHandler.updateCallee(span.getApplicationName(), applicationServiceType, span.getAcceptorHost(), spanServiceType, span.getAgentId(), span.getElapsed(), isError);
} else {
// create virtual user
statisticsHandler.updateCaller(span.getApplicationName(), ServiceType.USER, span.getAgentId(), span.getApplicationName(), applicationServiceType, span.getAgentId(), span.getElapsed(), isError);

// update the span information of the current node (self)
statisticsHandler.updateCallee(span.getApplicationName(), applicationServiceType, span.getApplicationName(), ServiceType.USER, span.getAgentId(), span.getElapsed(), isError);
// update the span information of the current node (self)
statisticsHandler.updateCallee(span.getApplicationName(), applicationServiceType, span.getApplicationName(), ServiceType.USER, span.getAgentId(), span.getElapsed(), isError);
}
bugCheck++;
}

// save statistics info only when parentApplicationContext exists
// when drawing server map based on statistics info, you must know the application name of the previous node.
if (span.getParentApplicationName() != null) {
logger.debug("Received parent application name. {}", span.getParentApplicationName());
String parentApplicationName = span.getParentApplicationName();
logger.debug("Received parent application name. {}", parentApplicationName);

ServiceType parentApplicationType = registry.findServiceType(span.getParentApplicationType());

// create virtual queue node if current' span's service type is a queue AND :
// 1. parent node's application service type is not a queue (it may have come from a queue that is traced)
// 2. current node's application service type is not a queue (current node may be a queue that is traced)
if (spanServiceType.isQueue()) {
if (!applicationServiceType.isQueue() && !parentApplicationType.isQueue()) {
// emulate virtual queue node's accept Span and record it's acceptor host
hostApplicationMapDao.insert(span.getRemoteAddr(), span.getAcceptorHost(), spanServiceType.getCode(), parentApplicationName, parentApplicationType.getCode());
// emulate virtual queue node's send SpanEvent
statisticsHandler.updateCaller(span.getAcceptorHost(), spanServiceType, span.getRemoteAddr(), span.getApplicationName(), applicationServiceType, span.getEndPoint(), span.getElapsed(), isError);

parentApplicationName = span.getAcceptorHost();
parentApplicationType = spanServiceType;
}
}

final ServiceType parentApplicationType = registry.findServiceType(span.getParentApplicationType());
statisticsHandler.updateCallee(span.getApplicationName(), applicationServiceType, span.getParentApplicationName(), parentApplicationType, span.getAgentId(), span.getElapsed(), isError);
statisticsHandler.updateCallee(span.getApplicationName(), applicationServiceType, parentApplicationName, parentApplicationType, span.getAgentId(), span.getElapsed(), isError);
bugCheck++;
}

Expand Down Expand Up @@ -145,10 +169,10 @@ private void insertSpanEventStat(TSpan span) {
/**
* save information to draw a server map based on statistics
*/
// save the information of caller (the spanevent that span called )
// save the information of caller (the spanevent that called span)
statisticsHandler.updateCaller(span.getApplicationName(), applicationServiceType, span.getAgentId(), spanEvent.getDestinationId(), spanEventType, spanEvent.getEndPoint(), elapsed, hasException);

// save the information of callee (the span that called spanevent)
// save the information of callee (the span that spanevent called)
statisticsHandler.updateCallee(spanEvent.getDestinationId(), spanEventType, span.getApplicationName(), applicationServiceType, span.getEndPoint(), elapsed, hasException);
}
}
Expand All @@ -165,7 +189,13 @@ private void insertAcceptorHost(TSpan span) {

final String parentApplicationName = span.getParentApplicationName();
final short parentServiceType = span.getParentApplicationType();
hostApplicationMapDao.insert(acceptorHost, spanApplicationName, applicationServiceTypeCode, parentApplicationName, parentServiceType);

final ServiceType spanServiceType = registry.findServiceType(span.getServiceType());
if (spanServiceType.isQueue()) {
hostApplicationMapDao.insert(span.getEndPoint(), spanApplicationName, applicationServiceTypeCode, parentApplicationName, parentServiceType);
} else {
hostApplicationMapDao.insert(acceptorHost, spanApplicationName, applicationServiceTypeCode, parentApplicationName, parentServiceType);
}
}

private ServiceType getApplicationServiceType(TSpan span) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class SpanBo implements Span {

private Short applicationServiceType;


private String acceptorHost;
private String remoteAddr; // optional

private byte loggingTransactionInfo; //optional
Expand Down Expand Up @@ -107,7 +107,8 @@ public SpanBo(TSpan span) {
this.apiId = span.getApiId();

this.errCode = span.getErr();


this.acceptorHost = span.getAcceptorHost();
this.remoteAddr = span.getRemoteAddr();

this.loggingTransactionInfo = span.getLoggingTransactionInfo();
Expand Down Expand Up @@ -334,6 +335,14 @@ public void setErrCode(int errCode) {
this.errCode = errCode;
}

public String getAcceptorHost() {
return acceptorHost;
}

public void setAcceptorHost(String acceptorHost) {
this.acceptorHost = acceptorHost;
}

public String getRemoteAddr() {
return remoteAddr;
}
Expand Down Expand Up @@ -456,6 +465,8 @@ public byte[] writeValue() {

buffer.put(loggingTransactionInfo);

buffer.putPrefixedString(acceptorHost);

return buffer.getBuffer();
}

Expand Down Expand Up @@ -511,6 +522,10 @@ public int readValue(byte[] bytes, int offset) {
this.loggingTransactionInfo = buffer.readByte();
}

if (buffer.limit() > 0) {
this.acceptorHost = buffer.readPrefixedString();
}

return buffer.getOffset();
}

Expand All @@ -532,6 +547,7 @@ public String toString() {
sb.append(", elapsed=").append(elapsed);
sb.append(", rpc='").append(rpc).append('\'');
sb.append(", serviceType=").append(serviceType);
sb.append(", acceptorHost=").append(acceptorHost);
sb.append(", endPoint='").append(endPoint).append('\'');
sb.append(", apiId=").append(apiId);
sb.append(", annotationBoList=").append(annotationBoList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ public interface AnnotationKey {
AnnotationKey HTTP_IO = AnnotationKeyFactory.of(49, "http.io", VIEW_IN_RECORD_SET);
// post method parameter of httpclient

AnnotationKey MESSAGE_QUEUE_URI = AnnotationKeyFactory.of(100, "message.queue.url");

AnnotationKey ARGS0 = AnnotationKeyFactory.of(-1, "args[0]");
AnnotationKey ARGS1 = AnnotationKeyFactory.of(-2, "args[1]");
AnnotationKey ARGS2 = AnnotationKeyFactory.of(-3, "args[2]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class DefaultServiceType implements ServiceType {
private final String name;
private final String desc;
private final boolean terminal;
private final boolean queue;

// FIXME record statistics of only rpc call currently. so is it all right to chane into isRecordRpc()
private final boolean recordStatistics;
Expand All @@ -49,6 +50,7 @@ class DefaultServiceType implements ServiceType {
this.category = ServiceTypeCategory.findCategory((short)code);

boolean terminal = false;
boolean queue = false;
boolean recordStatistics = false;
boolean includeDestinationId = false;

Expand All @@ -57,6 +59,10 @@ class DefaultServiceType implements ServiceType {
case TERMINAL:
terminal = true;
break;

case QUEUE:
queue = true;
break;

case RECORD_STATISTICS:
recordStatistics = true;
Expand All @@ -71,6 +77,7 @@ class DefaultServiceType implements ServiceType {
}

this.terminal = terminal;
this.queue = queue;
this.recordStatistics = recordStatistics;
this.includeDestinationId = includeDestinationId;
}
Expand Down Expand Up @@ -123,6 +130,11 @@ public boolean isTerminal() {
return terminal;
}

@Override
public boolean isQueue() {
return queue;
}

@Override
public boolean isIncludeDestinationId() {
return includeDestinationId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ public interface ServiceType {

boolean isTerminal();

boolean isQueue();

boolean isIncludeDestinationId();

ServiceTypeCategory getCategory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
*/
public enum ServiceTypeProperty {
TERMINAL,
QUEUE,
RECORD_STATISTICS,
INCLUDE_DESTINATION_ID
}
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,10 @@ public void appendNodeResponseTime(NodeList nodeList, LinkList linkList,
} else if (nodeType.isTerminal() || nodeType.isUnknown()) {
final NodeHistogram nodeHistogram = createTerminalNodeHistogram(node, linkList);
node.setNodeHistogram(nodeHistogram);
} else if (nodeType.isQueue()) {
// Virtual queue node - queues with agent installed will be handled above as a WAS node
final NodeHistogram nodeHistogram = createTerminalNodeHistogram(node, linkList);
node.setNodeHistogram(nodeHistogram);
} else if (nodeType.isUser()) {
// for User nodes, find its source link and create the histogram
Application userNode = node.getApplication();
Expand Down Expand Up @@ -409,7 +413,7 @@ private NodeHistogram createTerminalNodeHistogram(Node node, LinkList linkList)
nodeHistogram.setApplicationTimeHistogram(applicationTimeHistogram);

// for Terminal nodes, create AgentLevel histogram
if (nodeApplication.getServiceType().isTerminal()) {
if (nodeApplication.getServiceType().isTerminal() || nodeApplication.getServiceType().isQueue()) {
final Map<String, Histogram> agentHistogramMap = new HashMap<>();

for (Link link : toLinkList) {
Expand Down Expand Up @@ -454,7 +458,9 @@ private void appendServerInfo(Node node, LinkDataDuplexMap linkDataDuplexMap, Ag
return;
}

if (nodeServiceType.isTerminal()) {
if (nodeServiceType.isWas()) {
agentInfoPopulator.addAgentInfos(node);
} else if (nodeServiceType.isTerminal() || nodeServiceType.isQueue()) {
// extract information about the terminal node
ServerBuilder builder = new ServerBuilder();
for (LinkData linkData : linkDataDuplexMap.getSourceLinkDataList()) {
Expand All @@ -465,8 +471,6 @@ private void appendServerInfo(Node node, LinkDataDuplexMap linkDataDuplexMap, Ag
}
ServerInstanceList serverInstanceList = builder.build();
node.setServerInstanceList(serverInstanceList);
} else if (nodeServiceType.isWas()) {
agentInfoPopulator.addAgentInfos(node);
} else {
// add empty information
node.setServerInstanceList(new ServerInstanceList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ public Application getFilterApplication() {
if (fromNode.getServiceType() == ServiceType.USER) {
return toNode.getApplication();
}
// same goes for virtual queue nodes
if (!fromNode.getServiceType().isWas() && fromNode.getServiceType().isQueue()) {
return toNode.getApplication();
}
return fromNode.getApplication();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ enum FilterType {
USER_TO_WAS,
WAS_TO_UNKNOWN,
WAS_TO_BACKEND,
WAS_TO_QUEUE,
QUEUE_TO_WAS,
UNSUPPORTED
}

Expand All @@ -165,6 +167,12 @@ private FilterType getFilterType() {
if (includeWas(fromServiceDescList) && includeUnknown(toServiceDescList)) {
return FilterType.WAS_TO_UNKNOWN;
}
if (includeWas(fromServiceDescList) && includeQueue(toServiceDescList)) {
return FilterType.WAS_TO_QUEUE;
}
if (includeQueue(fromServiceDescList) && includeWas(toServiceDescList)) {
return FilterType.QUEUE_TO_WAS;
}
// TODO toServiceDescList check logic not exist.
// if (includeWas(fromServiceDescList) && isBackEnd????()) {
if (includeWas(fromServiceDescList)) {
Expand Down Expand Up @@ -214,6 +222,12 @@ public boolean include(List<SpanBo> transaction) {
case WAS_TO_WAS: {
return wasToWasFilter(transaction);
}
case WAS_TO_QUEUE: {
return wasToQueueFilter(transaction);
}
case QUEUE_TO_WAS: {
return queueToWasFilter(transaction);
}
case WAS_TO_BACKEND: {
return wasToBackendFilter(transaction);
}
Expand Down Expand Up @@ -329,6 +343,30 @@ private boolean wasToWasFilter(List<SpanBo> transaction) {
return fromBaseFilter(fromSpanList);
}

/**
* WAS -> Queue (virtual)
* Should be the same as {@link #wasToBackendFilter}
*/
private boolean wasToQueueFilter(List<SpanBo> transaction) {
return wasToBackendFilter(transaction);
}

/**
* Queue (virtual) -> WAS
*/
private boolean queueToWasFilter(List<SpanBo> transaction) {
final List<SpanBo> toNode = findToNode(transaction);
logger.debug("matching toNode spans: {}", toNode);
for (SpanBo span : toNode) {
if (fromApplicationName.equals(span.getAcceptorHost())) {
if (checkResponseCondition(span.getElapsed(), isError(span))) {
return true;
}
}
}
return false;
}

private boolean fromBaseFilter(List<SpanBo> fromSpanList) {
// from base filter. hint base filter
// exceptional case
Expand All @@ -341,7 +379,7 @@ private boolean fromBaseFilter(List<SpanBo> fromSpanList) {
}
for (SpanEventBo event : eventBoList) {
final ServiceType eventServiceType = serviceTypeRegistryService.findServiceType(event.getServiceType());
if (!eventServiceType.isRpcClient()) {
if (!eventServiceType.isRpcClient() || !eventServiceType.isQueue()) {
continue;
}
if (!eventServiceType.isRecordStatistics()) {
Expand Down Expand Up @@ -407,16 +445,16 @@ private List<SpanBo> findToNode(List<SpanBo> transaction) {
private List<SpanBo> findNode(List<SpanBo> nodeList, String findApplicationName, List<ServiceType> findServiceCode, AgentFilter agentFilter) {
List<SpanBo> findList = null;
for (SpanBo span : nodeList) {
final ServiceType spanServiceType = serviceTypeRegistryService.findServiceType(span.getServiceType());
if (findApplicationName.equals(span.getApplicationId()) && includeServiceType(findServiceCode, spanServiceType)) {
// apply preAgentFilter
if (agentFilter.accept(span.getAgentId())) {
if (findList == null) {
findList = new ArrayList<>();
final ServiceType applicationServiceType = serviceTypeRegistryService.findServiceType(span.getApplicationServiceType());
if (findApplicationName.equals(span.getApplicationId()) && includeServiceType(findServiceCode, applicationServiceType)) {
// apply preAgentFilter
if (agentFilter.accept(span.getAgentId())) {
if (findList == null) {
findList = new ArrayList<>();
}
findList.add(span);
}
findList.add(span);
}
}
}
if (findList == null) {
return Collections.emptyList();
Expand Down Expand Up @@ -448,6 +486,15 @@ private boolean includeWas(List<ServiceType> serviceTypeList) {
return false;
}

private boolean includeQueue(List<ServiceType> serviceTypeList) {
for (ServiceType serviceType : serviceTypeList) {
if (serviceType.isQueue()) {
return true;
}
}
return false;
}

private boolean includeServiceType(List<ServiceType> serviceTypeList, ServiceType targetServiceType) {
for (ServiceType serviceType : serviceTypeList) {
if (serviceType == targetServiceType) {
Expand Down

0 comments on commit 9c73e86

Please sign in to comment.