Skip to content

Commit

Permalink
CBG-2704: [2.8.4] ISGR Sequence checkpointing maintains unnessesary e…
Browse files Browse the repository at this point in the history
…ntries (#6082)Co-authored-by: Ben Brooks <ben.brooks@couchbase.com>

* improve comments/logging

* naive forward iteration implementation

* iterate backwards to avoid creating new slice when compacting

* comment cleanup

* Update BenchmarkCheckpointerUpdateCheckpointLists to test combinations of skipped and _updateCheckpointLists iterations

* add defaultExpectedSeqCompactionThreshold const

* Add test case for non-sequential sequences (and also out of order)

* fix test case

Co-authored-by: Ben Brooks <ben.brooks@couchbase.com>
  • Loading branch information
gregns1 and bbrks committed Feb 7, 2023
1 parent e4f46e5 commit 71f014e
Show file tree
Hide file tree
Showing 3 changed files with 215 additions and 79 deletions.
64 changes: 46 additions & 18 deletions db/active_replicator_checkpointer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"github.com/couchbase/sync_gateway/base"
)

const defaultExpectedSeqCompactionThreshold = 100

var DefaultCheckpointInterval = time.Second * 5

// Checkpointer implements replicator checkpointing, by keeping two lists of sequences. Those which we expect to be processing revs for (either push or pull), and a map for those which we have done so on.
Expand Down Expand Up @@ -57,6 +59,10 @@ type Checkpointer struct {
// remoteDBURL is used to fetch local SGR1 checkpoints.
remoteDBURL *url.URL

// expectedSeqCompactionThreshold is the number of expected sequences that we'll tolerate before considering compacting away already processed sequences
// time vs. space complexity tradeoff, since we need to iterate over the expectedSeqs slice to compact it
expectedSeqCompactionThreshold int

stats CheckpointerStats
}

