Skip to content

Commit

Permalink
etcdserver: add ProgressReport to Watch API
Browse files Browse the repository at this point in the history
  • Loading branch information
gyuho committed Mar 1, 2016
1 parent f0dbd0b commit a6c5877
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 18 deletions.
70 changes: 54 additions & 16 deletions etcdserver/api/v3rpc/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package v3rpc

import (
"io"
"time"

"github.com/coreos/etcd/etcdserver"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
Expand Down Expand Up @@ -59,22 +60,27 @@ type serverWatchStream struct {

gRPCStream pb.Watch_WatchServer
watchStream storage.WatchStream
ctrlStream chan *pb.WatchResponse
// ctrlStream is a channel for sending control response like watcher created and canceled.
ctrlStream chan *pb.WatchResponse
// reportStream is a channel for sending progress report messages.
reportStream chan *pb.WatchResponse
reportDone chan struct{}

// closec indicates the stream is closed.
closec chan struct{}
}

func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error {
sws := serverWatchStream{
clusterID: ws.clusterID,
memberID: ws.memberID,
raftTimer: ws.raftTimer,
gRPCStream: stream,
watchStream: ws.watchable.NewWatchStream(),
// chan for sending control response like watcher created and canceled.
ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen),
closec: make(chan struct{}),
clusterID: ws.clusterID,
memberID: ws.memberID,
raftTimer: ws.raftTimer,
gRPCStream: stream,
watchStream: ws.watchable.NewWatchStream(),
ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen),
reportStream: make(chan *pb.WatchResponse),
reportDone: make(chan struct{}),
closec: make(chan struct{}),
}
defer sws.close()

Expand Down Expand Up @@ -117,10 +123,30 @@ func (sws *serverWatchStream) recvLoop() error {
id = sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev)
}
sws.ctrlStream <- &pb.WatchResponse{
Header: sws.newResponseHeader(wsrev),
WatchId: int64(id),
Created: true,
Canceled: futureRev,
Header: sws.newResponseHeader(wsrev),
WatchId: int64(id),
Created: true,
Canceled: futureRev,
StartRevision: rev,
}

if uv.CreateRequest.ProgressReport {
go func() {
for {
select {
case <-sws.reportDone:
close(sws.reportStream)
return
case <-time.After(time.Duration(uv.CreateRequest.ReportInterval) * time.Second):
sws.reportStream <- &pb.WatchResponse{
Header: sws.newResponseHeader(sws.watchStream.Rev()),
WatchId: int64(id),
ProgressReport: uv.CreateRequest.ProgressReport,
StartRevision: rev,
}
}
}
}()
}
case *pb.WatchRequest_CancelRequest:
if uv.CancelRequest != nil {
Expand All @@ -143,7 +169,7 @@ func (sws *serverWatchStream) recvLoop() error {

func (sws *serverWatchStream) sendLoop() {
// watch ids that are currently active
ids := make(map[storage.WatchID]struct{})
ids := make(map[storage.WatchID]int64)
// watch responses pending on a watch id creation message
pending := make(map[storage.WatchID][]*pb.WatchResponse)

Expand All @@ -170,11 +196,13 @@ func (sws *serverWatchStream) sendLoop() {
CompactRevision: wresp.CompactRevision,
}

if _, hasId := ids[wresp.WatchID]; !hasId {
if rev, hasId := ids[wresp.WatchID]; !hasId {
// buffer if id not yet announced
wrs := append(pending[wresp.WatchID], wr)
pending[wresp.WatchID] = wrs
continue
} else {
wr.StartRevision = rev
}

storage.ReportEventReceived()
Expand All @@ -199,7 +227,7 @@ func (sws *serverWatchStream) sendLoop() {
}
if c.Created {
// flush buffered events
ids[wid] = struct{}{}
ids[wid] = c.StartRevision
for _, v := range pending[wid] {
storage.ReportEventReceived()
if err := sws.gRPCStream.Send(v); err != nil {
Expand All @@ -208,6 +236,15 @@ func (sws *serverWatchStream) sendLoop() {
}
delete(pending, wid)
}

case c, ok := <-sws.reportStream:
if !ok {
return
}
if err := sws.gRPCStream.Send(c); err != nil {
return
}

case <-sws.closec:
// drain the chan to clean up pending events
for range sws.watchStream.Chan() {
Expand All @@ -226,6 +263,7 @@ func (sws *serverWatchStream) close() {
sws.watchStream.Close()
close(sws.closec)
close(sws.ctrlStream)
close(sws.reportDone)
}

func (sws *serverWatchStream) newResponseHeader(rev int64) *pb.ResponseHeader {
Expand Down
132 changes: 130 additions & 2 deletions etcdserver/etcdserverpb/rpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions etcdserver/etcdserverpb/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,10 @@ message WatchCreateRequest {
bytes range_end = 2;
// start_revision is an optional revision (including) to watch from. No start_revision is "now".
int64 start_revision = 3;
// ProgressReport 'true' subscribes to periodic watch status updates.
bool progress_report = 4;
// ReportInterval is progress report frequency in seconds. If 5, it notifies every 5 seconds.
int64 report_interval = 5;
}

message WatchCancelRequest {
Expand Down Expand Up @@ -295,6 +299,11 @@ message WatchResponse {
// watching with same start_revision again.
int64 compact_revision = 5;

// ProgressReport is 'true' if the response is the report (not watch event).
bool progress_report = 6;
// StartRevision is the watch start revision of the request.
int64 start_revision = 7;

repeated storagepb.Event events = 11;
}

Expand Down

0 comments on commit a6c5877

Please sign in to comment.