Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-22.1: sql: cluster setting for persisting gateway node ID #88634

Merged
merged 1 commit into from Sep 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Expand Up @@ -150,6 +150,7 @@ sql.metrics.max_mem_stmt_fingerprints integer 100000 the maximum number of state
sql.metrics.max_mem_txn_fingerprints integer 100000 the maximum number of transaction fingerprints stored in memory
sql.metrics.statement_details.dump_to_logs boolean false dump collected statement statistics to node logs when periodically cleared
sql.metrics.statement_details.enabled boolean true collect per-statement query statistics
sql.metrics.statement_details.gateway_node.enabled boolean true save the gateway node for each statement fingerprint. If false, the value will be stored as 0.
sql.metrics.statement_details.plan_collection.enabled boolean false periodically save a logical plan for each fingerprint
sql.metrics.statement_details.plan_collection.period duration 5m0s the time until a new logical plan is collected
sql.metrics.statement_details.threshold duration 0s minimum execution time to cause statement statistics to be collected. If configured, no transaction stats are collected.
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Expand Up @@ -166,6 +166,7 @@
<tr><td><code>sql.metrics.max_mem_txn_fingerprints</code></td><td>integer</td><td><code>100000</code></td><td>the maximum number of transaction fingerprints stored in memory</td></tr>
<tr><td><code>sql.metrics.statement_details.dump_to_logs</code></td><td>boolean</td><td><code>false</code></td><td>dump collected statement statistics to node logs when periodically cleared</td></tr>
<tr><td><code>sql.metrics.statement_details.enabled</code></td><td>boolean</td><td><code>true</code></td><td>collect per-statement query statistics</td></tr>
<tr><td><code>sql.metrics.statement_details.gateway_node.enabled</code></td><td>boolean</td><td><code>true</code></td><td>save the gateway node for each statement fingerprint. If false, the value will be stored as 0.</td></tr>
<tr><td><code>sql.metrics.statement_details.plan_collection.enabled</code></td><td>boolean</td><td><code>false</code></td><td>periodically save a logical plan for each fingerprint</td></tr>
<tr><td><code>sql.metrics.statement_details.plan_collection.period</code></td><td>duration</td><td><code>5m0s</code></td><td>the time until a new logical plan is collected</td></tr>
<tr><td><code>sql.metrics.statement_details.threshold</code></td><td>duration</td><td><code>0s</code></td><td>minimum execution time to cause statement statistics to be collected. If configured, no transaction stats are collected.</td></tr>
Expand Down
11 changes: 10 additions & 1 deletion pkg/sql/executor_statement_metrics.go
Expand Up @@ -12,11 +12,13 @@ package sql

