diff --git a/iam/go_mod_tidy_hack.go b/iam/go_mod_tidy_hack.go index fbdd65f60c08..c3f49e045d25 100644 --- a/iam/go_mod_tidy_hack.go +++ b/iam/go_mod_tidy_hack.go @@ -14,6 +14,7 @@ // This file, and the cloud.google.com/go import, won't actually become part of // the resultant binary. +//go:build modhack // +build modhack package iam diff --git a/spanner/batch.go b/spanner/batch.go index a7e3a5a63052..6c999da23740 100644 --- a/spanner/batch.go +++ b/spanner/batch.go @@ -143,7 +143,7 @@ func (t *BatchReadOnlyTransaction) PartitionReadUsingIndexWithOptions(ctx contex PartitionOptions: opt.toProto(), }, gax.WithGRPCOptions(grpc.Header(&md))) - if GFELatencyMetricsEnabled && md != nil && t.ct != nil { + if getGFELatencyMetricsFlag() && md != nil && t.ct != nil { if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "PartitionReadUsingIndexWithOptions"); err != nil { trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err) } @@ -204,7 +204,7 @@ func (t *BatchReadOnlyTransaction) partitionQuery(ctx context.Context, statement } resp, err := client.PartitionQuery(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req, gax.WithGRPCOptions(grpc.Header(&md))) - if GFELatencyMetricsEnabled && md != nil && t.ct != nil { + if getGFELatencyMetricsFlag() && md != nil && t.ct != nil { if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "partitionQuery"); err != nil { trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err) } @@ -273,7 +273,7 @@ func (t *BatchReadOnlyTransaction) Cleanup(ctx context.Context) { var md metadata.MD err := client.DeleteSession(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.DeleteSessionRequest{Name: sid}, gax.WithGRPCOptions(grpc.Header(&md))) - if GFELatencyMetricsEnabled && md != nil && t.ct != nil { + if getGFELatencyMetricsFlag() && md != nil && t.ct != nil { if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "Cleanup"); err != nil { trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err) } @@ -322,7 +322,7 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R return client, err } md, err := client.Header() - if GFELatencyMetricsEnabled && md != nil && t.ct != nil { + if getGFELatencyMetricsFlag() && md != nil && t.ct != nil { if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "Execute"); err != nil { trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err) } @@ -347,7 +347,7 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R } md, err := client.Header() - if GFELatencyMetricsEnabled && md != nil && t.ct != nil { + if getGFELatencyMetricsFlag() && md != nil && t.ct != nil { if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "Execute"); err != nil { trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err) } diff --git a/spanner/integration_test.go b/spanner/integration_test.go index f7018c846da0..2a05cd677040 100644 --- a/spanner/integration_test.go +++ b/spanner/integration_test.go @@ -3271,7 +3271,7 @@ func TestIntegration_GFE_Latency(t *testing.T) { defer cancel() te := testutil.NewTestExporter(GFEHeaderMissingCountView, GFELatencyView) - GFELatencyMetricsEnabled = true + setGFELatencyMetricsFlag(true) client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) defer cleanup() diff --git a/spanner/oc_test.go b/spanner/oc_test.go index d4ddc2baeba3..aac2fb916c9a 100644 --- a/spanner/oc_test.go +++ b/spanner/oc_test.go @@ -268,7 +268,7 @@ func TestOCStats_GFE_Latency(t *testing.T) { te := testutil.NewTestExporter([]*view.View{GFELatencyView, GFEHeaderMissingCountView}...) defer te.Unregister() - GFELatencyMetricsEnabled = true + setGFELatencyMetricsFlag(true) server, client, teardown := setupMockedTestServer(t) defer teardown() diff --git a/spanner/pdml.go b/spanner/pdml.go index 18b494ac9565..9b5b92c4e37f 100644 --- a/spanner/pdml.go +++ b/spanner/pdml.go @@ -119,7 +119,7 @@ func executePdml(ctx context.Context, sh *sessionHandle, req *sppb.ExecuteSqlReq Selector: &sppb.TransactionSelector_Id{Id: res.Id}, } resultSet, err := sh.getClient().ExecuteSql(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req, gax.WithGRPCOptions(grpc.Header(&md))) - if GFELatencyMetricsEnabled && md != nil && sh.session.pool != nil { + if getGFELatencyMetricsFlag() && md != nil && sh.session.pool != nil { err := captureGFELatencyStats(tag.NewContext(ctx, sh.session.pool.tagMap), md, "executePdml_ExecuteSql") if err != nil { trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err) diff --git a/spanner/sessionclient.go b/spanner/sessionclient.go index 22838e43e968..2ebef2b5b5e4 100644 --- a/spanner/sessionclient.go +++ b/spanner/sessionclient.go @@ -138,7 +138,7 @@ func (sc *sessionClient) createSession(ctx context.Context) (*session, error) { Session: &sppb.Session{Labels: sc.sessionLabels}, }, gax.WithGRPCOptions(grpc.Header(&md))) - if GFELatencyMetricsEnabled && md != nil { + if getGFELatencyMetricsFlag() && md != nil { _, instance, database, err := parseDatabaseName(sc.database) if err != nil { return nil, ToSpannerError(err) @@ -260,7 +260,7 @@ func (sc *sessionClient) executeBatchCreateSessions(client *vkit.Client, createC SessionTemplate: &sppb.Session{Labels: labels}, }, gax.WithGRPCOptions(grpc.Header(&mdForGFELatency))) - if GFELatencyMetricsEnabled && mdForGFELatency != nil { + if getGFELatencyMetricsFlag() && mdForGFELatency != nil { _, instance, database, err := parseDatabaseName(sc.database) if err != nil { trace.TracePrintf(ctx, nil, "Error getting instance and database name: %v", err) diff --git a/spanner/stats.go b/spanner/stats.go index 09a4b91697d9..0515b5d8fc20 100644 --- a/spanner/stats.go +++ b/spanner/stats.go @@ -18,6 +18,7 @@ import ( "context" "strconv" "strings" + "sync" "testing" "cloud.google.com/go/internal/version" @@ -42,8 +43,10 @@ var ( tagNumReadSessions = tag.Tag{Key: tagKeyType, Value: "num_read_sessions"} tagNumWriteSessions = tag.Tag{Key: tagKeyType, Value: "num_write_prepared_sessions"} tagKeyMethod = tag.MustNewKey("grpc_client_method") - // GFELatencyMetricsEnabled is used to track if GFELatency and GFEHeaderMissingCount need to be recorded - GFELatencyMetricsEnabled = false + // gfeLatencyMetricsEnabled is used to track if GFELatency and GFEHeaderMissingCount need to be recorded + gfeLatencyMetricsEnabled = false + // mutex to avoid data race in reading/writing the above flag + statsMu = sync.RWMutex{} ) func recordStat(ctx context.Context, m *stats.Int64Measure, n int64) { @@ -213,28 +216,40 @@ func EnableStatViews() error { // EnableGfeLatencyView enables GFELatency metric func EnableGfeLatencyView() error { - GFELatencyMetricsEnabled = true + setGFELatencyMetricsFlag(true) return view.Register(GFELatencyView) } // EnableGfeHeaderMissingCountView enables GFEHeaderMissingCount metric func EnableGfeHeaderMissingCountView() error { - GFELatencyMetricsEnabled = true + setGFELatencyMetricsFlag(true) return view.Register(GFEHeaderMissingCountView) } // EnableGfeLatencyAndHeaderMissingCountViews enables GFEHeaderMissingCount and GFELatency metric func EnableGfeLatencyAndHeaderMissingCountViews() error { - GFELatencyMetricsEnabled = true + setGFELatencyMetricsFlag(true) return view.Register( GFELatencyView, GFEHeaderMissingCountView, ) } +func getGFELatencyMetricsFlag() bool { + statsMu.RLock() + defer statsMu.RUnlock() + return gfeLatencyMetricsEnabled +} + +func setGFELatencyMetricsFlag(enable bool) { + statsMu.Lock() + gfeLatencyMetricsEnabled = enable + statsMu.Unlock() +} + // DisableGfeLatencyAndHeaderMissingCountViews disables GFEHeaderMissingCount and GFELatency metric func DisableGfeLatencyAndHeaderMissingCountViews() { - GFELatencyMetricsEnabled = false + setGFELatencyMetricsFlag(false) view.Unregister( GFELatencyView, GFEHeaderMissingCountView, @@ -267,12 +282,6 @@ func checkCommonTagsGFELatency(t *testing.T, m map[tag.Key]string) { if !strings.HasPrefix(m[tagKeyClientID], "client") { t.Fatalf("Incorrect client ID: %v", m[tagKeyClientID]) } - if !strings.HasPrefix(m[tagKeyInstance], "gotest") { - t.Fatalf("Incorrect instance ID: %v", m[tagKeyInstance]) - } - if !strings.HasPrefix(m[tagKeyDatabase], "gotest") { - t.Fatalf("Incorrect database ID: %v", m[tagKeyDatabase]) - } if m[tagKeyLibVersion] != version.Repo { t.Fatalf("Incorrect library version: %v", m[tagKeyLibVersion]) } diff --git a/spanner/transaction.go b/spanner/transaction.go index 6a9702cfc31a..cfa40eb03d76 100644 --- a/spanner/transaction.go +++ b/spanner/transaction.go @@ -194,7 +194,7 @@ func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys Key return client, err } md, err := client.Header() - if GFELatencyMetricsEnabled && md != nil && t.ct != nil { + if getGFELatencyMetricsFlag() && md != nil && t.ct != nil { if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "ReadWithOptions"); err != nil { trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err) } @@ -402,7 +402,7 @@ func (t *txReadOnly) query(ctx context.Context, statement Statement, options Que return client, err } md, err := client.Header() - if GFELatencyMetricsEnabled && md != nil && t.ct != nil { + if getGFELatencyMetricsFlag() && md != nil && t.ct != nil { if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "query"); err != nil { trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err) } @@ -577,7 +577,7 @@ func (t *ReadOnlyTransaction) begin(ctx context.Context) error { }, }, gax.WithGRPCOptions(grpc.Header(&md))) - if GFELatencyMetricsEnabled && md != nil && t.ct != nil { + if getGFELatencyMetricsFlag() && md != nil && t.ct != nil { if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "begin_BeginTransaction"); err != nil { trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err) } @@ -931,7 +931,7 @@ func (t *ReadWriteTransaction) update(ctx context.Context, stmt Statement, opts var md metadata.MD resultSet, err := sh.getClient().ExecuteSql(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req, gax.WithGRPCOptions(grpc.Header(&md))) - if GFELatencyMetricsEnabled && md != nil && t.ct != nil { + if getGFELatencyMetricsFlag() && md != nil && t.ct != nil { if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "update"); err != nil { trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err) } @@ -1006,7 +1006,7 @@ func (t *ReadWriteTransaction) batchUpdateWithOptions(ctx context.Context, stmts RequestOptions: createRequestOptions(opts.Priority, opts.RequestTag, t.txOpts.TransactionTag), }, gax.WithGRPCOptions(grpc.Header(&md))) - if GFELatencyMetricsEnabled && md != nil && t.ct != nil { + if getGFELatencyMetricsFlag() && md != nil && t.ct != nil { if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "batchUpdateWithOptions"); err != nil { trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", ToSpannerError(err)) }