Skip to content

Commit

Permalink
Add gzip compression option to metrics and logs (#576)
Browse files Browse the repository at this point in the history
* Add gzip compression option to metrics and logs

* Disallow trace compression config, use gzip.Name

* make gotidy

* Add test
  • Loading branch information
damemi committed Jan 27, 2023
1 parent 720e1b5 commit c6c6d07
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 2 deletions.
2 changes: 2 additions & 0 deletions exporter/collector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ The following configuration options are supported:
- `user_agent` (optional): Override the user agent string sent on requests to Cloud Monitoring (currently only applies to metrics). Specify `{{version}}` to include the application version number. Defaults to `opentelemetry-collector-contrib {{version}}`.
- `use_insecure` (optional): If true. use gRPC as their communication transport. Only has effect if Endpoint is not "".
- `timeout` (optional): Timeout for all API calls. If not set, defaults to 12 seconds.
- `compression` (optional): Enable gzip compression on gRPC calls for Metrics or Logs. Valid values: `gzip`.
- `resource_mappings` (optional): ResourceMapping defines mapping of resources from source (OpenCensus) to target (Google Cloud).
- `label_mappings` (optional): Optional flag signals whether we can proceed with transformation if a label is missing in the resource.
- `retry_on_failure` (optional): Configuration for how to handle retries when sending data to Google Cloud fails.
Expand Down Expand Up @@ -185,6 +186,7 @@ exporters:
metric:
prefix: prefix
skip_create_descriptor: true
compression: gzip

log:
default_log_name: my-app
Expand Down
15 changes: 15 additions & 0 deletions exporter/collector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/encoding/gzip"
)

