Skip to content

Commit

Permalink
[Metricbeat] Add support for multiple regions in GCP (#32964)
Browse files Browse the repository at this point in the history
* add regions config setting and pass it as argument

* add services region resource label constants

* add buildRegionsFilter and update getFilterForMetric logic

* add getFilterForMetric and buildRegionsFilter tests

* add changelog entry

* minor changes for golangci-lint

* add warn logs and remove redundant return

* add missing argument in warnf log
  • Loading branch information
gpop63 authored and chrisberkhout committed Jun 1, 2023
1 parent 3cd41e8 commit 986b2bf
Show file tree
Hide file tree
Showing 10 changed files with 269 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Add regex support for drop_fields processor.
- Improve compatibility and reduce flakyness of Python tests {pull}31588[31588]
- Added `.python-version` file {pull}32323[32323]
- Add support for multiple regions in GCP {pull}32964[32964]

==== Deprecated

Expand Down
9 changes: 8 additions & 1 deletion x-pack/metricbeat/module/gcp/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,14 @@ const (
LabelMetadata = "metadata"
)

// Available perSeriesAligner map
const (
DefaultResourceLabelZone = "resource.label.zone"
ComputeResourceLabelZone = "resource.labels.zone"
GKEResourceLabelLocation = "resource.label.location"
StorageResourceLabelLocation = "resource.label.location"
)

// AlignersMapToGCP map contains available perSeriesAligner
// https://cloud.google.com/monitoring/api/ref_v3/rest/v3/projects.alertPolicies#Aligner
var AlignersMapToGCP = map[string]monitoringpb.Aggregation_Aligner{
"ALIGN_NONE": monitoringpb.Aggregation_ALIGN_NONE,
Expand Down
1 change: 1 addition & 0 deletions x-pack/metricbeat/module/gcp/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type MetadataCollectorInputData struct {
ProjectID string
Zone string
Region string
Regions []string
Point *monitoringpb.Point
Timestamp *time.Time
}
Expand Down
18 changes: 10 additions & 8 deletions x-pack/metricbeat/module/gcp/metrics/compute/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ package compute

import (
"context"
"fmt"
"strconv"
"strings"
"time"

"github.com/pkg/errors"
"google.golang.org/api/compute/v1"
"google.golang.org/api/option"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
Expand All @@ -26,11 +26,12 @@ const (
)

// NewMetadataService returns the specific Metadata service for a GCP Compute resource
func NewMetadataService(projectID, zone string, region string, opt ...option.ClientOption) (gcp.MetadataService, error) {
func NewMetadataService(projectID, zone string, region string, regions []string, opt ...option.ClientOption) (gcp.MetadataService, error) {
return &metadataCollector{
projectID: projectID,
zone: zone,
region: region,
regions: regions,
opt: opt,
instanceCache: common.NewCache(cacheTTL, initialCacheSize),
logger: logp.NewLogger("metrics-compute"),
Expand All @@ -40,12 +41,12 @@ func NewMetadataService(projectID, zone string, region string, opt ...option.Cli
// computeMetadata is an object to store data in between the extraction and the writing in the destination (to uncouple
// reading and writing in the same method)
type computeMetadata struct {
projectID string
// projectID string
zone string
instanceID string
machineType string

ts *monitoringpb.TimeSeries
// ts *monitoringpb.TimeSeries

User map[string]string
Metadata map[string]string
Expand All @@ -57,6 +58,7 @@ type metadataCollector struct {
projectID string
zone string
region string
regions []string
opt []option.ClientOption
instanceCache *common.Cache
logger *logp.Logger
Expand All @@ -75,16 +77,16 @@ func (s *metadataCollector) Metadata(ctx context.Context, resp *monitoringpb.Tim
}

if resp.Resource != nil && resp.Resource.Labels != nil {
metadataCollectorData.ECS.Put(gcp.ECSCloudInstanceIDKey, resp.Resource.Labels[gcp.TimeSeriesResponsePathForECSInstanceID])
_, _ = metadataCollectorData.ECS.Put(gcp.ECSCloudInstanceIDKey, resp.Resource.Labels[gcp.TimeSeriesResponsePathForECSInstanceID])
}

if resp.Metric.Labels != nil {
metadataCollectorData.ECS.Put(gcp.ECSCloudInstanceNameKey, resp.Metric.Labels[gcp.TimeSeriesResponsePathForECSInstanceName])
_, _ = metadataCollectorData.ECS.Put(gcp.ECSCloudInstanceNameKey, resp.Metric.Labels[gcp.TimeSeriesResponsePathForECSInstanceName])
}

if computeMetadata.machineType != "" {
lastIndex := strings.LastIndex(computeMetadata.machineType, "/")
metadataCollectorData.ECS.Put(gcp.ECSCloudMachineTypeKey, computeMetadata.machineType[lastIndex+1:])
_, _ = metadataCollectorData.ECS.Put(gcp.ECSCloudMachineTypeKey, computeMetadata.machineType[lastIndex+1:])
}

computeMetadata.Metrics = metadataCollectorData.Labels[gcp.LabelMetrics]
Expand All @@ -109,7 +111,7 @@ func (s *metadataCollector) Metadata(ctx context.Context, resp *monitoringpb.Tim
func (s *metadataCollector) instanceMetadata(ctx context.Context, instanceID, zone string) (*computeMetadata, error) {
instance, err := s.instance(ctx, instanceID)
if err != nil {
return nil, errors.Wrapf(err, "error trying to get data from instance '%s'", instanceID)
return nil, fmt.Errorf("error trying to get data from instance '%s' %w", instanceID, err)
}

computeMetadata := &computeMetadata{
Expand Down
2 changes: 1 addition & 1 deletion x-pack/metricbeat/module/gcp/metrics/metadata_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
func NewMetadataServiceForConfig(c config, serviceName string) (gcp.MetadataService, error) {
switch serviceName {
case gcp.ServiceCompute:
return compute.NewMetadataService(c.ProjectID, c.Zone, c.Region, c.opt...)
return compute.NewMetadataService(c.ProjectID, c.Zone, c.Region, c.Regions, c.opt...)
default:
return nil, nil
}
Expand Down
86 changes: 82 additions & 4 deletions x-pack/metricbeat/module/gcp/metrics/metrics_requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,81 @@ func (r *metricsRequester) Metrics(ctx context.Context, serviceName string, alig
return results, nil
}

func (r *metricsRequester) buildRegionsFilter(regions []string, label string) string {
if len(regions) == 0 {
return ""
}

var filter strings.Builder

// No. of regions added to the filter string.
var regionsCount uint

for _, region := range regions {
// If 1 region has been added and the iteration continues, add the OR operator.
if regionsCount > 0 {
filter.WriteString("OR")
filter.WriteString(" ")
}

filter.WriteString(fmt.Sprintf("%s = starts_with(\"%s\")", label, strings.TrimSuffix(region, "*")))
filter.WriteString(" ")

regionsCount++
}

switch {
// If the filter string has more than 1 region, parentheses are added for better filter readability.
case regionsCount > 1:
return fmt.Sprintf("(%s)", strings.TrimSpace(filter.String()))
default:
return strings.TrimSpace(filter.String())
}
}

// getFilterForMetric returns the filter associated with the corresponding filter. Some services like Pub/Sub fails
// if they have a region specified.
func (r *metricsRequester) getFilterForMetric(serviceName, m string) string {
f := fmt.Sprintf(`metric.type="%s"`, m)
if r.config.Zone == "" && r.config.Region == "" {
if r.config.Zone == "" && r.config.Region == "" && len(r.config.Regions) == 0 {
return f
}

switch serviceName {
case gcp.ServiceCompute:
if r.config.Region != "" && r.config.Zone != "" {
r.logger.Warnf("when region %s and zone %s config parameter "+
"both are provided, only use region", r.config.Regions, r.config.Zone)
}

if r.config.Region != "" && len(r.config.Regions) != 0 {
r.logger.Warnf("when region %s and regions config parameters are both provided, use region", r.config.Region)
}

if r.config.Region != "" {
f = fmt.Sprintf(
"%s AND %s = starts_with(\"%s\")",
f,
gcp.ComputeResourceLabelZone,
strings.TrimSuffix(r.config.Region, "*"),
)
break
}

if r.config.Zone != "" {
f = fmt.Sprintf(
"%s AND %s = starts_with(\"%s\")",
f,
gcp.ComputeResourceLabelZone,
strings.TrimSuffix(r.config.Zone, "*"),
)
break
}

if len(r.config.Regions) != 0 {
regionsFilter := r.buildRegionsFilter(r.config.Regions, gcp.ComputeResourceLabelZone)
f = fmt.Sprintf("%s AND %s", f, regionsFilter)
}
case gcp.ServiceGKE:
if r.config.Region != "" && r.config.Zone != "" {
r.logger.Warnf("when region %s and zone %s config parameter "+
Expand All @@ -131,15 +197,27 @@ func (r *metricsRequester) getFilterForMetric(serviceName, m string) string {
// }
zone = strings.TrimSuffix(zone, "*")
f = fmt.Sprintf("%s AND resource.label.location=starts_with(\"%s\")", f, zone)
break
}

if len(r.config.Regions) != 0 {
regionsFilter := r.buildRegionsFilter(r.config.Regions, gcp.GKEResourceLabelLocation)
f = fmt.Sprintf("%s AND %s", f, regionsFilter)
}
case gcp.ServicePubsub, gcp.ServiceLoadBalancing, gcp.ServiceCloudFunctions, gcp.ServiceFirestore, gcp.ServiceDataproc:
return f
case gcp.ServiceStorage:
if r.config.Region == "" {
return f
if r.config.Region != "" && len(r.config.Regions) != 0 {
r.logger.Warnf("when region %s and regions config parameters are both provided, use region", r.config.Region)
}

f = fmt.Sprintf(`%s AND resource.labels.location = "%s"`, f, r.config.Region)
switch {
case r.config.Region != "":
f = fmt.Sprintf(`%s AND resource.labels.location = "%s"`, f, r.config.Region)
case len(r.config.Regions) != 0:
regionsFilter := r.buildRegionsFilter(r.config.Regions, gcp.StorageResourceLabelLocation)
f = fmt.Sprintf("%s AND %s", f, regionsFilter)
}
default:
if r.config.Region != "" && r.config.Zone != "" {
r.logger.Warnf("when region %s and zone %s config parameter "+
Expand Down
Loading

0 comments on commit 986b2bf

Please sign in to comment.