diff --git a/storage/watchable_store.go b/storage/watchable_store.go index ebad2a64de9f..c776986b3d98 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -17,6 +17,7 @@ package storage import ( "fmt" "log" + "math" "sync" "time" @@ -241,6 +242,71 @@ func (s *watchableStore) syncWatchingsLoop() { } } +// RangeAllUnsynced ranges on them all +// finding minimum and maximum current revision index. +func (s *watchableStore) RangeAllUnsynced() (revbs [][]byte, kvs []storagepb.KeyValue, nextRev int64, err error) { + totalLimit := 0 + minRev, maxRev := int64(math.MaxInt64), int64(math.MinInt64) + startKey, endKey := make([]byte, 0), make([]byte, 0) + for w := range s.unsynced { + totalLimit += cap(w.ch) - len(w.ch) + if minRev >= w.cur { + minRev = w.cur + startKey = w.key + } + if maxRev <= w.cur { + maxRev = w.cur + endKey = w.key + } + } + + fmt.Println("startKey:", string(startKey)) + fmt.Println("endKey:", string(endKey)) + fmt.Println("minRev:", minRev) + fmt.Println("maxRev:", maxRev) + fmt.Println("totalLimit:", totalLimit) + + // startRev must be: + // s.store.compactMainRev < startRev <= s.store.currentRev.main + startRev := minRev + + if startRev > 0 && startRev <= s.store.compactMainRev { + return nil, nil, 0, ErrCompacted + } + if startRev > s.store.currentRev.main { + return nil, nil, 0, ErrFutureRev + } + + revs := s.store.kvindex.RangeSince(startKey, endKey, startRev) + if len(revs) == 0 { + return nil, nil, s.store.currentRev.main + 1, nil + } + + tx := s.store.b.BatchTx() + tx.Lock() + defer tx.Unlock() + // fetch events from the backend using revisions + for _, rev := range revs { + start, end := revBytesRange(rev) + + ks, vs := tx.UnsafeRange(keyBucketName, start, end, 0) + if len(vs) != 1 { + log.Fatalf("storage: range cannot find rev (%d,%d)", rev.main, rev.sub) + } + + var kv storagepb.KeyValue + if err := kv.Unmarshal(vs[0]); err != nil { + log.Fatalf("storage: cannot unmarshal event: %v", err) + } + revbs = append(revbs, ks[0]) + kvs = append(kvs, kv) + if totalLimit > 0 && len(kvs) >= int(totalLimit) { + return revbs, kvs, rev.main + 1, nil + } + } + return revbs, kvs, s.store.currentRev.main + 1, nil +} + // syncWatchings syncs the watchings in the unsyncd map. func (s *watchableStore) syncWatchings() { _, curRev, _ := s.store.Range(nil, nil, 0, 0) diff --git a/storage/watchable_store_test.go b/storage/watchable_store_test.go index 9fe9600dc63c..346958b1bccd 100644 --- a/storage/watchable_store_test.go +++ b/storage/watchable_store_test.go @@ -15,6 +15,7 @@ package storage import ( + "fmt" "os" "testing" ) @@ -185,6 +186,39 @@ func TestSyncWatchings(t *testing.T) { } } +// TestRangeAllUnsynced ... +func TestRangeAllUnsynced(t *testing.T) { + s := &watchableStore{ + store: newStore(tmpPath), + unsynced: make(map[*watching]struct{}), + synced: make(map[string]map[*watching]struct{}), + } + + defer func() { + s.store.Close() + os.Remove(tmpPath) + }() + + for k := 0; k < 2; k++ { + testKey := []byte(fmt.Sprintf("foo%d", k)) + testValue := []byte("bar") + s.Put(testKey, testValue) + + w := s.NewWatcher() + + watcherN := 10 + for i := 0; i < watcherN; i++ { + // use 1 to keep watchers in unsynced + w.Watch(testKey, true, 1) + } + } + a, b, c, d := s.RangeAllUnsynced() + fmt.Println(a) + fmt.Println(b) + fmt.Println(c) + fmt.Println(d) +} + func TestUnsafeAddWatching(t *testing.T) { s := newWatchableStore(tmpPath) defer func() {