From b320b51b1c7ca68f2f23b181feb78bb034f4f0d9 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Wed, 5 Nov 2025 12:07:12 -0800 Subject: [PATCH] copy: step txn unconditionally Currently, we have an issue with COPY FROM command if it runs within the txn which had just been rolled back to a savepoint - we'd hit an assertion error in the KV layer. This commit fixes the oversight by stepping the txn, similar to what we do on the main query path. Additionally, this commit explicitly prohibits running COPY FROM via the internal executor (previously it could lead to undefined behavior or internal errors) and adds a test with savepoints for COPY TO (which works because it uses different execution machinery than COPY FROM). Release note (bug fix): Previously, CockroachDB could encounter an internal error when evaluating COPY FROM command in a transaction after it's been rolled back to a savepoint. The bug has been present since before 23.2 version and is now fixed. --- pkg/sql/conn_executor.go | 6 ++ pkg/sql/copy/copy_in_test.go | 64 ++++++++++--------- pkg/sql/copy/copy_out_test.go | 24 +++++-- pkg/sql/internal.go | 12 ++-- .../testdata/execute_internally_builtin | 3 + 5 files changed, 67 insertions(+), 42 deletions(-) diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index e298e1d5b2ee..94ee3b62047d 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -3228,6 +3228,12 @@ func (ex *connExecutor) execCopyIn( // Disable the buffered writes for COPY since there is no benefit in this // ability here. ex.state.mu.txn.SetBufferedWritesEnabled(false /* enabled */) + // Step the txn in case it had just been rolled back to a savepoint (if it + // wasn't, this is harmless). This also matches what we do unconditionally + // on the main query path. + if err := ex.state.mu.txn.Step(ctx, false /* allowReadTimestampStep */); err != nil { + return ex.makeErrEvent(err, cmd.ParsedStmt.AST) + } txnOpt := copyTxnOpt{ txn: ex.state.mu.txn, txnTimestamp: ex.state.sqlTimestamp, diff --git a/pkg/sql/copy/copy_in_test.go b/pkg/sql/copy/copy_in_test.go index 74965f7bef28..31ab288c1c42 100644 --- a/pkg/sql/copy/copy_in_test.go +++ b/pkg/sql/copy/copy_in_test.go @@ -646,49 +646,51 @@ func TestCopyTransaction(t *testing.T) { s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) defer s.Stopper().Stop(context.Background()) - if _, err := db.Exec(` + _, err := db.Exec(` CREATE DATABASE d; SET DATABASE = d; CREATE TABLE t ( i INT PRIMARY KEY ); - `); err != nil { - t.Fatal(err) - } + `) + require.NoError(t, err) txn, err := db.Begin() - if err != nil { - t.Fatal(err) - } - - // Note that, at least with lib/pq, this doesn't actually send a Parse msg - // (which we wouldn't support, as we don't support Copy-in in extended - // protocol mode). lib/pq has magic for recognizing a Copy. - stmt, err := txn.Prepare(pq.CopyIn("t", "i")) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) - const val = 2 + // Run COPY twice with the first one being rolled back via savepoints. + for val, doSavepoint := range []bool{true, false} { + func() { + if doSavepoint { + _, err = txn.Exec("SAVEPOINT s") + require.NoError(t, err) + defer func() { + _, err = txn.Exec("ROLLBACK TO SAVEPOINT s") + require.NoError(t, err) + }() + } + // Note that, at least with lib/pq, this doesn't actually send a + // Parse msg (which we wouldn't support, as we don't support Copy-in + // in extended protocol mode). lib/pq has magic for recognizing a + // Copy. + stmt, err := txn.Prepare(pq.CopyIn("t", "i")) + require.NoError(t, err) - _, err = stmt.Exec(val) - if err != nil { - t.Fatal(err) - } + _, err = stmt.Exec(val) + require.NoError(t, err) - if err = stmt.Close(); err != nil { - t.Fatal(err) - } + err = stmt.Close() + require.NoError(t, err) - var i int - if err := txn.QueryRow("SELECT i FROM d.t").Scan(&i); err != nil { - t.Fatal(err) - } else if i != val { - t.Fatalf("expected 1, got %d", i) - } - if err := txn.Commit(); err != nil { - t.Fatal(err) + var i int + err = txn.QueryRow("SELECT i FROM d.t").Scan(&i) + require.NoError(t, err) + if i != val { + t.Fatalf("expected %d, got %d", val, i) + } + }() } + require.NoError(t, txn.Commit()) } // TestCopyFromFKCheck verifies that foreign keys are checked during COPY. diff --git a/pkg/sql/copy/copy_out_test.go b/pkg/sql/copy/copy_out_test.go index 1905b90bf8b5..e7e67d6228ff 100644 --- a/pkg/sql/copy/copy_out_test.go +++ b/pkg/sql/copy/copy_out_test.go @@ -61,14 +61,26 @@ func TestCopyOutTransaction(t *testing.T) { tx, err := conn.Begin(ctx) require.NoError(t, err) - _, err = tx.Exec(ctx, "INSERT INTO t VALUES (1)") - require.NoError(t, err) + for val, doSavepoint := range []bool{true, false} { + func() { + if doSavepoint { + _, err = tx.Exec(ctx, "SAVEPOINT s") + require.NoError(t, err) + defer func() { + _, err = tx.Exec(ctx, "ROLLBACK TO SAVEPOINT s") + require.NoError(t, err) + }() + } - var buf bytes.Buffer - _, err = tx.Conn().PgConn().CopyTo(ctx, &buf, "COPY t TO STDOUT") - require.NoError(t, err) - require.Equal(t, "1\n", buf.String()) + _, err = tx.Exec(ctx, "INSERT INTO t VALUES ($1)", val) + require.NoError(t, err) + var buf bytes.Buffer + _, err = tx.Conn().PgConn().CopyTo(ctx, &buf, "COPY t TO STDOUT") + require.NoError(t, err) + require.Equal(t, fmt.Sprintf("%d\n", val), buf.String()) + }() + } require.NoError(t, tx.Rollback(ctx)) } diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index fdf1ceeffd8a..deff12d41f48 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -1472,15 +1472,17 @@ func (ie *InternalExecutor) commitTxn(ctx context.Context) error { return ex.commitSQLTransactionInternal(ctx) } -// checkIfStmtIsAllowed returns an error if the internal executor is not bound -// with the outer-txn-related info but is used to run DDL statements within an -// outer txn. -// TODO (janexing): this will be deprecate soon since it's not a good idea -// to have `extraTxnState` to store the info from a outer txn. +// checkIfStmtIsAllowed returns an error if the internal executor cannot execute +// the given stmt. func (ie *InternalExecutor) checkIfStmtIsAllowed(stmt tree.Statement, txn *kv.Txn) error { if stmt == nil { return nil } + if _, ok := stmt.(*tree.CopyFrom); ok { + // COPY FROM has special handling in the connExecutor, so we can't run + // it via the internal executor. + return errors.New("COPY cannot be run via the internal executor") + } if tree.CanModifySchema(stmt) && txn != nil && ie.extraTxnState == nil { return errors.New("DDL statement is disallowed if internal " + "executor is not bound with txn metadata") diff --git a/pkg/sql/opt/exec/execbuilder/testdata/execute_internally_builtin b/pkg/sql/opt/exec/execbuilder/testdata/execute_internally_builtin index e76fda1928bb..0bee198befb5 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/execute_internally_builtin +++ b/pkg/sql/opt/exec/execbuilder/testdata/execute_internally_builtin @@ -298,3 +298,6 @@ subtest end # Regression test for crashing with SHOW COMMIT TIMESTAMP. statement error this statement is disallowed SELECT crdb_internal.execute_internally('SHOW COMMIT TIMESTAMP;', true); + +statement error COPY cannot be run via the internal executor +SELECT crdb_internal.execute_internally('COPY t FROM STDIN');