Skip to content

Commit

Permalink
ccl/backupccl,ccl/importccl: disable merges during IMPORT and RESTORE
Browse files Browse the repository at this point in the history
Add a facility for periodically gossiping keys which disable range
merges for a set of tables. This is used by IMPORT and RESTORE to
disable range merges on tables being imported / restored. This is done
to prevent the merge queue from fighting against the splitting performed
by IMPORT and RESTORE.

Fixes #29268

Release note (enterprise change): Disable range merges on tables that
are being restored or imported into.
  • Loading branch information
petermattis committed Nov 21, 2018
1 parent cfd53f6 commit 80eba6d
Show file tree
Hide file tree
Showing 9 changed files with 208 additions and 1 deletion.
13 changes: 13 additions & 0 deletions pkg/ccl/backupccl/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"

"github.com/cockroachdb/cockroach/pkg/ccl/gossipccl"
"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl/intervalccl"
Expand Down Expand Up @@ -1007,6 +1008,18 @@ func restore(
return mu.res, nil, nil, err
}

{
// Disable merging for the table IDs being restored into. We don't want the
// merge queue undoing the splits performed during RESTORE.
tableIDs := make([]uint32, 0, len(tables))
for _, t := range tables {
tableIDs = append(tableIDs, uint32(t.ID))
}
disableCtx, cancel := context.WithCancel(restoreCtx)
defer cancel()
gossipccl.DisableMerges(disableCtx, gossip, tableIDs)
}

// Get TableRekeys to use when importing raw data.
var rekeys []roachpb.ImportRequest_TableRekey
for i := range tables {
Expand Down
63 changes: 63 additions & 0 deletions pkg/ccl/gossipccl/disable_merges.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2018 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package gossipccl

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

const (
disableMergesInterval = 10 * time.Second
)

// DisableMerges starts a goroutine which periodically gossips keys that
// disable merging for the specified table IDs. The goroutine until the
// associated context is done (usually via cancellation).
func DisableMerges(ctx context.Context, g *gossip.Gossip, tableIDs []uint32) {
if len(tableIDs) == 0 {
// Nothing to do.
return
}

disable := func() {
for _, id := range tableIDs {
key := gossip.MakeTableDisableMergesKey(id)
err := g.AddInfo(key, nil /* value */, disableMergesInterval*2 /* ttl */)
if err != nil {
log.Infof(ctx, "failed to gossip: %s: %v", key, err)
}
}
}

// Disable merging synchronously before we start the periodic loop below.
disable()

s := g.Stopper()
// We don't care if this task can't be started as that only occurs if the
// stopper is stopping.
_ = s.RunAsyncTask(ctx, "disable-merges", func(ctx context.Context) {
ticker := time.NewTicker(disableMergesInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
disable()
case <-ctx.Done():
return
case <-s.ShouldQuiesce():
return
}
}
})
}
49 changes: 49 additions & 0 deletions pkg/ccl/gossipccl/disable_merges_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2018 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package gossipccl

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/stop"
)

func TestDisableMerges(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())

testCases := []struct {
tableIDs []uint32
}{
{tableIDs: nil},
{tableIDs: []uint32{0}},
{tableIDs: []uint32{1, 2, 9, 10}},
}
for _, c := range testCases {
t.Run("", func(t *testing.T) {
g := gossip.NewTest(1, nil /* rpcContext */, nil, /* grpcServer */
stopper, metric.NewRegistry())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

DisableMerges(ctx, g, c.tableIDs)
for _, id := range c.tableIDs {
key := gossip.MakeTableDisableMergesKey(id)
if _, err := g.GetInfo(key); err != nil {
t.Fatalf("expected to find %s, but got %v", key, err)
}
}
})
}
}
13 changes: 13 additions & 0 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/build"
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl"
"github.com/cockroachdb/cockroach/pkg/ccl/gossipccl"
"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/jobs"
Expand Down Expand Up @@ -1039,6 +1040,18 @@ func (r *importResumer) Resume(
}
}

{
// Disable merging for the table IDs being imported into. We don't want the
// merge queue undoing the splits performed during IMPORT.
tableIDs := make([]uint32, 0, len(tables))
for _, t := range tables {
tableIDs = append(tableIDs, uint32(t.ID))
}
disableCtx, cancel := context.WithCancel(ctx)
defer cancel()
gossipccl.DisableMerges(disableCtx, p.ExecCfg().Gossip, tableIDs)
}

res, err := doDistributedCSVTransform(
ctx, job, files, p, parentID, tables, transform, format, walltime, sstSize, oversample,
)
Expand Down
5 changes: 5 additions & 0 deletions pkg/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -1233,6 +1233,11 @@ func (g *Gossip) Start(advertAddr net.Addr, resolvers []resolver.Resolver) {
g.manage() // manage gossip clients
}

// Stopper returns the stopper for this gossip instance.
func (g *Gossip) Stopper() *stop.Stopper {
return g.server.stopper
}

