Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mvcc: fix panic by allowing future revision watcher from restore operation #9775

Merged
merged 2 commits into from
May 31, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions integration/v3_watch_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// +build !cluster_proxy

package integration

import (
Expand Down
9 changes: 9 additions & 0 deletions mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ func (s *watchableStore) Restore(b backend.Backend) error {
}

for wa := range s.synced.watchers {
wa.restore = true
s.unsynced.add(wa)
}
s.synced = newWatcherGroup()
Expand Down Expand Up @@ -500,6 +501,14 @@ type watcher struct {
// compacted is set when the watcher is removed because of compaction
compacted bool

// restore is true when the watcher is being restored from leader snapshot
// which means that this watcher has just been moved from "synced" to "unsynced"
// watcher group, possibly with a future revision when it was first added
// to the synced watcher
// "unsynced" watcher revision must always be <= current revision,
// except when the watcher were to be moved from "synced" watcher group
restore bool

// minRev is the minimum revision update the watcher will accept
minRev int64
id WatchID
Expand Down
56 changes: 56 additions & 0 deletions mvcc/watchable_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,62 @@ func TestWatchRestore(t *testing.T) {
t.Run("RunSyncWatchLoopBeforeRestore", test(time.Millisecond*120)) // longer than default waitDuration
}

// TestWatchRestoreSyncedWatcher tests such a case that:
// 1. watcher is created with a future revision "math.MaxInt64 - 2"
// 2. watcher with a future revision is added to "synced" watcher group
// 3. restore/overwrite storage with snapshot of a higher lasat revision
// 4. restore operation moves "synced" to "unsynced" watcher group
// 5. choose the watcher from step 1, without panic
func TestWatchRestoreSyncedWatcher(t *testing.T) {
b1, b1Path := backend.NewDefaultTmpBackend()
s1 := newWatchableStore(zap.NewExample(), b1, &lease.FakeLessor{}, nil)
defer cleanup(s1, b1, b1Path)

b2, b2Path := backend.NewDefaultTmpBackend()
s2 := newWatchableStore(zap.NewExample(), b2, &lease.FakeLessor{}, nil)
defer cleanup(s2, b2, b2Path)

testKey, testValue := []byte("foo"), []byte("bar")
rev := s1.Put(testKey, testValue, lease.NoLease)
startRev := rev + 2

// create a watcher with a future revision
// add to "synced" watcher group (startRev > s.store.currentRev)
w1 := s1.NewWatchStream()
w1.Watch(0, testKey, nil, startRev)

// make "s2" ends up with a higher last revision
s2.Put(testKey, testValue, lease.NoLease)
s2.Put(testKey, testValue, lease.NoLease)

// overwrite storage with higher revisions
if err := s1.Restore(b2); err != nil {
t.Fatal(err)
}

// wait for next "syncWatchersLoop" iteration
// and the unsynced watcher should be chosen
time.Sleep(2 * time.Second)

// trigger events for "startRev"
s1.Put(testKey, testValue, lease.NoLease)

select {
case resp := <-w1.Chan():
if resp.Revision != startRev {
t.Fatalf("resp.Revision expect %d, got %d", startRev, resp.Revision)
}
if len(resp.Events) != 1 {
t.Fatalf("len(resp.Events) expect 1, got %d", len(resp.Events))
}
if resp.Events[0].Kv.ModRevision != startRev {
t.Fatalf("resp.Events[0].Kv.ModRevision expect %d, got %d", startRev, resp.Events[0].Kv.ModRevision)
}
case <-time.After(time.Second):
t.Fatal("failed to receive event in 1 second")
}
}

// TestWatchBatchUnsynced tests batching on unsynced watchers
func TestWatchBatchUnsynced(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
Expand Down
10 changes: 9 additions & 1 deletion mvcc/watcher_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,15 @@ func (wg *watcherGroup) chooseAll(curRev, compactRev int64) int64 {
minRev := int64(math.MaxInt64)
for w := range wg.watchers {
if w.minRev > curRev {
panic(fmt.Errorf("watcher minimum revision %d should not exceed current revision %d", w.minRev, curRev))
// after network partition, possibly choosing future revision watcher from restore operation
// with watch key "proxy-namespace__lostleader" and revision "math.MaxInt64 - 2"
// do not panic when such watcher had been moved from "synced" watcher during restore operation
if !w.restore {
panic(fmt.Errorf("watcher minimum revision %d should not exceed current revision %d", w.minRev, curRev))
}

// mark 'restore' done, since it's chosen
w.restore = false
}
if w.minRev < compactRev {
select {
Expand Down