Expand All @@ -79,21 +85,22 @@ type CheckpointerStats struct {

func NewCheckpointer(ctx context.Context, clientID string, configHash string, blipSender *blip.Sender, replicatorConfig *ActiveReplicatorConfig, statusCallback statusFunc) *Checkpointer {
return &Checkpointer{
clientID: clientID,
configHash: configHash,
blipSender: blipSender,
activeDB: replicatorConfig.ActiveDB,
expectedSeqs: make([]SequenceID, 0),
processedSeqs: make(map[SequenceID]struct{}),
idAndRevLookup: make(map[IDAndRev]SequenceID),
checkpointInterval: replicatorConfig.CheckpointInterval,
ctx: ctx,
stats: CheckpointerStats{},
statusCallback: statusCallback,
sgr1CheckpointID: replicatorConfig.SGR1CheckpointID,
sgr1CheckpointOnRemote: replicatorConfig.Direction == ActiveReplicatorTypePush,
remoteDBURL: replicatorConfig.RemoteDBURL,
sgr1RemoteInsecureSkipVerify: replicatorConfig.InsecureSkipVerify,
clientID: clientID,
configHash: configHash,
blipSender: blipSender,
activeDB: replicatorConfig.ActiveDB,
expectedSeqs: make([]SequenceID, 0),
processedSeqs: make(map[SequenceID]struct{}),
idAndRevLookup: make(map[IDAndRev]SequenceID),
checkpointInterval: replicatorConfig.CheckpointInterval,
ctx: ctx,
stats: CheckpointerStats{},
statusCallback: statusCallback,
sgr1CheckpointID: replicatorConfig.SGR1CheckpointID,
sgr1CheckpointOnRemote: replicatorConfig.Direction == ActiveReplicatorTypePush,
remoteDBURL: replicatorConfig.RemoteDBURL,
sgr1RemoteInsecureSkipVerify: replicatorConfig.InsecureSkipVerify,
expectedSeqCompactionThreshold: defaultExpectedSeqCompactionThreshold,
}
}

Expand Down Expand Up @@ -139,7 +146,10 @@ func (c *Checkpointer) AddProcessedSeqIDAndRev(seq *SequenceID, idAndRev IDAndRe
c.lock.Lock()

if seq == nil {
foundSeq, _ := c.idAndRevLookup[idAndRev]
foundSeq, ok := c.idAndRevLookup[idAndRev]
if !ok {
base.WarnfCtx(c.ctx, "Unable to find matching sequence for %q / %q", base.UD(idAndRev.DocID), idAndRev.RevID)
}
seq = &foundSeq
}
// should remove entry in the map even if we have a seq available
Expand Down Expand Up @@ -248,8 +258,9 @@ func (c *Checkpointer) Stats() CheckpointerStats {
}

// _updateCheckpointLists determines the highest checkpointable sequence, and trims the processedSeqs/expectedSeqs lists up to this point.
// We will also remove all but the last processed sequence as we know we're able to checkpoint safely up to that point without leaving any intermediate sequence numbers around.
func (c *Checkpointer) _updateCheckpointLists() (safeSeq *SequenceID) {
base.TracefCtx(c.ctx, base.KeyReplicate, "checkpointer: _updateCheckpointLists(expectedSeqs: %v, procssedSeqs: %v)", c.expectedSeqs, c.processedSeqs)
base.TracefCtx(c.ctx, base.KeyReplicate, "checkpointer: _updateCheckpointLists(expectedSeqs: %v, processedSeqs: %v)", c.expectedSeqs, c.processedSeqs)

c.stats.ExpectedSequenceLen = len(c.expectedSeqs)
c.stats.ProcessedSequenceLen = len(c.processedSeqs)
Expand All @@ -269,9 +280,26 @@ func (c *Checkpointer) _updateCheckpointLists() (safeSeq *SequenceID) {
base.TracefCtx(c.ctx, base.KeyReplicate, "checkpointer: _updateCheckpointLists removed seq %v from processedSeqs map", removeSeq)
}

// trim expectedSeqs list for all processed seqs
// trim expectedSeqs list from beginning up to first unprocessed seq
c.expectedSeqs = c.expectedSeqs[maxI+1:]

// if we have many remaining expectedSeqs, see if we can shrink the lists even more
// compact contiguous blocks of sequences by keeping only the last processed sequence in both lists
if len(c.expectedSeqs) > c.expectedSeqCompactionThreshold {
// start at the one before the end of the list (since we know we need to retain that one anyway, if it's processed)
for i := len(c.expectedSeqs) - 2; i >= 0; i-- {
current := c.expectedSeqs[i]
next := c.expectedSeqs[i+1]
_, processedCurrent := c.processedSeqs[current]
_, processedNext := c.processedSeqs[next]
if processedCurrent && processedNext {
// remove the current sequence from both sets, since we know we've also processed the next sequence and are able to checkpoint that
delete(c.processedSeqs, current)
c.expectedSeqs = append(c.expectedSeqs[:i], c.expectedSeqs[i+1:]...)
}
}
}

c.stats.ExpectedSequenceLenPostCleanup = len(c.expectedSeqs)
c.stats.ProcessedSequenceLenPostCleanup = len(c.processedSeqs)

Expand Down
210 changes: 159 additions & 51 deletions db/active_replicator_checkpointer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,32 @@ import (
"github.com/stretchr/testify/assert"
)

func genExpectedForTest(t testing.TB, seqs ...string) []SequenceID {
result := make([]SequenceID, 0, len(seqs))
for _, seq := range seqs {
s, err := ParsePlainSequenceID(seq)
if err != nil {
t.Fatalf("Error parsing sequence %q for test setup: %v", seq, err)
}
result = append(result, s)
}
return result
}

func genProcessedForTest(t testing.TB, seqs ...string) map[SequenceID]struct{} {
result := make(map[SequenceID]struct{}, len(seqs))
for _, seq := range seqs {
s, err := ParsePlainSequenceID(seq)
if err != nil {
t.Fatalf("Error parsing sequence %q for test setup: %v", seq, err)
}
result[s] = struct{}{}
}
return result
}

func TestCheckpointerSafeSeq(t *testing.T) {

tests := []struct {
name string
c *Checkpointer
Expand All @@ -19,90 +44,153 @@ func TestCheckpointerSafeSeq(t *testing.T) {
{
name: "empty",
c: &Checkpointer{
expectedSeqs: []SequenceID{},
processedSeqs: map[SequenceID]struct{}{},
expectedSeqs: genExpectedForTest(t),
processedSeqs: genProcessedForTest(t),
},
expectedSafeSeq: nil,
expectedExpectedSeqsIdx: -1,
expectedExpectedSeqs: []SequenceID{},
expectedProcessedSeqs: map[SequenceID]struct{}{},
expectedExpectedSeqs: genExpectedForTest(t),
expectedProcessedSeqs: genProcessedForTest(t),
},
{
name: "none processed",
c: &Checkpointer{
expectedSeqs: []SequenceID{{Seq: 1}, {Seq: 2}, {Seq: 3}},
processedSeqs: map[SequenceID]struct{}{},
expectedSeqs: genExpectedForTest(t, "1", "2", "3"),
processedSeqs: genProcessedForTest(t),
},
expectedSafeSeq: nil,
expectedExpectedSeqsIdx: -1,
expectedExpectedSeqs: []SequenceID{{Seq: 1}, {Seq: 2}, {Seq: 3}},
expectedProcessedSeqs: map[SequenceID]struct{}{},
expectedExpectedSeqs: genExpectedForTest(t, "1", "2", "3"),
expectedProcessedSeqs: genProcessedForTest(t),
},
{
name: "partial processed",
c: &Checkpointer{
expectedSeqs: []SequenceID{{Seq: 1}, {Seq: 2}, {Seq: 3}},
processedSeqs: map[SequenceID]struct{}{{Seq: 1}: {}},
expectedSeqs: genExpectedForTest(t, "1", "2", "3"),
processedSeqs: genProcessedForTest(t, "1"),
},
expectedSafeSeq: &SequenceID{Seq: 1},
expectedExpectedSeqsIdx: 0,
expectedExpectedSeqs: []SequenceID{{Seq: 2}, {Seq: 3}},
expectedProcessedSeqs: map[SequenceID]struct{}{},
expectedExpectedSeqs: genExpectedForTest(t, "2", "3"),
expectedProcessedSeqs: genProcessedForTest(t),
},
{
name: "partial processed with gap",
c: &Checkpointer{
expectedSeqs: []SequenceID{{Seq: 1}, {Seq: 2}, {Seq: 3}},
processedSeqs: map[SequenceID]struct{}{{Seq: 1}: {}, {Seq: 3}: {}},
expectedSeqs: genExpectedForTest(t, "1", "2", "3"),
processedSeqs: genProcessedForTest(t, "1", "3"),
},
expectedSafeSeq: &SequenceID{Seq: 1},
expectedExpectedSeqsIdx: 0,
expectedExpectedSeqs: []SequenceID{{Seq: 2}, {Seq: 3}},
expectedProcessedSeqs: map[SequenceID]struct{}{{Seq: 3}: {}},
expectedExpectedSeqs: genExpectedForTest(t, "2", "3"),
expectedProcessedSeqs: genProcessedForTest(t, "3"),
},
{
name: "fully processed",
c: &Checkpointer{
expectedSeqs: []SequenceID{{Seq: 1}, {Seq: 2}, {Seq: 3}},
processedSeqs: map[SequenceID]struct{}{{Seq: 1}: {}, {Seq: 2}: {}, {Seq: 3}: {}},
expectedSeqs: genExpectedForTest(t, "1", "2", "3"),
processedSeqs: genProcessedForTest(t, "1", "2", "3"),
},
expectedSafeSeq: &SequenceID{Seq: 3},
expectedExpectedSeqsIdx: 2,
expectedExpectedSeqs: []SequenceID{},
expectedProcessedSeqs: map[SequenceID]struct{}{},
expectedExpectedSeqs: genExpectedForTest(t),
expectedProcessedSeqs: genProcessedForTest(t),
},
{
name: "extra processed",
c: &Checkpointer{
expectedSeqs: []SequenceID{{Seq: 1}, {Seq: 2}, {Seq: 3}},
processedSeqs: map[SequenceID]struct{}{{Seq: 1}: {}, {Seq: 2}: {}, {Seq: 3}: {}, {Seq: 4}: {}, {Seq: 5}: {}},
expectedSeqs: genExpectedForTest(t, "1", "2", "3"),
processedSeqs: genProcessedForTest(t, "1", "2", "3", "4", "5"),
},
expectedSafeSeq: &SequenceID{Seq: 3},
expectedExpectedSeqsIdx: 2,
expectedExpectedSeqs: []SequenceID{},
expectedProcessedSeqs: map[SequenceID]struct{}{{Seq: 4}: {}, {Seq: 5}: {}},
expectedExpectedSeqs: genExpectedForTest(t),
expectedProcessedSeqs: genProcessedForTest(t, "4", "5"),
},
{
name: "out of order expected seqs",
c: &Checkpointer{
expectedSeqs: []SequenceID{{Seq: 3}, {Seq: 2}, {Seq: 1}},
processedSeqs: map[SequenceID]struct{}{{Seq: 1}: {}, {Seq: 2}: {}, {Seq: 3}: {}},
expectedSeqs: genExpectedForTest(t, "3", "2", "1"),
processedSeqs: genProcessedForTest(t, "1", "2", "3"),
},
expectedSafeSeq: &SequenceID{Seq: 3},
expectedExpectedSeqsIdx: 2,
expectedExpectedSeqs: []SequenceID{},
expectedProcessedSeqs: map[SequenceID]struct{}{},
expectedExpectedSeqs: genExpectedForTest(t),
expectedProcessedSeqs: genProcessedForTest(t),
},
{
name: "compound sequence",
c: &Checkpointer{
expectedSeqs: []SequenceID{{Seq: 1}, {Seq: 3, LowSeq: 1}},
processedSeqs: map[SequenceID]struct{}{{Seq: 1}: {}, {Seq: 3, LowSeq: 1}: {}},
expectedSeqs: genExpectedForTest(t, "1", "1::3"),
processedSeqs: genProcessedForTest(t, "1", "1::3"),
},
expectedSafeSeq: &SequenceID{Seq: 3, LowSeq: 1},
expectedExpectedSeqsIdx: 1,
expectedExpectedSeqs: []SequenceID{},
expectedProcessedSeqs: map[SequenceID]struct{}{},
expectedExpectedSeqs: genExpectedForTest(t),
expectedProcessedSeqs: genProcessedForTest(t),
},
{
name: "compound sequence triggered by",
c: &Checkpointer{
expectedSeqs: genExpectedForTest(t, "1", "1::3", "4:2"),
processedSeqs: genProcessedForTest(t, "1", "1::3", "4:2"),
},
expectedSafeSeq: &SequenceID{Seq: 2, TriggeredBy: 4},
expectedExpectedSeqsIdx: 2,
expectedExpectedSeqs: genExpectedForTest(t),
expectedProcessedSeqs: genProcessedForTest(t),
},
{
// ensure we maintain enough sequences that we can checkpoint expected but not yet processed without retaining the full list of processed sequences
// in most cases this will be keeping the last processed sequence and removing all prior ones, until the missing sequence in the list.
// e.g.
// expected: [2 3 4 5 6]
// processed: [ 3 4 5 ]
// can be safely compacted to:
// expected: [2 5 6]
// processed: [ 5 ]
name: "processed compaction",
c: &Checkpointer{
expectedSeqs: genExpectedForTest(t, "1", "2", "3", "4", "5", "6"),
processedSeqs: genProcessedForTest(t, "1", "3", "4", "5"),
expectedSeqCompactionThreshold: 3, // this many expected seqs to trigger compaction
},
expectedSafeSeq: &SequenceID{Seq: 1},
expectedExpectedSeqsIdx: 0,
expectedExpectedSeqs: genExpectedForTest(t, "2", "5", "6"),
expectedProcessedSeqs: genProcessedForTest(t, "5"),
},
{
// ensure we maintain enough sequences that we can checkpoint expected but not yet processed without retaining the full list of processed sequences
// in most cases this will be keeping the last processed sequence and removing all prior ones, until the missing sequence in the list.
// e.g.
// expected: [2 4 6 8 9]
// processed: [ 4 6 8 ]
// can be safely compacted to:
// expected: [2 8 9]
// processed: [ 8 ]
name: "processed compaction non-sequential (out of order)",
c: &Checkpointer{
expectedSeqs: genExpectedForTest(t, "2", "1", "6", "8", "4", "9"),
processedSeqs: genProcessedForTest(t, "4", "1", "6", "8"),
expectedSeqCompactionThreshold: 3, // this many expected seqs to trigger compaction
},
expectedSafeSeq: &SequenceID{Seq: 1},
expectedExpectedSeqsIdx: 0,
expectedExpectedSeqs: genExpectedForTest(t, "2", "8", "9"),
expectedProcessedSeqs: genProcessedForTest(t, "8"),
},
{
name: "multiple skipped processed compaction",
c: &Checkpointer{
expectedSeqs: genExpectedForTest(t, "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20"),
processedSeqs: genProcessedForTest(t, "1", "2", "4", "5", "7", "8", "9", "11", "12", "13", "15", "16", "17", "19"),
expectedSeqCompactionThreshold: 5, // this many expected seqs to trigger compaction
},
expectedSafeSeq: &SequenceID{Seq: 2},
expectedExpectedSeqsIdx: 1,
expectedExpectedSeqs: genExpectedForTest(t, "3", "5", "6", "9", "10", "13", "14", "17", "18", "19", "20"),
expectedProcessedSeqs: genProcessedForTest(t, "5", "9", "13", "17", "19"),
},
}
for _, tt := range tests {
Expand Down Expand Up @@ -139,29 +227,49 @@ func BenchmarkCheckpointerUpdateCheckpointLists(b *testing.B) {
processedSeqsLen int
}{
{expectedSeqsLen: 1, processedSeqsLen: 1},
{expectedSeqsLen: 100, processedSeqsLen: 100},
{expectedSeqsLen: 500, processedSeqsLen: 500},
{expectedSeqsLen: 50, processedSeqsLen: 50},
{expectedSeqsLen: 400, processedSeqsLen: 400}, // ~expected size (2x changes batch)
{expectedSeqsLen: 1000, processedSeqsLen: 1000},
{expectedSeqsLen: 10000, processedSeqsLen: 10000},
{expectedSeqsLen: 50000, processedSeqsLen: 50000},
{expectedSeqsLen: 1000, processedSeqsLen: 10000},
{expectedSeqsLen: 100000, processedSeqsLen: 100000},
{expectedSeqsLen: 1000000, processedSeqsLen: 1000000},
}
for _, test := range tests {
b.Run(fmt.Sprintf("expectedSeqsLen=%d,processedSeqsLen=%d", test.expectedSeqsLen, test.processedSeqsLen), func(b *testing.B) {
expectedSeqs := make([]SequenceID, 0, test.expectedSeqsLen)
for i := 0; i < test.expectedSeqsLen; i++ {
expectedSeqs = append(expectedSeqs, SequenceID{Seq: uint64(i)})
}
processedSeqs := make(map[SequenceID]struct{}, test.processedSeqsLen)
for i := 0; i < test.processedSeqsLen; i++ {
processedSeqs[SequenceID{Seq: uint64(i)}] = struct{}{}
}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
c := &Checkpointer{expectedSeqs: expectedSeqs, processedSeqs: processedSeqs}
_ = c._updateCheckpointLists()
// -1 no skip
// 0 skip first
// 1 skip last
for _, numCheckpoints := range []int{1, 10} {
for _, skipSeq := range []int{-1, 0, 1} {
bFunc := func(skipSeq, numCheckpoints int) func(b *testing.B) {
return func(b *testing.B) {
expectedSeqs := make([]SequenceID, 0, test.expectedSeqsLen)
for i := 0; i < test.expectedSeqsLen; i++ {
expectedSeqs = append(expectedSeqs, SequenceID{Seq: uint64(i)})
}
processedSeqs := make(map[SequenceID]struct{}, test.processedSeqsLen)
for i := 0; i < test.processedSeqsLen; i++ {
if (skipSeq == 0 && i == 0) || (skipSeq == 1 && i == test.processedSeqsLen-1) {
continue
}
processedSeqs[SequenceID{Seq: uint64(i)}] = struct{}{}
}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
c := &Checkpointer{expectedSeqs: expectedSeqs, processedSeqs: processedSeqs, expectedSeqCompactionThreshold: 100}
// run checkpointing multiple times to test pruning speedup
for j := 0; j < numCheckpoints; j++ {
_ = c._updateCheckpointLists()
}
}
}
}
b.Run(
fmt.Sprintf("expectedSeqsLen=%d,processedSeqsLen=%d,skipSeq=%d,numCheckpoints=%d",
test.expectedSeqsLen, test.processedSeqsLen, skipSeq, numCheckpoints),
bFunc(skipSeq, numCheckpoints),
)
}
})
}
}
}

0 comments on commit 71f014e

Please sign in to comment.