forked from etcd-io/etcd
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
etcdserver: add ProgressReport to Watch API
For etcd-io#4628.
- Loading branch information
Showing
3 changed files
with
203 additions
and
28 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,7 @@ package v3rpc | |
|
||
import ( | ||
"io" | ||
"time" | ||
|
||
"github.com/coreos/etcd/etcdserver" | ||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb" | ||
|
@@ -59,22 +60,27 @@ type serverWatchStream struct { | |
|
||
gRPCStream pb.Watch_WatchServer | ||
watchStream storage.WatchStream | ||
ctrlStream chan *pb.WatchResponse | ||
// ctrlStream is a channel for sending control response like watcher created and canceled. | ||
ctrlStream chan *pb.WatchResponse | ||
// reportStream is a channel for sending progress report messages. | ||
reportStream chan *pb.WatchResponse | ||
reportDone chan struct{} | ||
|
||
// closec indicates the stream is closed. | ||
closec chan struct{} | ||
} | ||
|
||
func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error { | ||
sws := serverWatchStream{ | ||
clusterID: ws.clusterID, | ||
memberID: ws.memberID, | ||
raftTimer: ws.raftTimer, | ||
gRPCStream: stream, | ||
watchStream: ws.watchable.NewWatchStream(), | ||
// chan for sending control response like watcher created and canceled. | ||
ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen), | ||
closec: make(chan struct{}), | ||
clusterID: ws.clusterID, | ||
memberID: ws.memberID, | ||
raftTimer: ws.raftTimer, | ||
gRPCStream: stream, | ||
watchStream: ws.watchable.NewWatchStream(), | ||
ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen), | ||
reportStream: make(chan *pb.WatchResponse), | ||
reportDone: make(chan struct{}), | ||
closec: make(chan struct{}), | ||
} | ||
defer sws.close() | ||
|
||
|
@@ -117,10 +123,30 @@ func (sws *serverWatchStream) recvLoop() error { | |
id = sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev) | ||
} | ||
sws.ctrlStream <- &pb.WatchResponse{ | ||
Header: sws.newResponseHeader(wsrev), | ||
WatchId: int64(id), | ||
Created: true, | ||
Canceled: futureRev, | ||
Header: sws.newResponseHeader(wsrev), | ||
WatchId: int64(id), | ||
Created: true, | ||
Canceled: futureRev, | ||
StartRevision: rev, | ||
} | ||
|
||
if uv.CreateRequest.ProgressReport { | ||
go func() { | ||
for { | ||
select { | ||
case <-sws.reportDone: | ||
close(sws.reportStream) | ||
return | ||
case <-time.After(time.Duration(uv.CreateRequest.ReportInterval) * time.Second): | ||
sws.reportStream <- &pb.WatchResponse{ | ||
Header: sws.newResponseHeader(sws.watchStream.Rev()), | ||
WatchId: int64(id), | ||
ProgressReport: uv.CreateRequest.ProgressReport, | ||
This comment has been minimized.
Sorry, something went wrong.
This comment has been minimized.
Sorry, something went wrong.
gyuho
via email
Author
Owner
|
||
StartRevision: rev, | ||
} | ||
} | ||
} | ||
}() | ||
} | ||
case *pb.WatchRequest_CancelRequest: | ||
if uv.CancelRequest != nil { | ||
|
@@ -143,7 +169,7 @@ func (sws *serverWatchStream) recvLoop() error { | |
|
||
func (sws *serverWatchStream) sendLoop() { | ||
// watch ids that are currently active | ||
ids := make(map[storage.WatchID]struct{}) | ||
ids := make(map[storage.WatchID]int64) | ||
// watch responses pending on a watch id creation message | ||
pending := make(map[storage.WatchID][]*pb.WatchResponse) | ||
|
||
|
@@ -170,11 +196,13 @@ func (sws *serverWatchStream) sendLoop() { | |
CompactRevision: wresp.CompactRevision, | ||
} | ||
|
||
if _, hasId := ids[wresp.WatchID]; !hasId { | ||
if rev, hasId := ids[wresp.WatchID]; !hasId { | ||
// buffer if id not yet announced | ||
wrs := append(pending[wresp.WatchID], wr) | ||
pending[wresp.WatchID] = wrs | ||
continue | ||
} else { | ||
wr.StartRevision = rev | ||
} | ||
|
||
storage.ReportEventReceived() | ||
|
@@ -199,7 +227,7 @@ func (sws *serverWatchStream) sendLoop() { | |
} | ||
if c.Created { | ||
// flush buffered events | ||
ids[wid] = struct{}{} | ||
ids[wid] = c.StartRevision | ||
for _, v := range pending[wid] { | ||
storage.ReportEventReceived() | ||
if err := sws.gRPCStream.Send(v); err != nil { | ||
|
@@ -208,6 +236,15 @@ func (sws *serverWatchStream) sendLoop() { | |
} | ||
delete(pending, wid) | ||
} | ||
|
||
case c, ok := <-sws.reportStream: | ||
if !ok { | ||
return | ||
} | ||
if err := sws.gRPCStream.Send(c); err != nil { | ||
return | ||
} | ||
|
||
case <-sws.closec: | ||
// drain the chan to clean up pending events | ||
for range sws.watchStream.Chan() { | ||
|
@@ -226,6 +263,7 @@ func (sws *serverWatchStream) close() { | |
sws.watchStream.Close() | ||
close(sws.closec) | ||
close(sws.ctrlStream) | ||
close(sws.reportDone) | ||
} | ||
|
||
func (sws *serverWatchStream) newResponseHeader(rev int64) *pb.ResponseHeader { | ||
|
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Instead of having a separate ProgressReport / StartRevision, can this use
CompactRevision
andCanceled=false
? As far as I can tell, a progress report is equivalent to a compaction that doesn't cancel the watcher.