Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcdserver: fix incorrect metrics generated when clients cancel watches #12803

Merged
merged 1 commit into from
Apr 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 49 additions & 6 deletions etcdserver/api/v3rpc/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,8 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
return rpctypes.ErrGRPCNoLeader
}

cctx, cancel := context.WithCancel(ss.Context())
ss = serverStreamWithCtx{ctx: cctx, cancel: &cancel, ServerStream: ss}
ctx := newCancellableContext(ss.Context())
ss = serverStreamWithCtx{ctx: ctx, ServerStream: ss}

smap.mu.Lock()
smap.streams[ss] = struct{}{}
Expand All @@ -228,7 +228,8 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
smap.mu.Lock()
delete(smap.streams, ss)
smap.mu.Unlock()
cancel()
// TODO: investigate whether the reason for cancellation here is useful to know
ctx.Cancel(nil)
}()
}
}
Expand All @@ -237,10 +238,52 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
}
}

// cancellableContext wraps a context with new cancellable context that allows a
// specific cancellation error to be preserved and later retrieved using the
// Context.Err() function. This is so downstream context users can disambiguate
// the reason for the cancellation which could be from the client (for example)
// or from this interceptor code.
type cancellableContext struct {
context.Context

lock sync.RWMutex
cancel context.CancelFunc
cancelReason error
}

func newCancellableContext(parent context.Context) *cancellableContext {
ctx, cancel := context.WithCancel(parent)
return &cancellableContext{
Context: ctx,
cancel: cancel,
}
}

// Cancel stores the cancellation reason and then delegates to context.WithCancel
// against the parent context.
func (c *cancellableContext) Cancel(reason error) {
c.lock.Lock()
c.cancelReason = reason
c.lock.Unlock()
c.cancel()
}

// Err will return the preserved cancel reason error if present, and will
// otherwise return the underlying error from the parent context.
func (c *cancellableContext) Err() error {
c.lock.RLock()
defer c.lock.RUnlock()
if c.cancelReason != nil {
return c.cancelReason
}
return c.Context.Err()
}

type serverStreamWithCtx struct {
grpc.ServerStream
ctx context.Context
cancel *context.CancelFunc

// ctx is used so that we can preserve a reason for cancellation.
ctx *cancellableContext
}

func (ssc serverStreamWithCtx) Context() context.Context { return ssc.ctx }
Expand Down Expand Up @@ -272,7 +315,7 @@ func monitorLeader(s *etcdserver.EtcdServer) *streamsMap {
smap.mu.Lock()
for ss := range smap.streams {
if ssWithCtx, ok := ss.(serverStreamWithCtx); ok {
(*ssWithCtx.cancel)()
ssWithCtx.ctx.Cancel(rpctypes.ErrGRPCNoLeader)
<-ss.Context().Done()
}
}
Expand Down
2 changes: 2 additions & 0 deletions etcdserver/api/v3rpc/rpctypes/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ var (
ErrGRPCLeaseExist = status.New(codes.FailedPrecondition, "etcdserver: lease already exists").Err()
ErrGRPCLeaseTTLTooLarge = status.New(codes.OutOfRange, "etcdserver: too large lease TTL").Err()

ErrGRPCWatchCanceled = status.New(codes.Canceled, "etcdserver: watch canceled").Err()

ErrGRPCMemberExist = status.New(codes.FailedPrecondition, "etcdserver: member ID already exist").Err()
ErrGRPCPeerURLExist = status.New(codes.FailedPrecondition, "etcdserver: Peer URLs already exists").Err()
ErrGRPCMemberNotEnoughStarted = status.New(codes.FailedPrecondition, "etcdserver: re-configuration failed due to not enough started members").Err()
Expand Down
16 changes: 13 additions & 3 deletions etcdserver/api/v3rpc/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,15 +206,25 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
}
}()

// TODO: There's a race here. When a stream is closed (e.g. due to a cancellation),
// the underlying error (e.g. a gRPC stream error) may be returned and handled
// through errc if the recv goroutine finishes before the send goroutine.
// When the recv goroutine wins, the stream error is retained. When recv loses
// the race, the underlying error is lost (unless the root error is propagated
// through Context.Err() which is not always the case (as callers have to decide
// to implement a custom context to do so). The stdlib context package builtins
// may be insufficient to carry semantically useful errors around and should be
// revisited.
select {
case err = <-errc:
if err == context.Canceled {
err = rpctypes.ErrGRPCWatchCanceled
}
close(sws.ctrlStream)

case <-stream.Context().Done():
err = stream.Context().Err()
// the only server-side cancellation is noleader for now.
if err == context.Canceled {
err = rpctypes.ErrGRPCNoLeader
err = rpctypes.ErrGRPCWatchCanceled
}
}

Expand Down
5 changes: 4 additions & 1 deletion tests/e2e/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func metricsTest(cx ctlCtx) {
{"/metrics", fmt.Sprintf("etcd_mvcc_delete_total 3")},
{"/metrics", fmt.Sprintf(`etcd_server_version{server_version="%s"} 1`, version.Version)},
{"/metrics", fmt.Sprintf(`etcd_cluster_version{cluster_version="%s"} 1`, version.Cluster(version.Version))},
{"/metrics", fmt.Sprintf(`grpc_server_handled_total{grpc_code="Canceled",grpc_method="Watch",grpc_service="etcdserverpb.Watch",grpc_type="bidi_stream"} 6`)},
{"/health", `{"health":"true"}`},
} {
i++
Expand All @@ -58,7 +59,9 @@ func metricsTest(cx ctlCtx) {
if err := ctlV3Del(cx, []string{fmt.Sprintf("%d", i)}, 1); err != nil {
cx.t.Fatal(err)
}

if err := ctlV3Watch(cx, []string{"k", "--rev", "1"}, []kvExec{{key: "k", val: "v"}}...); err != nil {
cx.t.Fatal(err)
}
if err := cURLGet(cx.epc, cURLReq{endpoint: test.endpoint, expected: test.expected, metricsURLScheme: cx.cfg.metricsURLScheme}); err != nil {
cx.t.Fatalf("failed get with curl (%v)", err)
}
Expand Down