Skip to content

Commit

Permalink
*: fill in WatchResponse.Header
Browse files Browse the repository at this point in the history
Related to coreos#3848.

remove interface type assertion

use interface, Revision in storage.WatchResponse

use interface raftTimer, and revision from WatchResponse

use EtcdServer to get raft terms, remove new header function

fix deadlock

add interface
  • Loading branch information
gyuho committed Jan 6, 2016
1 parent 82f2cd6 commit dde6c6f
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 23 deletions.
3 changes: 3 additions & 0 deletions etcdctlv3/command/watch_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,13 @@ func recvLoop(wStream pb.Watch_WatchClient) {
switch {
// TODO: handle canceled/compacted and other control response types
case resp.Created:
fmt.Printf("[revision: %d]\n", resp.Header.Revision)
fmt.Printf("watcher created: id %08x\n", resp.WatchId)
case resp.Canceled:
fmt.Printf("[revision: %d]\n", resp.Header.Revision)
fmt.Printf("watcher canceled: id %08x\n", resp.WatchId)
default:
fmt.Printf("[revision: %d]\n", resp.Header.Revision)
for _, ev := range resp.Events {
fmt.Printf("%s: %s %s\n", ev.Type, string(ev.Kv.Key), string(ev.Kv.Value))
}
Expand Down
2 changes: 1 addition & 1 deletion etcdmain/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
// set up v3 demo rpc
grpcServer := grpc.NewServer()
etcdserverpb.RegisterKVServer(grpcServer, v3rpc.NewKVServer(s))
etcdserverpb.RegisterWatchServer(grpcServer, v3rpc.NewWatchServer(s.Watchable()))
etcdserverpb.RegisterWatchServer(grpcServer, v3rpc.NewWatchServer(s, s, s.Watchable()))
go func() { plog.Fatal(grpcServer.Serve(v3l)) }()
}

Expand Down
22 changes: 17 additions & 5 deletions etcdserver/api/v3rpc/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@ package v3rpc
import (
"io"

"github.com/coreos/etcd/etcdserver"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/storage"
"github.com/coreos/etcd/storage/storagepb"
)

type watchServer struct {
server etcdserver.Server
raftTimer *etcdserver.EtcdServer
watchable storage.Watchable
}

func NewWatchServer(w storage.Watchable) pb.WatchServer {
return &watchServer{w}
func NewWatchServer(s etcdserver.Server, r *etcdserver.EtcdServer, w storage.Watchable) pb.WatchServer {
return &watchServer{s, r, w}
}

const (
Expand All @@ -44,6 +47,9 @@ const (
// and creates responses that forwarded to gRPC stream.
// It also forwards control message like watch created and canceled.
type serverWatchStream struct {
raftTimer *etcdserver.EtcdServer
clusterID int64
memberID int64
gRPCStream pb.Watch_WatchServer
watchStream storage.WatchStream
ctrlStream chan *pb.WatchResponse
Expand All @@ -54,6 +60,9 @@ type serverWatchStream struct {

func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error {
sws := serverWatchStream{
raftTimer: ws.raftTimer,
clusterID: int64(ws.server.ClusterID()),
memberID: int64(ws.server.ID()),
gRPCStream: stream,
watchStream: ws.watchable.NewWatchStream(),
// chan for sending control response like watcher created and canceled.
Expand All @@ -76,6 +85,7 @@ func (sws *serverWatchStream) recvLoop() error {
return err
}

rh := &pb.ResponseHeader{ClusterId: uint64(sws.clusterID), MemberId: uint64(sws.memberID), Revision: sws.watchStream.Rev(), RaftTerm: sws.raftTimer.Term()}
switch {
case req.CreateRequest != nil:
creq := req.CreateRequest
Expand All @@ -87,7 +97,7 @@ func (sws *serverWatchStream) recvLoop() error {
}
id := sws.watchStream.Watch(toWatch, prefix, creq.StartRevision)
sws.ctrlStream <- &pb.WatchResponse{
// TODO: fill in response header.
Header: rh,
WatchId: int64(id),
Created: true,
}
Expand All @@ -96,7 +106,7 @@ func (sws *serverWatchStream) recvLoop() error {
err := sws.watchStream.Cancel(storage.WatchID(id))
if err == nil {
sws.ctrlStream <- &pb.WatchResponse{
// TODO: fill in response header.
Header: rh,
WatchId: id,
Canceled: true,
}
Expand Down Expand Up @@ -126,8 +136,10 @@ func (sws *serverWatchStream) sendLoop() {
}

err := sws.gRPCStream.Send(&pb.WatchResponse{
Header: &pb.ResponseHeader{ClusterId: uint64(sws.clusterID), MemberId: uint64(sws.memberID), Revision: wresp.Revision, RaftTerm: sws.raftTimer.Term()},
WatchId: int64(wresp.WatchID),
Events: events})
Events: events,
})
storage.ReportEventReceived()
if err != nil {
return
Expand Down
18 changes: 10 additions & 8 deletions etcdserver/etcdhttp/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,11 @@ type serverRecorder struct {
actions []action
}

func (s *serverRecorder) Start() {}
func (s *serverRecorder) Stop() {}
func (s *serverRecorder) Leader() types.ID { return types.ID(1) }
func (s *serverRecorder) ID() types.ID { return types.ID(1) }
func (s *serverRecorder) Start() {}
func (s *serverRecorder) Stop() {}
func (s *serverRecorder) Leader() types.ID { return types.ID(1) }
func (s *serverRecorder) ID() types.ID { return types.ID(1) }
func (s *serverRecorder) ClusterID() types.ID { return types.ID(1) }
func (s *serverRecorder) Do(_ context.Context, r etcdserverpb.Request) (etcdserver.Response, error) {
s.actions = append(s.actions, action{name: "Do", params: []interface{}{r}})
return etcdserver.Response{}, nil
Expand Down Expand Up @@ -141,10 +142,11 @@ type resServer struct {
res etcdserver.Response
}

func (rs *resServer) Start() {}
func (rs *resServer) Stop() {}
func (rs *resServer) ID() types.ID { return types.ID(1) }
func (rs *resServer) Leader() types.ID { return types.ID(1) }
func (rs *resServer) Start() {}
func (rs *resServer) Stop() {}
func (rs *resServer) ID() types.ID { return types.ID(1) }
func (rs *resServer) ClusterID() types.ID { return types.ID(1) }
func (rs *resServer) Leader() types.ID { return types.ID(1) }
func (rs *resServer) Do(_ context.Context, _ etcdserverpb.Request) (etcdserver.Response, error) {
return rs.res, nil
}
Expand Down
9 changes: 5 additions & 4 deletions etcdserver/etcdhttp/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,11 @@ type errServer struct {
err error
}

func (fs *errServer) Start() {}
func (fs *errServer) Stop() {}
func (fs *errServer) ID() types.ID { return types.ID(1) }
func (fs *errServer) Leader() types.ID { return types.ID(1) }
func (fs *errServer) Start() {}
func (fs *errServer) Stop() {}
func (fs *errServer) ID() types.ID { return types.ID(1) }
func (fs *errServer) ClusterID() types.ID { return types.ID(1) }
func (fs *errServer) Leader() types.ID { return types.ID(1) }
func (fs *errServer) Do(ctx context.Context, r etcdserverpb.Request) (etcdserver.Response, error) {
return etcdserver.Response{}, fs.err
}
Expand Down
2 changes: 1 addition & 1 deletion etcdserver/etcdserverpb/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ message WatchResponse {
// catch up with the progress of the KV.
//
// Client should treat the watching as canceled and should not try to create any
// watching with same start_revision again.
// watching with same start_revision again.
bool compacted = 5;

repeated storagepb.Event events = 11;
Expand Down
4 changes: 4 additions & 0 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ type Server interface {
Stop()
// ID returns the ID of the Server.
ID() types.ID
// ClusterID returns the ID of the cluster.
ClusterID() types.ID
// Leader returns the ID of the leader Server.
Leader() types.ID
// Do takes a request and attempts to fulfill it, returning a Response.
Expand Down Expand Up @@ -452,6 +454,8 @@ func (s *EtcdServer) ID() types.ID { return s.id }

func (s *EtcdServer) Cluster() Cluster { return s.cluster }

func (s *EtcdServer) ClusterID() types.ID { return s.cluster.id }

func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() }

func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
Expand Down
7 changes: 3 additions & 4 deletions storage/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,9 +346,9 @@ func (s *watchableStore) syncWatchers() {
}

for w, es := range newWatcherToEventMap(keyToUnsynced, evs) {
wr := WatchResponse{WatchID: w.id, Events: es}
select {
case w.ch <- wr:
case w.ch <- WatchResponse{WatchID: w.id, Events: es, Revision: s.store.currentRev.main}:
// s.store.Rev also uses Lock, so just return directly
pendingEventsGauge.Add(float64(len(es)))
default:
// TODO: handle the full unsynced watchers.
Expand Down Expand Up @@ -381,9 +381,8 @@ func (s *watchableStore) notify(rev int64, evs []storagepb.Event) {
if !ok {
continue
}
wr := WatchResponse{WatchID: w.id, Events: es}
select {
case w.ch <- wr:
case w.ch <- WatchResponse{WatchID: w.id, Events: es, Revision: s.Rev()}:
pendingEventsGauge.Add(float64(len(es)))
default:
// move slow watcher to unsynced
Expand Down
22 changes: 22 additions & 0 deletions storage/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,24 @@ type WatchStream interface {

// Close closes the WatchChan and release all related resources.
Close()

// Rev returns the current revision of the KV the stream watches on.
Rev() int64
}

// WatchResponse contains the WatchID that can be used to cancel Watch,
// and events that were watched, and Revision for clients to check the progress
// of the Watch operation. If the Revision in the WatchResponse is same as the one
// in the kv notification, it means that this watcher is in sync. If the Revision
// in the WatchResponse is greater the one in kv notifiction, it means the watch is
// unsynced; kv store is slow and not catching up with Watch with lower
type WatchResponse struct {
// WatchID is the WatchID of the watcher this response sent to.
WatchID WatchID
// Events contains all the events that needs to send.
Events []storagepb.Event
// Revision is the latest revision of the storage.
Revision int64
}

// watchStream contains a collection of watchers that share
Expand Down Expand Up @@ -113,3 +124,14 @@ func (ws *watchStream) Close() {
close(ws.ch)
watchStreamGauge.Dec()
}

func (ws *watchStream) Rev() int64 {
ws.mu.Lock()
defer ws.mu.Unlock()

s, ok := ws.watchable.(*watchableStore)
if !ok {
return 0
}
return s.Rev()
}

0 comments on commit dde6c6f

Please sign in to comment.