Skip to content

Commit

Permalink
[7.17](backport #33655) [metricbeat] [gcp] remove compute metadata ca…
Browse files Browse the repository at this point in the history
…che (#33948)

* [metricbeat] [gcp] remove compute metadata cache (#33655)

* remove compute metadata cache

* add logs and change map key type

* add import aliases

* add changelog entry

* cleanup comments

* remove comments

(cherry picked from commit 74edd05)

# Conflicts:
#	x-pack/metricbeat/module/gcp/metrics/compute/metadata.go

* fix CHANGELOG

* fix merge conflicts

* fix linter

* remove unused package

---------

Co-authored-by: Gabriel Pop <94497545+gpop63@users.noreply.github.com>
Co-authored-by: Edoardo Tenani <edoardo.tenani@elastic.co>
Co-authored-by: Edoardo Tenani <526307+endorama@users.noreply.github.com>
  • Loading branch information
4 people committed Feb 6, 2023
1 parent 3f3eb08 commit 6ab34dc
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 45 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Expand Up @@ -78,6 +78,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Metricbeat*

- Remove GCP Compute metadata cache {pull}33655[33655]

*Packetbeat*

Expand Down
100 changes: 61 additions & 39 deletions x-pack/metricbeat/module/gcp/metrics/compute/metadata.go
Expand Up @@ -6,41 +6,37 @@ package compute

import (
"context"
"fmt"
"strconv"
"strings"
"time"

"github.com/pkg/errors"
"google.golang.org/api/compute/v1"
compute "google.golang.org/api/compute/v1"
"google.golang.org/api/option"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/x-pack/metricbeat/module/gcp"
)

// NewMetadataService returns the specific Metadata service for a GCP Compute resource
func NewMetadataService(projectID, zone string, region string, opt ...option.ClientOption) (gcp.MetadataService, error) {
return &metadataCollector{
projectID: projectID,
zone: zone,
region: region,
opt: opt,
instanceCache: common.NewCache(30*time.Second, 13),
logger: logp.NewLogger("metrics-compute"),
projectID: projectID,
zone: zone,
region: region,
opt: opt,
computeInstances: make(map[uint64]*compute.Instance),
logger: logp.NewLogger("metrics-compute"),
}, nil
}

// computeMetadata is an object to store data in between the extraction and the writing in the destination (to uncouple
// reading and writing in the same method)
type computeMetadata struct {
projectID string
zone string
instanceID string
machineType string

ts *monitoringpb.TimeSeries

User map[string]string
Metadata map[string]string
Metrics interface{}
Expand All @@ -55,8 +51,8 @@ type metadataCollector struct {

computeMetadata *computeMetadata

instanceCache *common.Cache
logger *logp.Logger
computeInstances map[uint64]*compute.Instance
logger *logp.Logger
}

// Metadata implements googlecloud.MetadataCollector to the known set of labels from a Compute TimeSeries single point of data.
Expand All @@ -75,16 +71,25 @@ func (s *metadataCollector) Metadata(ctx context.Context, resp *monitoringpb.Tim
}

if resp.Resource != nil && resp.Resource.Labels != nil {
metadataCollectorData.ECS.Put(gcp.ECSCloudInstanceIDKey, resp.Resource.Labels[gcp.TimeSeriesResponsePathForECSInstanceID])
_, err = metadataCollectorData.ECS.Put(gcp.ECSCloudInstanceIDKey, resp.Resource.Labels[gcp.TimeSeriesResponsePathForECSInstanceID])
if err != nil {
return gcp.MetadataCollectorData{}, err
}
}

if resp.Metric.Labels != nil {
metadataCollectorData.ECS.Put(gcp.ECSCloudInstanceNameKey, resp.Metric.Labels[gcp.TimeSeriesResponsePathForECSInstanceName])
_, err = metadataCollectorData.ECS.Put(gcp.ECSCloudInstanceNameKey, resp.Metric.Labels[gcp.TimeSeriesResponsePathForECSInstanceName])
if err != nil {
return gcp.MetadataCollectorData{}, err
}
}

if s.computeMetadata.machineType != "" {
lastIndex := strings.LastIndex(s.computeMetadata.machineType, "/")
metadataCollectorData.ECS.Put(gcp.ECSCloudMachineTypeKey, s.computeMetadata.machineType[lastIndex+1:])
_, err = metadataCollectorData.ECS.Put(gcp.ECSCloudMachineTypeKey, s.computeMetadata.machineType[lastIndex+1:])
if err != nil {
return gcp.MetadataCollectorData{}, err
}
}

s.computeMetadata.Metrics = metadataCollectorData.Labels[gcp.LabelMetrics]
Expand All @@ -108,9 +113,9 @@ func (s *metadataCollector) Metadata(ctx context.Context, resp *monitoringpb.Tim
// instanceMetadata returns the labels of an instance
func (s *metadataCollector) instanceMetadata(ctx context.Context, instanceID, zone string) (*computeMetadata, error) {
// FIXME: remove side effect on metadataCollector instance and use return value instead
i, err := s.instance(ctx, instanceID, zone)
i, err := s.instance(ctx, instanceID)
if err != nil {
return nil, errors.Wrapf(err, "error trying to get data from instance '%s' in zone '%s'", instanceID, zone)
return nil, fmt.Errorf("error trying to get data from instance '%s' in zone '%s': %w", instanceID, zone, err)
}

s.computeMetadata = &computeMetadata{
Expand All @@ -119,6 +124,7 @@ func (s *metadataCollector) instanceMetadata(ctx context.Context, instanceID, zo
}

if i == nil {
s.logger.Debugf("couldn't find instance %s, call Instances.AggregatedList", instanceID)
return s.computeMetadata, nil
}

Expand All @@ -142,28 +148,18 @@ func (s *metadataCollector) instanceMetadata(ctx context.Context, instanceID, zo
}

// instance returns data from an instance ID using the cache or making a request
func (s *metadataCollector) instance(ctx context.Context, instanceID, zone string) (*compute.Instance, error) {
service, err := compute.NewService(ctx, s.opt...)
if err != nil {
return nil, errors.Wrapf(err, "error getting client from Compute service")
}
func (s *metadataCollector) instance(ctx context.Context, instanceID string) (*compute.Instance, error) {
s.getComputeInstances(ctx)

instanceCachedData := s.instanceCache.Get(instanceID)
if instanceCachedData != nil {
if computeInstance, ok := instanceCachedData.(*compute.Instance); ok {
return computeInstance, nil
}
instanceIdInt, _ := strconv.Atoi(instanceID)
computeInstance, ok := s.computeInstances[uint64(instanceIdInt)]
if ok {
return computeInstance, nil
}

if zone != "" {
instanceData, err := service.Instances.Get(s.projectID, zone, instanceID).Do()
if err != nil {
s.logger.Warnf("failed to get instance information for instance '%s' in zone '%s', skipping metadata for instance", instanceID, zone)
return nil, nil
}
s.instanceCache.Put(instanceID, instanceData)
return instanceData, nil
}
// Remake the compute instances map to avoid having stale data.
s.computeInstances = make(map[uint64]*compute.Instance)

return nil, nil
}

Expand All @@ -182,3 +178,29 @@ func (s *metadataCollector) instanceZone(ts *monitoringpb.TimeSeries) string {

return ""
}

func (s *metadataCollector) getComputeInstances(ctx context.Context) {
if len(s.computeInstances) > 0 {
return
}

s.logger.Debug("Compute API Instances.AggregatedList")

computeService, err := compute.NewService(ctx, s.opt...)
if err != nil {
s.logger.Errorf("error getting client from Compute service: %v", err)
return
}

req := computeService.Instances.AggregatedList(s.projectID)
if err := req.Pages(ctx, func(page *compute.InstanceAggregatedList) error {
for _, instancesScopedList := range page.Items {
for _, instance := range instancesScopedList.Instances {
s.computeInstances[instance.Id] = instance
}
}
return nil
}); err != nil {
s.logger.Errorf("google Instances.AggregatedList error: %v", err)
}
}
8 changes: 2 additions & 6 deletions x-pack/metricbeat/module/gcp/metrics/compute/metadata_test.go
Expand Up @@ -6,15 +6,12 @@ package compute

import (
"testing"
"time"

"github.com/golang/protobuf/ptypes/timestamp"
"github.com/stretchr/testify/assert"
"google.golang.org/genproto/googleapis/api/metric"
"google.golang.org/genproto/googleapis/api/monitoredres"
"google.golang.org/genproto/googleapis/monitoring/v3"

"github.com/elastic/beats/v7/libbeat/common"
monitoring "google.golang.org/genproto/googleapis/monitoring/v3"
)

var fake = &monitoring.TimeSeries{
Expand Down Expand Up @@ -67,8 +64,7 @@ var fake = &monitoring.TimeSeries{
}

var m = &metadataCollector{
projectID: "projectID",
instanceCache: common.NewCache(30*time.Second, 13),
projectID: "projectID",
}

func TestInstanceID(t *testing.T) {
Expand Down

0 comments on commit 6ab34dc

Please sign in to comment.