// hasIncomingLocked returns whether the server has an incoming gossip
// client matching the provided node ID. Mutex should be held by
// caller.
Expand Down
12 changes: 12 additions & 0 deletions pkg/gossip/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ const (
// the keys are used to notify nodes to invalidate table statistic caches.
KeyTableStatAddedPrefix = "table-stat-added"

// KeyTableDisableMergesPrefix is the prefix for keys that indicate range
// merges for the specified table ID should be disabled. This is used by
// IMPORT and RESTORE to disable range merging while those operations are in
// progress.
KeyTableDisableMergesPrefix = "table-disable-merges"

// KeyGossipClientsPrefix is the prefix for keys that indicate which gossip
// client connections a node has open. This is used by other nodes in the
// cluster to build a map of the gossip network.
Expand Down Expand Up @@ -210,6 +216,12 @@ func TableIDFromTableStatAddedKey(key string) (uint32, error) {
return uint32(tableID), nil
}

// MakeTableDisableMergesKey returns the gossip key used to disable merges for
// the specified table ID.
func MakeTableDisableMergesKey(tableID uint32) string {
return MakeKey(KeyTableDisableMergesPrefix, strconv.FormatUint(uint64(tableID), 10 /* base */))
}

// removePrefixFromKey removes the key prefix and separator and returns what's
// left. Returns an error if the key doesn't have this prefix.
func removePrefixFromKey(key, prefix string) (string, error) {
Expand Down
18 changes: 18 additions & 0 deletions pkg/gossip/keys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,21 @@ func TestStoreIDFromKey(t *testing.T) {
})
}
}

func TestMakeTableDisableMergesKey(t *testing.T) {
defer leaktest.AfterTest(t)()

testCases := []struct {
tableID uint32
expected string
}{
{0, "table-disable-merges:0"},
{123, "table-disable-merges:123"},
}
for _, c := range testCases {
key := MakeTableDisableMergesKey(c.tableID)
if c.expected != key {
t.Fatalf("expected %s, but found %s", c.expected, key)
}
}
}
21 changes: 20 additions & 1 deletion pkg/storage/merge_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -123,6 +124,15 @@ func (mq *mergeQueue) enabled() bool {
return st.Version.IsMinSupported(cluster.VersionRangeMerges) && storagebase.MergeQueueEnabled.Get(&st.SV)
}

func (mq *mergeQueue) mergesDisabled(desc *roachpb.RangeDescriptor) bool {
_, tableID, err := keys.DecodeTablePrefix(desc.StartKey.AsRawKey())
if err != nil {
return false
}
_, err = mq.gossip.GetInfo(gossip.MakeTableDisableMergesKey(uint32(tableID)))
return err == nil
}

func (mq *mergeQueue) shouldQueue(
ctx context.Context, now hlc.Timestamp, repl *Replica, sysCfg *config.SystemConfig,
) (shouldQ bool, priority float64) {
Expand Down Expand Up @@ -151,6 +161,10 @@ func (mq *mergeQueue) shouldQueue(
return false, 0
}

if mq.mergesDisabled(desc) {
return false, 0
}

// Invert sizeRatio to compute the priority so that smaller ranges are merged
// before larger ranges.
priority = 1 - sizeRatio
Expand Down Expand Up @@ -194,6 +208,12 @@ func (mq *mergeQueue) process(
return nil
}

lhsDesc := lhsRepl.Desc()
if mq.mergesDisabled(lhsDesc) {
log.VEventf(ctx, 2, "skipping merge: merges are temporarily disabled for this table")
return nil
}

lhsStats := lhsRepl.GetMVCCStats()
minBytes := lhsRepl.GetMinBytes()
if lhsStats.Total() >= minBytes {
Expand All @@ -202,7 +222,6 @@ func (mq *mergeQueue) process(
return nil
}

lhsDesc := lhsRepl.Desc()
lhsQPS := lhsRepl.GetSplitQPS()
timeSinceLastReq := lhsRepl.store.Clock().PhysicalTime().Sub(lhsRepl.GetLastRequestTime())
rhsDesc, rhsStats, rhsQPS, err := mq.requestRangeStats(ctx, lhsDesc.EndKey.AsRawKey())
Expand Down
15 changes: 15 additions & 0 deletions pkg/storage/merge_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/gogo/protobuf/proto"

"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
Expand Down Expand Up @@ -49,6 +50,13 @@ func TestMergeQueueShouldQueue(t *testing.T) {
config.TestingSetZoneConfig(keys.MaxReservedDescID+1, *config.NewZoneConfig())
config.TestingSetZoneConfig(keys.MaxReservedDescID+2, *config.NewZoneConfig())

// Disable merges for table ID 3.
if err := testCtx.gossip.AddInfo(
gossip.MakeTableDisableMergesKey(keys.MaxReservedDescID+3),
nil /* value */, 0 /* ttl */); err != nil {
t.Fatal(err)
}

type testCase struct {
startKey, endKey []byte
minBytes int64
Expand Down Expand Up @@ -147,6 +155,13 @@ func TestMergeQueueShouldQueue(t *testing.T) {
expShouldQ: true,
expPriority: 0.25,
},

// Merges disabled for a table via gossip.
{
startKey: tableKey(3),
endKey: tableKey(4),
minBytes: 1,
},
}

for _, tc := range testCases {
Expand Down

0 comments on commit 80eba6d

Please sign in to comment.