Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backport-2.1: sql: fix UPSERT RETURNING in presence of schema changes #29543

Merged
merged 1 commit into from
Sep 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions pkg/sql/descriptor_mutation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,63 @@ func (mt mutationTest) writeMutation(m sqlbase.DescriptorMutation) {
}
}

// Test that UPSERT with a column mutation that has a default value with a
// NOT NULL constraint can handle the null input to its row fetcher, and
// produces output rows of the correct shape.
// Regression test for #29436.
func TestUpsertWithColumnMutationAndNotNullDefault(t *testing.T) {
defer leaktest.AfterTest(t)()
// The descriptor changes made must have an immediate effect
// so disable leases on tables.
defer sql.TestDisableTableLeases()()
// Disable external processing of mutations.
params, _ := tests.CreateTestServerParams()
params.Knobs.SQLSchemaChanger = &sql.SchemaChangerTestingKnobs{
AsyncExecNotification: asyncSchemaChangerDisabled,
}
server, sqlDB, kvDB := serverutils.StartServer(t, params)
defer server.Stopper().Stop(context.TODO())

if _, err := sqlDB.Exec(`
CREATE DATABASE t;
CREATE TABLE t.test (k VARCHAR PRIMARY KEY DEFAULT 'default', v VARCHAR);
INSERT INTO t.test VALUES('a', 'foo');
ALTER TABLE t.test ADD COLUMN i VARCHAR NOT NULL DEFAULT 'i';
`); err != nil {
t.Fatal(err)
}

// read table descriptor
tableDesc := sqlbase.GetTableDescriptor(kvDB, "t", "test")

mTest := makeMutationTest(t, kvDB, sqlDB, tableDesc)
// Add column "i" as a mutation in delete/write.
mTest.writeColumnMutation("i", sqlbase.DescriptorMutation{State: sqlbase.DescriptorMutation_DELETE_AND_WRITE_ONLY})

// This row will conflict with the original row, and should insert an `i`
// into the new column.
mTest.Exec(t, `UPSERT INTO t.test VALUES('a', 'bar') RETURNING k`)

// These rows will not conflict.
mTest.Exec(t, `UPSERT INTO t.test VALUES('b', 'bar') RETURNING k`)
mTest.Exec(t, `INSERT INTO t.test VALUES('c', 'bar') RETURNING k, v`)
mTest.Exec(t, `INSERT INTO t.test VALUES('c', 'bar') ON CONFLICT(k) DO UPDATE SET v='qux' RETURNING k`)

mTest.CheckQueryResults(t, `SELECT * FROM t.test`, [][]string{
{"a", "bar"},
{"b", "bar"},
{"c", "qux"},
})

mTest.makeMutationsActive()

mTest.CheckQueryResults(t, `SELECT * FROM t.test`, [][]string{
{"a", "bar", "i"},
{"b", "bar", "i"},
{"c", "qux", "i"},
})
}

