Skip to content

Commit

Permalink
sql: fix cleanup on connection close
Browse files Browse the repository at this point in the history
Before this patch, if a pgwire connection was closed while in the
Aborted state, we weren't rolling back the underlying KV txn. Its locks
were never getting released.
I think until recently we only had this bug in the RetryWait state, but
it got worse when we merged the RetryWait and Aborted states.

This patch fixes it by properly handling the case of the connection
closing in the Aborted state by handling the
NonRetriableError(IsCommit:True) event.

Release note: None

Release justification: bug fix
  • Loading branch information
andreimatei committed Mar 10, 2020
1 parent 1d2541c commit 3639ec5
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 3 deletions.
10 changes: 9 additions & 1 deletion pkg/sql/conn_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,8 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{
return ts.finishTxn(args.Payload.(eventTxnFinishPayload))
},
},
eventNonRetriableErr{IsCommit: fsm.Any}: {
// Any statement.
eventNonRetriableErr{IsCommit: fsm.False}: {
// This event doesn't change state, but it returns a skipBatch code.
Description: "any other statement",
Next: stateAborted{},
Expand All @@ -360,6 +361,13 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{
return nil
},
},
// ConnExecutor closing.
eventNonRetriableErr{IsCommit: fsm.True}: {
// This event doesn't change state, but it returns a skipBatch code.
Description: "ConnExecutor closing",
Next: stateAborted{},
Action: cleanupAndFinishOnError,
},
// ROLLBACK TO SAVEPOINT <not cockroach_restart> success.
eventSavepointRollback{}: {
Description: "ROLLBACK TO SAVEPOINT (not cockroach_restart) success",
Expand Down
65 changes: 64 additions & 1 deletion pkg/sql/pgwire/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,69 @@ func (l pgxTestLogger) Log(level pgx.LogLevel, msg string, data map[string]inter
// pgxTestLogger implements pgx.Logger.
var _ pgx.Logger = pgxTestLogger{}

// Test that closing a pgwire connection causes transactions to be rolled back
// and release their locks.
func TestConnCloseReleasesLocks(t *testing.T) {
defer leaktest.AfterTest(t)()
// We're going to test closing the connection in both the Open and Aborted
// state.
testutils.RunTrueAndFalse(t, "open state", func(t *testing.T, open bool) {
s, _, _ := serverutils.StartServer(t, base.TestServerArgs{})
ctx := context.TODO()
defer s.Stopper().Stop(ctx)

pgURL, cleanupFunc := sqlutils.PGUrl(
t, s.ServingSQLAddr(), "testConnClose" /* prefix */, url.User(security.RootUser),
)
defer cleanupFunc()
db, err := gosql.Open("postgres", pgURL.String())
require.NoError(t, err)
defer db.Close()

r := sqlutils.MakeSQLRunner(db)
r.Exec(t, "CREATE DATABASE test")
r.Exec(t, "CREATE TABLE test.t (x int primary key)")

pgxConfig, err := pgx.ParseConnectionString(pgURL.String())
if err != nil {
t.Fatal(err)
}

conn, err := pgx.Connect(pgxConfig)
require.NoError(t, err)
tx, err := conn.Begin()
require.NoError(t, err)
_, err = tx.Exec("INSERT INTO test.t(x) values (1)")
require.NoError(t, err)
readCh := make(chan error)
go func() {
conn2, err := pgx.Connect(pgxConfig)
require.NoError(t, err)
_, err = conn2.Exec("SELECT * FROM test.t")
readCh <- err
}()

select {
case err := <-readCh:
t.Fatalf("unexpected read unblocked: %v", err)
case <-time.After(10 * time.Millisecond):
}

if !open {
_, err = tx.Exec("bogus")
require.NotNil(t, err)
}
err = conn.Close()
require.NoError(t, err)
select {
case readErr := <-readCh:
require.NoError(t, readErr)
case <-time.After(10 * time.Second):
t.Fatal("read not unblocked in a timely manner")
}
})
}

// Test that closing a client connection such that producing results rows
// encounters network errors doesn't crash the server (#23694).
//
Expand All @@ -649,7 +712,7 @@ var _ pgx.Logger = pgxTestLogger{}
// b) the connection's server-side results buffer has overflowed, and so
// attempting to produce results (through CommandResult.AddRow()) observes
// network errors.
func TestConnClose(t *testing.T) {
func TestConnCloseWhileProducingRows(t *testing.T) {
defer leaktest.AfterTest(t)()
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
ctx := context.TODO()
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/txnstatetransitions_diagram.gv
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ digraph finite_state_machine {

node [shape = circle];
"Aborted{}" -> "Aborted{}" [label = <NonRetriableErr{IsCommit:false}<BR/><I>any other statement</I>>]
"Aborted{}" -> "Aborted{}" [label = <NonRetriableErr{IsCommit:true}<BR/><I>any other statement</I>>]
"Aborted{}" -> "Aborted{}" [label = <NonRetriableErr{IsCommit:true}<BR/><I>ConnExecutor closing</I>>]
"Aborted{}" -> "Aborted{}" [label = <RetriableErr{CanAutoRetry:false, IsCommit:false}<BR/><I>ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart</I>>]
"Aborted{}" -> "Aborted{}" [label = <RetriableErr{CanAutoRetry:false, IsCommit:true}<BR/><I>ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart</I>>]
"Aborted{}" -> "Aborted{}" [label = <RetriableErr{CanAutoRetry:true, IsCommit:false}<BR/><I>ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart</I>>]
Expand Down
Binary file added pkg/sql/txnstatetransitions_diagram.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit 3639ec5

Please sign in to comment.