import (
"context"
"strconv"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
Expand Down Expand Up @@ -167,6 +169,13 @@ func (ex *connExecutor) recordStatementSummary(
PlanHash: planner.instrumentation.planGist.Hash(),
}

// We only have node information when it was collected with trace, but we know at least the current
// node should be on the list.
nodeID, err := strconv.ParseInt(ex.server.sqlStats.GetSQLInstanceID().String(), 10, 64)
if err != nil {
log.Warningf(ctx, "failed to convert node ID to int: %s", err)
}

recordedStmtStats := sqlstats.RecordedStmtStats{
AutoRetryCount: automaticRetryCount,
RowsAffected: rowsAffected,
Expand All @@ -178,7 +187,7 @@ func (ex *connExecutor) recordStatementSummary(
BytesRead: stats.bytesRead,
RowsRead: stats.rowsRead,
RowsWritten: stats.rowsWritten,
Nodes: getNodesFromPlanner(planner),
Nodes: util.CombineUniqueInt64(getNodesFromPlanner(planner), []int64{nodeID}),
StatementType: stmt.AST.StatementType(),
Plan: planner.instrumentation.PlanForStats(ctx),
PlanGist: planner.instrumentation.planGist.String(),
Expand Down
22 changes: 16 additions & 6 deletions pkg/sql/sqlstats/cluster_settings.go
Expand Up @@ -133,12 +133,12 @@ var MaxMemReportedSQLStatsTxnFingerprints = settings.RegisterIntSetting(
// This results in 4 statement fingerprints and 1 txn fingerprint.
// Let's suppose currently our statement fingerprint limit is 6.
// If we are to execute the same statement again:
// * BEGIN; <- this increments current statement fingerprint count to 5
// since we hold statement stats for explicit transaction in a
// temporary container before we can perform the upsert.
// * SELECT 1; <- this increments the count to 6
// * SELECT 1, 1; <- ERR: this causes the count to exceed our stmt fingerprint
// limit before we can perform the upsert.
// - BEGIN; <- this increments current statement fingerprint count to 5
// since we hold statement stats for explicit transaction in a
// temporary container before we can perform the upsert.
// - SELECT 1; <- this increments the count to 6
// - SELECT 1, 1; <- ERR: this causes the count to exceed our stmt fingerprint
// limit before we can perform the upsert.
//
// The total amount of memory consumed will still be constrained by the
// top-level memory monitor created for SQL Stats.
Expand All @@ -159,3 +159,13 @@ var MaxSQLStatReset = settings.RegisterDurationSetting(
time.Hour*2,
settings.NonNegativeDurationWithMaximum(time.Hour*24),
).WithPublic()

// GatewayNodeEnabled specifies whether we save the gateway node id for each fingerprint
// during sql stats collection, otherwise the value will be set to 0.
var GatewayNodeEnabled = settings.RegisterBoolSetting(
settings.TenantWritable,
"sql.metrics.statement_details.gateway_node.enabled",
"save the gateway node for each statement fingerprint. If false, the value will "+
"be stored as 0.",
true,
).WithPublic()
103 changes: 53 additions & 50 deletions pkg/sql/sqlstats/persistedsqlstats/flush.go
Expand Up @@ -323,6 +323,7 @@ DO NOTHING
}
statistics := tree.NewDJSON(statisticsJSON)

nodeID := s.GetEnabledSQLInstanceID()
rowsAffected, err = s.cfg.InternalExecutor.ExecEx(
ctx,
"insert-txn-stats",
Expand All @@ -331,13 +332,13 @@ DO NOTHING
User: security.NodeUserName(),
},
insertStmt,
aggregatedTs, // aggregated_ts
serializedFingerprintID, // fingerprint_id
stats.App, // app_name
s.cfg.SQLIDContainer.SQLInstanceID(), // node_id
aggInterval, // agg_interval
metadata, // metadata
statistics, // statistics
aggregatedTs, // aggregated_ts
serializedFingerprintID, // fingerprint_id
stats.App, // app_name
nodeID, // node_id
aggInterval, // agg_interval
metadata, // metadata
statistics, // statistics
)

return rowsAffected, err
Expand All @@ -364,6 +365,7 @@ WHERE fingerprint_id = $2
}
statistics := tree.NewDJSON(statisticsJSON)

nodeID := s.GetEnabledSQLInstanceID()
rowsAffected, err := s.cfg.InternalExecutor.ExecEx(
ctx,
"update-stmt-stats",
Expand All @@ -372,11 +374,11 @@ WHERE fingerprint_id = $2
User: security.NodeUserName(),
},
updateStmt,
statistics, // statistics
serializedFingerprintID, // fingerprint_id
aggregatedTs, // aggregated_ts
stats.App, // app_name
s.cfg.SQLIDContainer.SQLInstanceID(), // node_id
statistics, // statistics
serializedFingerprintID, // fingerprint_id
aggregatedTs, // aggregated_ts
stats.App, // app_name
nodeID, // node_id
)

if err != nil {
Expand All @@ -385,8 +387,7 @@ WHERE fingerprint_id = $2

if rowsAffected == 0 {
return errors.AssertionFailedf("failed to update transaction statistics for fingerprint_id: %s, app: %s, aggregated_ts: %s, node_id: %d",
serializedFingerprintID, stats.App, aggregatedTs,
s.cfg.SQLIDContainer.SQLInstanceID())
serializedFingerprintID, stats.App, aggregatedTs, nodeID)
}

