Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backport-2.1: storage: delay manual splits that would result in more snapshots #33015

Merged
merged 1 commit into from Dec 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 15 additions & 0 deletions pkg/base/config.go
Expand Up @@ -476,6 +476,14 @@ type RaftConfig struct {
// translates to ~1024 commands that might be executed in the handling of a
// single raft.Ready operation.
RaftMaxInflightMsgs int
// Splitting a range which has a replica needing a snapshot results in two
// ranges in that state. The delay configured here slows down splits when in
// that situation (limiting to those splits not run through the split
// queue). The most important target here are the splits performed by
// backup/restore.
//
// -1 to disable.
RaftDelaySplitToSuppressSnapshotTicks int
}

// SetDefaults initializes unset fields.
Expand Down Expand Up @@ -510,6 +518,13 @@ func (cfg *RaftConfig) SetDefaults() {
if cfg.RaftMaxInflightMsgs == 0 {
cfg.RaftMaxInflightMsgs = defaultRaftMaxInflightMsgs
}

if cfg.RaftDelaySplitToSuppressSnapshotTicks == 0 {
// A total of 100 ticks is >18s which experimentally has been shown to
// allow the small pile (<100) of Raft snapshots observed at the
// beginning of an import/restore to be resolved.
cfg.RaftDelaySplitToSuppressSnapshotTicks = 100
}
}

