Skip to content

Commit

Permalink
gcp/observability: Add compressed metrics to observability module and…
Browse files Browse the repository at this point in the history
… synchronize View data with exporter (#6105)
  • Loading branch information
zasweq committed Mar 15, 2023
1 parent 52ca957 commit 36fd0a4
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 7 deletions.
12 changes: 12 additions & 0 deletions gcp/observability/observability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,18 @@ func (s) TestOpenCensusIntegration(t *testing.T) {
if value := fe.SeenViews["grpc.io/server/server_latency"]; value != TypeOpenCensusViewDistribution {
errs = append(errs, fmt.Errorf("grpc.io/server/server_latency: %s != %s", value, TypeOpenCensusViewDistribution))
}
if value := fe.SeenViews["grpc.io/client/sent_compressed_bytes_per_rpc"]; value != TypeOpenCensusViewDistribution {
errs = append(errs, fmt.Errorf("unexpected type for grpc.io/client/sent_compressed_bytes_per_rpc: %s != %s", value, TypeOpenCensusViewDistribution))
}
if value := fe.SeenViews["grpc.io/client/received_compressed_bytes_per_rpc"]; value != TypeOpenCensusViewDistribution {
errs = append(errs, fmt.Errorf("unexpected type for grpc.io/client/received_compressed_bytes_per_rpc: %s != %s", value, TypeOpenCensusViewDistribution))
}
if value := fe.SeenViews["grpc.io/server/sent_compressed_bytes_per_rpc"]; value != TypeOpenCensusViewDistribution {
errs = append(errs, fmt.Errorf("unexpected type for grpc.io/server/sent_compressed_bytes_per_rpc: %s != %s", value, TypeOpenCensusViewDistribution))
}
if value := fe.SeenViews["grpc.io/server/received_compressed_bytes_per_rpc"]; value != TypeOpenCensusViewDistribution {
errs = append(errs, fmt.Errorf("unexpected type for grpc.io/server/received_compressed_bytes_per_rpc: %s != %s", value, TypeOpenCensusViewDistribution))
}
if fe.SeenSpans <= 0 {
errs = append(errs, fmt.Errorf("unexpected number of seen spans: %v <= 0", fe.SeenSpans))
}
Expand Down
25 changes: 20 additions & 5 deletions gcp/observability/opencensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,19 @@ import (
var (
// It's a variable instead of const to speed up testing
defaultMetricsReportingInterval = time.Second * 30
defaultViews = []*view.View{
opencensus.ClientStartedRPCsView,
opencensus.ClientCompletedRPCsView,
opencensus.ClientRoundtripLatencyView,
opencensus.ClientSentCompressedBytesPerRPCView,
opencensus.ClientReceivedCompressedBytesPerRPCView,
opencensus.ClientAPILatencyView,
opencensus.ServerStartedRPCsView,
opencensus.ServerCompletedRPCsView,
opencensus.ServerSentCompressedBytesPerRPCView,
opencensus.ServerReceivedCompressedBytesPerRPCView,
opencensus.ServerLatencyView,
}
)

func labelsToMonitoringLabels(labels map[string]string) *stackdriver.Labels {
Expand Down Expand Up @@ -106,11 +119,8 @@ func startOpenCensus(config *config) error {
}

if config.CloudMonitoring != nil {
if err := view.Register(opencensus.ClientAPILatencyView, opencensus.ClientStartedRPCsView, opencensus.ClientCompletedRPCsView, opencensus.ClientRoundtripLatencyView); err != nil {
return fmt.Errorf("failed to register default client views: %v", err)
}
if err := view.Register(opencensus.ServerStartedRPCsView, opencensus.ServerCompletedRPCsView, opencensus.ServerLatencyView); err != nil {
return fmt.Errorf("failed to register default server views: %v", err)
if err := view.Register(defaultViews...); err != nil {
return fmt.Errorf("failed to register observability views: %v", err)
}
view.SetReportingPeriod(defaultMetricsReportingInterval)
view.RegisterExporter(exporter.(view.Exporter))
Expand All @@ -130,11 +140,16 @@ func stopOpenCensus() {
if exporter != nil {
internal.ClearGlobalDialOptions()
internal.ClearGlobalServerOptions()
// This Unregister call guarantees the data recorded gets sent to
// exporter, synchronising the view package and exporter. Doesn't matter
// if views not registered, will be a noop if not registered.
view.Unregister(defaultViews...)
// Call these unconditionally, doesn't matter if not registered, will be
// a noop if not registered.
trace.UnregisterExporter(exporter)
view.UnregisterExporter(exporter)

// This Flush call makes sure recorded telemetry get sent to backend.
exporter.Flush()
exporter.Close()
}
Expand Down
4 changes: 2 additions & 2 deletions stats/opencensus/server_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ var (
// ServerSentBytesPerRPCView is the distribution of received bytes per RPC,
// keyed on method.
ServerSentBytesPerRPCView = &view.View{
Name: "grpc.io/server/sent_compressed_bytes_per_rpc",
Name: "grpc.io/server/sent_bytes_per_rpc",
Description: "Distribution of sent bytes per RPC, by method.",
Measure: serverSentBytesPerRPC,
TagKeys: []tag.Key{keyServerMethod},
Expand All @@ -72,7 +72,7 @@ var (
// ServerSentCompressedBytesPerRPCView is the distribution of received
// compressed bytes per RPC, keyed on method.
ServerSentCompressedBytesPerRPCView = &view.View{
Name: "grpc.io/server/sent_bytes_per_rpc",
Name: "grpc.io/server/sent_compressed_bytes_per_rpc",
Description: "Distribution of sent compressed bytes per RPC, by method.",
Measure: serverSentCompressedBytesPerRPC,
TagKeys: []tag.Key{keyServerMethod},
Expand Down

0 comments on commit 36fd0a4

Please sign in to comment.