return nil
Expand All @@ -411,13 +412,13 @@ WHERE fingerprint_id = $2
AND plan_hash = $6
AND node_id = $7
`

statisticsJSON, err := sqlstatsutil.BuildStmtStatisticsJSON(&stats.Stats)
if err != nil {
return err
}
statistics := tree.NewDJSON(statisticsJSON)

nodeID := s.GetEnabledSQLInstanceID()
rowsAffected, err := s.cfg.InternalExecutor.ExecEx(
ctx,
"update-stmt-stats",
Expand All @@ -426,13 +427,13 @@ WHERE fingerprint_id = $2
User: security.NodeUserName(),
},
updateStmt,
statistics, // statistics
serializedFingerprintID, // fingerprint_id
serializedTransactionFingerprintID, // transaction_fingerprint_id
aggregatedTs, // aggregated_ts
stats.Key.App, // app_name
serializedPlanHash, // plan_hash
s.cfg.SQLIDContainer.SQLInstanceID(), // node_id
statistics, // statistics
serializedFingerprintID, // fingerprint_id
serializedTransactionFingerprintID, // transaction_fingerprint_id
aggregatedTs, // aggregated_ts
stats.Key.App, // app_name
serializedPlanHash, // plan_hash
nodeID, // node_id
)

if err != nil {
Expand All @@ -448,7 +449,7 @@ WHERE fingerprint_id = $2
"plan_hash: %d, "+
"node_id: %d",
serializedFingerprintID, serializedTransactionFingerprintID, stats.Key.App,
aggregatedTs, serializedPlanHash, s.cfg.SQLIDContainer.SQLInstanceID())
aggregatedTs, serializedPlanHash, nodeID)
}

return nil
Expand Down Expand Up @@ -486,6 +487,7 @@ DO NOTHING
statistics := tree.NewDJSON(statisticsJSON)

plan := tree.NewDJSON(sqlstatsutil.ExplainTreePlanNodeToJSON(&stats.Stats.SensitiveInfo.MostRecentPlanDescription))
nodeID := s.GetEnabledSQLInstanceID()

rowsAffected, err = s.cfg.InternalExecutor.ExecEx(
ctx,
Expand All @@ -495,16 +497,16 @@ DO NOTHING
User: security.NodeUserName(),
},
insertStmt,
aggregatedTs, // aggregated_ts
serializedFingerprintID, // fingerprint_id
serializedTransactionFingerprintID, // transaction_fingerprint_id
serializedPlanHash, // plan_hash
stats.Key.App, // app_name
s.cfg.SQLIDContainer.SQLInstanceID(), // node_id
aggInterval, // agg_interval
metadata, // metadata
statistics, // statistics
plan, // plan
aggregatedTs, // aggregated_ts
serializedFingerprintID, // fingerprint_id
serializedTransactionFingerprintID, // transaction_fingerprint_id
serializedPlanHash, // plan_hash
stats.Key.App, // app_name
nodeID, // node_id
aggInterval, // agg_interval
metadata, // metadata
statistics, // statistics
plan, // plan
)

return rowsAffected, err
Expand Down Expand Up @@ -532,18 +534,19 @@ WHERE fingerprint_id = $1
FOR UPDATE
`

nodeID := s.GetEnabledSQLInstanceID()
row, err := s.cfg.InternalExecutor.QueryRowEx(
ctx,
"fetch-txn-stats",
txn, /* txn */
sessiondata.InternalExecutorOverride{
User: security.NodeUserName(),
},
readStmt, // stmt
serializedFingerprintID, // fingerprint_id
appName, // app_name
aggregatedTs, // aggregated_ts
s.cfg.SQLIDContainer.SQLInstanceID(), // node_id
readStmt, // stmt
serializedFingerprintID, // fingerprint_id
appName, // app_name
aggregatedTs, // aggregated_ts
nodeID, // node_id
)

if err != nil {
Expand All @@ -553,13 +556,13 @@ FOR UPDATE
if row == nil {
return errors.AssertionFailedf("transaction statistics not found for fingerprint_id: %s, app: %s, aggregated_ts: %s, node_id: %d",
serializedFingerprintID, appName, aggregatedTs,
s.cfg.SQLIDContainer.SQLInstanceID())
nodeID)
}

if len(row) != 1 {
return errors.AssertionFailedf("unexpectedly found %d returning columns for fingerprint_id: %s, app: %s, aggregated_ts: %s, node_id: %d",
len(row), serializedFingerprintID, appName, aggregatedTs,
s.cfg.SQLIDContainer.SQLInstanceID())
nodeID)
}

