Skip to content

Commit

Permalink
Simpler API
Browse files Browse the repository at this point in the history
  • Loading branch information
cristaloleg committed Dec 10, 2021
1 parent d859d10 commit 530f736
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 84 deletions.
109 changes: 25 additions & 84 deletions hedged.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,51 @@ package hedgedgrpc

import (
"context"
"errors"
"fmt"
"net/http"
"strings"
"sync"
"sync/atomic"
"time"

"google.golang.org/grpc"
)

const infiniteTimeout = 30 * 24 * time.Hour // domain specific infinite

// UnaryClientInterceptor returns a new hedging unary client interceptor.
func UnaryClientInterceptor(timeout time.Duration, upto int) grpc.UnaryClientInterceptor {
// NewClient returns a new hedging unary client interceptor.
func NewClient(timeout time.Duration, upto int) (grpc.UnaryClientInterceptor, error) {
interceptor, _, err := NewClientWithStats(timeout, upto)
return interceptor, err
}

// NewClientWithStats returns a new hedging unary client interceptor with stats.
func NewClientWithStats(timeout time.Duration, upto int) (grpc.UnaryClientInterceptor, *Stats, error) {
switch {
case timeout < 0:
panic("hedgedhttp: timeout cannot be negative")
return nil, nil, errors.New("timeout cannot be negative")
case upto < 1:
panic("hedgedhttp: upto must be greater than 0")
return nil, nil, errors.New("upto must be greater than 0")
default:
return newHedgedGRPC(timeout, upto)
}
}

return func(parentCtx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
func newHedgedGRPC(timeout time.Duration, upto int) (grpc.UnaryClientInterceptor, *Stats, error) {
stats := &Stats{}
interceptor := func(parentCtx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
errOverall := &MultiError{}
resultCh := make(chan indexedResp, upto)
errorCh := make(chan error, upto)

// ht.metrics.requestedRoundTripsInc()
stats.requestedRoundTripsInc()

resultIdx := -1
cancels := make([]func(), upto)

defer runInPool(func() {
for i, cancel := range cancels {
if i != resultIdx && cancel != nil {
// ht.metrics.canceledSubRequestsInc()
stats.canceledSubRequestsInc()
cancel()
}
}
Expand All @@ -50,10 +60,10 @@ func UnaryClientInterceptor(timeout time.Duration, upto int) grpc.UnaryClientInt
cancels[idx] = cancel

runInPool(func() {
// ht.metrics.actualRoundTripsInc()
stats.actualRoundTripsInc()
err := invoker(subCtx, method, req, reply, cc, opts...)
if err != nil {
// ht.metrics.failedRoundTripsInc()
stats.failedRoundTripsInc()
errorCh <- err
} else {
resultCh <- indexedResp{idx, reply}
Expand All @@ -72,7 +82,7 @@ func UnaryClientInterceptor(timeout time.Duration, upto int) grpc.UnaryClientInt
resultIdx = resp.Index
return nil
case parentCtx.Err() != nil:
// ht.metrics.canceledByUserRoundTripsInc()
stats.canceledByUserRoundTripsInc()
return parentCtx.Err()
case err != nil:
errOverall.Errors = append(errOverall.Errors, err)
Expand All @@ -82,6 +92,7 @@ func UnaryClientInterceptor(timeout time.Duration, upto int) grpc.UnaryClientInt
// all request have returned errors
return errOverall
}
return interceptor, stats, nil
}

func waitResult(ctx context.Context, resultCh <-chan indexedResp, errorCh <-chan error, timeout time.Duration) (indexedResp, error) {
Expand Down Expand Up @@ -125,81 +136,11 @@ func reqWithCtx(ctx context.Context, isHedged bool) (context.Context, context.Ca
type hedgedRequest struct{}

// IsHedgedRequest reports when a request is hedged.
func IsHedgedRequest(r *http.Request) bool {
val := r.Context().Value(hedgedRequest{})
func IsHedgedRequest(ctx context.Context) bool {
val := ctx.Value(hedgedRequest{})
return val != nil
}

// atomicCounter is a false sharing safe counter.
type atomicCounter struct {
count uint64
_ [7]uint64
}

type cacheLine [64]byte

// Stats object that can be queried to obtain certain metrics and get better observability.
type Stats struct {
_ cacheLine
requestedRoundTrips atomicCounter
actualRoundTrips atomicCounter
failedRoundTrips atomicCounter
canceledByUserRoundTrips atomicCounter
canceledSubRequests atomicCounter
_ cacheLine
}

func (s *Stats) requestedRoundTripsInc() { atomic.AddUint64(&s.requestedRoundTrips.count, 1) }
func (s *Stats) actualRoundTripsInc() { atomic.AddUint64(&s.actualRoundTrips.count, 1) }
func (s *Stats) failedRoundTripsInc() { atomic.AddUint64(&s.failedRoundTrips.count, 1) }
func (s *Stats) canceledByUserRoundTripsInc() { atomic.AddUint64(&s.canceledByUserRoundTrips.count, 1) }
func (s *Stats) canceledSubRequestsInc() { atomic.AddUint64(&s.canceledSubRequests.count, 1) }

// RequestedRoundTrips returns count of requests that were requested by client.
func (s *Stats) RequestedRoundTrips() uint64 {
return atomic.LoadUint64(&s.requestedRoundTrips.count)
}

// ActualRoundTrips returns count of requests that were actually sent.
func (s *Stats) ActualRoundTrips() uint64 {
return atomic.LoadUint64(&s.actualRoundTrips.count)
}

// FailedRoundTrips returns count of requests that failed.
func (s *Stats) FailedRoundTrips() uint64 {
return atomic.LoadUint64(&s.failedRoundTrips.count)
}

// CanceledByUserRoundTrips returns count of requests that were canceled by user, using request context.
func (s *Stats) CanceledByUserRoundTrips() uint64 {
return atomic.LoadUint64(&s.canceledByUserRoundTrips.count)
}

// CanceledSubRequests returns count of hedged sub-requests that were canceled by transport.
func (s *Stats) CanceledSubRequests() uint64 {
return atomic.LoadUint64(&s.canceledSubRequests.count)
}

// StatsSnapshot is a snapshot of Stats.
type StatsSnapshot struct {
RequestedRoundTrips uint64 // count of requests that were requested by client
ActualRoundTrips uint64 // count of requests that were actually sent
FailedRoundTrips uint64 // count of requests that failed
CanceledByUserRoundTrips uint64 // count of requests that were canceled by user, using request context
CanceledSubRequests uint64 // count of hedged sub-requests that were canceled by transport
}

// Snapshot of the stats.
func (s *Stats) Snapshot() StatsSnapshot {
return StatsSnapshot{
RequestedRoundTrips: s.RequestedRoundTrips(),
ActualRoundTrips: s.ActualRoundTrips(),
FailedRoundTrips: s.FailedRoundTrips(),
CanceledByUserRoundTrips: s.CanceledByUserRoundTrips(),
CanceledSubRequests: s.CanceledSubRequests(),
}
}

var taskQueue = make(chan func())

func runInPool(task func()) {
Expand Down
75 changes: 75 additions & 0 deletions stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package hedgedgrpc

import (
"sync/atomic"
)

// atomicCounter is a false sharing safe counter.
type atomicCounter struct {
count uint64
_ [7]uint64
}

type cacheLine [64]byte

// Stats object that can be queried to obtain certain metrics and get better observability.
type Stats struct {
_ cacheLine
requestedRoundTrips atomicCounter
actualRoundTrips atomicCounter
failedRoundTrips atomicCounter
canceledByUserRoundTrips atomicCounter
canceledSubRequests atomicCounter
_ cacheLine
}

func (s *Stats) requestedRoundTripsInc() { atomic.AddUint64(&s.requestedRoundTrips.count, 1) }
func (s *Stats) actualRoundTripsInc() { atomic.AddUint64(&s.actualRoundTrips.count, 1) }
func (s *Stats) failedRoundTripsInc() { atomic.AddUint64(&s.failedRoundTrips.count, 1) }
func (s *Stats) canceledByUserRoundTripsInc() { atomic.AddUint64(&s.canceledByUserRoundTrips.count, 1) }
func (s *Stats) canceledSubRequestsInc() { atomic.AddUint64(&s.canceledSubRequests.count, 1) }

// RequestedRoundTrips returns count of requests that were requested by client.
func (s *Stats) RequestedRoundTrips() uint64 {
return atomic.LoadUint64(&s.requestedRoundTrips.count)
}

// ActualRoundTrips returns count of requests that were actually sent.
func (s *Stats) ActualRoundTrips() uint64 {
return atomic.LoadUint64(&s.actualRoundTrips.count)
}

// FailedRoundTrips returns count of requests that failed.
func (s *Stats) FailedRoundTrips() uint64 {
return atomic.LoadUint64(&s.failedRoundTrips.count)
}

// CanceledByUserRoundTrips returns count of requests that were canceled by user, using request context.
func (s *Stats) CanceledByUserRoundTrips() uint64 {
return atomic.LoadUint64(&s.canceledByUserRoundTrips.count)
}

// CanceledSubRequests returns count of hedged sub-requests that were canceled by transport.
func (s *Stats) CanceledSubRequests() uint64 {
return atomic.LoadUint64(&s.canceledSubRequests.count)
}

// StatsSnapshot is a snapshot of Stats.
type StatsSnapshot struct {
RequestedRoundTrips uint64 // count of requests that were requested by client
ActualRoundTrips uint64 // count of requests that were actually sent
FailedRoundTrips uint64 // count of requests that failed
CanceledByUserRoundTrips uint64 // count of requests that were canceled by user, using request context
CanceledSubRequests uint64 // count of hedged sub-requests that were canceled by transport
}

// Snapshot of the stats.
func (s *Stats) Snapshot() StatsSnapshot {
return StatsSnapshot{
RequestedRoundTrips: s.RequestedRoundTrips(),
ActualRoundTrips: s.ActualRoundTrips(),
FailedRoundTrips: s.FailedRoundTrips(),
CanceledByUserRoundTrips: s.CanceledByUserRoundTrips(),
CanceledSubRequests: s.CanceledSubRequests(),
}
}

0 comments on commit 530f736

Please sign in to comment.