Skip to content

Commit

Permalink
gossip: replace DeprecatedGossip with OptionalGossip
Browse files Browse the repository at this point in the history
Addresses a TODO and simplifies code.
  • Loading branch information
nvanbenschoten committed Aug 3, 2020
1 parent 8759499 commit 6c1a90b
Show file tree
Hide file tree
Showing 21 changed files with 49 additions and 69 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func splitAndFilterSpans(
}

// clusterNodeCount returns the approximate number of nodes in the cluster.
func clusterNodeCount(gw gossip.DeprecatedGossip) (int, error) {
func clusterNodeCount(gw gossip.OptionalGossip) (int, error) {
g, err := gw.OptionalErr(47970)
if err != nil {
return 0, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func createBenchmarkChangefeed(
Settings: settings,
DB: s.DB(),
Clock: feedClock,
Gossip: gossip.MakeExposedGossip(s.GossipI().(*gossip.Gossip)),
Gossip: gossip.MakeOptionalGossip(s.GossipI().(*gossip.Gossip)),
Spans: spans,
Targets: details.Targets,
Sink: buf,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Config struct {
Settings *cluster.Settings
DB *kv.DB
Clock *hlc.Clock
Gossip gossip.DeprecatedGossip
Gossip gossip.OptionalGossip
Spans []roachpb.Span
Targets jobspb.ChangefeedTargets
Sink EventBufferWriter
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/kvfeed/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type kvScanner interface {

type scanRequestScanner struct {
settings *cluster.Settings
gossip gossip.DeprecatedGossip
gossip gossip.OptionalGossip
db *kv.DB
}

Expand Down Expand Up @@ -244,7 +244,7 @@ func allRangeDescriptors(ctx context.Context, txn *kv.Txn) ([]roachpb.RangeDescr
}

// clusterNodeCount returns the approximate number of nodes in the cluster.
func clusterNodeCount(gw gossip.DeprecatedGossip) (int, error) {
func clusterNodeCount(gw gossip.OptionalGossip) (int, error) {
g, err := gw.OptionalErr(47971)
if err != nil {
return 0, err
Expand Down
42 changes: 11 additions & 31 deletions pkg/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -1617,46 +1617,26 @@ func (g *Gossip) OnFirstRangeChanged(cb func(*roachpb.RangeDescriptor)) {
})
}

// MakeExposedGossip initializes a DeprecatedGossip instance which exposes a
// wrapped Gossip instance via Optional(). This is used on SQL servers running
// inside of a KV server (i.e. single-tenant deployments).
// MakeOptionalGossip initializes an OptionalGossip instance wrapping a
// (possibly nil) *Gossip.
//
// Use of Gossip from within the SQL layer is **deprecated**. Please do not
// introduce new uses of it.
//
// See TenantSQLDeprecatedWrapper for details.
func MakeExposedGossip(g *Gossip) DeprecatedGossip {
const exposed = true
return DeprecatedGossip{
w: errorutil.MakeTenantSQLDeprecatedWrapper(g, exposed),
func MakeOptionalGossip(g *Gossip) OptionalGossip {
return OptionalGossip{
w: errorutil.MakeTenantSQLDeprecatedWrapper(g, g != nil),
}
}

// MakeUnexposedGossip initializes a DeprecatedGossip instance for which
// Optional() does not return the wrapped Gossip instance. This is used on
// SQL servers not running as part of a KV server, i.e. with multi-tenancy.
// OptionalGossip is a Gossip instance in a SQL tenant server.
//
// Use of Gossip from within the SQL layer is **deprecated**. Please do not
// introduce new uses of it.
//
// See TenantSQLDeprecatedWrapper for details.
//
// TODO(tbg): once we can start a SQL tenant without gossip, remove this method
// and rename DeprecatedGossip to OptionalGossip.
func MakeUnexposedGossip(g *Gossip) DeprecatedGossip {
const exposed = false
return DeprecatedGossip{
w: errorutil.MakeTenantSQLDeprecatedWrapper(g, exposed),
}
}

// DeprecatedGossip is a Gossip instance in a SQL tenant server.
//
// Use of Gossip from within the SQL layer is **deprecated**. Please do not
// introduce new uses of it.
//
// See TenantSQLDeprecatedWrapper for details.
type DeprecatedGossip struct {
type OptionalGossip struct {
w errorutil.TenantSQLDeprecatedWrapper
}

Expand All @@ -1666,8 +1646,8 @@ type DeprecatedGossip struct {
//
// Use of Gossip from within the SQL layer is **deprecated**. Please do not
// introduce new uses of it.
func (dg DeprecatedGossip) OptionalErr(issueNos ...int) (*Gossip, error) {
v, err := dg.w.OptionalErr(issueNos...)
func (og OptionalGossip) OptionalErr(issueNos ...int) (*Gossip, error) {
v, err := og.w.OptionalErr(issueNos...)
if err != nil {
return nil, err
}
Expand All @@ -1680,8 +1660,8 @@ func (dg DeprecatedGossip) OptionalErr(issueNos ...int) (*Gossip, error) {
//
// Use of Gossip from within the SQL layer is **deprecated**. Please do not
// introduce new uses of it.
func (dg DeprecatedGossip) Optional(issueNos ...int) (*Gossip, bool) {
v, ok := dg.w.Optional()
func (og OptionalGossip) Optional(issueNos ...int) (*Gossip, bool) {
v, ok := og.w.Optional()
if !ok {
return nil, false
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
sqlServerOptionalKVArgs: sqlServerOptionalKVArgs{
statusServer: serverpb.MakeOptionalStatusServer(sStatus),
nodeLiveness: sqlbase.MakeOptionalNodeLiveness(nodeLiveness),
gossip: gossip.MakeExposedGossip(g),
gossip: gossip.MakeOptionalGossip(g),
grpcServer: grpc.Server,
recorder: recorder,
nodeIDContainer: idContainer,
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ type sqlServerOptionalKVArgs struct {
// Gossip is relied upon by distSQLCfg (execinfra.ServerConfig), the executor
// config, the DistSQL planner, the table statistics cache, the statements
// diagnostics registry, and the lease manager.
gossip gossip.DeprecatedGossip
gossip gossip.OptionalGossip
// To register blob and DistSQL servers.
grpcServer *grpc.Server
// Used by executorConfig.
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ func makeSQLServerArgs(
sqlServerOptionalKVArgs: sqlServerOptionalKVArgs{
statusServer: serverpb.MakeOptionalStatusServer(nil),
nodeLiveness: sqlbase.MakeOptionalNodeLiveness(nil),
gossip: gossip.MakeUnexposedGossip(nil),
gossip: gossip.MakeOptionalGossip(nil),
grpcServer: dummyRPCServer,
recorder: dummyRecorder,
isMeta1Leaseholder: func(_ context.Context, timestamp hlc.Timestamp) (bool, error) {
Expand Down
14 changes: 7 additions & 7 deletions pkg/sql/catalog/lease/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -1829,15 +1829,15 @@ func (m *Manager) findDescriptorState(id sqlbase.ID, create bool) *descriptorSta
// rangefeeds. This function must be passed a non-nil gossip if
// VersionRangefeedLeases is not active.
func (m *Manager) RefreshLeases(
ctx context.Context, s *stop.Stopper, db *kv.DB, g gossip.DeprecatedGossip,
ctx context.Context, s *stop.Stopper, db *kv.DB, g gossip.OptionalGossip,
) {
s.RunWorker(ctx, func(ctx context.Context) {
m.refreshLeases(ctx, g, db, s)
})
}

func (m *Manager) refreshLeases(
ctx context.Context, g gossip.DeprecatedGossip, db *kv.DB, s *stop.Stopper,
ctx context.Context, g gossip.OptionalGossip, db *kv.DB, s *stop.Stopper,
) {
descUpdateCh := make(chan *sqlbase.Descriptor)
m.watchForUpdates(ctx, s, db, g, descUpdateCh)
Expand Down Expand Up @@ -1887,7 +1887,7 @@ func (m *Manager) watchForUpdates(
ctx context.Context,
s *stop.Stopper,
db *kv.DB,
g gossip.DeprecatedGossip,
g gossip.OptionalGossip,
descUpdateCh chan *sqlbase.Descriptor,
) {
useRangefeeds := m.testingKnobs.AlwaysUseRangefeeds ||
Expand Down Expand Up @@ -1929,7 +1929,7 @@ func (m *Manager) watchForUpdates(
func (m *Manager) watchForGossipUpdates(
ctx context.Context,
s *stop.Stopper,
g gossip.DeprecatedGossip,
g gossip.OptionalGossip,
descUpdateCh chan<- *sqlbase.Descriptor,
) {
rawG, err := g.OptionalErr(47150)
Expand All @@ -1942,9 +1942,9 @@ func (m *Manager) watchForGossipUpdates(

s.RunWorker(ctx, func(ctx context.Context) {
descKeyPrefix := m.storage.codec.TablePrefix(uint32(sqlbase.DescriptorTable.ID))
// TODO(ajwerner): Add a mechanism to unregister this channel upon return.
// NB: this call is allowed to bypass DeprecatedGossip because we'll never
// get here after VersionRangefeedLeases.
// TODO(ajwerner): Add a mechanism to unregister this channel upon
// return. NB: this call is allowed to bypass OptionalGossip because
// we'll never get here after VersionRangefeedLeases.
gossipUpdateC := rawG.RegisterSystemConfigChannel()
filter := gossip.MakeSystemConfigDeltaFilter(descKeyPrefix)

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/conn_executor_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func startConnExecutor(
st := cluster.MakeTestingClusterSettings()
nodeID := base.TestingIDContainer
distSQLMetrics := execinfra.MakeDistSQLMetrics(time.Hour /* histogramWindow */)
gw := gossip.MakeExposedGossip(nil)
gw := gossip.MakeOptionalGossip(nil)
cfg := &ExecutorConfig{
AmbientCtx: testutils.MakeAmbientCtx(),
Settings: st,
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ type DistSQLPlanner struct {
runnerChan chan runnerRequest

// gossip handle used to check node version compatibility.
gossip gossip.DeprecatedGossip
gossip gossip.OptionalGossip

nodeDialer *nodedialer.Dialer

Expand Down Expand Up @@ -128,7 +128,7 @@ func NewDistSQLPlanner(
distSQLSrv *distsql.ServerImpl,
distSender *kvcoord.DistSender,
nodeDescs kvcoord.NodeDescStore,
gw gossip.DeprecatedGossip,
gw gossip.OptionalGossip,
stopper *stop.Stopper,
isLive func(roachpb.NodeID) (bool, error),
nodeDialer *nodedialer.Dialer,
Expand Down Expand Up @@ -716,7 +716,7 @@ type SpanPartition struct {
}

type distSQLNodeHealth struct {
gossip gossip.DeprecatedGossip
gossip gossip.OptionalGossip
isLive func(roachpb.NodeID) (bool, error)
connHealth func(roachpb.NodeID, rpc.ConnectionClass) error
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/distsql_physical_planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,7 @@ func TestPartitionSpans(t *testing.T) {
ranges: tc.ranges,
}

gw := gossip.MakeExposedGossip(mockGossip)
gw := gossip.MakeOptionalGossip(mockGossip)
dsp := DistSQLPlanner{
planVersion: execinfra.Version,
st: cluster.MakeTestingClusterSettings(),
Expand Down Expand Up @@ -1008,7 +1008,7 @@ func TestPartitionSpansSkipsIncompatibleNodes(t *testing.T) {
ranges: ranges,
}

gw := gossip.MakeExposedGossip(mockGossip)
gw := gossip.MakeOptionalGossip(mockGossip)
dsp := DistSQLPlanner{
planVersion: tc.planVersion,
st: cluster.MakeTestingClusterSettings(),
Expand Down Expand Up @@ -1107,7 +1107,7 @@ func TestPartitionSpansSkipsNodesNotInGossip(t *testing.T) {
ranges: ranges,
}

gw := gossip.MakeExposedGossip(mockGossip)
gw := gossip.MakeOptionalGossip(mockGossip)
dsp := DistSQLPlanner{
planVersion: execinfra.Version,
st: cluster.MakeTestingClusterSettings(),
Expand Down Expand Up @@ -1213,7 +1213,7 @@ func TestCheckNodeHealth(t *testing.T) {
{notLive, "not using n5 due to liveness: node n5 is not live"},
}

gw := gossip.MakeExposedGossip(mockGossip)
gw := gossip.MakeOptionalGossip(mockGossip)
for _, test := range livenessTests {
t.Run("liveness", func(t *testing.T) {
h := distSQLNodeHealth{
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ type ExecutorConfig struct {
Locality roachpb.Locality
AmbientCtx log.AmbientContext
DB *kv.DB
Gossip gossip.DeprecatedGossip
Gossip gossip.OptionalGossip
SystemConfig config.SystemConfigProvider
DistSender *kvcoord.DistSender
RPCContext *rpc.Context
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/execinfra/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ type ServerConfig struct {

// A handle to gossip used to broadcast the node's DistSQL version and
// draining state.
Gossip gossip.DeprecatedGossip
Gossip gossip.OptionalGossip

NodeDialer *nodedialer.Dialer

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/rowexec/sample_aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestSampleAggregator(t *testing.T) {
Settings: st,
DB: kvDB,
Executor: server.InternalExecutor().(sqlutil.InternalExecutor),
Gossip: gossip.MakeExposedGossip(server.GossipI().(*gossip.Gossip)),
Gossip: gossip.MakeOptionalGossip(server.GossipI().(*gossip.Gossip)),
},
}
// Override the default memory limit. If memLimitBytes is small but
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/stats/automatic_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestMaybeRefreshStats(t *testing.T) {
descA := sqlbase.TestingGetTableDescriptor(s.DB(), keys.SystemSQLCodec, "t", "a")
cache := NewTableStatisticsCache(
10, /* cacheSize */
gossip.MakeExposedGossip(s.GossipI().(*gossip.Gossip)),
gossip.MakeOptionalGossip(s.GossipI().(*gossip.Gossip)),
kvDB,
executor,
keys.SystemSQLCodec,
Expand Down Expand Up @@ -137,7 +137,7 @@ func TestAverageRefreshTime(t *testing.T) {
tableID := sqlbase.TestingGetTableDescriptor(s.DB(), keys.SystemSQLCodec, "t", "a").ID
cache := NewTableStatisticsCache(
10, /* cacheSize */
gossip.MakeExposedGossip(s.GossipI().(*gossip.Gossip)),
gossip.MakeOptionalGossip(s.GossipI().(*gossip.Gossip)),
kvDB,
executor,
keys.SystemSQLCodec,
Expand Down Expand Up @@ -373,7 +373,7 @@ func TestAutoStatsReadOnlyTables(t *testing.T) {
executor := s.InternalExecutor().(sqlutil.InternalExecutor)
cache := NewTableStatisticsCache(
10, /* cacheSize */
gossip.MakeExposedGossip(s.GossipI().(*gossip.Gossip)),
gossip.MakeOptionalGossip(s.GossipI().(*gossip.Gossip)),
kvDB,
executor,
keys.SystemSQLCodec,
Expand Down Expand Up @@ -411,7 +411,7 @@ func TestNoRetryOnFailure(t *testing.T) {
executor := s.InternalExecutor().(sqlutil.InternalExecutor)
cache := NewTableStatisticsCache(
10, /* cacheSize */
gossip.MakeExposedGossip(s.GossipI().(*gossip.Gossip)),
gossip.MakeOptionalGossip(s.GossipI().(*gossip.Gossip)),
kvDB,
executor,
keys.SystemSQLCodec,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/stats/delete_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestDeleteOldStatsForColumns(t *testing.T) {
ex := s.InternalExecutor().(sqlutil.InternalExecutor)
cache := NewTableStatisticsCache(
10, /* cacheSize */
gossip.MakeExposedGossip(s.GossipI().(*gossip.Gossip)),
gossip.MakeOptionalGossip(s.GossipI().(*gossip.Gossip)),
db,
ex,
keys.SystemSQLCodec,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/stats/gossip_invalidation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestGossipInvalidation(t *testing.T) {

sc := stats.NewTableStatisticsCache(
10, /* cacheSize */
gossip.MakeExposedGossip(tc.Server(0).GossipI().(*gossip.Gossip)),
gossip.MakeOptionalGossip(tc.Server(0).GossipI().(*gossip.Gossip)),
tc.Server(0).DB(),
tc.Server(0).InternalExecutor().(sqlutil.InternalExecutor),
keys.SystemSQLCodec,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/stats/stats_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ type cacheEntry struct {
// statistics for <cacheSize> tables.
func NewTableStatisticsCache(
cacheSize int,
gw gossip.DeprecatedGossip,
gw gossip.OptionalGossip,
db *kv.DB,
sqlExecutor sqlutil.InternalExecutor,
codec keys.SQLCodec,
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/stats/stats_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func TestCacheBasic(t *testing.T) {
// exceeded, entries should be evicted according to the LRU policy.
sc := NewTableStatisticsCache(
2, /* cacheSize */
gossip.MakeExposedGossip(s.GossipI().(*gossip.Gossip)),
gossip.MakeOptionalGossip(s.GossipI().(*gossip.Gossip)),
db,
ex,
keys.SystemSQLCodec,
Expand Down Expand Up @@ -323,7 +323,7 @@ CREATE STATISTICS s FROM tt;
// Make a stats cache.
sc := NewTableStatisticsCache(
1,
gossip.MakeExposedGossip(s.GossipI().(*gossip.Gossip)),
gossip.MakeOptionalGossip(s.GossipI().(*gossip.Gossip)),
kvDB,
s.InternalExecutor().(sqlutil.InternalExecutor),
keys.SystemSQLCodec,
Expand Down Expand Up @@ -362,7 +362,7 @@ func TestCacheWait(t *testing.T) {
sort.Sort(tableIDs)
sc := NewTableStatisticsCache(
len(tableIDs), /* cacheSize */
gossip.MakeExposedGossip(s.GossipI().(*gossip.Gossip)),
gossip.MakeOptionalGossip(s.GossipI().(*gossip.Gossip)),
db,
ex,
keys.SystemSQLCodec,
Expand Down
Loading

0 comments on commit 6c1a90b

Please sign in to comment.