Skip to content

Commit

Permalink
clientv3: support "watch fragment"
Browse files Browse the repository at this point in the history
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
  • Loading branch information
gyuho committed Feb 7, 2018
1 parent fc90887 commit 742810c
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 19 deletions.
78 changes: 78 additions & 0 deletions clientv3/integration/watch_fragment_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package integration

import (
"context"
"fmt"
"strings"
"testing"
"time"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/integration"
"github.com/coreos/etcd/pkg/testutil"
)

func TestWatchFragmentEnable(t *testing.T) { testWatchFragment(t, true) }
func TestWatchFragmentDisable(t *testing.T) { testWatchFragment(t, false) }

func testWatchFragment(t *testing.T, fragment bool) {
// when two events are combined, response exceeds limit
clus := integration.NewClusterV3(t,
&integration.ClusterConfig{
Size: 1,
MaxRequestBytes: 1.5 * 1024 * 1024,
},
)
defer clus.Terminate(t)

cli := clus.Client(0)
errc := make(chan error)
for i := 0; i < 100; i++ {
go func(i int) {
_, err := cli.Put(context.TODO(),
fmt.Sprint("foo", i),
strings.Repeat("a", 1024*1024),
)
errc <- err
}(i)
}
for i := 0; i < 100; i++ {
if err := <-errc; err != nil {
t.Errorf("failed to put: %v", err)
}
}

opts := []clientv3.OpOption{
clientv3.WithPrefix(),
clientv3.WithRev(1),
}
if !fragment {
opts = append(opts, clientv3.WithNoFragment())
}
wch := cli.Watch(context.TODO(), "foo", opts...)
select {
case ws := <-wch:
if fragment && len(ws.Events) != 100 {
t.Errorf("expected 100 events with watch fragmentation, got %d", len(ws.Events))
}
if !fragment && len(ws.Events) == 100 {
t.Errorf("expected <100 events with no watch fragmentation, got %d", len(ws.Events))
}
case <-time.After(testutil.RequestTimeout):
t.Fatalf("took too long to receive events")
}
}
9 changes: 9 additions & 0 deletions clientv3/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ type Op struct {
// for watch, put, delete
prevKV bool

// for watch
noFragment bool

// for put
ignoreValue bool
ignoreLease bool
Expand Down Expand Up @@ -466,6 +469,12 @@ func WithPrevKV() OpOption {
}
}

// WithNoFragment to receive raw watch response
// without fragmentation.
func WithNoFragment() OpOption {
return func(op *Op) { op.noFragment = true }
}

// WithIgnoreValue updates the key using its current value.
// This option can not be combined with non-empty values.
// Returns an error if the key does not exist.
Expand Down
66 changes: 47 additions & 19 deletions clientv3/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ type watchGrpcStream struct {
resumec chan struct{}
// closeErr is the error that closed the watch stream
closeErr error

// expectFragments is "true" to indicate client expects
// fragmented watch response from server
expectFragments bool
}

// watchRequest is issued by the subscriber to start a new watcher
Expand All @@ -178,6 +182,10 @@ type watchRequest struct {
createdNotify bool
// progressNotify is for progress updates
progressNotify bool
// noFragment to receive raw watch response without fragment
// watch fragment "true" by default in case watch response exceeds
// server request limit, which is default 1.5 MiB
noFragment bool
// filters is the list of events to filter out
filters []pb.WatchCreateRequest_FilterType
// get the previous key-value pair before the event happens
Expand Down Expand Up @@ -232,22 +240,23 @@ func (vc *valCtx) Deadline() (time.Time, bool) { return zeroTime, false }
func (vc *valCtx) Done() <-chan struct{} { return valCtxCh }
func (vc *valCtx) Err() error { return nil }

func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
func (w *watcher) newWatcherGrpcStream(inctx context.Context, op Op) *watchGrpcStream {
ctx, cancel := context.WithCancel(&valCtx{inctx})
wgs := &watchGrpcStream{
owner: w,
remote: w.remote,
callOpts: w.callOpts,
ctx: ctx,
ctxKey: streamKeyFromCtx(inctx),
cancel: cancel,
substreams: make(map[int64]*watcherStream),
respc: make(chan *pb.WatchResponse),
reqc: make(chan *watchRequest),
donec: make(chan struct{}),
errc: make(chan error, 1),
closingc: make(chan *watcherStream),
resumec: make(chan struct{}),
owner: w,
remote: w.remote,
callOpts: w.callOpts,
ctx: ctx,
ctxKey: streamKeyFromCtx(inctx),
cancel: cancel,
substreams: make(map[int64]*watcherStream),
respc: make(chan *pb.WatchResponse),
reqc: make(chan *watchRequest),
donec: make(chan struct{}),
errc: make(chan error, 1),
closingc: make(chan *watcherStream),
resumec: make(chan struct{}),
expectFragments: !op.noFragment,
}
go wgs.run()
return wgs
Expand All @@ -274,6 +283,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
progressNotify: ow.progressNotify,
filters: filters,
prevKV: ow.prevKV,
noFragment: ow.noFragment,
retc: make(chan chan WatchResponse, 1),
}

Expand All @@ -291,7 +301,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
}
wgs := w.streams[ctxKey]
if wgs == nil {
wgs = w.newWatcherGrpcStream(ctx)
wgs = w.newWatcherGrpcStream(ctx, ow)
w.streams[ctxKey] = wgs
}
donec := wgs.donec
Expand Down Expand Up @@ -450,7 +460,7 @@ func (w *watchGrpcStream) run() {
}

cancelSet := make(map[int64]struct{})

var cur *pb.WatchResponse
for {
select {
// Watch() requested
Expand All @@ -477,12 +487,25 @@ func (w *watchGrpcStream) run() {
}
// New events from the watch client
case pbresp := <-w.respc:
if !w.expectFragments {
cur = pbresp
} else {
if cur != nil {
// combine events
cur.Events = append(cur.Events, pbresp.Events...)
cur.Fragment = pbresp.Fragment
} else {
cur = pbresp
}
}

switch {
case pbresp.Created:
// response to head of queue creation
if ws := w.resuming[0]; ws != nil {
w.addSubstream(pbresp, ws)
w.dispatchEvent(pbresp)
w.addSubstream(cur, ws)
w.dispatchEvent(cur)
cur = nil // reset
w.resuming[0] = nil
}
if ws := w.nextResume(); ws != nil {
Expand All @@ -495,9 +518,12 @@ func (w *watchGrpcStream) run() {
close(ws.recvc)
closing[ws] = struct{}{}
}
case cur != nil && cur.Fragment:
continue
default:
// dispatch to appropriate watch stream
if ok := w.dispatchEvent(pbresp); ok {
if ok := w.dispatchEvent(cur); ok {
cur = nil
break
}
// watch response on unexpected watch id; cancel id
Expand Down Expand Up @@ -820,7 +846,9 @@ func (wr *watchRequest) toPB() *pb.WatchRequest {
ProgressNotify: wr.progressNotify,
Filters: wr.filters,
PrevKv: wr.prevKV,
Fragment: !wr.noFragment,
}

cr := &pb.WatchRequest_CreateRequest{CreateRequest: req}
return &pb.WatchRequest{RequestUnion: cr}
}
Expand Down

0 comments on commit 742810c

Please sign in to comment.