Skip to content

Commit

Permalink
Merge #30058
Browse files Browse the repository at this point in the history
30058: storage: Fix AmbientCtx handling during Store construction/startup r=a-robinson a=a-robinson

The AmbientCtx present in NewStore() does not yet include the store ID,
but was getting passed to the Scanner and StoreRebalancer, leaving
their log tags without the store ID (or in practice for the store
rebalancer's case, with the store ID out of order).

This fixes that (albeit very awkwardly for the Scanner), and removes a
passing of the AmbientCtx to a constructor that didn't actually use it.

Release note: None

The improperly ordered store rebalancer log tags (`s1,n1,store-rebalancer`) were annoying me while looking at #29969, and the lack of a store tag could conceivably get confusing for the scanner on a node with multiple stores.

Co-authored-by: Alex Robinson <alexdwanerobinson@gmail.com>
  • Loading branch information
craig[bot] and a-robinson committed Sep 11, 2018
2 parents 35100f0 + 53ced2b commit 26bbb77
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 12 deletions.
3 changes: 1 addition & 2 deletions pkg/storage/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"sync"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)
Expand Down Expand Up @@ -140,7 +139,7 @@ type raftScheduler struct {
}

func newRaftScheduler(
ambient log.AmbientContext, metrics *StoreMetrics, processor raftProcessor, numWorkers int,
metrics *StoreMetrics, processor raftProcessor, numWorkers int,
) *raftScheduler {
s := &raftScheduler{
processor: processor,
Expand Down
6 changes: 2 additions & 4 deletions pkg/storage/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
)

func TestRangeIDChunk(t *testing.T) {
Expand Down Expand Up @@ -196,7 +194,7 @@ func TestSchedulerLoop(t *testing.T) {
defer leaktest.AfterTest(t)()

p := newTestProcessor()
s := newRaftScheduler(log.AmbientContext{Tracer: tracing.NewTracer()}, nil, p, 1)
s := newRaftScheduler(nil, p, 1)
stopper := stop.NewStopper()
ctx := context.TODO()
defer stopper.Stop(ctx)
Expand All @@ -218,7 +216,7 @@ func TestSchedulerBuffering(t *testing.T) {
defer leaktest.AfterTest(t)()

p := newTestProcessor()
s := newRaftScheduler(log.AmbientContext{Tracer: tracing.NewTracer()}, nil, p, 1)
s := newRaftScheduler(nil, p, 1)
stopper := stop.NewStopper()
ctx := context.TODO()
defer stopper.Stop(ctx)
Expand Down
18 changes: 13 additions & 5 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -942,7 +942,7 @@ func NewStore(cfg StoreConfig, eng engine.Engine, nodeDesc *roachpb.NodeDescript
s.intentResolver = newIntentResolver(s, cfg.IntentResolverTaskLimit)
s.raftEntryCache = newRaftEntryCache(cfg.RaftEntryCacheSize)
s.draining.Store(false)
s.scheduler = newRaftScheduler(s.cfg.AmbientCtx, s.metrics, s, storeSchedulerConcurrency)
s.scheduler = newRaftScheduler(s.metrics, s, storeSchedulerConcurrency)

s.coalescedMu.Lock()
s.coalescedMu.heartbeats = map[roachpb.StoreIdent][]RaftHeartbeat{}
Expand Down Expand Up @@ -1024,9 +1024,6 @@ func NewStore(cfg StoreConfig, eng engine.Engine, nodeDesc *roachpb.NodeDescript
)
s.scanner.AddQueues(s.tsMaintenanceQueue)
}

s.storeRebalancer = NewStoreRebalancer(
s.cfg.AmbientCtx, cfg.Settings, s.replicateQueue, s.replRankings)
}

if cfg.TestingKnobs.DisableGCQueue {
Expand Down Expand Up @@ -1389,6 +1386,15 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
ctx = s.AnnotateCtx(ctx)
log.Event(ctx, "read store identity")

// Add the store ID to the scanner's AmbientContext before starting it, since
// the AmbientContext provided during construction did not include it.
// Note that this is just a hacky way of getting around that without
// refactoring the scanner/queue construction/start logic more broadly, and
// depends on the scanner not having added its own log tag.
if s.scanner != nil {
s.scanner.AmbientContext.AddLogTag("s", s.StoreID())
}

// If the nodeID is 0, it has not be assigned yet.
if s.nodeDesc.NodeID != 0 && s.Ident.NodeID != s.nodeDesc.NodeID {
return errors.Errorf("node id:%d does not equal the one in node descriptor:%d", s.Ident.NodeID, s.nodeDesc.NodeID)
Expand Down Expand Up @@ -1557,7 +1563,9 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
// Connect rangefeeds to closed timestamp updates.
s.startClosedTimestampRangefeedSubscriber(ctx)

if s.storeRebalancer != nil {
if s.replicateQueue != nil {
s.storeRebalancer = NewStoreRebalancer(
s.cfg.AmbientCtx, s.cfg.Settings, s.replicateQueue, s.replRankings)
s.storeRebalancer.Start(ctx, s.stopper)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/store_rebalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,14 @@ func NewStoreRebalancer(
rq *replicateQueue,
replRankings *replicaRankings,
) *StoreRebalancer {
ambientCtx.AddLogTag("store-rebalancer", nil)
sr := &StoreRebalancer{
AmbientContext: ambientCtx,
metrics: makeStoreRebalancerMetrics(),
st: st,
rq: rq,
replRankings: replRankings,
}
sr.AddLogTag("store-rebalancer", nil)
sr.rq.store.metrics.registry.AddMetricStruct(&sr.metrics)
return sr
}
Expand Down

0 comments on commit 26bbb77

Please sign in to comment.