// Test INSERT, UPDATE, UPSERT, and DELETE operations with a column schema
// change.
func TestOperationsWithColumnMutation(t *testing.T) {
Expand Down Expand Up @@ -232,6 +289,14 @@ CREATE INDEX allidx ON t.test (k, v);
if !testutils.IsError(err, `column "i" does not exist`) {
t.Fatal(err)
}
if useUpsert {
_, err = sqlDB.Exec(`UPSERT INTO t.test (k, v) VALUES ('b', 'y') RETURNING i`)
} else {
_, err = sqlDB.Exec(`INSERT INTO t.test (k, v) VALUES ('b', 'y') RETURNING i`)
}
if !testutils.IsError(err, `column "i" does not exist`) {
t.Fatal(err)
}

// Repeating the same without specifying the columns results in a different error.
if useUpsert {
Expand Down
10 changes: 10 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/upsert
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,16 @@ UPSERT INTO kv VALUES (11, 10), (11, 12) RETURNING k, v
11 10
11 12

query I
UPSERT INTO kv VALUES (11) RETURNING k
----
11

query I
UPSERT INTO kv VALUES (11, 12) RETURNING v
----
12

statement count 1
INSERT INTO kv VALUES (13, 13), (7, 8) ON CONFLICT (k) DO NOTHING RETURNING *

Expand Down
43 changes: 34 additions & 9 deletions pkg/sql/tablewriter_upsert.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ type tableUpserterBase struct {
// rowTemplate is used to prepare rows to add to rowsUpserted.
rowTemplate tree.Datums
// rowIdxToRetIdx maps the indices in the inserted rows
// back to indices in rowTemplate.
// back to indices in rowTemplate. Contains dontReturnCol (-1) for indices
// in the inserted rows that are mutation columns which shouldn't be returned
// to the user.
rowIdxToRetIdx []int
// resultCount is the number of upserts. Mirrors rowsUpserted.Len() if
// collectRows is set, counted separately otherwise.
Expand All @@ -52,6 +54,10 @@ type tableUpserterBase struct {
indexKeyPrefix []byte
}

// dontReturnCol is stored in rowIdxToRetIdx to indicate indices in the inserted
// rows that are mutation columns which shouldn't be returned to the user.
const dontReturnCol = -1

func (tu *tableUpserterBase) init(txn *client.Txn, evalCtx *tree.EvalContext) error {
tu.tableWriterBase.init(txn)
tableDesc := tu.tableDesc()
Expand All @@ -77,13 +83,22 @@ func (tu *tableUpserterBase) init(txn *client.Txn, evalCtx *tree.EvalContext) er
}

// Create the map from insert rows to returning rows.
// Note that this map will *not* contain any mutation columns - that's because
// even though we might insert values into mutation columns, we never return
// them back to the user.
colIDToRetIndex := map[sqlbase.ColumnID]int{}
for i, col := range tableDesc.Columns {
colIDToRetIndex[col.ID] = i
}
tu.rowIdxToRetIdx = make([]int, len(tu.ri.InsertCols))
for i, col := range tu.ri.InsertCols {
tu.rowIdxToRetIdx[i] = colIDToRetIndex[col.ID]
retID, ok := colIDToRetIndex[col.ID]
if !ok {
// If the column is missing, it means it's a mutation column. Put -1 in
// the map to signal that we don't want it in the returning row.
retID = dontReturnCol
}
tu.rowIdxToRetIdx[i] = retID
}

tu.insertRows.Init(
Expand Down Expand Up @@ -149,16 +164,25 @@ func (tu *tableUpserterBase) finalize(
func (tu *tableUpserterBase) makeResultFromInsertRow(
insertRow tree.Datums, cols []sqlbase.ColumnDescriptor,
) tree.Datums {
resultRow := insertRow
if len(resultRow) < len(cols) {
resultRow = make(tree.Datums, len(cols))
// Pre-fill with NULLs.
if len(insertRow) == len(cols) {
// The row we inserted was already the right shape.
return insertRow
}
resultRow := make(tree.Datums, len(cols))
if len(insertRow) < len(cols) {
// The row we inserted didn't have all columns filled out. Fill the columns
// that weren't included with NULLs.
for i := range resultRow {
resultRow[i] = tree.DNull
}
// Fill the other values from insertRow.
for i, val := range insertRow {
resultRow[tu.rowIdxToRetIdx[i]] = val
}
// Now, fill the other values from insertRow.
for i, val := range insertRow {
retIdx := tu.rowIdxToRetIdx[i]
if retIdx != dontReturnCol {
// We don't want to return values we wrote into non-public columns.
// These values have retIdx = dontReturnCol. Skip them.
resultRow[retIdx] = val
}
}
return resultRow
Expand Down Expand Up @@ -430,6 +454,7 @@ func (tu *tableUpserter) atBatchEnd(ctx context.Context, traceKV bool) error {
// Do we need to remember a result for RETURNING?
if tu.collectRows {
// Yes, collect it.
resultRow = tu.makeResultFromInsertRow(resultRow, tableDesc.Columns)
_, err = tu.rowsUpserted.AddRow(ctx, resultRow)
if err != nil {
return err
Expand Down