const (
Expand Down Expand Up @@ -62,6 +63,9 @@ type ClientConfig struct {
GetClientOptions func() []option.ClientOption

Endpoint string `mapstructure:"endpoint"`
// Compression specifies the compression format for Metrics and Logging gRPC requests.
// Supported values: gzip.
Compression string `mapstructure:"compression"`
// Only has effect if Endpoint is not ""
UseInsecure bool `mapstructure:"use_insecure"`
// GRPCPoolSize sets the size of the connection pool in the GCP client
Expand Down Expand Up @@ -211,6 +215,17 @@ func ValidateConfig(cfg Config) error {
return fmt.Errorf("unable to parse resource filter regex: %s", err.Error())
}
}

if len(cfg.LogConfig.ClientConfig.Compression) > 0 && cfg.LogConfig.ClientConfig.Compression != gzip.Name {
return fmt.Errorf("unknown compression option '%s', allowed values: '', 'gzip'", cfg.LogConfig.ClientConfig.Compression)
}
if len(cfg.MetricConfig.ClientConfig.Compression) > 0 && cfg.MetricConfig.ClientConfig.Compression != gzip.Name {
return fmt.Errorf("unknown compression option '%s', allowed values: '', 'gzip'", cfg.MetricConfig.ClientConfig.Compression)
}
if len(cfg.TraceConfig.ClientConfig.Compression) > 0 {
return fmt.Errorf("traces.compression invalid: compression is only available for logs and metrics")
}

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion exporter/collector/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.34.2
github.com/census-instrumentation/opencensus-proto v0.4.1
github.com/google/go-cmp v0.5.9
github.com/googleapis/gax-go/v2 v2.7.0
github.com/stretchr/testify v1.8.1
go.opencensus.io v0.24.0
go.opentelemetry.io/collector/pdata v1.0.0-rc2
Expand Down Expand Up @@ -40,7 +41,6 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.1 // indirect
github.com/googleapis/gax-go/v2 v2.7.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,5 +218,13 @@ var MetricsTestCases = []TestCase{
ExpectFixturePath: "testdata/fixtures/metrics/prometheus_empty_buckets_expected.json",
SkipForSDK: true,
},
{
Name: "Basic Counter with gzip compression",
OTLPInputFixturePath: "testdata/fixtures/metrics/basic_counter_metrics.json",
ExpectFixturePath: "testdata/fixtures/metrics/basic_counter_metrics_expect.json",
ConfigureCollector: func(cfg *collector.Config) {
cfg.MetricConfig.ClientConfig.Compression = "gzip"
},
},
// TODO: Add integration tests for workload.googleapis.com metrics from the ops agent
}
9 changes: 9 additions & 0 deletions exporter/collector/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@ import (
loggingv2 "cloud.google.com/go/logging/apiv2"
logpb "cloud.google.com/go/logging/apiv2/loggingpb"
"google.golang.org/genproto/googleapis/api/monitoredres"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/protobuf/proto"

"github.com/googleapis/gax-go/v2"

"github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping"

"go.opentelemetry.io/collector/pdata/pcommon"
Expand Down Expand Up @@ -140,6 +144,11 @@ func NewGoogleCloudLogsExporter(
return nil, err
}

if cfg.LogConfig.ClientConfig.Compression == gzip.Name {
loggingClient.CallOptions.WriteLogEntries = append(loggingClient.CallOptions.WriteLogEntries,
gax.WithGRPCOptions(grpc.UseCompressor(gzip.Name)))
}

obs := selfObservability{
log: log,
}
Expand Down
14 changes: 14 additions & 0 deletions exporter/collector/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,15 @@ import (
"google.golang.org/genproto/googleapis/api/label"
metricpb "google.golang.org/genproto/googleapis/api/metric"
monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/googleapis/gax-go/v2"

"github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/collector/internal/datapointstorage"
"github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/collector/internal/normalization"
"github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping"
Expand Down Expand Up @@ -145,6 +149,16 @@ func NewGoogleCloudMetricsExporter(
if err != nil {
return nil, err
}

if cfg.MetricConfig.ClientConfig.Compression == gzip.Name {
client.CallOptions.CreateMetricDescriptor = append(client.CallOptions.CreateMetricDescriptor,
gax.WithGRPCOptions(grpc.UseCompressor(gzip.Name)))
client.CallOptions.CreateTimeSeries = append(client.CallOptions.CreateTimeSeries,
gax.WithGRPCOptions(grpc.UseCompressor(gzip.Name)))
client.CallOptions.CreateServiceTimeSeries = append(client.CallOptions.CreateServiceTimeSeries,
gax.WithGRPCOptions(grpc.UseCompressor(gzip.Name)))
}

obs := selfObservability{log: log}
shutdown := make(chan struct{})
normalizer := normalization.NewDisabledNormalizer()
Expand Down
2 changes: 1 addition & 1 deletion exporter/metric/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
require (
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/cloudmock v0.34.2
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.34.2
github.com/googleapis/gax-go/v2 v2.7.0
github.com/stretchr/testify v1.8.1
go.uber.org/multierr v1.8.0
)
Expand All @@ -37,7 +38,6 @@ require (
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.1 // indirect
github.com/googleapis/gax-go/v2 v2.7.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel/trace v1.11.2 // indirect
Expand Down
13 changes: 13 additions & 0 deletions exporter/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,13 @@ import (
"google.golang.org/genproto/googleapis/api/label"
googlemetricpb "google.golang.org/genproto/googleapis/api/metric"
monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/googleapis/gax-go/v2"

"github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping"
)

Expand Down Expand Up @@ -104,6 +108,15 @@ func newMetricExporter(o *options) (*metricExporter, error) {
return nil, err
}

if o.compression == "gzip" {
client.CallOptions.GetMetricDescriptor = append(client.CallOptions.CreateMetricDescriptor,
gax.WithGRPCOptions(grpc.UseCompressor(gzip.Name)))
client.CallOptions.CreateMetricDescriptor = append(client.CallOptions.CreateMetricDescriptor,
gax.WithGRPCOptions(grpc.UseCompressor(gzip.Name)))
client.CallOptions.CreateTimeSeries = append(client.CallOptions.CreateTimeSeries,
gax.WithGRPCOptions(grpc.UseCompressor(gzip.Name)))
}

cache := map[key]*googlemetricpb.MetricDescriptor{}
e := &metricExporter{
o: o,
Expand Down
9 changes: 9 additions & 0 deletions exporter/metric/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ type options struct {
// resource if the resource does not inherently belong to a specific
// project, e.g. on-premise resource like k8s_container or generic_task.
projectID string
// compression enables gzip compression on gRPC calls.
compression string
// monitoringClientOptions are additional options to be passed
// to the underlying Stackdriver Monitoring API client.
// Optional.
Expand Down Expand Up @@ -138,3 +140,10 @@ func WithDisableCreateMetricDescriptors() func(o *options) {
o.disableCreateMetricDescriptors = true
}
}

// WithCompression sets the compression to use for gRPC requests.
func WithCompression(c string) func(o *options) {
return func(o *options) {
o.compression = c
}
}

0 comments on commit c6c6d07

Please sign in to comment.