diff --git a/hedged.go b/hedged.go index 244dda6..e93d8b4 100644 --- a/hedged.go +++ b/hedged.go @@ -2,11 +2,10 @@ package hedgedgrpc import ( "context" + "errors" "fmt" - "net/http" "strings" "sync" - "sync/atomic" "time" "google.golang.org/grpc" @@ -14,21 +13,32 @@ import ( const infiniteTimeout = 30 * 24 * time.Hour // domain specific infinite -// UnaryClientInterceptor returns a new hedging unary client interceptor. -func UnaryClientInterceptor(timeout time.Duration, upto int) grpc.UnaryClientInterceptor { +// NewClient returns a new hedging unary client interceptor. +func NewClient(timeout time.Duration, upto int) (grpc.UnaryClientInterceptor, error) { + interceptor, _, err := NewClientWithStats(timeout, upto) + return interceptor, err +} + +// NewClientWithStats returns a new hedging unary client interceptor with stats. +func NewClientWithStats(timeout time.Duration, upto int) (grpc.UnaryClientInterceptor, *Stats, error) { switch { case timeout < 0: - panic("hedgedhttp: timeout cannot be negative") + return nil, nil, errors.New("timeout cannot be negative") case upto < 1: - panic("hedgedhttp: upto must be greater than 0") + return nil, nil, errors.New("upto must be greater than 0") + default: + return newHedgedGRPC(timeout, upto) } +} - return func(parentCtx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { +func newHedgedGRPC(timeout time.Duration, upto int) (grpc.UnaryClientInterceptor, *Stats, error) { + stats := &Stats{} + interceptor := func(parentCtx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { errOverall := &MultiError{} resultCh := make(chan indexedResp, upto) errorCh := make(chan error, upto) - // ht.metrics.requestedRoundTripsInc() + stats.requestedRoundTripsInc() resultIdx := -1 cancels := make([]func(), upto) @@ -36,7 +46,7 @@ func UnaryClientInterceptor(timeout time.Duration, upto int) grpc.UnaryClientInt defer runInPool(func() { for i, cancel := range cancels { if i != resultIdx && cancel != nil { - // ht.metrics.canceledSubRequestsInc() + stats.canceledSubRequestsInc() cancel() } } @@ -50,10 +60,10 @@ func UnaryClientInterceptor(timeout time.Duration, upto int) grpc.UnaryClientInt cancels[idx] = cancel runInPool(func() { - // ht.metrics.actualRoundTripsInc() + stats.actualRoundTripsInc() err := invoker(subCtx, method, req, reply, cc, opts...) if err != nil { - // ht.metrics.failedRoundTripsInc() + stats.failedRoundTripsInc() errorCh <- err } else { resultCh <- indexedResp{idx, reply} @@ -72,7 +82,7 @@ func UnaryClientInterceptor(timeout time.Duration, upto int) grpc.UnaryClientInt resultIdx = resp.Index return nil case parentCtx.Err() != nil: - // ht.metrics.canceledByUserRoundTripsInc() + stats.canceledByUserRoundTripsInc() return parentCtx.Err() case err != nil: errOverall.Errors = append(errOverall.Errors, err) @@ -82,6 +92,7 @@ func UnaryClientInterceptor(timeout time.Duration, upto int) grpc.UnaryClientInt // all request have returned errors return errOverall } + return interceptor, stats, nil } func waitResult(ctx context.Context, resultCh <-chan indexedResp, errorCh <-chan error, timeout time.Duration) (indexedResp, error) { @@ -125,81 +136,11 @@ func reqWithCtx(ctx context.Context, isHedged bool) (context.Context, context.Ca type hedgedRequest struct{} // IsHedgedRequest reports when a request is hedged. -func IsHedgedRequest(r *http.Request) bool { - val := r.Context().Value(hedgedRequest{}) +func IsHedgedRequest(ctx context.Context) bool { + val := ctx.Value(hedgedRequest{}) return val != nil } -// atomicCounter is a false sharing safe counter. -type atomicCounter struct { - count uint64 - _ [7]uint64 -} - -type cacheLine [64]byte - -// Stats object that can be queried to obtain certain metrics and get better observability. -type Stats struct { - _ cacheLine - requestedRoundTrips atomicCounter - actualRoundTrips atomicCounter - failedRoundTrips atomicCounter - canceledByUserRoundTrips atomicCounter - canceledSubRequests atomicCounter - _ cacheLine -} - -func (s *Stats) requestedRoundTripsInc() { atomic.AddUint64(&s.requestedRoundTrips.count, 1) } -func (s *Stats) actualRoundTripsInc() { atomic.AddUint64(&s.actualRoundTrips.count, 1) } -func (s *Stats) failedRoundTripsInc() { atomic.AddUint64(&s.failedRoundTrips.count, 1) } -func (s *Stats) canceledByUserRoundTripsInc() { atomic.AddUint64(&s.canceledByUserRoundTrips.count, 1) } -func (s *Stats) canceledSubRequestsInc() { atomic.AddUint64(&s.canceledSubRequests.count, 1) } - -// RequestedRoundTrips returns count of requests that were requested by client. -func (s *Stats) RequestedRoundTrips() uint64 { - return atomic.LoadUint64(&s.requestedRoundTrips.count) -} - -// ActualRoundTrips returns count of requests that were actually sent. -func (s *Stats) ActualRoundTrips() uint64 { - return atomic.LoadUint64(&s.actualRoundTrips.count) -} - -// FailedRoundTrips returns count of requests that failed. -func (s *Stats) FailedRoundTrips() uint64 { - return atomic.LoadUint64(&s.failedRoundTrips.count) -} - -// CanceledByUserRoundTrips returns count of requests that were canceled by user, using request context. -func (s *Stats) CanceledByUserRoundTrips() uint64 { - return atomic.LoadUint64(&s.canceledByUserRoundTrips.count) -} - -// CanceledSubRequests returns count of hedged sub-requests that were canceled by transport. -func (s *Stats) CanceledSubRequests() uint64 { - return atomic.LoadUint64(&s.canceledSubRequests.count) -} - -// StatsSnapshot is a snapshot of Stats. -type StatsSnapshot struct { - RequestedRoundTrips uint64 // count of requests that were requested by client - ActualRoundTrips uint64 // count of requests that were actually sent - FailedRoundTrips uint64 // count of requests that failed - CanceledByUserRoundTrips uint64 // count of requests that were canceled by user, using request context - CanceledSubRequests uint64 // count of hedged sub-requests that were canceled by transport -} - -// Snapshot of the stats. -func (s *Stats) Snapshot() StatsSnapshot { - return StatsSnapshot{ - RequestedRoundTrips: s.RequestedRoundTrips(), - ActualRoundTrips: s.ActualRoundTrips(), - FailedRoundTrips: s.FailedRoundTrips(), - CanceledByUserRoundTrips: s.CanceledByUserRoundTrips(), - CanceledSubRequests: s.CanceledSubRequests(), - } -} - var taskQueue = make(chan func()) func runInPool(task func()) { diff --git a/stats.go b/stats.go new file mode 100644 index 0000000..983c796 --- /dev/null +++ b/stats.go @@ -0,0 +1,75 @@ +package hedgedgrpc + +import ( + "sync/atomic" +) + +// atomicCounter is a false sharing safe counter. +type atomicCounter struct { + count uint64 + _ [7]uint64 +} + +type cacheLine [64]byte + +// Stats object that can be queried to obtain certain metrics and get better observability. +type Stats struct { + _ cacheLine + requestedRoundTrips atomicCounter + actualRoundTrips atomicCounter + failedRoundTrips atomicCounter + canceledByUserRoundTrips atomicCounter + canceledSubRequests atomicCounter + _ cacheLine +} + +func (s *Stats) requestedRoundTripsInc() { atomic.AddUint64(&s.requestedRoundTrips.count, 1) } +func (s *Stats) actualRoundTripsInc() { atomic.AddUint64(&s.actualRoundTrips.count, 1) } +func (s *Stats) failedRoundTripsInc() { atomic.AddUint64(&s.failedRoundTrips.count, 1) } +func (s *Stats) canceledByUserRoundTripsInc() { atomic.AddUint64(&s.canceledByUserRoundTrips.count, 1) } +func (s *Stats) canceledSubRequestsInc() { atomic.AddUint64(&s.canceledSubRequests.count, 1) } + +// RequestedRoundTrips returns count of requests that were requested by client. +func (s *Stats) RequestedRoundTrips() uint64 { + return atomic.LoadUint64(&s.requestedRoundTrips.count) +} + +// ActualRoundTrips returns count of requests that were actually sent. +func (s *Stats) ActualRoundTrips() uint64 { + return atomic.LoadUint64(&s.actualRoundTrips.count) +} + +// FailedRoundTrips returns count of requests that failed. +func (s *Stats) FailedRoundTrips() uint64 { + return atomic.LoadUint64(&s.failedRoundTrips.count) +} + +// CanceledByUserRoundTrips returns count of requests that were canceled by user, using request context. +func (s *Stats) CanceledByUserRoundTrips() uint64 { + return atomic.LoadUint64(&s.canceledByUserRoundTrips.count) +} + +// CanceledSubRequests returns count of hedged sub-requests that were canceled by transport. +func (s *Stats) CanceledSubRequests() uint64 { + return atomic.LoadUint64(&s.canceledSubRequests.count) +} + +// StatsSnapshot is a snapshot of Stats. +type StatsSnapshot struct { + RequestedRoundTrips uint64 // count of requests that were requested by client + ActualRoundTrips uint64 // count of requests that were actually sent + FailedRoundTrips uint64 // count of requests that failed + CanceledByUserRoundTrips uint64 // count of requests that were canceled by user, using request context + CanceledSubRequests uint64 // count of hedged sub-requests that were canceled by transport +} + +// Snapshot of the stats. +func (s *Stats) Snapshot() StatsSnapshot { + return StatsSnapshot{ + RequestedRoundTrips: s.RequestedRoundTrips(), + ActualRoundTrips: s.ActualRoundTrips(), + FailedRoundTrips: s.FailedRoundTrips(), + CanceledByUserRoundTrips: s.CanceledByUserRoundTrips(), + CanceledSubRequests: s.CanceledSubRequests(), + } +}