Skip to content

Commit

Permalink
sql: fix UPSERT fast path regression
Browse files Browse the repository at this point in the history
And add a test to prevent future regressions.

A second bug was unearthed which caused the upsert fast path to be
incorrect during schema changes. This was caught by the schema change
tests when the regression was fixed.

Closes #13437.
  • Loading branch information
danhhz committed Feb 8, 2017
1 parent 43aae29 commit 6295cc4
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 12 deletions.
5 changes: 0 additions & 5 deletions pkg/sql/parser/name_part.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,6 @@ func (n Name) Normalize() string {
return norm.NFC.String(lower)
}

// Equal returns true iff the normalizations of a and b are equal.
func (n Name) Equal(b Name) bool {
return n.Normalize() == b.Normalize()
}

// ReNormalizeName performs the same work as NormalizeName but when
// the string originates from the database. We define a different
// function so as to be able to track usage of this function (cf. #8200).
Expand Down
5 changes: 4 additions & 1 deletion pkg/sql/tablewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,10 @@ func (tu *tableUpserter) init(txn *client.Txn) error {
tu.indexKeyPrefix = sqlbase.MakeIndexKeyPrefix(tu.tableDesc, tu.tableDesc.PrimaryIndex.ID)

allColsIdentityExpr := len(tu.ri.insertCols) == len(tu.tableDesc.Columns) &&
tu.evaler != nil && tu.evaler.isIdentityEvaler()
tu.evaler != nil && tu.evaler.isIdentityEvaler() &&
// Disable the upsert fast path when columns or indexes are being added
// or dropped.
len(tu.tableDesc.Mutations) == 0
if len(tu.tableDesc.Indexes) == 0 && allColsIdentityExpr {
tu.fastPathBatch = tu.txn.NewBatch()
tu.fastPathKeys = make(map[string]struct{})
Expand Down
12 changes: 6 additions & 6 deletions pkg/sql/upsert.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,16 +133,16 @@ func (p *planner) makeUpsertHelper(
helper.evalExprs = evalExprs

helper.allExprsIdentity = true
for i, expr := range evalExprs {
// analyzeExpr above has normalized all direct column names to ColumnItems.
c, ok := expr.(*parser.ColumnItem)
for _, expr := range evalExprs {
ivar, ok := expr.(*parser.IndexedVar)
if !ok {
helper.allExprsIdentity = false
break
}
if len(c.Selector) > 0 ||
!c.TableName.TableName.Equal(upsertExcludedTable.TableName) ||
c.ColumnName.Normalize() != parser.ReNormalizeName(updateCols[i].Name) {
// Idx on the IndexedVar references a columns from one of the two
// sources. They're ordered table source then excluded source. If any
// expr references a table column, then the fast path doesn't apply.
if ivar.Idx < len(sourceInfo.sourceColumns) {
helper.allExprsIdentity = false
break
}
Expand Down
86 changes: 86 additions & 0 deletions pkg/sql/upsert_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2017 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
//
// Author: Daniel Harrison (dan@cockroachlabs.com)

package sql_test

import (
"bytes"
"sync/atomic"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
)

func TestUpsertFastPath(t *testing.T) {
defer leaktest.AfterTest(t)()

// This filter increments scans for every ScanRequest that hits user table
// data.
var scans uint64
filter := func(filterArgs storagebase.FilterArgs) *roachpb.Error {
if filterArgs.Req.Method() == roachpb.Scan &&
bytes.Compare(filterArgs.Req.Header().Key, keys.UserTableDataMin) >= 0 {
atomic.AddUint64(&scans, 1)
}
return nil
}

s, conn, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{Store: &storage.StoreTestingKnobs{
TestingCommandFilter: filter,
}},
})
defer s.Stopper().Stop()
sqlDB := sqlutils.MakeSQLRunner(t, conn)
sqlDB.Exec(`CREATE DATABASE d`)
sqlDB.Exec(`CREATE TABLE d.kv (k INT PRIMARY KEY, v INT)`)

// This should hit the fast path.
atomic.StoreUint64(&scans, 0)
sqlDB.Exec(`UPSERT INTO d.kv VALUES (1, 1)`)
if s := atomic.LoadUint64(&scans); s != 0 {
t.Errorf("expected no scans (the upsert fast path) but got %d", s)
}

// This should hit the fast path.
atomic.StoreUint64(&scans, 0)
sqlDB.Exec(`INSERT INTO d.kv VALUES (1, 1) ON CONFLICT (k) DO UPDATE SET v=excluded.v`)
if s := atomic.LoadUint64(&scans); s != 0 {
t.Errorf("expected no scans (the upsert fast path) but got %d", s)
}

// This should not hit the fast path because it doesn't set every column.
atomic.StoreUint64(&scans, 0)
sqlDB.Exec(`UPSERT INTO d.kv (k) VALUES (1)`)
if s := atomic.LoadUint64(&scans); s != 1 {
t.Errorf("expected 1 scans (no upsert fast path) but got %d", s)
}

// This should not hit the fast path because kv has a secondary index.
sqlDB.Exec(`CREATE INDEX vidx ON d.kv (v)`)
atomic.StoreUint64(&scans, 0)
sqlDB.Exec(`UPSERT INTO d.kv VALUES (1, 1)`)
if s := atomic.LoadUint64(&scans); s != 1 {
t.Errorf("expected 1 scans (no upsert fast path) but got %d", s)
}
}

0 comments on commit 6295cc4

Please sign in to comment.