From 3d64d9ffe80280fbaf31b6547361b7bb1234cab7 Mon Sep 17 00:00:00 2001 From: Nikhil Benesch Date: Wed, 12 Sep 2018 21:24:13 -0400 Subject: [PATCH 1/2] changefeedccl: deflake TestChangefeedRetryableSinkError TestChangefeedRetryableSinkError was performing many rounds of bcrypt, which is excruciatingly slow when the race detector is on. Put its test cluster into insecure mode to skip bcrypt hashing. Fix #30156. Fix #30161. Release note: None --- pkg/ccl/changefeedccl/changefeed_test.go | 9 +++++++-- pkg/ccl/changefeedccl/sink.go | 5 +++++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 2dd2de6584f8..dead9db0b718 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -770,6 +770,11 @@ func TestChangefeedRetryableSinkError(t *testing.T) { return nil } s, sqlDBRaw, _ := serverutils.StartServer(t, base.TestServerArgs{ + // This test causes a lot of pgwire connection attempts which, in secure + // mode, results in many rounds of bcrypt hashing. This is excruciatingly + // slow with the race detector on. Just use insecure mode, which avoids + // bcrypt. + Insecure: true, Knobs: base.TestingKnobs{ DistSQL: &distsqlrun.TestingKnobs{ Changefeed: &TestingKnobs{ @@ -785,11 +790,11 @@ func TestChangefeedRetryableSinkError(t *testing.T) { // Create original data table. sqlDB.Exec(t, `CREATE DATABASE d`) sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) - sqlDB.Exec(t, `CREATE USER sinkuser WITH PASSWORD sink`) + sqlDB.Exec(t, `CREATE USER sinkuser`) sqlDB.Exec(t, `GRANT ALL ON DATABASE d TO sinkuser`) // Create changefeed into SQL Sink. - row := sqlDB.QueryRow(t, fmt.Sprintf(`CREATE CHANGEFEED FOR foo INTO 'experimental-sql://sinkuser:sink@%s/d'`, s.ServingAddr())) + row := sqlDB.QueryRow(t, fmt.Sprintf(`CREATE CHANGEFEED FOR foo INTO 'experimental-sql://sinkuser@%s/d?sslmode=disable'`, s.ServingAddr())) var jobID string row.Scan(&jobID) diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index 98a02fca69b0..0774e6784c31 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -87,6 +87,11 @@ func getSink(sinkURI string, targets jobspb.ChangefeedTargets) (Sink, error) { if err != nil { return nil, err } + // Mark all query parameters as consumed; since the connection succeeded, + // we assume they were valid SQL connection parameters. + for k := range q { + q.Del(k) + } default: return nil, errors.Errorf(`unsupported sink: %s`, u.Scheme) } From 4ee48ef4c71986401f8958e296097bc53c849aed Mon Sep 17 00:00:00 2001 From: Nikhil Benesch Date: Wed, 12 Sep 2018 21:32:14 -0400 Subject: [PATCH 2/2] storage: fix race when draining rangefeed channel The previous pattern used to drain a channel if len(ch) > 0 { <-ch } is considered a race by the race detector if close(ch) occurs at the same time. Adjust the channel-draining code to be race-safe by instead perfomring repeated nonblocking reads. Fix #30159. Release note: None --- pkg/storage/store.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 54b2ad9e0f2a..0329d785491f 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -1753,8 +1753,16 @@ func (s *Store) startClosedTimestampRangefeedSubscriber(ctx context.Context) { select { case <-ch: // Drain all notifications from the channel. - for len(ch) > 0 { - <-ch + loop: + for { + select { + case _, ok := <-ch: + if !ok { + break loop + } + default: + break loop + } } // Gather replicas to notify under lock.