Skip to content

Commit

Permalink
Merge #110898
Browse files Browse the repository at this point in the history
110898: sql, insights: record failed transactions due to commit r=xinhaoz a=xinhaoz

Previously we did not record failed transactions due to a failed
commit in the insights system. This commit captures those transactions
in the insights system by propagating the payload error associated with
a failed commit to the stats recording step. If the transaction error
is nil, we will attempt to find the last statement error for scenarios
where the error is not propagated to the payload of TxnCommit/Aborted
states.

In addition, the `Status` (completed or failed) of a txn is now based
on whether the transaction has committed at the time of insights recording.
Previously it was also based on the presence of failed stmts.

Fixes: #110846

Release note (ui change): In the Insights page in DB Console, users can now see
failed transactions and their error messages when the transaction fails on the
COMMIT stage.

<img width="1909" alt="Pasted Graphic" src="https://github.com/cockroachdb/cockroach/assets/20136951/c75ca5b2-5c19-4f24-b66f-a0e1df381d9c">


Co-authored-by: Xin Hao Zhang <xzhang@cockroachlabs.com>
  • Loading branch information
craig[bot] and xinhaoz committed Sep 27, 2023
2 parents bb21bea + 7397a27 commit ed6b6d7
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 38 deletions.
24 changes: 14 additions & 10 deletions pkg/sql/conn_executor.go
Expand Up @@ -1222,12 +1222,13 @@ func (ex *connExecutor) close(ctx context.Context, closeType closeType) {
ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
)

