Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…into jedis

y
  • Loading branch information
lytscu committed Feb 28, 2018
2 parents 1596b61 + 8f61418 commit 350ddc1
Show file tree
Hide file tree
Showing 28 changed files with 1,269 additions and 24 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Apache SkyWalking | [中文](README_ZH.md)
<img src="https://skywalkingtest.github.io/page-resources/3.0/skywalking.png" alt="Sky Walking logo" height="90px" align="right" />

**SkyWalking**: APM (application performance monitor) tool for distributed systems, especially designed for
microservices, cloud native and container-based (Docker, K8s, Mesos) architectures.
microservices, cloud native and container-based (Docker, Kubernetes, Mesos) architectures.
Underlying technology is a distributed tracing system.

[![GitHub stars](https://img.shields.io/github/stars/apache/incubator-skywalking.svg?style=for-the-badge&label=Stars&logo=github)](https://github.com/apache/incubator-skywalking)
Expand Down
2 changes: 1 addition & 1 deletion README_ZH.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ Apache SkyWalking | [English](README.md)

<img src="https://skywalkingtest.github.io/page-resources/3.0/skywalking.png" alt="Sky Walking logo" height="90px" align="right" />

**SkyWalking**: 针对分布式系统的APM(应用性能监控)系统,特别针对微服务、cloud native和容器化(Docker, K8s, Mesos)架构,
**SkyWalking**: 针对分布式系统的APM(应用性能监控)系统,特别针对微服务、cloud native和容器化(Docker, Kubernetes, Mesos)架构,
其核心是个分布式追踪系统。

[![GitHub stars](https://img.shields.io/github/stars/apache/incubator-skywalking.svg?style=for-the-badge&label=Stars&logo=github)](https://github.com/apache/incubator-skywalking)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,6 @@ public interface IServiceMetricUIDAO extends DAO {
List<Node> getServicesMetric(Step step, long startTime, long endTime,
MetricSource metricSource, Collection<Integer> serviceIds);

List<ServiceMetric> getSlowService(int applicationId, Step step, long start, long end,
Integer top, MetricSource metricSource);
List<ServiceMetric> getSlowService(int applicationId, Step step, long startTimeBucket, long endTimeBucket,
Integer topN, MetricSource metricSource);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@
public class ServiceMetric {
private int id;
private String name;
private long calls;
private int avgResponseTime;
private int tps;
private int callsPerSec;

public int getId() {
return id;
Expand All @@ -51,11 +52,19 @@ public void setAvgResponseTime(int avgResponseTime) {
this.avgResponseTime = avgResponseTime;
}

public int getTps() {
return tps;
public int getCallsPerSec() {
return callsPerSec;
}

public void setTps(int tps) {
this.tps = tps;
public void setCallsPerSec(int callsPerSec) {
this.callsPerSec = callsPerSec;
}

public long getCalls() {
return calls;
}

public void setCalls(long calls) {
this.calls = calls;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,9 @@ public List<Node> getServicesMetric(Step step, long startTime, long endTime, Met
return nodes;
}

@Override public List<ServiceMetric> getSlowService(int applicationId, Step step, long start, long end, Integer top,
@Override
public List<ServiceMetric> getSlowService(int applicationId, Step step, long startTimeBucket, long endTimeBucket,
Integer topN,
MetricSource metricSource) {
String tableName = TimePyramidTableNameBuilder.build(step, ServiceMetricTable.TABLE);

Expand All @@ -184,7 +186,7 @@ public List<Node> getServicesMetric(Step step, long startTime, long endTime, Met
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);

BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.must().add(QueryBuilders.rangeQuery(ServiceMetricTable.COLUMN_TIME_BUCKET).gte(start).lte(end));
boolQuery.must().add(QueryBuilders.rangeQuery(ServiceMetricTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket));
if (applicationId != 0) {
boolQuery.must().add(QueryBuilders.termQuery(ServiceMetricTable.COLUMN_APPLICATION_ID, applicationId));
}
Expand All @@ -193,7 +195,7 @@ public List<Node> getServicesMetric(Step step, long startTime, long endTime, Met
searchRequestBuilder.setQuery(boolQuery);
searchRequestBuilder.setSize(0);

TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms(ServiceMetricTable.COLUMN_SERVICE_ID).field(ServiceMetricTable.COLUMN_SERVICE_ID).size(top);
TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms(ServiceMetricTable.COLUMN_SERVICE_ID).field(ServiceMetricTable.COLUMN_SERVICE_ID).size(topN);
aggregationBuilder.subAggregation(AggregationBuilders.sum(ServiceMetricTable.COLUMN_TRANSACTION_CALLS).field(ServiceMetricTable.COLUMN_TRANSACTION_CALLS));
aggregationBuilder.subAggregation(AggregationBuilders.sum(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS).field(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS));
aggregationBuilder.subAggregation(AggregationBuilders.sum(ServiceMetricTable.COLUMN_TRANSACTION_DURATION_SUM).field(ServiceMetricTable.COLUMN_TRANSACTION_DURATION_SUM));
Expand Down Expand Up @@ -221,7 +223,9 @@ public List<Node> getServicesMetric(Step step, long startTime, long endTime, Met

ServiceMetric serviceMetric = new ServiceMetric();
InternalSimpleValue simpleValue = serviceIdTerm.getAggregations().get(AVG_DURATION);
Sum calls = serviceIdTerm.getAggregations().get(ServiceMetricTable.COLUMN_TRANSACTION_CALLS);

serviceMetric.setCalls((long)calls.getValue());
serviceMetric.setId(serviceId);
serviceMetric.setAvgResponseTime((int)simpleValue.getValue());
serviceMetrics.add(serviceMetric);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public List<Integer> getServiceResponseTimeTrend(int serviceId, Step step, List<
return null;
}

@Override public List<ServiceMetric> getSlowService(int applicationId, Step step, long start, long end, Integer top,
@Override public List<ServiceMetric> getSlowService(int applicationId, Step step, long startTimeBucket, long endTimeBucket, Integer topN,
MetricSource metricSource) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,13 @@ public ConjecturalAppBrief getConjecturalApps(Duration duration) throws ParseExc
}

public List<ServiceMetric> getTopNSlowService(Duration duration, int topN) throws ParseException {
long start = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart());
long end = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd());
long startTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart());
long endTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd());

long startSecondTimeBucket = DurationUtils.INSTANCE.durationToSecondTimeBucket(duration.getStep(), duration.getStart());
long endSecondTimeBucket = DurationUtils.INSTANCE.durationToSecondTimeBucket(duration.getStep(), duration.getEnd());

return getServiceNameService().getSlowService(duration.getStep(), start, end, topN);
return getServiceNameService().getSlowService(duration.getStep(), startTimeBucket, endTimeBucket, startSecondTimeBucket, endSecondTimeBucket, topN);
}

public List<ApplicationTPS> getTopNApplicationThroughput(Duration duration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public List<ServiceMetric> getSlowService(int applicationId, Step step, long sta
slowServices.forEach(slowService -> {
slowService.setName(serviceNameCacheService.get(slowService.getId()).getServiceName());
//TODO
slowService.setTps(1);
slowService.setCallsPerSec(1);
});
return slowServices;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.skywalking.apm.collector.storage.dao.ui.IServiceMetricUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.ui.IServiceNameServiceUIDAO;
import org.apache.skywalking.apm.collector.storage.table.MetricSource;
import org.apache.skywalking.apm.collector.storage.table.register.ServiceName;
import org.apache.skywalking.apm.collector.storage.ui.common.ResponseTimeTrend;
import org.apache.skywalking.apm.collector.storage.ui.common.SLATrend;
import org.apache.skywalking.apm.collector.storage.ui.common.Step;
Expand All @@ -35,20 +36,26 @@
import org.apache.skywalking.apm.collector.storage.ui.service.ServiceMetric;
import org.apache.skywalking.apm.collector.storage.utils.DurationPoint;
import org.apache.skywalking.apm.collector.ui.utils.DurationUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author peng-yongsheng
*/
public class ServiceNameService {

private final Logger logger = LoggerFactory.getLogger(ServiceNameService.class);

private final IServiceNameServiceUIDAO serviceNameServiceUIDAO;
private final IServiceMetricUIDAO serviceMetricUIDAO;
private final ServiceNameCacheService serviceNameCacheService;
private final SecondBetweenService secondBetweenService;

public ServiceNameService(ModuleManager moduleManager) {
this.serviceNameServiceUIDAO = moduleManager.find(StorageModule.NAME).getService(IServiceNameServiceUIDAO.class);
this.serviceMetricUIDAO = moduleManager.find(StorageModule.NAME).getService(IServiceMetricUIDAO.class);
this.serviceNameCacheService = moduleManager.find(CacheModule.NAME).getService(ServiceNameCacheService.class);
this.secondBetweenService = new SecondBetweenService(moduleManager);
}

public int getCount() {
Expand Down Expand Up @@ -85,13 +92,18 @@ public SLATrend getServiceSLATrend(int serviceId, Step step, long start, long en
return slaTrend;
}

public List<ServiceMetric> getSlowService(Step step, long start, long end,
Integer top) throws ParseException {
List<ServiceMetric> slowServices = serviceMetricUIDAO.getSlowService(0, step, start, end, top, MetricSource.Callee);
public List<ServiceMetric> getSlowService(Step step, long startTimeBucket, long endTimeBucket,
long startSecondTimeBucket, long endSecondTimeBucket,
Integer topN) throws ParseException {
List<ServiceMetric> slowServices = serviceMetricUIDAO.getSlowService(0, step, startTimeBucket, endTimeBucket, topN, MetricSource.Callee);
slowServices.forEach(slowService -> {
slowService.setName(serviceNameCacheService.get(slowService.getId()).getServiceName());
//TODO
slowService.setTps(1);
ServiceName serviceName = serviceNameCacheService.get(slowService.getId());
slowService.setName(serviceName.getServiceName());
try {
slowService.setCallsPerSec((int)(slowService.getCalls() / secondBetweenService.calculate(serviceName.getApplicationId(), startSecondTimeBucket, endSecondTimeBucket)));
} catch (ParseException e) {
logger.error(e.getMessage(), e);
}
});
return slowServices;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type ServiceMetric {
name: String
# The unit is millisecond.
avgResponseTime: Int!
tps: Int!
callsPerSec: Int!
}

type TraceItem {
Expand Down
1 change: 1 addition & 0 deletions apm-sniffer/apm-sdk-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
<module>mysql-5.x-plugin</module>
<module>h2-1.x-plugin</module>
<module>postgresql-8.x-plugin</module>
<module>rocketMQ-3.x-plugin</module>
<module>rocketMQ-4.x-plugin</module>
<module>elastic-job-2.x-plugin</module>
<module>mongodb-2.x-plugin</module>
Expand Down
68 changes: 68 additions & 0 deletions apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-sdk-plugin</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>5.0.0-alpha-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>apm-rocketmq-3.x-plugin</artifactId>
<name>rocketMQ-3.x-plugin</name>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>3.6.2.Final</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
</plugin>
<plugin>
<!-- 源码插件 -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<!-- 发布时自动将源码同时发布的配置 -->
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.plugin.rocketMQ.v3;

import java.lang.reflect.Method;
import java.util.List;
import com.alibaba.rocketmq.common.message.MessageExt;
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;

/**
* {@link AbstractMessageConsumeInterceptor} create entry span when the <code>consumeMessage</code> in the {@link
* com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently} and {@link
* com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly} class.
*
* @author carlvine500
*/
public abstract class AbstractMessageConsumeInterceptor implements InstanceMethodsAroundInterceptor {

public static final String CONSUMER_OPERATION_NAME_PREFIX = "RocketMQ/";

@Override
public final void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
List<MessageExt> msgs = (List<MessageExt>)allArguments[0];

ContextCarrier contextCarrier = getContextCarrierFromMessage(msgs.get(0));
AbstractSpan span = ContextManager.createEntrySpan(CONSUMER_OPERATION_NAME_PREFIX + msgs.get(0).getTopic() + "/Consumer", contextCarrier);

span.setComponent(ComponentsDefine.ROCKET_MQ);
span.setLayer(SpanLayer.MQ);
for (int i = 1; i < msgs.size(); i++) {
ContextManager.extract(getContextCarrierFromMessage(msgs.get(i)));
}

}

@Override public final void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().errorOccurred().log(t);
}

private ContextCarrier getContextCarrierFromMessage(MessageExt message) {
ContextCarrier contextCarrier = new ContextCarrier();

CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
next.setHeadValue(message.getUserProperty(next.getHeadKey()));
}

return contextCarrier;
}
}

0 comments on commit 350ddc1

Please sign in to comment.