Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compactor: adjust interval for period <1-hour #9485

Merged
merged 2 commits into from
Mar 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
98 changes: 77 additions & 21 deletions compactor/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,59 @@ func newPeriodic(clock clockwork.Clock, h time.Duration, rg RevGetter, c Compact
return t
}

// periodDivisor divides Periodic.period in into checkCompactInterval duration
const periodDivisor = 10
/*
Compaction period 1-hour:
1. compute compaction period, which is 1-hour
2. record revisions for every 1/10 of 1-hour (6-minute)
3. keep recording revisions with no compaction for first 1-hour
4. do compact with revs[0]
- success? contiue on for-loop and move sliding window; revs = revs[1:]
- failure? update revs, and retry after 1/10 of 1-hour (6-minute)

Compaction period 24-hour:
1. compute compaction period, which is 1-hour
2. record revisions for every 1/10 of 1-hour (6-minute)
3. keep recording revisions with no compaction for first 24-hour
4. do compact with revs[0]
- success? contiue on for-loop and move sliding window; revs = revs[1:]
- failure? update revs, and retry after 1/10 of 1-hour (6-minute)

Compaction period 59-min:
1. compute compaction period, which is 59-min
2. record revisions for every 1/10 of 59-min (5.9-min)
3. keep recording revisions with no compaction for first 59-min
4. do compact with revs[0]
- success? contiue on for-loop and move sliding window; revs = revs[1:]
- failure? update revs, and retry after 1/10 of 59-min (5.9-min)

Compaction period 5-sec:
1. compute compaction period, which is 5-sec
2. record revisions for every 1/10 of 5-sec (0.5-sec)
3. keep recording revisions with no compaction for first 5-sec
4. do compact with revs[0]
- success? contiue on for-loop and move sliding window; revs = revs[1:]
- failure? update revs, and retry after 1/10 of 5-sec (0.5-sec)
*/

// Run runs periodic compactor.
func (t *Periodic) Run() {
interval := t.period / time.Duration(periodDivisor)
compactInterval := t.getCompactInterval()
retryInterval := t.getRetryInterval()
retentions := t.getRetentions()

go func() {
initialWait := t.clock.Now()
lastSuccess := t.clock.Now()
baseInterval := t.period
for {
t.revs = append(t.revs, t.rg.Rev())
if len(t.revs) > retentions {
t.revs = t.revs[1:] // t.revs[0] is always the rev at t.period ago
}

select {
case <-t.ctx.Done():
return
case <-t.clock.After(interval):
case <-t.clock.After(retryInterval):
t.mu.Lock()
p := t.paused
t.mu.Unlock()
Expand All @@ -83,30 +122,55 @@ func (t *Periodic) Run() {
}
}

// wait up to initial given period
if t.clock.Now().Sub(initialWait) < t.period {
if t.clock.Now().Sub(lastSuccess) < baseInterval {
continue
}

rev, remaining := t.getRev()
if rev < 0 {
continue
// wait up to initial given period
if baseInterval == t.period {
baseInterval = compactInterval
}
rev := t.revs[0]

plog.Noticef("Starting auto-compaction at revision %d (retention: %v)", rev, t.period)
_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev})
if err == nil || err == mvcc.ErrCompacted {
// move to next sliding window
t.revs = remaining
lastSuccess = t.clock.Now()
plog.Noticef("Finished auto-compaction at revision %d", rev)
} else {
plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err)
plog.Noticef("Retry after %v", interval)
plog.Noticef("Retry after %v", retryInterval)
}
}
}()
}

// if given compaction period x is <1-hour, compact every x duration.
// (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='10m', then compact every 10-minute)
// if given compaction period x is >1-hour, compact every hour.
// (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='2h', then compact every 1-hour)
func (t *Periodic) getCompactInterval() time.Duration {
itv := t.period
if itv > time.Hour {
itv = time.Hour
}
return itv
}

func (t *Periodic) getRetentions() int {
return int(t.period/t.getRetryInterval()) + 1
}

const retryDivisor = 10

func (t *Periodic) getRetryInterval() time.Duration {
itv := t.period
if itv > time.Hour {
itv = time.Hour
}
return itv / retryDivisor
}

// Stop stops periodic compactor.
func (t *Periodic) Stop() {
t.cancel()
Expand All @@ -125,11 +189,3 @@ func (t *Periodic) Resume() {
defer t.mu.Unlock()
t.paused = false
}

func (t *Periodic) getRev() (int64, []int64) {
i := len(t.revs) - periodDivisor
if i < 0 {
return -1, t.revs
}
return t.revs[i], t.revs[i+1:]
}
109 changes: 87 additions & 22 deletions compactor/periodic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/jonboulle/clockwork"
)