var payloadErr error
if closeType == normalClose {
// We'll cleanup the SQL txn by creating a non-retriable (commit:true) event.
// This event is guaranteed to be accepted in every state.
ev := eventNonRetriableErr{IsCommit: fsm.True}
payload := eventNonRetriableErrPayload{err: pgerror.Newf(pgcode.AdminShutdown,
"connExecutor closing")}
payloadErr = pgerror.Newf(pgcode.AdminShutdown, "connExecutor closing")
payload := eventNonRetriableErrPayload{err: payloadErr}
if err := ex.machine.ApplyWithPayload(ctx, ev, payload); err != nil {
log.Warningf(ctx, "error while cleaning up connExecutor: %s", err)
}
Expand All @@ -1251,7 +1252,7 @@ func (ex *connExecutor) close(ctx context.Context, closeType closeType) {
ex.state.finishExternalTxn()
}

ex.resetExtraTxnState(ctx, txnEvent{eventType: txnEvType})
ex.resetExtraTxnState(ctx, txnEvent{eventType: txnEvType}, payloadErr)
if ex.hasCreatedTemporarySchema && !ex.server.cfg.TestingKnobs.DisableTempObjectsCleanupOnSessionExit {
err := cleanupSessionTempObjects(
ctx,
Expand Down Expand Up @@ -1964,9 +1965,10 @@ func (ns *prepStmtNamespace) resetTo(

// resetExtraTxnState resets the fields of ex.extraTxnState when a transaction
// finishes execution (either commits, rollbacks or restarts). Based on the
// transaction event, resetExtraTxnState invokes corresponding callbacks
// transaction event, resetExtraTxnState invokes corresponding callbacks.
// The payload error is included for statistics recording.
// (e.g. onTxnFinish() and onTxnRestart()).
func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent) {
func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent, payloadErr error) {
ex.extraTxnState.numDDL = 0
ex.extraTxnState.firstStmtExecuted = false
ex.extraTxnState.hasAdminRoleCache = HasAdminRoleCache{}
Expand Down Expand Up @@ -2003,7 +2005,7 @@ func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent) {
ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
)
ex.extraTxnState.savepoints.clear()
ex.onTxnFinish(ctx, ev)
ex.onTxnFinish(ctx, ev, payloadErr)
case txnRestart:
ex.onTxnRestart(ctx)
ex.state.mu.Lock()
Expand Down Expand Up @@ -3730,18 +3732,20 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(

advInfo := ex.state.consumeAdvanceInfo()

var payloadErr error
// If we had an error from DDL statement execution due to the presence of
// other concurrent schema changes when attempting a schema change, wait for
// the completion of those schema changes first.
if p, ok := payload.(payloadWithError); ok {
if descID := scerrors.ConcurrentSchemaChangeDescID(p.errorCause()); descID != descpb.InvalidID {
payloadErr = p.errorCause()
if descID := scerrors.ConcurrentSchemaChangeDescID(payloadErr); descID != descpb.InvalidID {
if err := ex.handleWaitingForConcurrentSchemaChanges(ex.Ctx(), descID); err != nil {
return advanceInfo{}, err
}
}
// Similarly, if the descriptor ID generator is not available because of
// an ongoing migration, wait for the migration to complete first.
if errors.Is(p.errorCause(), descidgen.ErrDescIDSequenceMigrationInProgress) {
if errors.Is(payloadErr, descidgen.ErrDescIDSequenceMigrationInProgress) {
if err := ex.handleWaitingForDescriptorIDGeneratorMigration(ex.Ctx()); err != nil {
return advanceInfo{}, err
}
Expand Down Expand Up @@ -3859,7 +3863,7 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(

fallthrough
case txnRollback:
ex.resetExtraTxnState(ex.Ctx(), advInfo.txnEvent)
ex.resetExtraTxnState(ex.Ctx(), advInfo.txnEvent, payloadErr)
// Since we're doing a complete rollback, there's no need to keep the
// prepared stmts for a txn rewind.
ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.closeAllPortals(
Expand All @@ -3869,7 +3873,7 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
case txnRestart:
// In addition to resetting the extraTxnState, the restart event may
// also need to reset the sqlliveness.Session.
ex.resetExtraTxnState(ex.Ctx(), advInfo.txnEvent)
ex.resetExtraTxnState(ex.Ctx(), advInfo.txnEvent, payloadErr)
if err := ex.maybeSetSQLLivenessSession(); err != nil {
return advanceInfo{}, err
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/conn_executor_exec.go
Expand Up @@ -2993,7 +2993,7 @@ func payloadHasError(payload fsm.EventPayload) bool {
return hasErr
}

func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent) {
func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent, txnErr error) {
if ex.extraTxnState.shouldExecuteOnTxnFinish {
ex.extraTxnState.shouldExecuteOnTxnFinish = false
txnStart := ex.extraTxnState.txnFinishClosure.txnStartTime
Expand Down Expand Up @@ -3022,7 +3022,7 @@ func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent) {
)
}

err = ex.recordTransactionFinish(ctx, transactionFingerprintID, ev, implicit, txnStart)
err = ex.recordTransactionFinish(ctx, transactionFingerprintID, ev, implicit, txnStart, txnErr)
if err != nil {
if log.V(1) {
log.Warningf(ctx, "failed to record transaction stats: %s", err)
Expand Down Expand Up @@ -3109,6 +3109,7 @@ func (ex *connExecutor) recordTransactionFinish(
ev txnEvent,
implicit bool,
txnStart time.Time,
txnErr error,
) error {
recordingStart := timeutil.Now()
defer func() {
Expand Down Expand Up @@ -3175,6 +3176,7 @@ func (ex *connExecutor) recordTransactionFinish(
// TODO(107318): add asoftime or ishistorical
// TODO(107318): add readonly
SessionData: ex.sessionData(),
TxnErr: txnErr,
}

if ex.server.cfg.TestingKnobs.OnRecordTxnFinish != nil {
Expand Down
89 changes: 75 additions & 14 deletions pkg/sql/sqlstats/insights/integration/insights_test.go
Expand Up @@ -193,6 +193,7 @@ func TestFailedInsights(t *testing.T) {
defer log.Scope(t).Close(t)

const appName = "TestFailedInsights"
re := regexp.MustCompile(",?SlowExecution,?")

// Start the cluster. (One node is sufficient; the outliers system is currently in-memory only.)
ctx := context.Background()
Expand Down Expand Up @@ -275,12 +276,7 @@ FROM crdb_internal.node_execution_insights
WHERE query = $1 AND app_name = $2 `,
tc.fingerprint, appName)

err = row.Scan(&query, &status, &problem, &errorCode, &errorMsg)
if err != nil {
return err
}

return nil
return row.Scan(&query, &status, &problem, &errorCode, &errorMsg)
}, 1*time.Second)

require.Equal(t, tc.status, status)
Expand Down Expand Up @@ -329,7 +325,7 @@ WHERE query = $1 AND app_name = $2 `,
// Multi-statement txn with a slow stmt and then a failed execution.
stmts: "BEGIN; SELECT (pg_sleep(0.1)); CREATE TABLE exists(); CREATE TABLE exists();",
fingerprint: "SELECT (pg_sleep(_)) ; CREATE TABLE \"exists\" () ; CREATE TABLE \"exists\" ()",
problems: "{SlowExecution,FailedExecution}",
problems: "{FailedExecution,SlowExecution}",
errorCode: "42P07",
errorMsg: `relation ‹"defaultdb.public.\"exists\""› already exists`,
errorMsgRedacted: `relation ‹×› already exists`,
Expand Down Expand Up @@ -366,12 +362,7 @@ SELECT query,
FROM crdb_internal.node_txn_execution_insights
WHERE query = $1 AND app_name = $2`, tc.fingerprint, appName)

err = row.Scan(&query, &problems, &status, &errorCode, &errorMsg)

if err != nil {
return err
}
return nil
return row.Scan(&query, &problems, &status, &errorCode, &errorMsg)
}, 1*time.Second)

require.Equal(t, tc.txnStatus, status)
Expand All @@ -386,7 +377,6 @@ WHERE query = $1 AND app_name = $2`, tc.fingerprint, appName)
if problems != tc.problems {
// During tests some transactions can stay open for longer, adding an extra
// `SlowExecution` to the problems list. This checks for that possibility.
re := regexp.MustCompile(",?SlowExecution,?")
replacedSlowProblems = re.ReplaceAllString(replacedSlowProblems, "")
}
// Print the original problems if we did any replacements, for debugging.
Expand All @@ -397,6 +387,77 @@ WHERE query = $1 AND app_name = $2`, tc.fingerprint, appName)
})
}

// TestTransactionInsightsFailOnCommit specifically tests for the scenario where a transaction
// fails on COMMIT. COMMIT is executed specially and does not get stats recorded, skipping
// the insights recording step. We should ensure txns failing on COMMIT are also captured.
func TestTransactionInsightsFailOnCommit(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const appName = "TestTransactionInsightsFailOnCommit"
re := regexp.MustCompile(",?SlowExecution,?")

// Start the cluster. (One node is sufficient; the outliers system is currently in-memory only.)
ctx := context.Background()
srv, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer srv.Stopper().Stop(ctx)

conn1 := sqlutils.MakeSQLRunner(db)
conn2 := sqlutils.MakeSQLRunner(srv.ApplicationLayer().SQLConn(t, ""))

// Set up myUsers table with 2 users.
conn1.Exec(t, "CREATE TABLE myUsers (name STRING, city STRING)")
conn1.Exec(t, "INSERT INTO myUsers VALUES ('WENDY', 'NYC'), ('NOVI', 'TORONTO')")

conn1.Exec(t, "SET SESSION application_name=$1", appName)
conn2.Exec(t, "SET SESSION application_name=$1", appName)

// We will simulate a 40001 transaction retry error due to conflicting locks.
// Transaction 1 will fail on COMMIT.
tx1 := conn1.Begin(t)
_, err := tx1.Exec("SELECT * FROM myUsers WHERE city = 'TORONTO'")
require.NoError(t, err)

tx2 := conn2.Begin(t)
_, err = tx2.Exec("UPDATE myUsers SET name = 'NOVI' WHERE city = 'TORONTO'")
require.NoError(t, err)

_, err = tx1.Exec("UPDATE myUsers SET name = 'WENDY' WHERE city = 'NYC'")
require.NoError(t, err)
require.Error(t, tx1.Commit())

require.NoError(t, tx2.Commit())

var query, problems, status, errorCode, errorMsg string
testutils.SucceedsSoon(t, func() error {
// Query the node txn execution insights table.
row := conn1.DB.QueryRowContext(ctx, `
SELECT query,
problems,
status,
COALESCE(last_error_code, '') last_error_code,
COALESCE(last_error_redactable, '') last_error
FROM crdb_internal.node_txn_execution_insights
WHERE app_name = $1`, appName)

return row.Scan(&query, &problems, &status, &errorCode, &errorMsg)
})

require.Equal(t, "SELECT * FROM myusers WHERE city = '_' ; UPDATE myusers SET name = '_' WHERE city = '_'", query)
expectedProblem := "{FailedExecution}"
replacedSlowProblems := problems
if problems != expectedProblem {
// During tests some transactions can stay open for longer, adding an extra
// `SlowExecution` to the problems list. This checks for that possibility.
replacedSlowProblems = re.ReplaceAllString(replacedSlowProblems, "")
}
// Print the original problems if we did any replacements, for debugging.
require.Equal(t, expectedProblem, replacedSlowProblems, "received: %s, used to compare: %s", problems, replacedSlowProblems)
require.Equal(t, "Failed", status)
require.Equal(t, "40001", errorCode)
require.Contains(t, errorMsg, "TransactionRetryWithProtoRefreshError")
}

func TestInsightsPriorityIntegration(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
32 changes: 21 additions & 11 deletions pkg/sql/sqlstats/insights/registry.go
Expand Up @@ -114,8 +114,9 @@ func (r *lockingRegistry) ObserveTransaction(sessionID clusterunique.ID, transac
highContention = transaction.Contention.Seconds() >= LatencyThreshold.Get(&r.causes.st.SV).Seconds()
}

if slowOrFailedStatements.Empty() && !highContention {
// We only record an insight if we have slow or failed statements or high txn contention.
txnFailed := transaction.Status == Transaction_Failed
if slowOrFailedStatements.Empty() && !highContention && !txnFailed {
// We only record an insight if we have slow statements, high txn contention, or failed executions.
return
}

Expand All @@ -128,23 +129,27 @@ func (r *lockingRegistry) ObserveTransaction(sessionID clusterunique.ID, transac
insight.Transaction.Causes = addCause(insight.Transaction.Causes, Cause_HighContention)
}

var lastErrorCode string
var lastErrorMessage redact.RedactableString
if txnFailed {
insight.Transaction.Problems = addProblem(insight.Transaction.Problems, Problem_FailedExecution)
}

// The transaction status will reflect the status of its statements; it will
// default to completed unless a failed statement status is found. Note that
// this does not take into account the "Cancelled" transaction status.
var lastStatus = Transaction_Completed
var lastStmtErr redact.RedactableString
var lastStmtErrCode string
for i, s := range *statements {
if slowOrFailedStatements.Contains(i) {
switch s.Status {
case Statement_Completed:
s.Problem = Problem_SlowExecution
s.Causes = r.causes.examine(s.Causes, s)
case Statement_Failed:
lastErrorCode = s.ErrorCode
lastErrorMessage = s.ErrorMsg
lastStatus = Transaction_Status(s.Status)
s.Problem = Problem_FailedExecution
if transaction.LastErrorCode == "" {
lastStmtErr = s.ErrorMsg
lastStmtErrCode = s.ErrorCode
}
}

// Bubble up stmt problems and causes.
Expand All @@ -158,9 +163,14 @@ func (r *lockingRegistry) ObserveTransaction(sessionID clusterunique.ID, transac
insight.Statements = append(insight.Statements, s)
}

insight.Transaction.LastErrorCode = lastErrorCode
insight.Transaction.LastErrorMsg = lastErrorMessage
insight.Transaction.Status = lastStatus
if transaction.LastErrorCode == "" && lastStmtErrCode != "" {
// Stmt failure equates to transaction failure. Sometimes we
// can't propagate the error up to the transaction level so
// we manually set the transaction's failure info here.
transaction.LastErrorMsg = lastStmtErr
transaction.LastErrorCode = lastStmtErrCode
}

r.sink.AddInsight(insight)
}

Expand Down

0 comments on commit ed6b6d7

Please sign in to comment.