From e6be738e5c7df9380b5d336e2359b5a88181ff10 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Tue, 5 Jan 2016 21:33:50 -0800 Subject: [PATCH] *: fill in WatchResponse.Header Related to coreos#3848. --- etcdmain/etcd.go | 2 +- etcdserver/api/v3rpc/watch.go | 46 +++++++++++++++++++++++++++++++---- storage/watcher.go | 13 ++++++++++ 3 files changed, 55 insertions(+), 6 deletions(-) diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index 1ab0067cffd8..3f83c379da47 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -323,7 +323,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) { // set up v3 demo rpc grpcServer := grpc.NewServer() etcdserverpb.RegisterKVServer(grpcServer, v3rpc.NewKVServer(s)) - etcdserverpb.RegisterWatchServer(grpcServer, v3rpc.NewWatchServer(s.Watchable())) + etcdserverpb.RegisterWatchServer(grpcServer, v3rpc.NewWatchServer(s)) go func() { plog.Fatal(grpcServer.Serve(v3l)) }() } diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index 4cd8c9e994aa..3804a4f6406c 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -17,17 +17,23 @@ package v3rpc import ( "io" + "github.com/coreos/etcd/etcdserver" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/storage" "github.com/coreos/etcd/storage/storagepb" ) type watchServer struct { + server etcdserver.Server watchable storage.Watchable } -func NewWatchServer(w storage.Watchable) pb.WatchServer { - return &watchServer{w} +func NewWatchServer(s etcdserver.Server) pb.WatchServer { + sv, ok := s.(*etcdserver.EtcdServer) + if !ok { + return nil + } + return &watchServer{s, sv.Watchable()} } const ( @@ -44,6 +50,7 @@ const ( // and creates responses that forwarded to gRPC stream. // It also forwards control message like watch created and canceled. type serverWatchStream struct { + server etcdserver.Server gRPCStream pb.Watch_WatchServer watchStream storage.WatchStream ctrlStream chan *pb.WatchResponse @@ -54,6 +61,7 @@ type serverWatchStream struct { func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error { sws := serverWatchStream{ + server: ws.server, gRPCStream: stream, watchStream: ws.watchable.NewWatchStream(), // chan for sending control response like watcher created and canceled. @@ -66,7 +74,32 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error { return sws.recvLoop() } +func (sws *serverWatchStream) newResponseHeaderWithoutRevision() *pb.ResponseHeader { + sv, ok := sws.server.(*etcdserver.EtcdServer) + if !ok { + return nil + } + return &pb.ResponseHeader{ + ClusterId: uint64(sv.Cluster().ID()), + MemberId: uint64(sv.ID()), + RaftTerm: sv.Term(), + } +} + +// attachRevision copies the ResponseHeader and updates it with the latest +// revision. We must attach the revision when this response is generated. +// If the rev in the ResponseHeader is same as the one in the kv notification, +// it means that this watcher is in sync. If the rev in the ResponseHeader is +// greater, it means the watch is unsynced(slow) because kv notification +// revision got behind the current watch revision. +func (sws *serverWatchStream) attachRevision(rh *pb.ResponseHeader) *pb.ResponseHeader { + nh := *rh + nh.Revision = sws.watchStream.Revision() + return &nh +} + func (sws *serverWatchStream) recvLoop() error { + rh := sws.newResponseHeaderWithoutRevision() for { req, err := sws.gRPCStream.Recv() if err == io.EOF { @@ -87,7 +120,7 @@ func (sws *serverWatchStream) recvLoop() error { } id := sws.watchStream.Watch(toWatch, prefix, creq.StartRevision) sws.ctrlStream <- &pb.WatchResponse{ - // TODO: fill in response header. + Header: sws.attachRevision(rh), WatchId: int64(id), Created: true, } @@ -96,7 +129,7 @@ func (sws *serverWatchStream) recvLoop() error { err := sws.watchStream.Cancel(storage.WatchID(id)) if err == nil { sws.ctrlStream <- &pb.WatchResponse{ - // TODO: fill in response header. + Header: sws.attachRevision(rh), WatchId: id, Canceled: true, } @@ -109,6 +142,7 @@ func (sws *serverWatchStream) recvLoop() error { } func (sws *serverWatchStream) sendLoop() { + rh := sws.newResponseHeaderWithoutRevision() for { select { case wresp, ok := <-sws.watchStream.Chan(): @@ -126,8 +160,10 @@ func (sws *serverWatchStream) sendLoop() { } err := sws.gRPCStream.Send(&pb.WatchResponse{ + Header: sws.attachRevision(rh), WatchId: int64(wresp.WatchID), - Events: events}) + Events: events, + }) storage.ReportEventReceived() if err != nil { return diff --git a/storage/watcher.go b/storage/watcher.go index acee2cfbc5fa..343f8eaccd6e 100644 --- a/storage/watcher.go +++ b/storage/watcher.go @@ -49,6 +49,9 @@ type WatchStream interface { // Close closes the WatchChan and release all related resources. Close() + + // Revision returns the latest revison of the storage. + Revision() int64 } type WatchResponse struct { @@ -113,3 +116,13 @@ func (ws *watchStream) Close() { close(ws.ch) watchStreamGauge.Dec() } + +func (ws *watchStream) Revision() int64 { + ws.mu.Lock() + defer ws.mu.Unlock() + s, ok := ws.watchable.(*watchableStore) + if !ok { + return 0 + } + return s.Rev() +}