func TestPeriodic(t *testing.T) {
func TestPeriodicHourly(t *testing.T) {
Copy link
Member

@fanminshi fanminshi Mar 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably adjust fc.Advance(checkCompactionInterval) to fc.Advance(6*time.Minute) .

retentionHours := 2
retentionDuration := time.Duration(retentionHours) * time.Hour

Expand All @@ -36,32 +36,94 @@ func TestPeriodic(t *testing.T) {

tb.Run()
defer tb.Stop()
checkCompactInterval := retentionDuration / time.Duration(periodDivisor)
n := periodDivisor
// simulate 5 hours worth of intervals.
for i := 0; i < n/retentionHours*5; i++ {

initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10

// compaction doesn't happen til 2 hours elapse
for i := 0; i < initialIntervals; i++ {
rg.Wait(1)
fc.Advance(checkCompactInterval)
// compaction doesn't happen til 2 hours elapses.
if i < n {
continue
fc.Advance(tb.getRetryInterval())
}

// very first compaction
a, err := compactable.Wait(1)
if err != nil {
t.Fatal(err)
}
expectedRevision := int64(1)
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
}

// simulate 3 hours
// now compactor kicks in, every hour
for i := 0; i < 3; i++ {
// advance one hour, one revision for each interval
for j := 0; j < intervalsPerPeriod; j++ {
rg.Wait(1)
fc.Advance(tb.getRetryInterval())
}
// after 2 hours, compaction happens at every checkCompactInterval.
a, err := compactable.Wait(1)

a, err = compactable.Wait(1)
if err != nil {
t.Fatal(err)
}
expectedRevision := int64(i + 1 - n)

expectedRevision = int64((i + 1) * 10)
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
}
}
}

func TestPeriodicMinutes(t *testing.T) {
retentionMinutes := 5
retentionDuration := time.Duration(retentionMinutes) * time.Minute

fc := clockwork.NewFakeClock()
rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
compactable := &fakeCompactable{testutil.NewRecorderStream()}
tb := newPeriodic(fc, retentionDuration, rg, compactable)

tb.Run()
defer tb.Stop()

initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10

// compaction doesn't happen til 5 minutes elapse
for i := 0; i < initialIntervals; i++ {
rg.Wait(1)
fc.Advance(tb.getRetryInterval())
}

// unblock the rev getter, so we can stop the compactor routine.
_, err := rg.Wait(1)
// very first compaction
a, err := compactable.Wait(1)
if err != nil {
t.Fatal(err)
}
expectedRevision := int64(1)
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
}

// compaction happens at every interval
for i := 0; i < 5; i++ {
// advance 5-minute, one revision for each interval
for j := 0; j < intervalsPerPeriod; j++ {
rg.Wait(1)
fc.Advance(tb.getRetryInterval())
}

a, err := compactable.Wait(1)
if err != nil {
t.Fatal(err)
}

expectedRevision = int64((i + 1) * 10)
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
}
}
}

func TestPeriodicPause(t *testing.T) {
Expand All @@ -74,14 +136,14 @@ func TestPeriodicPause(t *testing.T) {
tb.Run()
tb.Pause()

n := tb.getRetentions()

// tb will collect 3 hours of revisions but not compact since paused
checkCompactInterval := retentionDuration / time.Duration(periodDivisor)
n := periodDivisor
for i := 0; i < 3*n; i++ {
for i := 0; i < n*3; i++ {
rg.Wait(1)
fc.Advance(checkCompactInterval)
fc.Advance(tb.getRetryInterval())
}
// tb ends up waiting for the clock
// t.revs = [21 22 23 24 25 26 27 28 29 30]

select {
case a := <-compactable.Chan():
Expand All @@ -91,14 +153,17 @@ func TestPeriodicPause(t *testing.T) {

// tb resumes to being blocked on the clock
tb.Resume()

// unblock clock, will kick off a compaction at hour 3:06
rg.Wait(1)
fc.Advance(checkCompactInterval)

// unblock clock, will kick off a compaction at T=3h6m by retry
fc.Advance(tb.getRetryInterval())

// T=3h6m
a, err := compactable.Wait(1)
if err != nil {
t.Fatal(err)
}

// compact the revision from hour 2:06
wreq := &pb.CompactionRequest{Revision: int64(1 + 2*n + 1)}
if !reflect.DeepEqual(a[0].Params[0], wreq) {
Expand Down