diff --git a/storage/watchable_store.go b/storage/watchable_store.go index ebad2a64de9f..2c6e8ee2844e 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -17,6 +17,7 @@ package storage import ( "fmt" "log" + "math" "sync" "time" @@ -241,57 +242,101 @@ func (s *watchableStore) syncWatchingsLoop() { } } -// syncWatchings syncs the watchings in the unsyncd map. +// syncWatchings periodically syncs unsynced watchings. A watching becomes +// unsynced if its chan buffer is full and it is behind the current revision. func (s *watchableStore) syncWatchings() { - _, curRev, _ := s.store.Range(nil, nil, 0, 0) + s.store.mu.Lock() + defer s.store.mu.Unlock() + + if len(s.unsynced) == 0 { + return + } + + // in order to find key-value pairs from unsynced watchings, we need to + // find min revision index, and these revisions can be used to + // query the backend store of key-value pairs + minRev := int64(math.MaxInt64) + + curRev := s.store.currentRev.main + compactionRev := s.store.compactMainRev + + keyToUnsynced := make(map[string]map[*watching]struct{}) + for w := range s.unsynced { - var end []byte - if w.prefix { - end = make([]byte, len(w.key)) - copy(end, w.key) - end[len(w.key)-1]++ + k := string(w.key) + + if w.cur > curRev { + panic("watching current revision should not exceed current revision") } - limit := cap(w.ch) - len(w.ch) - // the channel is full, try it in the next round - if limit == 0 { + + if w.cur < compactionRev { + // TODO: return error compacted to that watching instead of + // just removing it sliently from unsynced. + delete(s.unsynced, w) continue } - revbs, kvs, nextRev, err := s.store.RangeHistory(w.key, end, int64(limit), w.cur) - if err != nil { - // TODO: send error event to watching - delete(s.unsynced, w) + + if minRev >= w.cur { + minRev = w.cur + } + + if _, ok := keyToUnsynced[k]; !ok { + keyToUnsynced[k] = make(map[*watching]struct{}) + } + keyToUnsynced[k][w] = struct{}{} + } + + minBytes, maxBytes := newRevBytes(), newRevBytes() + revToBytes(revision{main: minRev}, minBytes) + revToBytes(revision{main: curRev + 1}, maxBytes) + + // UnsafeRange returns keys and values. And in boltdb, keys are revisions. + // values are actual key-value pairs in backend. + tx := s.store.b.BatchTx() + tx.Lock() + ks, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0) + tx.Unlock() + + for i, v := range vs { + var kv storagepb.KeyValue + if err := kv.Unmarshal(v); err != nil { + log.Panicf("storage: cannot unmarshal event: %v", err) + } + + k := string(kv.Key) + wm, ok := keyToUnsynced[k] + if !ok { continue } - // push events to the channel - for i, kv := range kvs { - var evt storagepb.Event_EventType - switch { - case isTombstone(revbs[i]): - evt = storagepb.DELETE - default: - evt = storagepb.PUT - } + var ev storagepb.Event + switch { + case isTombstone(ks[i]): + ev.Type = storagepb.DELETE + default: + ev.Type = storagepb.PUT + } + ev.Kv = &kv + + for w := range wm { + ev.WatchID = w.id - w.ch <- storagepb.Event{ - Type: evt, - Kv: &kv, - WatchID: w.id, + select { + case w.ch <- ev: + pendingEventsGauge.Inc() + default: + // TODO: handle the full unsynced watchings. + // continue to process other watchings for now, the full ones + // will be processed next time and hopefully it will not be full. + continue } - pendingEventsGauge.Inc() - } - // switch to tracking future events if needed - if nextRev > curRev { - k := string(w.key) if err := unsafeAddWatching(&s.synced, k, w); err != nil { log.Panicf("error unsafeAddWatching (%v) for key %s", err, k) } delete(s.unsynced, w) - continue } - // put it back to try it in the next round - w.cur = nextRev } + slowWatchingGauge.Set(float64(len(s.unsynced))) } diff --git a/storage/watchable_store_test.go b/storage/watchable_store_test.go index 9fe9600dc63c..de1dc13e5452 100644 --- a/storage/watchable_store_test.go +++ b/storage/watchable_store_test.go @@ -61,6 +61,9 @@ func TestNewWatcherCancel(t *testing.T) { // TestCancelUnsynced tests if running CancelFunc removes watchings from unsynced. func TestCancelUnsynced(t *testing.T) { + // arbitrary number for watchers + watcherN := 100 + // manually create watchableStore instead of newWatchableStore // because newWatchableStore automatically calls syncWatchers // method to sync watchers in unsynced map. We want to keep watchers @@ -89,9 +92,6 @@ func TestCancelUnsynced(t *testing.T) { w := s.NewWatcher() - // arbitrary number for watcher - watcherN := 100 - // create watcherN of CancelFunc of // synced and unsynced cancels := make([]CancelFunc, watcherN) @@ -119,6 +119,9 @@ func TestCancelUnsynced(t *testing.T) { // events to channel of unsynced watchings and moves these // watchings to synced. func TestSyncWatchings(t *testing.T) { + // arbitrary number for watchers + watcherN := 150 + s := &watchableStore{ store: newStore(tmpPath), unsynced: make(map[*watching]struct{}), @@ -136,9 +139,6 @@ func TestSyncWatchings(t *testing.T) { w := s.NewWatcher() - // arbitrary number for watcher - watcherN := 100 - for i := 0; i < watcherN; i++ { // use 1 to keep watchers in unsynced w.Watch(testKey, true, 1) @@ -149,12 +149,12 @@ func TestSyncWatchings(t *testing.T) { // synced should be empty // because we manually populate unsynced only if len(s.synced[string(testKey)]) != 0 { - t.Fatalf("synced[string(testKey)] size = %d, want 0", len(s.synced[string(testKey)])) + t.Fatalf("[BEFORE syncWatchings] synced[string(testKey)] size = %d, want 0", len(s.synced[string(testKey)])) } // unsynced should not be empty // because we manually populated unsynced only if len(s.unsynced) == 0 { - t.Errorf("unsynced size = %d, want %d", len(s.unsynced), watcherN) + t.Errorf("[BEFORE syncWatchings] unsynced size = %d, want %d", len(s.unsynced), watcherN) } // this should move all unsynced watchings @@ -167,14 +167,12 @@ func TestSyncWatchings(t *testing.T) { // because syncWatchings populates synced // in this test case if len(s.synced[string(testKey)]) == 0 { - t.Errorf("synced[string(testKey)] size = 0, want %d", len(s.synced[string(testKey)])) + t.Errorf("[AFTER syncWatching] synced[string(testKey)] size = 0, want %d", len(s.synced[string(testKey)])) } - // unsynced should be empty - // because syncWatchings is expected to move - // all watchings from unsynced to synced - // in this test case + // unsynced should be empty because syncWatchings should have moved + // all watchings from unsynced to synced in this test case if len(s.unsynced) != 0 { - t.Errorf("unsynced size = %d, want 0", len(s.unsynced)) + t.Errorf("[AFTER syncWatchings] unsynced size = %d, want 0", len(s.unsynced)) } // All of the watchings actually share one channel