// RaftElectionTimeout returns the raft election timeout, as computed from the
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/client_split_test.go
Expand Up @@ -1348,6 +1348,9 @@ func runSetupSplitSnapshotRace(
sc.TestingKnobs.DisableAsyncIntentResolution = true
// Avoid fighting with the merge queue while trying to reproduce this race.
sc.TestingKnobs.DisableMergeQueue = true
// Disable the split delay mechanism, or it'll spend 10s going in circles.
// (We can't set it to zero as otherwise the default overrides us).
sc.RaftDelaySplitToSuppressSnapshotTicks = -1
mtc := &multiTestContext{storeConfig: &sc}
defer mtc.Stop()
mtc.Start(t, 6)
Expand Down
13 changes: 10 additions & 3 deletions pkg/storage/replica_command.go
Expand Up @@ -169,7 +169,7 @@ func (r *Replica) AdminSplit(
return roachpb.AdminSplitResponse{}, pErr
}

reply, lastErr = r.adminSplitWithDescriptor(ctx, args, r.Desc())
reply, lastErr = r.adminSplitWithDescriptor(ctx, args, r.Desc(), true /* delayable */)
// On seeing a ConditionFailedError or an AmbiguousResultError, retry
// the command with the updated descriptor.
if retry := causer.Visit(lastErr, func(err error) bool {
Expand Down Expand Up @@ -240,7 +240,10 @@ func splitSnapshotWarningStr(rangeID roachpb.RangeID, status *raft.Status) strin
//
// See the comment on splitTrigger for details on the complexities.
func (r *Replica) adminSplitWithDescriptor(
ctx context.Context, args roachpb.AdminSplitRequest, desc *roachpb.RangeDescriptor,
ctx context.Context,
args roachpb.AdminSplitRequest,
desc *roachpb.RangeDescriptor,
delayable bool,
) (roachpb.AdminSplitResponse, error) {
var reply roachpb.AdminSplitResponse

Expand Down Expand Up @@ -319,7 +322,11 @@ func (r *Replica) adminSplitWithDescriptor(
}
leftDesc.EndKey = splitKey

extra := splitSnapshotWarningStr(r.RangeID, r.RaftStatus())
var extra string
if delayable {
extra += maybeDelaySplitToAvoidSnapshot(ctx, (*splitDelayHelper)(r))
}
extra += splitSnapshotWarningStr(r.RangeID, r.RaftStatus())

log.Infof(ctx, "initiating a split of this range at key %s [r%d]%s",
splitKey, rightDesc.RangeID, extra)
Expand Down
157 changes: 157 additions & 0 deletions pkg/storage/split_delay_helper.go
@@ -0,0 +1,157 @@
// Copyright 2018 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package storage

import (
"context"
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"go.etcd.io/etcd/raft"
)

type splitDelayHelperI interface {
RaftStatus(context.Context) (roachpb.RangeID, *raft.Status)
ProposeEmptyCommand(ctx context.Context)
NumAttempts() int
Sleep(context.Context) time.Duration
}

type splitDelayHelper Replica

func (sdh *splitDelayHelper) RaftStatus(ctx context.Context) (roachpb.RangeID, *raft.Status) {
r := (*Replica)(sdh)
r.mu.RLock()
raftStatus := r.raftStatusRLocked()
if raftStatus != nil {
updateRaftProgressFromActivity(
ctx, raftStatus.Progress, r.descRLocked().Replicas, r.mu.lastUpdateTimes, timeutil.Now(),
)
}
r.mu.RUnlock()
return r.RangeID, raftStatus
}

func (sdh *splitDelayHelper) ProposeEmptyCommand(ctx context.Context) {
r := (*Replica)(sdh)
r.raftMu.Lock()
_ = r.withRaftGroup(true /* campaignOnWake */, func(rawNode *raft.RawNode) (bool, error) {
// NB: intentionally ignore the error (which can be ErrProposalDropped
// when there's an SST inflight).
_ = rawNode.Propose(encodeRaftCommandV1(makeIDKey(), nil))
// NB: we need to unquiesce as the group might be quiesced.
return true /* unquiesceAndWakeLeader */, nil
})
r.raftMu.Unlock()
}

func (sdh *splitDelayHelper) NumAttempts() int {
return (*Replica)(sdh).store.cfg.RaftDelaySplitToSuppressSnapshotTicks
}

func (sdh *splitDelayHelper) Sleep(ctx context.Context) time.Duration {
tBegin := timeutil.Now()

r := (*Replica)(sdh)
select {
case <-time.After(r.store.cfg.RaftTickInterval):
case <-ctx.Done():
}

return timeutil.Since(tBegin)
}

func maybeDelaySplitToAvoidSnapshot(ctx context.Context, sdh splitDelayHelperI) string {
// We have an "optimization" to avoid Raft snapshots by dropping some
// outgoing MsgAppResp (see the _ assignment below) which takes effect for
// RaftPostSplitSuppressSnapshotTicks ticks after an uninitialized replica
// is created. This check can err, in which case the snapshot will be
// delayed for that many ticks, and so we want to delay by at least as much
// plus a bit of padding to give a snapshot a chance to catch the follower
// up. If we run out of time, we'll resume the split no matter what.
// _ = (*Replica)(nil).maybeDropMsgAppResp // guru assignment (not backported)
maxDelaySplitToAvoidSnapshotTicks := sdh.NumAttempts()

var slept time.Duration
var extra string
var succeeded bool
for ticks := 0; ticks < maxDelaySplitToAvoidSnapshotTicks; ticks++ {
succeeded = false
extra = ""
rangeID, raftStatus := sdh.RaftStatus(ctx)

if raftStatus == nil {
// Don't delay on followers (we don't know when to stop). This case
// is hit rarely enough to not matter.
extra += "; not Raft leader"
succeeded = true
break
}

done := true
for replicaID, pr := range raftStatus.Progress {
if replicaID == raftStatus.Lead {
// TODO(tschottdorf): remove this once we have picked up
// https://github.com/etcd-io/etcd/pull/10279
continue
}

if pr.State != raft.ProgressStateReplicate {
if !pr.RecentActive {
if ticks == 0 {
// Having set done = false, we make sure we're not exiting early.
// This is important because we sometimes need that Raft proposal
// below to make the followers active as there's no chatter on an
// idle range. (Note that there's a theoretical race in which the
// follower becomes inactive again during the sleep, but the
// inactivity interval is much larger than a tick).
//
// Don't do this more than once though: if a follower is down,
// we don't want to delay splits for it.
done = false
}
extra += fmt.Sprintf("; r%d/%d inactive", rangeID, replicaID)
continue
}
done = false
extra += fmt.Sprintf("; replica r%d/%d not caught up: %+v", rangeID, replicaID, &pr)
}
}
if done {
succeeded = true
break
}
// Propose an empty command which works around a Raft bug that can
// leave a follower in ProgressStateProbe even though it has caught
// up.
sdh.ProposeEmptyCommand(ctx)
slept += sdh.Sleep(ctx)

if ctx.Err() != nil {
return ""
}
}

if slept != 0 {
extra += fmt.Sprintf("; delayed split for %.1fs to avoid Raft snapshot", slept.Seconds())
if !succeeded {
extra += " (without success)"
}
}

return extra
}
166 changes: 166 additions & 0 deletions pkg/storage/split_delay_helper_test.go
@@ -0,0 +1,166 @@
// Copyright 2018 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package storage

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"go.etcd.io/etcd/raft"

"github.com/cockroachdb/cockroach/pkg/util/leaktest"
)

type testSplitDelayHelper struct {
numAttempts int

rangeID roachpb.RangeID
raftStatus *raft.Status
sleep func()

slept, emptyProposed int
}

func (h *testSplitDelayHelper) RaftStatus(context.Context) (roachpb.RangeID, *raft.Status) {
return h.rangeID, h.raftStatus
}
func (h *testSplitDelayHelper) ProposeEmptyCommand(ctx context.Context) {
h.emptyProposed++
}
func (h *testSplitDelayHelper) NumAttempts() int {
return h.numAttempts
}
func (h *testSplitDelayHelper) Sleep(context.Context) time.Duration {
if h.sleep != nil {
h.sleep()
}
h.slept++
return time.Second
}

var _ splitDelayHelperI = (*testSplitDelayHelper)(nil)

func TestSplitDelayToAvoidSnapshot(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()

t.Run("disabled", func(t *testing.T) {
// Should immediately bail out if told to run zero attempts.
h := &testSplitDelayHelper{
numAttempts: 0,
rangeID: 1,
raftStatus: nil,
}
s := maybeDelaySplitToAvoidSnapshot(ctx, h)
assert.Equal(t, "", s)
assert.Equal(t, 0, h.slept)
})

t.Run("follower", func(t *testing.T) {
// Should immediately bail out if run on non-leader.
h := &testSplitDelayHelper{
numAttempts: 5,
rangeID: 1,
raftStatus: nil,
}
s := maybeDelaySplitToAvoidSnapshot(ctx, h)
assert.Equal(t, "; not Raft leader", s)
assert.Equal(t, 0, h.slept)
})

t.Run("inactive", func(t *testing.T) {
h := &testSplitDelayHelper{
numAttempts: 5,
rangeID: 1,
raftStatus: &raft.Status{
Progress: map[uint64]raft.Progress{
2: {State: raft.ProgressStateProbe},
},
},
}
s := maybeDelaySplitToAvoidSnapshot(ctx, h)
// We try to wake up the follower once, but then give up on it.
assert.Equal(t, "; r1/2 inactive; delayed split for 1.0s to avoid Raft snapshot", s)
assert.Equal(t, 1, h.slept)
assert.Equal(t, 1, h.emptyProposed)
})

for _, state := range []raft.ProgressStateType{raft.ProgressStateProbe, raft.ProgressStateSnapshot} {
t.Run(state.String(), func(t *testing.T) {
h := &testSplitDelayHelper{
numAttempts: 5,
rangeID: 1,
raftStatus: &raft.Status{
Progress: map[uint64]raft.Progress{
2: {State: state, RecentActive: true, Paused: true /* unifies string output below */},
// Healthy follower just for kicks.
3: {State: raft.ProgressStateReplicate},
},
},
}
s := maybeDelaySplitToAvoidSnapshot(ctx, h)
assert.Equal(t, "; replica r1/2 not caught up: next = 0, match = 0, state = "+
state.String()+
", waiting = true, pendingSnapshot = 0; delayed split for 5.0s to avoid Raft snapshot (without success)", s)
assert.Equal(t, 5, h.slept)
assert.Equal(t, 5, h.emptyProposed)
})
}

t.Run("immediately-replicating", func(t *testing.T) {
h := &testSplitDelayHelper{
numAttempts: 5,
rangeID: 1,
raftStatus: &raft.Status{
Progress: map[uint64]raft.Progress{
2: {State: raft.ProgressStateReplicate}, // intentionally not recently active
},
},
}
s := maybeDelaySplitToAvoidSnapshot(ctx, h)
assert.Equal(t, "", s)
assert.Equal(t, 0, h.slept)
assert.Equal(t, 0, h.emptyProposed)
})

t.Run("becomes-replicating", func(t *testing.T) {
h := &testSplitDelayHelper{
numAttempts: 5,
rangeID: 1,
raftStatus: &raft.Status{
Progress: map[uint64]raft.Progress{
2: {State: raft.ProgressStateProbe, RecentActive: true},
},
},
}
// The fourth attempt will see the follower catch up.
h.sleep = func() {
if h.slept == 2 {
pr := h.raftStatus.Progress[2]
pr.State = raft.ProgressStateReplicate
h.raftStatus.Progress[2] = pr
}
}
s := maybeDelaySplitToAvoidSnapshot(ctx, h)
assert.Equal(t, "; delayed split for 3.0s to avoid Raft snapshot", s)
assert.Equal(t, 3, h.slept)
assert.Equal(t, 3, h.emptyProposed)
})
}