Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-23.1: sql: fix exec+audit logs for BEGIN, COMMIT, SET stmts #108411

Merged
merged 3 commits into from Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 18 additions & 2 deletions pkg/ccl/auditloggingccl/audit_logging_test.go
Expand Up @@ -164,6 +164,22 @@ func TestSingleRoleAuditLogging(t *testing.T) {
`GRANT SELECT ON TABLE u TO root`,
// DML statement
`SELECT * FROM u`,
// The following statements are all executed specially by the conn_executor.
`SET application_name = 'test'`,
`SET CLUSTER SETTING sql.defaults.vectorize = 'on'`,
`BEGIN`,
`SHOW application_name`,
`SAVEPOINT s`,
`RELEASE SAVEPOINT s`,
`SAVEPOINT t`,
`ROLLBACK TO SAVEPOINT t`,
`COMMIT`,
`SHOW COMMIT TIMESTAMP`,
`BEGIN TRANSACTION PRIORITY LOW`,
`ROLLBACK`,
`PREPARE q AS SELECT 1`,
`EXECUTE q`,
`DEALLOCATE q`,
}
testData := []struct {
name string
Expand All @@ -175,7 +191,7 @@ func TestSingleRoleAuditLogging(t *testing.T) {
name: "test-all-stmt-types",
role: allStmtTypesRole,
queries: testQueries,
expectedNumLogs: 3,
expectedNumLogs: len(testQueries),
},
{
name: "test-no-stmt-types",
Expand All @@ -189,7 +205,7 @@ func TestSingleRoleAuditLogging(t *testing.T) {
role: "testuser",
queries: testQueries,
// One for each test query.
expectedNumLogs: 3,
expectedNumLogs: len(testQueries),
},
}

Expand Down
5 changes: 4 additions & 1 deletion pkg/server/server_sql.go
Expand Up @@ -979,6 +979,9 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
SessionInitCache: sessioninit.NewCache(
serverCacheMemoryMonitor.MakeBoundAccount(), cfg.stopper,
),
AuditConfig: &auditlogging.AuditConfigLock{
Config: auditlogging.EmptyAuditConfig(),
},
ClientCertExpirationCache: security.NewClientCertExpirationCache(
ctx, cfg.Settings, cfg.stopper, &timeutil.DefaultTimeSource{}, rootSQLMemoryMonitor,
),
Expand Down Expand Up @@ -1373,7 +1376,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
vmoduleSetting.SetOnChange(&cfg.Settings.SV, fn)
fn(ctx)

auditlogging.ConfigureRoleBasedAuditClusterSettings(ctx, execCfg.SessionInitCache.AuditConfig, execCfg.Settings, &execCfg.Settings.SV)
auditlogging.ConfigureRoleBasedAuditClusterSettings(ctx, execCfg.AuditConfig, execCfg.Settings, &execCfg.Settings.SV)

return &SQLServer{
ambientCtx: cfg.BaseConfig.AmbientCtx,
Expand Down
3 changes: 1 addition & 2 deletions pkg/sql/conn_executor.go
Expand Up @@ -2751,7 +2751,6 @@ func (ex *connExecutor) execCopyOut(
ex.planner.maybeLogStatement(
ctx,
ex.executorType,
true, /* isCopy */
int(ex.state.mu.autoRetryCounter),
ex.extraTxnState.txnCounter,
numOutputRows,
Expand Down Expand Up @@ -3002,7 +3001,7 @@ func (ex *connExecutor) execCopyIn(
ex.planner.CurrentDatabase(),
)
var stats topLevelQueryStats
ex.planner.maybeLogStatement(ctx, ex.executorType, true,
ex.planner.maybeLogStatement(ctx, ex.executorType,
int(ex.state.mu.autoRetryCounter), ex.extraTxnState.txnCounter,
numInsertedRows, 0, /* bulkJobId */
copyErr,
Expand Down
157 changes: 129 additions & 28 deletions pkg/sql/conn_executor_exec.go
Expand Up @@ -131,7 +131,7 @@ func (ex *connExecutor) execStmt(
// Note: when not using explicit transactions, we go through this transition
// for every statement. It is important to minimize the amount of work and
// allocations performed up to this point.
ev, payload = ex.execStmtInNoTxnState(ctx, ast, res)
ev, payload = ex.execStmtInNoTxnState(ctx, parserStmt, res)

case stateOpen:
var preparedStmt *PreparedStatement
Expand Down Expand Up @@ -749,6 +749,70 @@ func (ex *connExecutor) execStmtInOpenState(
}
}(ctx)

// If adminAuditLogging is enabled, we want to check for HasAdminRole
// before maybeLogStatement.
// We must check prior to execution in the case the txn is aborted due to
// an error. HasAdminRole can only be checked in a valid txn.
if adminAuditLog := adminAuditLogEnabled.Get(
&ex.planner.execCfg.Settings.SV,
); adminAuditLog {
if !ex.extraTxnState.hasAdminRoleCache.IsSet {
hasAdminRole, err := ex.planner.HasAdminRole(ctx)
if err != nil {
return makeErrEvent(err)
}
ex.extraTxnState.hasAdminRoleCache.HasAdminRole = hasAdminRole
ex.extraTxnState.hasAdminRoleCache.IsSet = true
}
}

p.stmt = stmt
p.semaCtx.Annotations = tree.MakeAnnotations(stmt.NumAnnotations)
p.extendedEvalCtx.Annotations = &p.semaCtx.Annotations
if err := p.semaCtx.Placeholders.Assign(pinfo, stmt.NumPlaceholders); err != nil {
return makeErrEvent(err)
}
p.extendedEvalCtx.Placeholders = &p.semaCtx.Placeholders

shouldLogToExecAndAudit := true
defer func() {
if !shouldLogToExecAndAudit {
// We don't want to log this statement, since another layer of the
// conn_executor will handle the logging for this statement.
return
}

p.curPlan.init(&p.stmt, &p.instrumentation)
var execErr error
if p, ok := retPayload.(payloadWithError); ok {
execErr = p.errorCause()
}
f := tree.NewFmtCtx(tree.FmtHideConstants)
f.FormatNode(ast)
stmtFingerprintID := appstatspb.ConstructStatementFingerprintID(
f.CloseAndGetString(),
execErr != nil,
ex.implicitTxn(),
p.CurrentDatabase(),
)

p.maybeLogStatement(
ctx,
ex.executorType,
int(ex.state.mu.autoRetryCounter),
ex.extraTxnState.txnCounter,
0, /* rowsAffected */
0, /* bulkJobId */
execErr,
ex.statsCollector.PhaseTimes().GetSessionPhaseTime(sessionphase.SessionQueryReceived),
&ex.extraTxnState.hasAdminRoleCache,
ex.server.TelemetryLoggingMetrics,
stmtFingerprintID,
&topLevelQueryStats{},
ex.statsCollector,
)
}()

switch s := ast.(type) {
case *tree.BeginTransaction:
// BEGIN is only allowed if we are in an implicit txn.
Expand Down Expand Up @@ -829,6 +893,18 @@ func (ex *connExecutor) execStmtInOpenState(
ex.server.cfg.GenerateID(),
)
var rawTypeHints []oid.Oid

// Placeholders should be part of the statement being prepared, not the
// PREPARE statement itself.
oldPlaceholders := p.extendedEvalCtx.Placeholders
p.extendedEvalCtx.Placeholders = nil
defer func() {
// The call to addPreparedStmt changed the planner stmt to the statement
// being prepared. Set it back to the PREPARE statement, so that it's
// logged correctly.
p.stmt = stmt
p.extendedEvalCtx.Placeholders = oldPlaceholders
}()
if _, err := ex.addPreparedStmt(
ctx, name, prepStmt, typeHints, rawTypeHints, PreparedStatementOriginSQL,
); err != nil {
Expand All @@ -837,7 +913,9 @@ func (ex *connExecutor) execStmtInOpenState(
return nil, nil, nil
}

p.semaCtx.Annotations = tree.MakeAnnotations(stmt.NumAnnotations)
// Don't write to the exec/audit logs here; it will be handled in
// dispatchToExecutionEngine.
shouldLogToExecAndAudit = false

// For regular statements (the ones that get to this point), we
// don't return any event unless an error happens.
Expand Down Expand Up @@ -890,12 +968,6 @@ func (ex *connExecutor) execStmtInOpenState(
return makeErrEvent(err)
}

if err := p.semaCtx.Placeholders.Assign(pinfo, stmt.NumPlaceholders); err != nil {
return makeErrEvent(err)
}
p.extendedEvalCtx.Placeholders = &p.semaCtx.Placeholders
p.extendedEvalCtx.Annotations = &p.semaCtx.Annotations
p.stmt = stmt
if isPausablePortal() {
p.pausablePortal = portal
}
Expand Down Expand Up @@ -1449,23 +1521,6 @@ func (ex *connExecutor) dispatchToExecutionEngine(
}
}

// If adminAuditLogging is enabled, we want to check for HasAdminRole
// before the deferred maybeLogStatement.
// We must check prior to execution in the case the txn is aborted due to
// an error. HasAdminRole can only be checked in a valid txn.
if adminAuditLog := adminAuditLogEnabled.Get(
&ex.planner.execCfg.Settings.SV,
); adminAuditLog {
if !ex.extraTxnState.hasAdminRoleCache.IsSet {
hasAdminRole, err := ex.planner.HasAdminRole(ctx)
if err != nil {
return err
}
ex.extraTxnState.hasAdminRoleCache.HasAdminRole = hasAdminRole
ex.extraTxnState.hasAdminRoleCache.IsSet = true
}
}

var err error
if ppInfo := getPausablePortalInfo(); ppInfo != nil {
if !ppInfo.dispatchToExecutionEngine.cleanup.isComplete {
Expand Down Expand Up @@ -1508,7 +1563,6 @@ func (ex *connExecutor) dispatchToExecutionEngine(
planner.maybeLogStatement(
ctx,
ex.executorType,
false, /* isCopy */
int(ex.state.mu.autoRetryCounter),
ex.extraTxnState.txnCounter,
ppInfo.dispatchToExecutionEngine.rowsAffected,
Expand All @@ -1535,7 +1589,6 @@ func (ex *connExecutor) dispatchToExecutionEngine(
planner.maybeLogStatement(
ctx,
ex.executorType,
false, /* isCopy */
int(ex.state.mu.autoRetryCounter),
ex.extraTxnState.txnCounter,
nonBulkJobNumRows,
Expand Down Expand Up @@ -2167,8 +2220,55 @@ var eventStartExplicitTxn fsm.Event = eventTxnStart{ImplicitTxn: fsm.False}
// the cursor is not advanced. This means that the statement will run again in
// stateOpen, at each point its results will also be flushed.
func (ex *connExecutor) execStmtInNoTxnState(
ctx context.Context, ast tree.Statement, res RestrictedCommandResult,
ctx context.Context, parserStmt statements.Statement[tree.Statement], res RestrictedCommandResult,
) (_ fsm.Event, payload fsm.EventPayload) {
shouldLogToExecAndAudit := true
defer func() {
if !shouldLogToExecAndAudit {
// We don't want to log this statement, since another layer of the
// conn_executor will handle the logging for this statement.
return
}

p := &ex.planner
stmt := makeStatement(parserStmt, ex.server.cfg.GenerateID())
p.stmt = stmt
p.semaCtx.Annotations = tree.MakeAnnotations(stmt.NumAnnotations)
p.extendedEvalCtx.Annotations = &p.semaCtx.Annotations
p.extendedEvalCtx.Placeholders = &tree.PlaceholderInfo{}
p.curPlan.init(&p.stmt, &p.instrumentation)
var execErr error
if p, ok := payload.(payloadWithError); ok {
execErr = p.errorCause()
}

f := tree.NewFmtCtx(tree.FmtHideConstants)
f.FormatNode(stmt.AST)
stmtFingerprintID := appstatspb.ConstructStatementFingerprintID(
f.CloseAndGetString(),
execErr != nil,
ex.implicitTxn(),
p.CurrentDatabase(),
)

p.maybeLogStatement(
ctx,
ex.executorType,
int(ex.state.mu.autoRetryCounter),
ex.extraTxnState.txnCounter,
0, /* rowsAffected */
0, /* bulkJobId */
execErr,
ex.statsCollector.PhaseTimes().GetSessionPhaseTime(sessionphase.SessionQueryReceived),
&ex.extraTxnState.hasAdminRoleCache,
ex.server.TelemetryLoggingMetrics,
stmtFingerprintID,
&topLevelQueryStats{},
ex.statsCollector,
)
}()

ast := parserStmt.AST
switch s := ast.(type) {
case *tree.BeginTransaction:
ex.incrementStartedStmtCounter(ast)
Expand Down Expand Up @@ -2200,6 +2300,7 @@ func (ex *connExecutor) execStmtInNoTxnState(
// historical timestamp even though the statement itself might contain
// an AOST clause. In these cases the clause is evaluated and applied
// execStmtInOpenState.
shouldLogToExecAndAudit = false
noBeginStmt := (*tree.BeginTransaction)(nil)
mode, sqlTs, historicalTs, err := ex.beginTransactionTimestampsAndReadMode(ctx, noBeginStmt)
if err != nil {
Expand Down
7 changes: 2 additions & 5 deletions pkg/sql/event_log.go
Expand Up @@ -167,9 +167,6 @@ type eventLogOptions struct {

// Additional redaction options, if necessary.
rOpts redactionOptions

// isCopy notes whether the current event is related to COPY.
isCopy bool
}

// redactionOptions contains instructions on how to redact the SQL
Expand Down Expand Up @@ -282,8 +279,8 @@ func logEventInternalForSQLStatements(
) error {
// Inject the common fields into the payload provided by the caller.
injectCommonFields := func(event logpb.EventPayload) error {
if opts.isCopy {
// No txn is set for COPY, so use now instead.
if txn == nil {
// No txn is set (e.g. for COPY or BEGIN), so use now instead.
event.CommonDetails().Timestamp = timeutil.Now().UnixNano()
} else {
event.CommonDetails().Timestamp = txn.KV().ReadTimestamp().WallTime
Expand Down