Skip to content

Commit

Permalink
feat: request count for aliyun plugin (#815)
Browse files Browse the repository at this point in the history
  • Loading branch information
kongfei605 committed Mar 1, 2024
1 parent 9fcdccb commit f7d5c36
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 19 deletions.
62 changes: 54 additions & 8 deletions inputs/aliyun/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func (ins *Instance) Init() error {

err := ins.initialize()
if err != nil {
log.Println("E! initialize error:", err)
return err
}

Expand Down Expand Up @@ -205,13 +206,13 @@ func (f *metricCache) isValid() bool {
}

// getFilteredMetrics returns metrics specified in the config file or metrics listed from Cloudwatch.
func (ins *Instance) getFilteredMetrics() ([]filteredMetric, error) {
func (ins *Instance) getFilteredMetrics(slist *types.SampleList) ([]filteredMetric, error) {
if ins.metricCache != nil && ins.metricCache.isValid() {
return ins.metricCache.metrics, nil
}
fMetrics := []filteredMetric{}

allMetrics, err := ins.fetchNamespaceMetrics(ins.Namespaces)
allMetrics, err := ins.fetchNamespaceMetrics(slist, ins.Namespaces)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -283,7 +284,7 @@ func (ins *Instance) Gather(slist *types.SampleList) {
}
}
} else {
filteredMetrics, err := ins.getFilteredMetrics()
filteredMetrics, err := ins.getFilteredMetrics(slist)
if err != nil {
log.Println("E!", err)
return
Expand Down Expand Up @@ -319,9 +320,14 @@ func (ins *Instance) sendMetrics(metric internalTypes.Metric, wg *sync.WaitGroup
if !ins.windowStart.IsZero() {
req.StartTime = tea.String(ins.windowStart.Format(timefmt))
}
points, err := ins.client.GetMetric(ctx, req)
n, points, err := ins.client.GetMetric(ctx, req)
slist.PushFront(types.NewSample(inputName, "cms_request_count", n, map[string]string{
"namespace": metric.Namespace,
"metric_name": metric.MetricName,
"callee": "DescribeMetricList",
}).SetTime(time.Now()))
if err != nil {
log.Println("E! get metrics error,", err)
log.Printf("E! get metrics %s::%s error, %s", metric.Namespace, metric.MetricName, err)
return
}
for _, point := range points {
Expand Down Expand Up @@ -364,9 +370,44 @@ func (ins *Instance) makeLabels(point internalTypes.Point, labels ...map[string]
if len(point.NodeID) != 0 {
result["node_id"] = point.NodeID
}
if len(point.ListenerPort) != 0 {
result["listener_port"] = point.ListenerPort
}
if len(point.ListenerProtocol) != 0 {
result["listener_protocol"] = point.ListenerProtocol
}
if len(point.LoadBalancerID) != 0 {
result["load_balancer_id"] = point.LoadBalancerID
}
if len(point.Device) != 0 {
result["device"] = point.Device
}
if len(point.CenID) != 0 {
result["cen_id"] = point.CenID
result["src_region_id"] = point.SrcRegion
result["dst_region_id"] = point.DstRegion
}
if len(point.GroupID) != 0 {
result["group_id"] = point.GroupID
}
if len(point.Topic) != 0 {
result["topic"] = point.Topic
}
if len(point.ExchangeName) != 0 {
result["exchange_name"] = point.ExchangeName
}
if len(point.VHostName) != 0 {
result["vhost_name"] = point.VHostName
}
if len(point.RegionID) != 0 {
result["region_id"] = point.RegionID
}
if len(point.QueueName) != 0 {
result["queue_name"] = point.QueueName
}
if len(point.VHostQueue) != 0 {
result["vhost_queue"] = point.VHostQueue
}
return result
}

Expand All @@ -385,8 +426,7 @@ func (ins *Instance) updateWindow(relativeTo time.Time) {
}

// fetchNamespaceMetrics retrieves available metrics for a given aliyun namespace.
func (ins *Instance) fetchNamespaceMetrics(namespaces []string) ([]internalTypes.Metric, error) {
// func (ins *Instance) fetchNamespaceMetrics() ([]*cms20190101.DescribeMetricMetaListResponseBodyResourcesResource, error) {
func (ins *Instance) fetchNamespaceMetrics(slist *types.SampleList, namespaces []string) ([]internalTypes.Metric, error) {
var params *cms20190101.DescribeMetricMetaListRequest
// namespaces := ins.Namespaces
if len(namespaces) == 0 {
Expand All @@ -398,7 +438,13 @@ func (ins *Instance) fetchNamespaceMetrics(namespaces []string) ([]internalTypes
params = &cms20190101.DescribeMetricMetaListRequest{
Namespace: tea.String(namespaces[i]),
}
resp, err := ins.client.ListMetrics(context.Background(), params)

n, resp, err := ins.client.ListMetrics(context.Background(), params)
slist.PushFront(types.NewSample(inputName, "cms_request_count", n, map[string]string{
"namespace": namespace,
"callee": "DescribeMetricMetaList",
}).SetTime(time.Now()))

if err != nil {
log.Printf("E! failed to list metrics with namespace %s: %v", namespace, err)
// skip problem namespace on error and continue to next namespace
Expand Down
29 changes: 19 additions & 10 deletions inputs/aliyun/internal/manager/cms.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ const (
MaxPageNum = 99
)

func (m *Manager) ListMetrics(ctx context.Context, req *cms20190101.DescribeMetricMetaListRequest) ([]*cms20190101.DescribeMetricMetaListResponseBodyResourcesResource, error) {
func (m *Manager) ListMetrics(ctx context.Context, req *cms20190101.DescribeMetricMetaListRequest) (int, []*cms20190101.DescribeMetricMetaListResponseBodyResourcesResource, error) {
count := 1
if req.PageNumber == nil {
req.SetPageNumber(DefaultPageNum)
}
Expand All @@ -39,11 +40,11 @@ func (m *Manager) ListMetrics(ctx context.Context, req *cms20190101.DescribeMetr
}
resp, err := m.cms.DescribeMetricMetaList(req)
if err != nil {
return nil, err
return count, nil, err
}
totalCount, err := strconv.Atoi(*resp.Body.TotalCount)
if err != nil {
return nil, err
return count, nil, err
}

pageSize, pageNum := pageCaculator(totalCount)
Expand All @@ -52,14 +53,15 @@ func (m *Manager) ListMetrics(ctx context.Context, req *cms20190101.DescribeMetr
resources = append(resources, resp.Body.Resources.Resource...)
var i int32
for i = 2; i < 2+pageNum; i++ {
count++
req.SetPageNumber(i)
resp, err := m.cms.DescribeMetricMetaList(req)
if err != nil {
return nil, err
return count, nil, err
}
resources = append(resources, resp.Body.Resources.Resource...)
}
return resources, nil
return count, resources, nil
}

func pageCaculator(totalCount int) (size, num int32) {
Expand Down Expand Up @@ -116,36 +118,43 @@ func (m *Manager) dataPointConverter(metricName, ns, datapoints string) ([]types
r.Value = tea.Float64(*point.Avg)
result = append(result, r)
}
if point.Sum != nil {
r.MetricName = fmt.Sprintf("%s_%s", stringx.SnakeCase(metricName), "sum")
r.Value = tea.Float64(*point.Sum)
result = append(result, r)
}
}
return result, nil
}
func (m *Manager) GetMetric(ctx context.Context, req *cms20190101.DescribeMetricListRequest) ([]types.Point, error) {
func (m *Manager) GetMetric(ctx context.Context, req *cms20190101.DescribeMetricListRequest) (int, []types.Point, error) {

count := 1
resp, err := m.cms.DescribeMetricList(req)
result := make([]types.Point, 0, 100)
if err != nil {
return nil, err
return count, nil, err
}
points, err := m.dataPointConverter(*req.MetricName, *req.Namespace, *resp.Body.Datapoints)
if err != nil {
return nil, err
return count, nil, err
}
result = append(result, points...)
for resp.Body != nil && resp.Body.NextToken != nil {
req.NextToken = resp.Body.NextToken
count++
resp, err = m.cms.DescribeMetricList(req)
if err != nil {
log.Println(err)
continue
}
points, err := m.dataPointConverter(*req.MetricName, *req.Namespace, *resp.Body.Datapoints)
if err != nil {
return nil, err
return count, nil, err
}
result = append(result, points...)
}

return result, nil
return count, result, nil
}

func (m *Manager) GetEcsHosts() ([]*cms20190101.DescribeMonitoringAgentHostsResponseBodyHostsHost, error) {
Expand Down
24 changes: 23 additions & 1 deletion inputs/aliyun/internal/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,35 @@ type (
Val *float64 `json:"Value,omitempty"`
Value *float64 `josn:"-"`

// alb
LoadBalancerID string `json:"loadBalancerId"`
ListenerProtocol string `json:"listenerProtocol"`
ListenerPort string `json:"listenerPort"`

Device string `json:"device"`

// acs_cen
CenID string `json:"cenId"`
SrcRegion string `json:"srcRegionId"`
DstRegion string `json:"dstRegionId"`

// filter
LabelStr string `json:"-"`
Dimensions string `json:"-"`
Namespace string `json:"-"`
MetricName string `json:"-"`

Device string `json:"device"`
// rocketMq
GroupID string `json:"groupId,omitempty"`
Sum *float64 `json:"Sum,omitempty"`
Topic string `json:"topic,omitempty"`

// rabbitMq
ExchangeName string `json:"exchangeName,omitempty"`
VHostName string `json:"vhostName,omitempty"`
RegionID string `json:"regionId,omitempty"`
QueueName string `json:"queueName,omitempty"`
VHostQueue string `json:"vhostQueue,omitempty"`
}
)

Expand Down

0 comments on commit f7d5c36

Please sign in to comment.