Skip to content

Commit

Permalink
*: fill in WatchResponse.Header
Browse files Browse the repository at this point in the history
Related to coreos#3848.
  • Loading branch information
gyuho committed Jan 6, 2016
1 parent 82f2cd6 commit e6be738
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 6 deletions.
2 changes: 1 addition & 1 deletion etcdmain/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
// set up v3 demo rpc
grpcServer := grpc.NewServer()
etcdserverpb.RegisterKVServer(grpcServer, v3rpc.NewKVServer(s))
etcdserverpb.RegisterWatchServer(grpcServer, v3rpc.NewWatchServer(s.Watchable()))
etcdserverpb.RegisterWatchServer(grpcServer, v3rpc.NewWatchServer(s))
go func() { plog.Fatal(grpcServer.Serve(v3l)) }()
}

Expand Down
46 changes: 41 additions & 5 deletions etcdserver/api/v3rpc/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,23 @@ package v3rpc
import (
"io"

"github.com/coreos/etcd/etcdserver"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/storage"
"github.com/coreos/etcd/storage/storagepb"
)

type watchServer struct {
server etcdserver.Server
watchable storage.Watchable
}

func NewWatchServer(w storage.Watchable) pb.WatchServer {
return &watchServer{w}
func NewWatchServer(s etcdserver.Server) pb.WatchServer {
sv, ok := s.(*etcdserver.EtcdServer)
if !ok {
return nil
}
return &watchServer{s, sv.Watchable()}
}

const (
Expand All @@ -44,6 +50,7 @@ const (
// and creates responses that forwarded to gRPC stream.
// It also forwards control message like watch created and canceled.
type serverWatchStream struct {
server etcdserver.Server
gRPCStream pb.Watch_WatchServer
watchStream storage.WatchStream
ctrlStream chan *pb.WatchResponse
Expand All @@ -54,6 +61,7 @@ type serverWatchStream struct {

func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error {
sws := serverWatchStream{
server: ws.server,
gRPCStream: stream,
watchStream: ws.watchable.NewWatchStream(),
// chan for sending control response like watcher created and canceled.
Expand All @@ -66,7 +74,32 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error {
return sws.recvLoop()
}

func (sws *serverWatchStream) newResponseHeaderWithoutRevision() *pb.ResponseHeader {
sv, ok := sws.server.(*etcdserver.EtcdServer)
if !ok {
return nil
}
return &pb.ResponseHeader{
ClusterId: uint64(sv.Cluster().ID()),
MemberId: uint64(sv.ID()),
RaftTerm: sv.Term(),
}
}

// attachRevision copies the ResponseHeader and updates it with the latest
// revision. We must attach the revision when this response is generated.
// If the rev in the ResponseHeader is same as the one in the kv notification,
// it means that this watcher is in sync. If the rev in the ResponseHeader is
// greater, it means the watch is unsynced(slow) because kv notification
// revision got behind the current watch revision.
func (sws *serverWatchStream) attachRevision(rh *pb.ResponseHeader) *pb.ResponseHeader {
nh := *rh
nh.Revision = sws.watchStream.Revision()
return &nh
}

func (sws *serverWatchStream) recvLoop() error {
rh := sws.newResponseHeaderWithoutRevision()
for {
req, err := sws.gRPCStream.Recv()
if err == io.EOF {
Expand All @@ -87,7 +120,7 @@ func (sws *serverWatchStream) recvLoop() error {
}
id := sws.watchStream.Watch(toWatch, prefix, creq.StartRevision)
sws.ctrlStream <- &pb.WatchResponse{
// TODO: fill in response header.
Header: sws.attachRevision(rh),
WatchId: int64(id),
Created: true,
}
Expand All @@ -96,7 +129,7 @@ func (sws *serverWatchStream) recvLoop() error {
err := sws.watchStream.Cancel(storage.WatchID(id))
if err == nil {
sws.ctrlStream <- &pb.WatchResponse{
// TODO: fill in response header.
Header: sws.attachRevision(rh),
WatchId: id,
Canceled: true,
}
Expand All @@ -109,6 +142,7 @@ func (sws *serverWatchStream) recvLoop() error {
}

func (sws *serverWatchStream) sendLoop() {
rh := sws.newResponseHeaderWithoutRevision()
for {
select {
case wresp, ok := <-sws.watchStream.Chan():
Expand All @@ -126,8 +160,10 @@ func (sws *serverWatchStream) sendLoop() {
}

err := sws.gRPCStream.Send(&pb.WatchResponse{
Header: sws.attachRevision(rh),
WatchId: int64(wresp.WatchID),
Events: events})
Events: events,
})
storage.ReportEventReceived()
if err != nil {
return
Expand Down
13 changes: 13 additions & 0 deletions storage/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ type WatchStream interface {

// Close closes the WatchChan and release all related resources.
Close()

// Revision returns the latest revison of the storage.
Revision() int64
}

type WatchResponse struct {
Expand Down Expand Up @@ -113,3 +116,13 @@ func (ws *watchStream) Close() {
close(ws.ch)
watchStreamGauge.Dec()
}

func (ws *watchStream) Revision() int64 {
ws.mu.Lock()
defer ws.mu.Unlock()
s, ok := ws.watchable.(*watchableStore)
if !ok {
return 0
}
return s.Rev()
}

0 comments on commit e6be738

Please sign in to comment.