statistics := tree.MustBeDJSON(row[0])
Expand Down Expand Up @@ -589,20 +592,21 @@ WHERE fingerprint_id = $1
AND node_id = $6
FOR UPDATE
`
nodeID := s.GetEnabledSQLInstanceID()
row, err := s.cfg.InternalExecutor.QueryRowEx(
ctx,
"fetch-stmt-stats",
txn, /* txn */
sessiondata.InternalExecutorOverride{
User: security.NodeUserName(),
},
readStmt, // stmt
serializedFingerprintID, // fingerprint_id
serializedTransactionFingerprintID, // transaction_fingerprint_id
key.App, // app_name
aggregatedTs, // aggregated_ts
serializedPlanHash, // plan_hash
s.cfg.SQLIDContainer.SQLInstanceID(), // node_id
readStmt, // stmt
serializedFingerprintID, // fingerprint_id
serializedTransactionFingerprintID, // transaction_fingerprint_id
key.App, // app_name
aggregatedTs, // aggregated_ts
serializedPlanHash, // plan_hash
nodeID, // node_id
)

if err != nil {
Expand All @@ -612,13 +616,12 @@ FOR UPDATE
if row == nil {
return errors.AssertionFailedf(
"statement statistics not found fingerprint_id: %s, app: %s, aggregated_ts: %s, plan_hash: %d, node_id: %d",
serializedFingerprintID, key.App, aggregatedTs, serializedPlanHash, s.cfg.SQLIDContainer.SQLInstanceID())
serializedFingerprintID, key.App, aggregatedTs, serializedPlanHash, nodeID)
}

if len(row) != 1 {
return errors.AssertionFailedf("unexpectedly found %d returning columns for fingerprint_id: %s, app: %s, aggregated_ts: %s, plan_hash %d, node_id: %d",
len(row), serializedFingerprintID, key.App, aggregatedTs, serializedPlanHash,
s.cfg.SQLIDContainer.SQLInstanceID())
len(row), serializedFingerprintID, key.App, aggregatedTs, serializedPlanHash, nodeID)
}

statistics := tree.MustBeDJSON(row[0])
Expand Down
62 changes: 62 additions & 0 deletions pkg/sql/sqlstats/persistedsqlstats/flush_test.go
Expand Up @@ -426,6 +426,36 @@ func TestInMemoryStatsDiscard(t *testing.T) {
})
}

func TestSQLStatsGatewayNodeSetting(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
params, _ := tests.CreateTestServerParams()
s, conn, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(context.Background())
sqlConn := sqlutils.MakeSQLRunner(conn)

// Gateway Node ID enabled, so should persist the value.
sqlConn.Exec(t, "SET CLUSTER SETTING sql.metrics.statement_details.gateway_node.enabled = true")
sqlConn.Exec(t, "SET application_name = 'gateway_enabled'")
sqlConn.Exec(t, "SELECT 1")
s.SQLServer().(*sql.Server).
GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx)

verifyNodeID(t, sqlConn, "SELECT _", true, "gateway_enabled")

// Gateway Node ID disabled, so shouldn't persist the value on the node_id column, but it should
// still store the value on the statistics column.
sqlConn.Exec(t, "SET CLUSTER SETTING sql.metrics.statement_details.gateway_node.enabled = false")
sqlConn.Exec(t, "SET application_name = 'gateway_disabled'")
sqlConn.Exec(t, "SELECT 1")
s.SQLServer().(*sql.Server).
GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx)

verifyNodeID(t, sqlConn, "SELECT _", false, "gateway_disabled")
}

type stubTime struct {
syncutil.RWMutex
t time.Time
Expand Down Expand Up @@ -575,3 +605,35 @@ func verifyInMemoryStatsEmpty(
require.NoError(t, err)
}
}

func verifyNodeID(
t *testing.T,
sqlConn *sqlutils.SQLRunner,
fingerprint string,
gatewayEnabled bool,
appName string,
) {
row := sqlConn.DB.QueryRowContext(context.Background(),
`
SELECT
node_id,
statistics -> 'statistics' ->> 'nodes' as nodes
FROM
system.statement_statistics
WHERE
metadata ->> 'query' = $1 AND
app_name = $2
`, fingerprint, appName)

var gatewayNodeID int64
var allNodesIds string

e := row.Scan(&gatewayNodeID, &allNodesIds)
require.NoError(t, e)
nodeID := int64(1)
if !gatewayEnabled {
nodeID = int64(0)
}
require.Equal(t, nodeID, gatewayNodeID, "Gateway NodeID")
require.Equal(t, "[1]", allNodesIds, "All NodeIDs from statistics")
}