diff --git a/storage/watchable_store.go b/storage/watchable_store.go index ebad2a64de9f..2c6e8ee2844e 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -17,6 +17,7 @@ package storage import ( "fmt" "log" + "math" "sync" "time" @@ -241,57 +242,101 @@ func (s *watchableStore) syncWatchingsLoop() { } } -// syncWatchings syncs the watchings in the unsyncd map. +// syncWatchings periodically syncs unsynced watchings. A watching becomes +// unsynced if its chan buffer is full and it is behind the current revision. func (s *watchableStore) syncWatchings() { - _, curRev, _ := s.store.Range(nil, nil, 0, 0) + s.store.mu.Lock() + defer s.store.mu.Unlock() + + if len(s.unsynced) == 0 { + return + } + + // in order to find key-value pairs from unsynced watchings, we need to + // find min revision index, and these revisions can be used to + // query the backend store of key-value pairs + minRev := int64(math.MaxInt64) + + curRev := s.store.currentRev.main + compactionRev := s.store.compactMainRev + + keyToUnsynced := make(map[string]map[*watching]struct{}) + for w := range s.unsynced { - var end []byte - if w.prefix { - end = make([]byte, len(w.key)) - copy(end, w.key) - end[len(w.key)-1]++ + k := string(w.key) + + if w.cur > curRev { + panic("watching current revision should not exceed current revision") } - limit := cap(w.ch) - len(w.ch) - // the channel is full, try it in the next round - if limit == 0 { + + if w.cur < compactionRev { + // TODO: return error compacted to that watching instead of + // just removing it sliently from unsynced. + delete(s.unsynced, w) continue } - revbs, kvs, nextRev, err := s.store.RangeHistory(w.key, end, int64(limit), w.cur) - if err != nil { - // TODO: send error event to watching - delete(s.unsynced, w) + + if minRev >= w.cur { + minRev = w.cur + } + + if _, ok := keyToUnsynced[k]; !ok { + keyToUnsynced[k] = make(map[*watching]struct{}) + } + keyToUnsynced[k][w] = struct{}{} + } + + minBytes, maxBytes := newRevBytes(), newRevBytes() + revToBytes(revision{main: minRev}, minBytes) + revToBytes(revision{main: curRev + 1}, maxBytes) + + // UnsafeRange returns keys and values. And in boltdb, keys are revisions. + // values are actual key-value pairs in backend. + tx := s.store.b.BatchTx() + tx.Lock() + ks, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0) + tx.Unlock() + + for i, v := range vs { + var kv storagepb.KeyValue + if err := kv.Unmarshal(v); err != nil { + log.Panicf("storage: cannot unmarshal event: %v", err) + } + + k := string(kv.Key) + wm, ok := keyToUnsynced[k] + if !ok { continue } - // push events to the channel - for i, kv := range kvs { - var evt storagepb.Event_EventType - switch { - case isTombstone(revbs[i]): - evt = storagepb.DELETE - default: - evt = storagepb.PUT - } + var ev storagepb.Event + switch { + case isTombstone(ks[i]): + ev.Type = storagepb.DELETE + default: + ev.Type = storagepb.PUT + } + ev.Kv = &kv + + for w := range wm { + ev.WatchID = w.id - w.ch <- storagepb.Event{ - Type: evt, - Kv: &kv, - WatchID: w.id, + select { + case w.ch <- ev: + pendingEventsGauge.Inc() + default: + // TODO: handle the full unsynced watchings. + // continue to process other watchings for now, the full ones + // will be processed next time and hopefully it will not be full. + continue } - pendingEventsGauge.Inc() - } - // switch to tracking future events if needed - if nextRev > curRev { - k := string(w.key) if err := unsafeAddWatching(&s.synced, k, w); err != nil { log.Panicf("error unsafeAddWatching (%v) for key %s", err, k) } delete(s.unsynced, w) - continue } - // put it back to try it in the next round - w.cur = nextRev } + slowWatchingGauge.Set(float64(len(s.unsynced))) }