Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql, insights: record failed transactions due to commit #110898

Merged
merged 1 commit into from Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 14 additions & 10 deletions pkg/sql/conn_executor.go
Expand Up @@ -1222,12 +1222,13 @@ func (ex *connExecutor) close(ctx context.Context, closeType closeType) {
ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
)

var payloadErr error
if closeType == normalClose {
// We'll cleanup the SQL txn by creating a non-retriable (commit:true) event.
// This event is guaranteed to be accepted in every state.
ev := eventNonRetriableErr{IsCommit: fsm.True}
payload := eventNonRetriableErrPayload{err: pgerror.Newf(pgcode.AdminShutdown,
"connExecutor closing")}
payloadErr = pgerror.Newf(pgcode.AdminShutdown, "connExecutor closing")
payload := eventNonRetriableErrPayload{err: payloadErr}
if err := ex.machine.ApplyWithPayload(ctx, ev, payload); err != nil {
log.Warningf(ctx, "error while cleaning up connExecutor: %s", err)
}
Expand All @@ -1251,7 +1252,7 @@ func (ex *connExecutor) close(ctx context.Context, closeType closeType) {
ex.state.finishExternalTxn()
}

ex.resetExtraTxnState(ctx, txnEvent{eventType: txnEvType})
ex.resetExtraTxnState(ctx, txnEvent{eventType: txnEvType}, payloadErr)
if ex.hasCreatedTemporarySchema && !ex.server.cfg.TestingKnobs.DisableTempObjectsCleanupOnSessionExit {
err := cleanupSessionTempObjects(
ctx,
Expand Down Expand Up @@ -1964,9 +1965,10 @@ func (ns *prepStmtNamespace) resetTo(

// resetExtraTxnState resets the fields of ex.extraTxnState when a transaction
// finishes execution (either commits, rollbacks or restarts). Based on the
// transaction event, resetExtraTxnState invokes corresponding callbacks
// transaction event, resetExtraTxnState invokes corresponding callbacks.
// The payload error is included for statistics recording.
// (e.g. onTxnFinish() and onTxnRestart()).
func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent) {
func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent, payloadErr error) {
ex.extraTxnState.numDDL = 0
ex.extraTxnState.firstStmtExecuted = false
ex.extraTxnState.hasAdminRoleCache = HasAdminRoleCache{}
Expand Down Expand Up @@ -2003,7 +2005,7 @@ func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent) {
ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
)
ex.extraTxnState.savepoints.clear()
ex.onTxnFinish(ctx, ev)
ex.onTxnFinish(ctx, ev, payloadErr)
case txnRestart:
ex.onTxnRestart(ctx)
ex.state.mu.Lock()
Expand Down Expand Up @@ -3730,18 +3732,20 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(

advInfo := ex.state.consumeAdvanceInfo()

var payloadErr error
// If we had an error from DDL statement execution due to the presence of
// other concurrent schema changes when attempting a schema change, wait for
// the completion of those schema changes first.
if p, ok := payload.(payloadWithError); ok {
if descID := scerrors.ConcurrentSchemaChangeDescID(p.errorCause()); descID != descpb.InvalidID {
payloadErr = p.errorCause()
if descID := scerrors.ConcurrentSchemaChangeDescID(payloadErr); descID != descpb.InvalidID {
if err := ex.handleWaitingForConcurrentSchemaChanges(ex.Ctx(), descID); err != nil {
return advanceInfo{}, err
}
}
// Similarly, if the descriptor ID generator is not available because of
// an ongoing migration, wait for the migration to complete first.
if errors.Is(p.errorCause(), descidgen.ErrDescIDSequenceMigrationInProgress) {
if errors.Is(payloadErr, descidgen.ErrDescIDSequenceMigrationInProgress) {
if err := ex.handleWaitingForDescriptorIDGeneratorMigration(ex.Ctx()); err != nil {
return advanceInfo{}, err
}
Expand Down Expand Up @@ -3859,7 +3863,7 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(

fallthrough
case txnRollback:
ex.resetExtraTxnState(ex.Ctx(), advInfo.txnEvent)
ex.resetExtraTxnState(ex.Ctx(), advInfo.txnEvent, payloadErr)
// Since we're doing a complete rollback, there's no need to keep the
// prepared stmts for a txn rewind.
ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.closeAllPortals(
Expand All @@ -3869,7 +3873,7 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
case txnRestart:
// In addition to resetting the extraTxnState, the restart event may
// also need to reset the sqlliveness.Session.
ex.resetExtraTxnState(ex.Ctx(), advInfo.txnEvent)
ex.resetExtraTxnState(ex.Ctx(), advInfo.txnEvent, payloadErr)
if err := ex.maybeSetSQLLivenessSession(); err != nil {
return advanceInfo{}, err
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/conn_executor_exec.go
Expand Up @@ -2993,7 +2993,7 @@ func payloadHasError(payload fsm.EventPayload) bool {
return hasErr
}

func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent) {
func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent, txnErr error) {
if ex.extraTxnState.shouldExecuteOnTxnFinish {
ex.extraTxnState.shouldExecuteOnTxnFinish = false
txnStart := ex.extraTxnState.txnFinishClosure.txnStartTime
Expand Down Expand Up @@ -3022,7 +3022,7 @@ func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent) {
)
}

err = ex.recordTransactionFinish(ctx, transactionFingerprintID, ev, implicit, txnStart)
err = ex.recordTransactionFinish(ctx, transactionFingerprintID, ev, implicit, txnStart, txnErr)
if err != nil {
if log.V(1) {
log.Warningf(ctx, "failed to record transaction stats: %s", err)
Expand Down Expand Up @@ -3109,6 +3109,7 @@ func (ex *connExecutor) recordTransactionFinish(
ev txnEvent,
implicit bool,
txnStart time.Time,
txnErr error,
) error {
recordingStart := timeutil.Now()
defer func() {
Expand Down Expand Up @@ -3175,6 +3176,7 @@ func (ex *connExecutor) recordTransactionFinish(
// TODO(107318): add asoftime or ishistorical
// TODO(107318): add readonly
SessionData: ex.sessionData(),
TxnErr: txnErr,
}

if ex.server.cfg.TestingKnobs.OnRecordTxnFinish != nil {
Expand Down
89 changes: 75 additions & 14 deletions pkg/sql/sqlstats/insights/integration/insights_test.go
Expand Up @@ -193,6 +193,7 @@ func TestFailedInsights(t *testing.T) {
defer log.Scope(t).Close(t)

const appName = "TestFailedInsights"
re := regexp.MustCompile(",?SlowExecution,?")

// Start the cluster. (One node is sufficient; the outliers system is currently in-memory only.)
ctx := context.Background()
Expand Down Expand Up @@ -275,12 +276,7 @@ FROM crdb_internal.node_execution_insights
WHERE query = $1 AND app_name = $2 `,
tc.fingerprint, appName)

err = row.Scan(&query, &status, &problem, &errorCode, &errorMsg)
if err != nil {
return err
}

return nil
return row.Scan(&query, &status, &problem, &errorCode, &errorMsg)
}, 1*time.Second)

require.Equal(t, tc.status, status)
Expand Down Expand Up @@ -329,7 +325,7 @@ WHERE query = $1 AND app_name = $2 `,
// Multi-statement txn with a slow stmt and then a failed execution.
stmts: "BEGIN; SELECT (pg_sleep(0.1)); CREATE TABLE exists(); CREATE TABLE exists();",
fingerprint: "SELECT (pg_sleep(_)) ; CREATE TABLE \"exists\" () ; CREATE TABLE \"exists\" ()",
problems: "{SlowExecution,FailedExecution}",
problems: "{FailedExecution,SlowExecution}",
errorCode: "42P07",
errorMsg: `relation ‹"defaultdb.public.\"exists\""› already exists`,
errorMsgRedacted: `relation ‹×› already exists`,
Expand Down Expand Up @@ -366,12 +362,7 @@ SELECT query,
FROM crdb_internal.node_txn_execution_insights
WHERE query = $1 AND app_name = $2`, tc.fingerprint, appName)

err = row.Scan(&query, &problems, &status, &errorCode, &errorMsg)

if err != nil {
return err
}
return nil
return row.Scan(&query, &problems, &status, &errorCode, &errorMsg)
}, 1*time.Second)

require.Equal(t, tc.txnStatus, status)
Expand All @@ -386,7 +377,6 @@ WHERE query = $1 AND app_name = $2`, tc.fingerprint, appName)
if problems != tc.problems {
// During tests some transactions can stay open for longer, adding an extra
// `SlowExecution` to the problems list. This checks for that possibility.
re := regexp.MustCompile(",?SlowExecution,?")
replacedSlowProblems = re.ReplaceAllString(replacedSlowProblems, "")
}
// Print the original problems if we did any replacements, for debugging.
Expand All @@ -397,6 +387,77 @@ WHERE query = $1 AND app_name = $2`, tc.fingerprint, appName)
})
}

// TestTransactionInsightsFailOnCommit specifically tests for the scenario where a transaction
// fails on COMMIT. COMMIT is executed specially and does not get stats recorded, skipping
// the insights recording step. We should ensure txns failing on COMMIT are also captured.
func TestTransactionInsightsFailOnCommit(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const appName = "TestTransactionInsightsFailOnCommit"
re := regexp.MustCompile(",?SlowExecution,?")

// Start the cluster. (One node is sufficient; the outliers system is currently in-memory only.)
ctx := context.Background()
srv, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer srv.Stopper().Stop(ctx)

conn1 := sqlutils.MakeSQLRunner(db)
conn2 := sqlutils.MakeSQLRunner(srv.ApplicationLayer().SQLConn(t, ""))

// Set up myUsers table with 2 users.
conn1.Exec(t, "CREATE TABLE myUsers (name STRING, city STRING)")
conn1.Exec(t, "INSERT INTO myUsers VALUES ('WENDY', 'NYC'), ('NOVI', 'TORONTO')")

conn1.Exec(t, "SET SESSION application_name=$1", appName)
conn2.Exec(t, "SET SESSION application_name=$1", appName)

// We will simulate a 40001 transaction retry error due to conflicting locks.
// Transaction 1 will fail on COMMIT.
tx1 := conn1.Begin(t)
_, err := tx1.Exec("SELECT * FROM myUsers WHERE city = 'TORONTO'")
require.NoError(t, err)

tx2 := conn2.Begin(t)
_, err = tx2.Exec("UPDATE myUsers SET name = 'NOVI' WHERE city = 'TORONTO'")
require.NoError(t, err)

_, err = tx1.Exec("UPDATE myUsers SET name = 'WENDY' WHERE city = 'NYC'")
require.NoError(t, err)
require.Error(t, tx1.Commit())

require.NoError(t, tx2.Commit())

var query, problems, status, errorCode, errorMsg string
testutils.SucceedsSoon(t, func() error {
// Query the node txn execution insights table.
row := conn1.DB.QueryRowContext(ctx, `
SELECT query,
problems,
status,
COALESCE(last_error_code, '') last_error_code,
COALESCE(last_error_redactable, '') last_error
FROM crdb_internal.node_txn_execution_insights
WHERE app_name = $1`, appName)

return row.Scan(&query, &problems, &status, &errorCode, &errorMsg)
})

require.Equal(t, "SELECT * FROM myusers WHERE city = '_' ; UPDATE myusers SET name = '_' WHERE city = '_'", query)
expectedProblem := "{FailedExecution}"
replacedSlowProblems := problems
if problems != expectedProblem {
// During tests some transactions can stay open for longer, adding an extra
// `SlowExecution` to the problems list. This checks for that possibility.
replacedSlowProblems = re.ReplaceAllString(replacedSlowProblems, "")
}
// Print the original problems if we did any replacements, for debugging.
require.Equal(t, expectedProblem, replacedSlowProblems, "received: %s, used to compare: %s", problems, replacedSlowProblems)
require.Equal(t, "Failed", status)
require.Equal(t, "40001", errorCode)
require.Contains(t, errorMsg, "TransactionRetryWithProtoRefreshError")
}

func TestInsightsPriorityIntegration(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
32 changes: 21 additions & 11 deletions pkg/sql/sqlstats/insights/registry.go
Expand Up @@ -114,8 +114,9 @@ func (r *lockingRegistry) ObserveTransaction(sessionID clusterunique.ID, transac
highContention = transaction.Contention.Seconds() >= LatencyThreshold.Get(&r.causes.st.SV).Seconds()
}

if slowOrFailedStatements.Empty() && !highContention {
// We only record an insight if we have slow or failed statements or high txn contention.
txnFailed := transaction.Status == Transaction_Failed
if slowOrFailedStatements.Empty() && !highContention && !txnFailed {
// We only record an insight if we have slow statements, high txn contention, or failed executions.
return
}

Expand All @@ -128,23 +129,27 @@ func (r *lockingRegistry) ObserveTransaction(sessionID clusterunique.ID, transac
insight.Transaction.Causes = addCause(insight.Transaction.Causes, Cause_HighContention)
}

var lastErrorCode string
var lastErrorMessage redact.RedactableString
if txnFailed {
insight.Transaction.Problems = addProblem(insight.Transaction.Problems, Problem_FailedExecution)
}

// The transaction status will reflect the status of its statements; it will
// default to completed unless a failed statement status is found. Note that
// this does not take into account the "Cancelled" transaction status.
var lastStatus = Transaction_Completed
var lastStmtErr redact.RedactableString
var lastStmtErrCode string
for i, s := range *statements {
if slowOrFailedStatements.Contains(i) {
switch s.Status {
case Statement_Completed:
s.Problem = Problem_SlowExecution
s.Causes = r.causes.examine(s.Causes, s)
case Statement_Failed:
lastErrorCode = s.ErrorCode
lastErrorMessage = s.ErrorMsg
lastStatus = Transaction_Status(s.Status)
s.Problem = Problem_FailedExecution
if transaction.LastErrorCode == "" {
lastStmtErr = s.ErrorMsg
lastStmtErrCode = s.ErrorCode
}
}

// Bubble up stmt problems and causes.
Expand All @@ -158,9 +163,14 @@ func (r *lockingRegistry) ObserveTransaction(sessionID clusterunique.ID, transac
insight.Statements = append(insight.Statements, s)
}

insight.Transaction.LastErrorCode = lastErrorCode
insight.Transaction.LastErrorMsg = lastErrorMessage
insight.Transaction.Status = lastStatus
if transaction.LastErrorCode == "" && lastStmtErrCode != "" {
// Stmt failure equates to transaction failure. Sometimes we
// can't propagate the error up to the transaction level so
// we manually set the transaction's failure info here.
transaction.LastErrorMsg = lastStmtErr
transaction.LastErrorCode = lastStmtErrCode
}

r.sink.AddInsight(insight)
}

Expand Down