Skip to content

Commit

Permalink
sql: fix query start time for begin statements
Browse files Browse the repository at this point in the history
Previously, the query age for BEGIN statements appearing in logs
(sql-exec, sql-audit, telemetry) was being incorrectly recorded.
This is because the start time being passed was from the stats
collector, which was not up to date with the session phase times
at the time of executing BEGIN. This patch fixes the query age
for BEGIN stmt logs by using the session phase times directly
from the conn executor for BEGIN stmt logging instead reading
them from the stats collector.

Epic: none
Fixes: #114571

Release note (bug fix): BEGIN statement logs now have correct
query `Age` fields. Previously, these fields were being recorded
incorrectly which would lead to some statements also appearing
erroneously in the slow query logs.
  • Loading branch information
xinhaoz committed Nov 21, 2023
1 parent c7d1155 commit 3dc6dc5
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 2 deletions.
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Expand Up @@ -707,6 +707,7 @@ go_test(
"split_test.go",
"sql_activity_update_job_test.go",
"sql_cursor_test.go",
"sql_exec_log_test.go",
"sql_prepare_test.go",
"statement_mark_redaction_test.go",
"table_ref_test.go",
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/conn_executor_exec.go
Expand Up @@ -2427,7 +2427,7 @@ func (ex *connExecutor) execStmtInNoTxnState(
0, /* stmtCount */
0, /* bulkJobId */
execErr,
ex.statsCollector.PhaseTimes().GetSessionPhaseTime(sessionphase.SessionQueryReceived),
ex.phaseTimes.GetSessionPhaseTime(sessionphase.SessionQueryReceived),
&ex.extraTxnState.hasAdminRoleCache,
ex.server.TelemetryLoggingMetrics,
stmtFingerprintID,
Expand Down
134 changes: 134 additions & 0 deletions pkg/sql/sql_exec_log_test.go
@@ -0,0 +1,134 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package sql

import (
"context"
"encoding/json"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/log/logpb"
"github.com/stretchr/testify/require"
)

// sqlExecLogInterceptor is used to intercept sql exec logs.
// Intercepted QueryExecute log events will be sent to the logsChan.
// Additional log filtering criteria can be specified in the filter fn.
type sqlExecLogSpy struct {
logs []eventpb.QueryExecute
filter func(eventpb.QueryExecute) bool
}

func (s *sqlExecLogSpy) Intercept(entry []byte) {
var rawLog logpb.Entry
var qe eventpb.QueryExecute

if err := json.Unmarshal(entry, &rawLog); err != nil {
return
}

if rawLog.Channel != logpb.Channel_SQL_EXEC {
return
}

if err := json.Unmarshal([]byte(rawLog.Message[rawLog.StructuredStart:rawLog.StructuredEnd]), &qe); err != nil {
return
}

if s.filter != nil && !s.filter(qe) {
return
}

s.logs = append(s.logs, qe)
}

func TestSqlExecLog(t *testing.T) {
defer leaktest.AfterTest(t)()

sc := log.ScopeWithoutShowLogs(t)
defer sc.Close(t)

ctx := context.Background()

s := serverutils.StartServerOnly(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)

appLogsSpy := sqlExecLogSpy{
logs: make([]eventpb.QueryExecute, 0, 5),
filter: func(qe eventpb.QueryExecute) bool {
// We only care about non-internal applications.
return qe.ExecMode != executorTypeInternal.logLabel()
},
}

cleanup := log.InterceptWith(ctx, &appLogsSpy)
defer cleanup()

// This connection will be used to set cluster settings.
setupConn := sqlutils.MakeSQLRunner(s.ApplicationLayer().SQLConn(t))
setupConn.Exec(t, "SET application_name = 'setupConnApp'")
// These logs should not appear since we haven't turned logging on.
setupConn.Exec(t, "SELECT 'hello'")
setupConn.Exec(t, "SELECT 1, 2")
setupConn.Exec(t, "SELECT 3")

// Turn sql execs log on.
setupConn.Exec(t, "SET CLUSTER SETTING sql.trace.log_statement_execute = true")

// In #114571, it was found that the query age was not properly recorded for BEGIN statements.
// This lead to the query age being a noticeably large value when it was the first statement
// in a session, as the session query received time was not yet set. We can easily verify
// if this is fixed by executing BEGIN as the first stmt in the connection below, and
// observing the recorded query age.
conn := sqlutils.MakeSQLRunner(s.ApplicationLayer().SQLConn(t))
queries := []string{"BEGIN", "SELECT 1", "COMMIT"}
for _, q := range queries {
conn.Exec(t, q)
}

log.FlushAllSync()

capturedLogs := appLogsSpy.logs
expectedIdx := 0

require.GreaterOrEqual(t, len(capturedLogs), len(queries))
for _, qe := range capturedLogs {
if qe.Tag == "SET CLUSTER SETTING" {
// Skip setting cluster settings statement.
continue
}

if qe.ApplicationName == "setupConnApp" {
// Other than set cluster setting, we shouldn't have seen any
// logs from setupConnApp.
t.Fatalf("unexpected log from setupConnApp: %v", qe)
}

// Verify expected log.
expectedStmt := queries[expectedIdx]
require.Containsf(t, qe.Statement.StripMarkers(), expectedStmt, "entry: %v", qe)
// Query age should be less than 3 seconds.
require.Lessf(t, qe.Age, float32(3000), "entry: %v", qe)
expectedIdx++
}

// We should have error'd earlier if we didn't find an expected query, but we'll verify
// that again here we found them all.
require.Equal(t, expectedIdx, len(queries),
"Didn't find all expected queries in logs, found %d / %d", expectedIdx, len(queries))

}
4 changes: 3 additions & 1 deletion pkg/util/log/logtestutils/log_test_utils.go
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log/logpb"
)

// InstallLogFileSink installs a file sink for telemetry logging tests.
// InstallLogFileSink installs a file sink for logging tests.
func InstallLogFileSink(sc *log.TestLogScope, t *testing.T, channel logpb.Channel) func() {
// Enable logging channels.
log.TestingResetActive()
Expand All @@ -31,6 +31,8 @@ func InstallLogFileSink(sc *log.TestLogScope, t *testing.T, channel logpb.Channe
cfg.Sinks.FileGroups["telemetry"] = &fileSinkConfig
case logpb.Channel_SENSITIVE_ACCESS:
cfg.Sinks.FileGroups["sql-audit"] = &fileSinkConfig
case logpb.Channel_SQL_EXEC:
cfg.Sinks.FileGroups["sql-exec"] = &fileSinkConfig
default:
panic("unrecognized logging channel")
}
Expand Down

0 comments on commit 3dc6dc5

Please sign in to comment.