Skip to content

Commit

Permalink
storage: range all unsynced at once
Browse files Browse the repository at this point in the history
This is for etcd-io#3848
to batch RangeHistory for all watchings at once.
  • Loading branch information
gyuho committed Dec 28, 2015
1 parent b79dae2 commit cbdef07
Showing 1 changed file with 80 additions and 35 deletions.
115 changes: 80 additions & 35 deletions storage/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package storage
import (
"fmt"
"log"
"math"
"sync"
"time"

Expand Down Expand Up @@ -241,57 +242,101 @@ func (s *watchableStore) syncWatchingsLoop() {
}
}

// syncWatchings syncs the watchings in the unsyncd map.
// syncWatchings periodically syncs unsynced watchings. A watching becomes
// unsynced if its chan buffer is full and it is behind the current revision.
func (s *watchableStore) syncWatchings() {
_, curRev, _ := s.store.Range(nil, nil, 0, 0)
s.store.mu.Lock()
defer s.store.mu.Unlock()

if len(s.unsynced) == 0 {
return
}

// in order to find key-value pairs from unsynced watchings, we need to
// find min revision index, and these revisions can be used to
// query the backend store of key-value pairs
minRev := int64(math.MaxInt64)

curRev := s.store.currentRev.main
compactionRev := s.store.compactMainRev

keyToUnsynced := make(map[string]map[*watching]struct{})

for w := range s.unsynced {
var end []byte
if w.prefix {
end = make([]byte, len(w.key))
copy(end, w.key)
end[len(w.key)-1]++
k := string(w.key)

if w.cur > curRev {
panic("watching current revision should not exceed current revision")
}
limit := cap(w.ch) - len(w.ch)
// the channel is full, try it in the next round
if limit == 0 {

if w.cur < compactionRev {
// TODO: return error compacted to that watching instead of
// just removing it sliently from unsynced.
delete(s.unsynced, w)
continue
}
revbs, kvs, nextRev, err := s.store.RangeHistory(w.key, end, int64(limit), w.cur)
if err != nil {
// TODO: send error event to watching
delete(s.unsynced, w)

if minRev >= w.cur {
minRev = w.cur
}

if _, ok := keyToUnsynced[k]; !ok {
keyToUnsynced[k] = make(map[*watching]struct{})
}
keyToUnsynced[k][w] = struct{}{}
}

minBytes, maxBytes := newRevBytes(), newRevBytes()
revToBytes(revision{main: minRev}, minBytes)
revToBytes(revision{main: curRev + 1}, maxBytes)

// UnsafeRange returns keys and values. And in boltdb, keys are revisions.
// values are actual key-value pairs in backend.
tx := s.store.b.BatchTx()
tx.Lock()
ks, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
tx.Unlock()

for i, v := range vs {
var kv storagepb.KeyValue
if err := kv.Unmarshal(v); err != nil {
log.Panicf("storage: cannot unmarshal event: %v", err)
}

k := string(kv.Key)
wm, ok := keyToUnsynced[k]
if !ok {
continue
}

// push events to the channel
for i, kv := range kvs {
var evt storagepb.Event_EventType
switch {
case isTombstone(revbs[i]):
evt = storagepb.DELETE
default:
evt = storagepb.PUT
}
var ev storagepb.Event
switch {
case isTombstone(ks[i]):
ev.Type = storagepb.DELETE
default:
ev.Type = storagepb.PUT
}
ev.Kv = &kv

for w := range wm {
ev.WatchID = w.id

w.ch <- storagepb.Event{
Type: evt,
Kv: &kv,
WatchID: w.id,
select {
case w.ch <- ev:
pendingEventsGauge.Inc()
default:
// TODO: handle the full unsynced watchings.
// continue to process other watchings for now, the full ones
// will be processed next time and hopefully it will not be full.
continue
}
pendingEventsGauge.Inc()
}
// switch to tracking future events if needed
if nextRev > curRev {
k := string(w.key)
if err := unsafeAddWatching(&s.synced, k, w); err != nil {
log.Panicf("error unsafeAddWatching (%v) for key %s", err, k)
}
delete(s.unsynced, w)
continue
}
// put it back to try it in the next round
w.cur = nextRev
}

slowWatchingGauge.Set(float64(len(s.unsynced)))
}

Expand Down

0 comments on commit cbdef07

Please sign in to comment.