Skip to content

Commit b320b51

Browse files
committed
copy: step txn unconditionally
Currently, we have an issue with COPY FROM command if it runs within the txn which had just been rolled back to a savepoint - we'd hit an assertion error in the KV layer. This commit fixes the oversight by stepping the txn, similar to what we do on the main query path. Additionally, this commit explicitly prohibits running COPY FROM via the internal executor (previously it could lead to undefined behavior or internal errors) and adds a test with savepoints for COPY TO (which works because it uses different execution machinery than COPY FROM). Release note (bug fix): Previously, CockroachDB could encounter an internal error when evaluating COPY FROM command in a transaction after it's been rolled back to a savepoint. The bug has been present since before 23.2 version and is now fixed.
1 parent d83d1d5 commit b320b51

File tree

5 files changed

+67
-42
lines changed

5 files changed

+67
-42
lines changed

pkg/sql/conn_executor.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3228,6 +3228,12 @@ func (ex *connExecutor) execCopyIn(
32283228
// Disable the buffered writes for COPY since there is no benefit in this
32293229
// ability here.
32303230
ex.state.mu.txn.SetBufferedWritesEnabled(false /* enabled */)
3231+
// Step the txn in case it had just been rolled back to a savepoint (if it
3232+
// wasn't, this is harmless). This also matches what we do unconditionally
3233+
// on the main query path.
3234+
if err := ex.state.mu.txn.Step(ctx, false /* allowReadTimestampStep */); err != nil {
3235+
return ex.makeErrEvent(err, cmd.ParsedStmt.AST)
3236+
}
32313237
txnOpt := copyTxnOpt{
32323238
txn: ex.state.mu.txn,
32333239
txnTimestamp: ex.state.sqlTimestamp,

pkg/sql/copy/copy_in_test.go

Lines changed: 33 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -646,49 +646,51 @@ func TestCopyTransaction(t *testing.T) {
646646
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
647647
defer s.Stopper().Stop(context.Background())
648648

649-
if _, err := db.Exec(`
649+
_, err := db.Exec(`
650650
CREATE DATABASE d;
651651
SET DATABASE = d;
652652
CREATE TABLE t (
653653
i INT PRIMARY KEY
654654
);
655-
`); err != nil {
656-
t.Fatal(err)
657-
}
655+
`)
656+
require.NoError(t, err)
658657

659658
txn, err := db.Begin()
660-
if err != nil {
661-
t.Fatal(err)
662-
}
663-
664-
// Note that, at least with lib/pq, this doesn't actually send a Parse msg
665-
// (which we wouldn't support, as we don't support Copy-in in extended
666-
// protocol mode). lib/pq has magic for recognizing a Copy.
667-
stmt, err := txn.Prepare(pq.CopyIn("t", "i"))
668-
if err != nil {
669-
t.Fatal(err)
670-
}
659+
require.NoError(t, err)
671660

672-
const val = 2
661+
// Run COPY twice with the first one being rolled back via savepoints.
662+
for val, doSavepoint := range []bool{true, false} {
663+
func() {
664+
if doSavepoint {
665+
_, err = txn.Exec("SAVEPOINT s")
666+
require.NoError(t, err)
667+
defer func() {
668+
_, err = txn.Exec("ROLLBACK TO SAVEPOINT s")
669+
require.NoError(t, err)
670+
}()
671+
}
672+
// Note that, at least with lib/pq, this doesn't actually send a
673+
// Parse msg (which we wouldn't support, as we don't support Copy-in
674+
// in extended protocol mode). lib/pq has magic for recognizing a
675+
// Copy.
676+
stmt, err := txn.Prepare(pq.CopyIn("t", "i"))
677+
require.NoError(t, err)
673678

674-
_, err = stmt.Exec(val)
675-
if err != nil {
676-
t.Fatal(err)
677-
}
679+
_, err = stmt.Exec(val)
680+
require.NoError(t, err)
678681

679-
if err = stmt.Close(); err != nil {
680-
t.Fatal(err)
681-
}
682+
err = stmt.Close()
683+
require.NoError(t, err)
682684

683-
var i int
684-
if err := txn.QueryRow("SELECT i FROM d.t").Scan(&i); err != nil {
685-
t.Fatal(err)
686-
} else if i != val {
687-
t.Fatalf("expected 1, got %d", i)
688-
}
689-
if err := txn.Commit(); err != nil {
690-
t.Fatal(err)
685+
var i int
686+
err = txn.QueryRow("SELECT i FROM d.t").Scan(&i)
687+
require.NoError(t, err)
688+
if i != val {
689+
t.Fatalf("expected %d, got %d", val, i)
690+
}
691+
}()
691692
}
693+
require.NoError(t, txn.Commit())
692694
}
693695

694696
// TestCopyFromFKCheck verifies that foreign keys are checked during COPY.

pkg/sql/copy/copy_out_test.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,26 @@ func TestCopyOutTransaction(t *testing.T) {
6161
tx, err := conn.Begin(ctx)
6262
require.NoError(t, err)
6363

64-
_, err = tx.Exec(ctx, "INSERT INTO t VALUES (1)")
65-
require.NoError(t, err)
64+
for val, doSavepoint := range []bool{true, false} {
65+
func() {
66+
if doSavepoint {
67+
_, err = tx.Exec(ctx, "SAVEPOINT s")
68+
require.NoError(t, err)
69+
defer func() {
70+
_, err = tx.Exec(ctx, "ROLLBACK TO SAVEPOINT s")
71+
require.NoError(t, err)
72+
}()
73+
}
6674

67-
var buf bytes.Buffer
68-
_, err = tx.Conn().PgConn().CopyTo(ctx, &buf, "COPY t TO STDOUT")
69-
require.NoError(t, err)
70-
require.Equal(t, "1\n", buf.String())
75+
_, err = tx.Exec(ctx, "INSERT INTO t VALUES ($1)", val)
76+
require.NoError(t, err)
7177

78+
var buf bytes.Buffer
79+
_, err = tx.Conn().PgConn().CopyTo(ctx, &buf, "COPY t TO STDOUT")
80+
require.NoError(t, err)
81+
require.Equal(t, fmt.Sprintf("%d\n", val), buf.String())
82+
}()
83+
}
7284
require.NoError(t, tx.Rollback(ctx))
7385
}
7486

pkg/sql/internal.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1472,15 +1472,17 @@ func (ie *InternalExecutor) commitTxn(ctx context.Context) error {
14721472
return ex.commitSQLTransactionInternal(ctx)
14731473
}
14741474

1475-
// checkIfStmtIsAllowed returns an error if the internal executor is not bound
1476-
// with the outer-txn-related info but is used to run DDL statements within an
1477-
// outer txn.
1478-
// TODO (janexing): this will be deprecate soon since it's not a good idea
1479-
// to have `extraTxnState` to store the info from a outer txn.
1475+
// checkIfStmtIsAllowed returns an error if the internal executor cannot execute
1476+
// the given stmt.
14801477
func (ie *InternalExecutor) checkIfStmtIsAllowed(stmt tree.Statement, txn *kv.Txn) error {
14811478
if stmt == nil {
14821479
return nil
14831480
}
1481+
if _, ok := stmt.(*tree.CopyFrom); ok {
1482+
// COPY FROM has special handling in the connExecutor, so we can't run
1483+
// it via the internal executor.
1484+
return errors.New("COPY cannot be run via the internal executor")
1485+
}
14841486
if tree.CanModifySchema(stmt) && txn != nil && ie.extraTxnState == nil {
14851487
return errors.New("DDL statement is disallowed if internal " +
14861488
"executor is not bound with txn metadata")

pkg/sql/opt/exec/execbuilder/testdata/execute_internally_builtin

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,3 +298,6 @@ subtest end
298298
# Regression test for crashing with SHOW COMMIT TIMESTAMP.
299299
statement error this statement is disallowed
300300
SELECT crdb_internal.execute_internally('SHOW COMMIT TIMESTAMP;', true);
301+
302+
statement error COPY cannot be run via the internal executor
303+
SELECT crdb_internal.execute_internally('COPY t FROM STDIN');

0 commit comments

Comments
 (0)