Skip to content

Commit

Permalink
storage: range all unsynced at once
Browse files Browse the repository at this point in the history
This is for etcd-io#3848
to batch RangeHistory for all watchings at once.
  • Loading branch information
gyuho committed Dec 22, 2015
1 parent b79dae2 commit 3849241
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 0 deletions.
66 changes: 66 additions & 0 deletions storage/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package storage
import (
"fmt"
"log"
"math"
"sync"
"time"

Expand Down Expand Up @@ -241,6 +242,71 @@ func (s *watchableStore) syncWatchingsLoop() {
}
}

// RangeAllUnsynced ranges on them all
// finding minimum and maximum current revision index.
func (s *watchableStore) RangeAllUnsynced() (revbs [][]byte, kvs []storagepb.KeyValue, nextRev int64, err error) {
totalLimit := 0
minRev, maxRev := int64(math.MaxInt64), int64(math.MinInt64)
startKey, endKey := make([]byte, 0), make([]byte, 0)
for w := range s.unsynced {
totalLimit += cap(w.ch) - len(w.ch)
if minRev >= w.cur {
minRev = w.cur
startKey = w.key
}
if maxRev <= w.cur {
maxRev = w.cur
endKey = w.key
}
}

fmt.Println("startKey:", string(startKey))
fmt.Println("endKey:", string(endKey))
fmt.Println("minRev:", minRev)
fmt.Println("maxRev:", maxRev)
fmt.Println("totalLimit:", totalLimit)

// startRev must be:
// s.store.compactMainRev < startRev <= s.store.currentRev.main
startRev := minRev

if startRev > 0 && startRev <= s.store.compactMainRev {
return nil, nil, 0, ErrCompacted
}
if startRev > s.store.currentRev.main {
return nil, nil, 0, ErrFutureRev
}

revs := s.store.kvindex.RangeSince(startKey, endKey, startRev)
if len(revs) == 0 {
return nil, nil, s.store.currentRev.main + 1, nil
}

tx := s.store.b.BatchTx()
tx.Lock()
defer tx.Unlock()
// fetch events from the backend using revisions
for _, rev := range revs {
start, end := revBytesRange(rev)

ks, vs := tx.UnsafeRange(keyBucketName, start, end, 0)
if len(vs) != 1 {
log.Fatalf("storage: range cannot find rev (%d,%d)", rev.main, rev.sub)
}

var kv storagepb.KeyValue
if err := kv.Unmarshal(vs[0]); err != nil {
log.Fatalf("storage: cannot unmarshal event: %v", err)
}
revbs = append(revbs, ks[0])
kvs = append(kvs, kv)
if totalLimit > 0 && len(kvs) >= int(totalLimit) {
return revbs, kvs, rev.main + 1, nil
}
}
return revbs, kvs, s.store.currentRev.main + 1, nil
}

// syncWatchings syncs the watchings in the unsyncd map.
func (s *watchableStore) syncWatchings() {
_, curRev, _ := s.store.Range(nil, nil, 0, 0)
Expand Down
34 changes: 34 additions & 0 deletions storage/watchable_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package storage

import (
"fmt"
"os"
"testing"
)
Expand Down Expand Up @@ -185,6 +186,39 @@ func TestSyncWatchings(t *testing.T) {
}
}

// TestRangeAllUnsynced ...
func TestRangeAllUnsynced(t *testing.T) {
s := &watchableStore{
store: newStore(tmpPath),
unsynced: make(map[*watching]struct{}),
synced: make(map[string]map[*watching]struct{}),
}

defer func() {
s.store.Close()
os.Remove(tmpPath)
}()

for k := 0; k < 2; k++ {
testKey := []byte(fmt.Sprintf("foo%d", k))
testValue := []byte("bar")
s.Put(testKey, testValue)

w := s.NewWatcher()

watcherN := 10
for i := 0; i < watcherN; i++ {
// use 1 to keep watchers in unsynced
w.Watch(testKey, true, 1)
}
}
a, b, c, d := s.RangeAllUnsynced()
fmt.Println(a)
fmt.Println(b)
fmt.Println(c)
fmt.Println(d)
}

func TestUnsafeAddWatching(t *testing.T) {
s := newWatchableStore(tmpPath)
defer func() {
Expand Down

0 comments on commit 3849241

Please sign in to comment.