diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index 1579815e8a18..8428f3370066 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -16,6 +16,7 @@ package v3rpc import ( "io" + "time" "github.com/coreos/etcd/etcdserver" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" @@ -59,7 +60,11 @@ type serverWatchStream struct { gRPCStream pb.Watch_WatchServer watchStream storage.WatchStream - ctrlStream chan *pb.WatchResponse + // ctrlStream is a channel for sending control response like watcher created and canceled. + ctrlStream chan *pb.WatchResponse + // reportStream is a channel for sending progress report messages. + reportStream chan *pb.WatchResponse + reportDone chan struct{} // closec indicates the stream is closed. closec chan struct{} @@ -67,14 +72,15 @@ type serverWatchStream struct { func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error { sws := serverWatchStream{ - clusterID: ws.clusterID, - memberID: ws.memberID, - raftTimer: ws.raftTimer, - gRPCStream: stream, - watchStream: ws.watchable.NewWatchStream(), - // chan for sending control response like watcher created and canceled. - ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen), - closec: make(chan struct{}), + clusterID: ws.clusterID, + memberID: ws.memberID, + raftTimer: ws.raftTimer, + gRPCStream: stream, + watchStream: ws.watchable.NewWatchStream(), + ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen), + reportStream: make(chan *pb.WatchResponse), + reportDone: make(chan struct{}), + closec: make(chan struct{}), } defer sws.close() @@ -117,10 +123,30 @@ func (sws *serverWatchStream) recvLoop() error { id = sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev) } sws.ctrlStream <- &pb.WatchResponse{ - Header: sws.newResponseHeader(wsrev), - WatchId: int64(id), - Created: true, - Canceled: futureRev, + Header: sws.newResponseHeader(wsrev), + WatchId: int64(id), + Created: true, + Canceled: futureRev, + StartRevision: rev, + } + + if uv.CreateRequest.ProgressReport { + go func() { + for { + select { + case <-sws.reportDone: + close(sws.reportStream) + return + case <-time.After(time.Duration(uv.CreateRequest.ReportInterval) * time.Second): + sws.reportStream <- &pb.WatchResponse{ + Header: sws.newResponseHeader(sws.watchStream.Rev()), + WatchId: int64(id), + ProgressReport: uv.CreateRequest.ProgressReport, + StartRevision: rev, + } + } + } + }() } case *pb.WatchRequest_CancelRequest: if uv.CancelRequest != nil { @@ -143,7 +169,7 @@ func (sws *serverWatchStream) recvLoop() error { func (sws *serverWatchStream) sendLoop() { // watch ids that are currently active - ids := make(map[storage.WatchID]struct{}) + ids := make(map[storage.WatchID]int64) // watch responses pending on a watch id creation message pending := make(map[storage.WatchID][]*pb.WatchResponse) @@ -170,11 +196,13 @@ func (sws *serverWatchStream) sendLoop() { CompactRevision: wresp.CompactRevision, } - if _, hasId := ids[wresp.WatchID]; !hasId { + if rev, hasId := ids[wresp.WatchID]; !hasId { // buffer if id not yet announced wrs := append(pending[wresp.WatchID], wr) pending[wresp.WatchID] = wrs continue + } else { + wr.StartRevision = rev } storage.ReportEventReceived() @@ -199,7 +227,7 @@ func (sws *serverWatchStream) sendLoop() { } if c.Created { // flush buffered events - ids[wid] = struct{}{} + ids[wid] = c.StartRevision for _, v := range pending[wid] { storage.ReportEventReceived() if err := sws.gRPCStream.Send(v); err != nil { @@ -208,6 +236,15 @@ func (sws *serverWatchStream) sendLoop() { } delete(pending, wid) } + + case c, ok := <-sws.reportStream: + if !ok { + return + } + if err := sws.gRPCStream.Send(c); err != nil { + return + } + case <-sws.closec: // drain the chan to clean up pending events for range sws.watchStream.Chan() { @@ -226,6 +263,7 @@ func (sws *serverWatchStream) close() { sws.watchStream.Close() close(sws.closec) close(sws.ctrlStream) + close(sws.reportDone) } func (sws *serverWatchStream) newResponseHeader(rev int64) *pb.ResponseHeader { diff --git a/etcdserver/etcdserverpb/rpc.pb.go b/etcdserver/etcdserverpb/rpc.pb.go index b3fdeb1741e0..6ea52497e3d8 100644 --- a/etcdserver/etcdserverpb/rpc.pb.go +++ b/etcdserver/etcdserverpb/rpc.pb.go @@ -875,6 +875,10 @@ type WatchCreateRequest struct { RangeEnd []byte `protobuf:"bytes,2,opt,name=range_end,proto3" json:"range_end,omitempty"` // start_revision is an optional revision (including) to watch from. No start_revision is "now". StartRevision int64 `protobuf:"varint,3,opt,name=start_revision,proto3" json:"start_revision,omitempty"` + // ProgressReport 'true' subscribes to periodic watch status updates. + ProgressReport bool `protobuf:"varint,4,opt,name=progress_report,proto3" json:"progress_report,omitempty"` + // ReportInterval is progress report frequency in seconds. If 5, it notifies every 5 seconds. + ReportInterval int64 `protobuf:"varint,5,opt,name=report_interval,proto3" json:"report_interval,omitempty"` } func (m *WatchCreateRequest) Reset() { *m = WatchCreateRequest{} } @@ -909,8 +913,12 @@ type WatchResponse struct { // // Client should treat the watching as canceled and should not try to create any // watching with same start_revision again. - CompactRevision int64 `protobuf:"varint,5,opt,name=compact_revision,proto3" json:"compact_revision,omitempty"` - Events []*storagepb.Event `protobuf:"bytes,11,rep,name=events" json:"events,omitempty"` + CompactRevision int64 `protobuf:"varint,5,opt,name=compact_revision,proto3" json:"compact_revision,omitempty"` + // ProgressReport is 'true' if the response is the report (not watch event). + ProgressReport bool `protobuf:"varint,6,opt,name=progress_report,proto3" json:"progress_report,omitempty"` + // StartRevision is the watch start revision of the request. + StartRevision int64 `protobuf:"varint,7,opt,name=start_revision,proto3" json:"start_revision,omitempty"` + Events []*storagepb.Event `protobuf:"bytes,11,rep,name=events" json:"events,omitempty"` } func (m *WatchResponse) Reset() { *m = WatchResponse{} } @@ -2602,6 +2610,21 @@ func (m *WatchCreateRequest) MarshalTo(data []byte) (int, error) { i++ i = encodeVarintRpc(data, i, uint64(m.StartRevision)) } + if m.ProgressReport { + data[i] = 0x20 + i++ + if m.ProgressReport { + data[i] = 1 + } else { + data[i] = 0 + } + i++ + } + if m.ReportInterval != 0 { + data[i] = 0x28 + i++ + i = encodeVarintRpc(data, i, uint64(m.ReportInterval)) + } return i, nil } @@ -2683,6 +2706,21 @@ func (m *WatchResponse) MarshalTo(data []byte) (int, error) { i++ i = encodeVarintRpc(data, i, uint64(m.CompactRevision)) } + if m.ProgressReport { + data[i] = 0x30 + i++ + if m.ProgressReport { + data[i] = 1 + } else { + data[i] = 0 + } + i++ + } + if m.StartRevision != 0 { + data[i] = 0x38 + i++ + i = encodeVarintRpc(data, i, uint64(m.StartRevision)) + } if len(m.Events) > 0 { for _, msg := range m.Events { data[i] = 0x5a @@ -3602,6 +3640,12 @@ func (m *WatchCreateRequest) Size() (n int) { if m.StartRevision != 0 { n += 1 + sovRpc(uint64(m.StartRevision)) } + if m.ProgressReport { + n += 2 + } + if m.ReportInterval != 0 { + n += 1 + sovRpc(uint64(m.ReportInterval)) + } return n } @@ -3633,6 +3677,12 @@ func (m *WatchResponse) Size() (n int) { if m.CompactRevision != 0 { n += 1 + sovRpc(uint64(m.CompactRevision)) } + if m.ProgressReport { + n += 2 + } + if m.StartRevision != 0 { + n += 1 + sovRpc(uint64(m.StartRevision)) + } if len(m.Events) > 0 { for _, e := range m.Events { l = e.Size() @@ -6053,6 +6103,45 @@ func (m *WatchCreateRequest) Unmarshal(data []byte) error { break } } + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ProgressReport", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + v |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.ProgressReport = bool(v != 0) + case 5: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ReportInterval", wireType) + } + m.ReportInterval = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.ReportInterval |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipRpc(data[iNdEx:]) @@ -6283,6 +6372,45 @@ func (m *WatchResponse) Unmarshal(data []byte) error { break } } + case 6: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ProgressReport", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + v |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.ProgressReport = bool(v != 0) + case 7: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field StartRevision", wireType) + } + m.StartRevision = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.StartRevision |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } case 11: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Events", wireType) diff --git a/etcdserver/etcdserverpb/rpc.proto b/etcdserver/etcdserverpb/rpc.proto index f70152bba0ff..c6cfc608fded 100644 --- a/etcdserver/etcdserverpb/rpc.proto +++ b/etcdserver/etcdserverpb/rpc.proto @@ -267,6 +267,10 @@ message WatchCreateRequest { bytes range_end = 2; // start_revision is an optional revision (including) to watch from. No start_revision is "now". int64 start_revision = 3; + // ProgressReport 'true' subscribes to periodic watch status updates. + bool progress_report = 4; + // ReportInterval is progress report frequency in seconds. If 5, it notifies every 5 seconds. + int64 report_interval = 5; } message WatchCancelRequest { @@ -295,6 +299,11 @@ message WatchResponse { // watching with same start_revision again. int64 compact_revision = 5; + // ProgressReport is 'true' if the response is the report (not watch event). + bool progress_report = 6; + // StartRevision is the watch start revision of the request. + int64 start_revision = 7; + repeated storagepb.Event events = 11; }