diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 2dd2de6584f8..dead9db0b718 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -770,6 +770,11 @@ func TestChangefeedRetryableSinkError(t *testing.T) { return nil } s, sqlDBRaw, _ := serverutils.StartServer(t, base.TestServerArgs{ + // This test causes a lot of pgwire connection attempts which, in secure + // mode, results in many rounds of bcrypt hashing. This is excruciatingly + // slow with the race detector on. Just use insecure mode, which avoids + // bcrypt. + Insecure: true, Knobs: base.TestingKnobs{ DistSQL: &distsqlrun.TestingKnobs{ Changefeed: &TestingKnobs{ @@ -785,11 +790,11 @@ func TestChangefeedRetryableSinkError(t *testing.T) { // Create original data table. sqlDB.Exec(t, `CREATE DATABASE d`) sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) - sqlDB.Exec(t, `CREATE USER sinkuser WITH PASSWORD sink`) + sqlDB.Exec(t, `CREATE USER sinkuser`) sqlDB.Exec(t, `GRANT ALL ON DATABASE d TO sinkuser`) // Create changefeed into SQL Sink. - row := sqlDB.QueryRow(t, fmt.Sprintf(`CREATE CHANGEFEED FOR foo INTO 'experimental-sql://sinkuser:sink@%s/d'`, s.ServingAddr())) + row := sqlDB.QueryRow(t, fmt.Sprintf(`CREATE CHANGEFEED FOR foo INTO 'experimental-sql://sinkuser@%s/d?sslmode=disable'`, s.ServingAddr())) var jobID string row.Scan(&jobID) diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index 98a02fca69b0..0774e6784c31 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -87,6 +87,11 @@ func getSink(sinkURI string, targets jobspb.ChangefeedTargets) (Sink, error) { if err != nil { return nil, err } + // Mark all query parameters as consumed; since the connection succeeded, + // we assume they were valid SQL connection parameters. + for k := range q { + q.Del(k) + } default: return nil, errors.Errorf(`unsupported sink: %s`, u.Scheme) } diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 54b2ad9e0f2a..0329d785491f 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -1753,8 +1753,16 @@ func (s *Store) startClosedTimestampRangefeedSubscriber(ctx context.Context) { select { case <-ch: // Drain all notifications from the channel. - for len(ch) > 0 { - <-ch + loop: + for { + select { + case _, ok := <-ch: + if !ok { + break loop + } + default: + break loop + } } // Gather replicas to notify under lock.