Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: fix StatementStatistics.Nodes list #106587

Merged
merged 1 commit into from Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/ccl/testccl/sqlstatsccl/BUILD.bazel
Expand Up @@ -18,13 +18,15 @@ go_test(
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/appstatspb",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
],
)
174 changes: 134 additions & 40 deletions pkg/ccl/testccl/sqlstatsccl/sql_stats_test.go
Expand Up @@ -13,26 +13,33 @@ import (
gosql "database/sql"
"encoding/json"
"fmt"
"strconv"
"strings"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/appstatspb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestSQLStatsRegions(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderRace(t, "test is to slow for race")
skip.UnderStress(t, "test is too heavy to run under stress")

// We build a small multiregion cluster, with the proper settings for
Expand All @@ -44,30 +51,78 @@ func TestSQLStatsRegions(t *testing.T) {
sql.SecondaryTenantsMultiRegionAbstractionsEnabled.Override(ctx, &st.SV, true)
sql.SecondaryTenantZoneConfigsEnabled.Override(ctx, &st.SV, true)

numServers := 9
numServers := 3
regionNames := []string{
"gcp-us-west1",
"gcp-us-central1",
"gcp-us-east1",
}

serverArgs := make(map[int]base.TestServerArgs)
signalAfter := make([]chan struct{}, numServers)
for i := 0; i < numServers; i++ {
serverArgs[i] = base.TestServerArgs{
signalAfter[i] = make(chan struct{})
args := base.TestServerArgs{
Settings: st,
Locality: roachpb.Locality{
Tiers: []roachpb.Tier{{Key: "region", Value: regionNames[i%len(regionNames)]}},
},
// We'll start our own test tenant manually below.
DefaultTestTenant: base.TestTenantDisabled,
}

serverKnobs := &server.TestingKnobs{
SignalAfterGettingRPCAddress: signalAfter[i],
}

args.Knobs.Server = serverKnobs
serverArgs[i] = args
}

host := testcluster.StartTestCluster(t, numServers, base.TestClusterArgs{
ServerArgsPerNode: serverArgs,
ParallelStart: true,
})
defer host.Stopper().Stop(ctx)

go func() {
for _, c := range signalAfter {
<-c
}
}()

tdb := sqlutils.MakeSQLRunner(host.ServerConn(1))

// Shorten the closed timestamp target duration so that span configs
// propagate more rapidly.
tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '200ms'`)
tdb.Exec(t, "SET CLUSTER SETTING kv.allocator.load_based_rebalancing = off")
tdb.Exec(t, "SET CLUSTER SETTING kv.allocator.min_lease_transfer_interval = '10ms'")
// Lengthen the lead time for the global tables to prevent overload from
// resulting in delays in propagating closed timestamps and, ultimately
// forcing requests from being redirected to the leaseholder. Without this
// change, the test sometimes is flakey because the latency budget allocated
// to closed timestamp propagation proves to be insufficient. This value is
// very cautious, and makes this already slow test even slower.
tdb.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '50 ms'")
tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.lead_for_global_reads_override = '1500ms'`)
tdb.Exec(t, `ALTER TENANT ALL SET CLUSTER SETTING spanconfig.reconciliation_job.checkpoint_interval = '500ms'`)

tdb.Exec(t, "ALTER TENANT ALL SET CLUSTER SETTING sql.zone_configs.allow_for_secondary_tenant.enabled = true")
tdb.Exec(t, "ALTER TENANT ALL SET CLUSTER SETTING sql.multi_region.allow_abstractions_for_secondary_tenants.enabled = true")
tdb.Exec(t, `ALTER RANGE meta configure zone using constraints = '{"+region=gcp-us-west1": 1, "+region=gcp-us-central1": 1, "+region=gcp-us-east1": 1}';`)

// Create secondary tenants
var tenantDbs []*gosql.DB
for _, server := range host.Servers {
_, tenantDb := serverutils.StartTenant(t, server, base.TestTenantArgs{
Settings: st,
TenantID: roachpb.MustMakeTenantID(11),
Locality: *server.Locality(),
})
tenantDbs = append(tenantDbs, tenantDb)
}

testCases := []struct {
name string
db func(t *testing.T, host *testcluster.TestCluster, st *cluster.Settings) *sqlutils.SQLRunner
Expand All @@ -84,24 +139,16 @@ func TestSQLStatsRegions(t *testing.T) {
// connection to the first one.
name: "secondary tenant",
db: func(t *testing.T, host *testcluster.TestCluster, st *cluster.Settings) *sqlutils.SQLRunner {
var dbs []*gosql.DB
for _, server := range host.Servers {
_, db := serverutils.StartTenant(t, server, base.TestTenantArgs{
Settings: st,
TenantID: roachpb.MustMakeTenantID(11),
Locality: *server.Locality(),
})
dbs = append(dbs, db)
}
return sqlutils.MakeSQLRunner(dbs[0])
return sqlutils.MakeSQLRunner(tenantDbs[1])
},
}}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
db := tc.db(t, host, st)

// Create a multi-region database.
db.Exec(t, "SET enable_multiregion_placement_policy = true")
db.Exec(t, `SET CLUSTER SETTING sql.txn_stats.sample_rate = 1;`)

db.Exec(t, fmt.Sprintf(`CREATE DATABASE testdb PRIMARY REGION "%s" PLACEMENT RESTRICTED`, regionNames[0]))
for i := 1; i < len(regionNames); i++ {
db.Exec(t, fmt.Sprintf(`ALTER DATABASE testdb ADD region "%s"`, regionNames[i]))
Expand All @@ -113,39 +160,86 @@ func TestSQLStatsRegions(t *testing.T) {

// Add some data to each region.
for i, regionName := range regionNames {
db.Exec(t, "INSERT INTO test (crdb_region, a) VALUES ($1, $2)", regionName, i)
db.Exec(t, "INSERT INTO test (a, crdb_region) VALUES ($1, $2)", i, regionName)
}

// Select from the table and see what statement statistics were written.
db.Exec(t, "SET application_name = $1", t.Name())
db.Exec(t, "SELECT * FROM test")
row := db.QueryRow(t, `
// It takes a while for the region replication to complete.
testutils.SucceedsWithin(t, func() error {
var expectedNodes []int64
var expectedRegions []string
_, err := db.DB.ExecContext(ctx, `USE testdb`)
if err != nil {
return err
}

// Use EXPLAIN ANALYSE (DISTSQL) to get the accurate list of nodes.
explainInfo, err := db.DB.QueryContext(ctx, `EXPLAIN ANALYSE (DISTSQL) SELECT * FROM test`)
if err != nil {
return err
}
for explainInfo.Next() {
var explainStr string
if err := explainInfo.Scan(&explainStr); err != nil {
t.Fatal(err)
}

explainStr = strings.ReplaceAll(explainStr, " ", "")
// Example str " regions: cp-us-central1,gcp-us-east1,gcp-us-west1"
if strings.HasPrefix(explainStr, "regions:") {
explainStr = strings.ReplaceAll(explainStr, "regions:", "")
explainStr = strings.ReplaceAll(explainStr, " ", "")
expectedRegions = strings.Split(explainStr, ",")
if len(expectedRegions) < len(regionNames) {
return fmt.Errorf("rows are not replicated to all regions %s\n", expectedRegions)
}
}

// Example str " nodes: n1, n2, n4, n9"
if strings.HasPrefix(explainStr, "nodes:") {
explainStr = strings.ReplaceAll(explainStr, "nodes:", "")
explainStr = strings.ReplaceAll(explainStr, "n", "")

split := strings.Split(explainStr, ",")
if len(split) < len(regionNames) {
return fmt.Errorf("rows are not replicated to all regions %s\n", split)
}

// Gateway node was not included in the explain plan. Add it to the list
if split[0] != "1" {
expectedNodes = append(expectedNodes, int64(1))
}

for _, val := range split {
node, err := strconv.Atoi(val)
require.NoError(t, err)
expectedNodes = append(expectedNodes, int64(node))
}
}
}

// Select from the table and see what statement statistics were written.
db.Exec(t, "SET application_name = $1", t.Name())
db.Exec(t, "SELECT * FROM test")
row := db.QueryRow(t, `
SELECT statistics->>'statistics'
FROM crdb_internal.statement_statistics
WHERE app_name = $1`, t.Name())

var actualJSON string
row.Scan(&actualJSON)
var actual appstatspb.StatementStatistics
err := json.Unmarshal([]byte(actualJSON), &actual)
require.NoError(t, err)

require.Equal(t,
appstatspb.StatementStatistics{
// TODO(todd): It appears we do not yet reliably record
// the nodes for the statement. (I have manually verified
// that the above query does indeed fan out across the
// regions, via EXPLAIN (DISTSQL).) Filed as #96647.
//Nodes: []int64{1, 2, 3},
//Regions: regionNames,
Nodes: []int64{1},
Regions: []string{regionNames[0]},
},
appstatspb.StatementStatistics{
Nodes: actual.Nodes,
Regions: actual.Regions,
},
)
var actualJSON string
row.Scan(&actualJSON)
var actual appstatspb.StatementStatistics
err = json.Unmarshal([]byte(actualJSON), &actual)
require.NoError(t, err)

// Replication to all regions can take some time to complete. During
// this time a incomplete list will be returned.
if !assert.ObjectsAreEqual(expectedNodes, actual.Nodes) {
return fmt.Errorf("nodes are not equal. Expected: %d, Actual: %d", expectedNodes, actual.Nodes)
}

require.Equal(t, expectedRegions, actual.Regions)
return nil
}, 3*time.Minute)
})
}
}
9 changes: 3 additions & 6 deletions pkg/sql/colflow/vectorized_flow.go
Expand Up @@ -19,7 +19,6 @@ import (
"time"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/col/coldataext"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -467,9 +466,7 @@ func (s *vectorizedFlowCreator) wrapWithNetworkVectorizedStatsCollector(
// statistics that the outbox is responsible for, nil is returned if stats are
// not being collected.
func (s *vectorizedFlowCreator) makeGetStatsFnForOutbox(
flowCtx *execinfra.FlowCtx,
statsCollectors []colexecop.VectorizedStatsCollector,
originSQLInstanceID base.SQLInstanceID,
flowCtx *execinfra.FlowCtx, statsCollectors []colexecop.VectorizedStatsCollector,
) func(context.Context) []*execinfrapb.ComponentStats {
if !s.recordingStats {
return nil
Expand All @@ -489,7 +486,7 @@ func (s *vectorizedFlowCreator) makeGetStatsFnForOutbox(
// whole flow from parent monitors. These stats are added to a
// flow-level span.
result = append(result, &execinfrapb.ComponentStats{
Component: execinfrapb.FlowComponentID(originSQLInstanceID, flowCtx.ID),
Component: flowCtx.FlowComponentID(),
FlowStats: execinfrapb.FlowStats{
MaxMemUsage: optional.MakeUint(uint64(flowCtx.Mon.MaximumBytes())),
MaxDiskUsage: optional.MakeUint(uint64(flowCtx.DiskMonitor.MaximumBytes())),
Expand Down Expand Up @@ -1069,7 +1066,7 @@ func (s *vectorizedFlowCreator) setupOutput(
// Set up an Outbox.
outbox, err := s.setupRemoteOutputStream(
ctx, flowCtx, pspec.ProcessorID, opWithMetaInfo, opOutputTypes, outputStream, factory,
s.makeGetStatsFnForOutbox(flowCtx, opWithMetaInfo.StatsCollectors, outputStream.OriginNodeID),
s.makeGetStatsFnForOutbox(flowCtx, opWithMetaInfo.StatsCollectors),
)
if err != nil {
return err
Expand Down
13 changes: 6 additions & 7 deletions pkg/sql/conn_executor_exec.go
Expand Up @@ -1691,15 +1691,15 @@ func (ex *connExecutor) dispatchToExecutionEngine(
ppInfo.dispatchToExecutionEngine.cleanup.appendFunc(namedFunc{
fName: "populate query level stats and regions",
f: func() {
populateQueryLevelStatsAndRegions(ctx, &curPlanner, ex.server.cfg, ppInfo.dispatchToExecutionEngine.queryStats, &ex.cpuStatsCollector)
populateQueryLevelStats(ctx, &curPlanner, ex.server.cfg, ppInfo.dispatchToExecutionEngine.queryStats, &ex.cpuStatsCollector)
ppInfo.dispatchToExecutionEngine.stmtFingerprintID = ex.recordStatementSummary(
ctx, &curPlanner,
int(ex.state.mu.autoRetryCounter), ppInfo.dispatchToExecutionEngine.rowsAffected, ppInfo.curRes.ErrAllowReleased(), *ppInfo.dispatchToExecutionEngine.queryStats,
)
},
})
} else {
populateQueryLevelStatsAndRegions(ctx, planner, ex.server.cfg, &stats, &ex.cpuStatsCollector)
populateQueryLevelStats(ctx, planner, ex.server.cfg, &stats, &ex.cpuStatsCollector)
stmtFingerprintID = ex.recordStatementSummary(
ctx, planner,
int(ex.state.mu.autoRetryCounter), res.RowsAffected(), res.Err(), stats,
Expand Down Expand Up @@ -1747,12 +1747,11 @@ func (ex *connExecutor) dispatchToExecutionEngine(
return err
}

// populateQueryLevelStatsAndRegions collects query-level execution statistics
// populateQueryLevelStats collects query-level execution statistics
// and populates it in the instrumentationHelper's queryLevelStatsWithErr field.
// Query-level execution statistics are collected using the statement's trace
// and the plan's flow metadata. It also populates the regions field and
// annotates the explainPlan field of the instrumentationHelper.
func populateQueryLevelStatsAndRegions(
// and the plan's flow metadata.
func populateQueryLevelStats(
ctx context.Context,
p *planner,
cfg *ExecutorConfig,
Expand Down Expand Up @@ -1794,7 +1793,7 @@ func populateQueryLevelStatsAndRegions(
}
}
if ih.traceMetadata != nil && ih.explainPlan != nil {
ih.regions = ih.traceMetadata.annotateExplain(
ih.traceMetadata.annotateExplain(
ih.explainPlan,
trace,
cfg.TestingKnobs.DeterministicExplain,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/exec_log.go
Expand Up @@ -358,7 +358,7 @@ func (p *planner) maybeLogStatementInternal(
ApplyJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.ApplyJoin]),
ZigZagJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.ZigZagJoin]),
ContentionNanos: queryLevelStats.ContentionTime.Nanoseconds(),
Regions: p.curPlan.instrumentation.regions,
Regions: queryLevelStats.Regions,
NetworkBytesSent: queryLevelStats.NetworkBytesSent,
MaxMemUsage: queryLevelStats.MaxMemUsage,
MaxDiskUsage: queryLevelStats.MaxDiskUsage,
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/execinfra/flow_context.go
Expand Up @@ -184,3 +184,9 @@ func (flowCtx *FlowCtx) ProcessorComponentID(procID int32) execinfrapb.Component
func (flowCtx *FlowCtx) StreamComponentID(streamID execinfrapb.StreamID) execinfrapb.ComponentID {
return execinfrapb.StreamComponentID(flowCtx.NodeID.SQLInstanceID(), flowCtx.ID, streamID)
}

// FlowComponentID returns a ComponentID for the given flow.
func (flowCtx *FlowCtx) FlowComponentID() execinfrapb.ComponentID {
region, _ := flowCtx.Cfg.Locality.Find("region")
return execinfrapb.FlowComponentID(flowCtx.NodeID.SQLInstanceID(), flowCtx.ID, region)
}
3 changes: 2 additions & 1 deletion pkg/sql/execinfrapb/component_stats.go
Expand Up @@ -50,11 +50,12 @@ func StreamComponentID(
}

// FlowComponentID returns a ComponentID for the given flow.
func FlowComponentID(instanceID base.SQLInstanceID, flowID FlowID) ComponentID {
func FlowComponentID(instanceID base.SQLInstanceID, flowID FlowID, region string) ComponentID {
return ComponentID{
FlowID: flowID,
Type: ComponentID_FLOW,
SQLInstanceID: instanceID,
Region: region,
}
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/execinfrapb/component_stats.proto
Expand Up @@ -54,6 +54,10 @@ message ComponentID {
(gogoproto.nullable) = false,
(gogoproto.customname) = "SQLInstanceID",
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/base.SQLInstanceID"];

// The region the component is associated with.
// For the initial implementation only ComponentIDs of Flow type might have this set.
optional string region = 5 [(gogoproto.nullable) = false];
}

// ComponentStats contains statistics for an execution component. A component is
Expand Down