Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3228,6 +3228,12 @@ func (ex *connExecutor) execCopyIn(
// Disable the buffered writes for COPY since there is no benefit in this
// ability here.
ex.state.mu.txn.SetBufferedWritesEnabled(false /* enabled */)
// Step the txn in case it had just been rolled back to a savepoint (if it
// wasn't, this is harmless). This also matches what we do unconditionally
// on the main query path.
if err := ex.state.mu.txn.Step(ctx, false /* allowReadTimestampStep */); err != nil {
return ex.makeErrEvent(err, cmd.ParsedStmt.AST)
}
txnOpt := copyTxnOpt{
txn: ex.state.mu.txn,
txnTimestamp: ex.state.sqlTimestamp,
Expand Down
64 changes: 33 additions & 31 deletions pkg/sql/copy/copy_in_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,49 +646,51 @@ func TestCopyTransaction(t *testing.T) {
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(context.Background())

if _, err := db.Exec(`
_, err := db.Exec(`
CREATE DATABASE d;
SET DATABASE = d;
CREATE TABLE t (
i INT PRIMARY KEY
);
`); err != nil {
t.Fatal(err)
}
`)
require.NoError(t, err)

txn, err := db.Begin()
if err != nil {
t.Fatal(err)
}

// Note that, at least with lib/pq, this doesn't actually send a Parse msg
// (which we wouldn't support, as we don't support Copy-in in extended
// protocol mode). lib/pq has magic for recognizing a Copy.
stmt, err := txn.Prepare(pq.CopyIn("t", "i"))
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

const val = 2
// Run COPY twice with the first one being rolled back via savepoints.
for val, doSavepoint := range []bool{true, false} {
func() {
if doSavepoint {
_, err = txn.Exec("SAVEPOINT s")
require.NoError(t, err)
defer func() {
_, err = txn.Exec("ROLLBACK TO SAVEPOINT s")
require.NoError(t, err)
}()
}
// Note that, at least with lib/pq, this doesn't actually send a
// Parse msg (which we wouldn't support, as we don't support Copy-in
// in extended protocol mode). lib/pq has magic for recognizing a
// Copy.
stmt, err := txn.Prepare(pq.CopyIn("t", "i"))
require.NoError(t, err)

_, err = stmt.Exec(val)
if err != nil {
t.Fatal(err)
}
_, err = stmt.Exec(val)
require.NoError(t, err)

if err = stmt.Close(); err != nil {
t.Fatal(err)
}
err = stmt.Close()
require.NoError(t, err)

var i int
if err := txn.QueryRow("SELECT i FROM d.t").Scan(&i); err != nil {
t.Fatal(err)
} else if i != val {
t.Fatalf("expected 1, got %d", i)
}
if err := txn.Commit(); err != nil {
t.Fatal(err)
var i int
err = txn.QueryRow("SELECT i FROM d.t").Scan(&i)
require.NoError(t, err)
if i != val {
t.Fatalf("expected %d, got %d", val, i)
}
}()
}
require.NoError(t, txn.Commit())
}

// TestCopyFromFKCheck verifies that foreign keys are checked during COPY.
Expand Down
24 changes: 18 additions & 6 deletions pkg/sql/copy/copy_out_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,26 @@ func TestCopyOutTransaction(t *testing.T) {
tx, err := conn.Begin(ctx)
require.NoError(t, err)

_, err = tx.Exec(ctx, "INSERT INTO t VALUES (1)")
require.NoError(t, err)
for val, doSavepoint := range []bool{true, false} {
func() {
if doSavepoint {
_, err = tx.Exec(ctx, "SAVEPOINT s")
require.NoError(t, err)
defer func() {
_, err = tx.Exec(ctx, "ROLLBACK TO SAVEPOINT s")
require.NoError(t, err)
}()
}

var buf bytes.Buffer
_, err = tx.Conn().PgConn().CopyTo(ctx, &buf, "COPY t TO STDOUT")
require.NoError(t, err)
require.Equal(t, "1\n", buf.String())
_, err = tx.Exec(ctx, "INSERT INTO t VALUES ($1)", val)
require.NoError(t, err)

var buf bytes.Buffer
_, err = tx.Conn().PgConn().CopyTo(ctx, &buf, "COPY t TO STDOUT")
require.NoError(t, err)
require.Equal(t, fmt.Sprintf("%d\n", val), buf.String())
}()
}
require.NoError(t, tx.Rollback(ctx))
}

Expand Down
12 changes: 7 additions & 5 deletions pkg/sql/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -1472,15 +1472,17 @@ func (ie *InternalExecutor) commitTxn(ctx context.Context) error {
return ex.commitSQLTransactionInternal(ctx)
}

// checkIfStmtIsAllowed returns an error if the internal executor is not bound
// with the outer-txn-related info but is used to run DDL statements within an
// outer txn.
// TODO (janexing): this will be deprecate soon since it's not a good idea
// to have `extraTxnState` to store the info from a outer txn.
// checkIfStmtIsAllowed returns an error if the internal executor cannot execute
// the given stmt.
func (ie *InternalExecutor) checkIfStmtIsAllowed(stmt tree.Statement, txn *kv.Txn) error {
if stmt == nil {
return nil
}
if _, ok := stmt.(*tree.CopyFrom); ok {
// COPY FROM has special handling in the connExecutor, so we can't run
// it via the internal executor.
return errors.New("COPY cannot be run via the internal executor")
}
if tree.CanModifySchema(stmt) && txn != nil && ie.extraTxnState == nil {
return errors.New("DDL statement is disallowed if internal " +
"executor is not bound with txn metadata")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,3 +298,6 @@ subtest end
# Regression test for crashing with SHOW COMMIT TIMESTAMP.
statement error this statement is disallowed
SELECT crdb_internal.execute_internally('SHOW COMMIT TIMESTAMP;', true);

statement error COPY cannot be run via the internal executor
SELECT crdb_internal.execute_internally('COPY t FROM STDIN');
Loading