Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions gccontrol/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ const (
waitForTrailers = 10 * time.Millisecond
// The arrival rate and the amount of resources to compute each request can vary
// a lot over time, keeping a long history might not improve the decision.
sampleSize = 5
unavailabilityPastSize = 5
sampleHistorySize = 5
unavailabilityHistorySize = 5
)

// ShedResponse the response of processing a single request from GCInterceptor.
Expand All @@ -29,13 +29,13 @@ type ShedResponse struct {
startTime time.Time
}

// New returns a new Interceptor instance.
// NewInterceptor returns a new Interceptor instance.
// Important to notice that runtime's GC will be switched off before the instance is created/returned.
func New() *Interceptor {
func NewInterceptor() *Interceptor {
debug.SetGCPercent(-1)
return &Interceptor{
sampler: newSampler(sampleSize),
estimator: newUnavailabilityEstimator(unavailabilityPastSize),
sampler: newSampler(sampleHistorySize),
estimator: newUnavailabilityEstimator(unavailabilityHistorySize),
heap: newHeap(),
}
}
Expand All @@ -46,8 +46,8 @@ func New() *Interceptor {
// This class is thread-safe. It is meant to be used as singleton in highly
// concurrent environment.
type Interceptor struct {
incoming int64 // Total number of incoming requests (monotonically increasing).
finished int64 // Total number of processed requests (monotonically increasing).
incoming int64 // Total number of incoming requests to process since last GC.
finished int64 // Total number of processed requests since last GC.
doingGC int32 // bool: 0 false | 1 true. Making it an int32 because of the atomic package.

sampler *sampler
Expand All @@ -58,8 +58,6 @@ type Interceptor struct {
// Before must be invoked before the request is processed by the service instance.
// It is strongly recommened that this is the first method called in the request processing chain.
func (i *Interceptor) Before() ShedResponse {
atomic.AddInt64(&i.incoming, 1)

// The service is unavailable.
if atomic.LoadInt32(&i.doingGC) == 1 {
return i.shed()
Expand Down Expand Up @@ -89,13 +87,18 @@ func (i *Interceptor) Before() ShedResponse {
i.heap.collect()
i.estimator.gcFinished(time.Now().Sub(gcStart))

// Zeroing counters.
atomic.StoreInt64(&i.incoming, 0)
atomic.StoreInt64(&i.finished, 0)

// Finishing unavailability period.
atomic.StoreInt32(&i.doingGC, 0)
fmt.Printf("End %+v -- %+v -- %+v\n\n", i.sampler, i.heap, i.estimator)
}()
return i.shed()
}
}
atomic.AddInt64(&i.incoming, 1)
return dontShed
}

Expand All @@ -108,8 +111,8 @@ func (i *Interceptor) shed() ShedResponse {
// After must be called before the response is set to the client.
// It is strongly recommened that this is the last method called in the request processing chain.
func (i *Interceptor) After(r ShedResponse) {
atomic.AddInt64(&i.finished, 1)
if !r.ShouldShed {
atomic.AddInt64(&i.finished, 1)
i.estimator.requestFinished(time.Now().Sub(r.startTime))
}
}
51 changes: 51 additions & 0 deletions gccontrol/interceptor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package gccontrol

import (
"testing"
"time"

"github.com/matryer/is"
)

func TestInterceptor(t *testing.T) {
is := is.New(t)
rt := &fakeRT{1}
i := Interceptor{
heap: &heap{st: 2, rt: rt, past: []uint64{0, 0, 0}},
sampler: newSampler(3),
estimator: newUnavailabilityEstimator(3),
}

sr := i.Before()
is.True(!sr.ShouldShed) // Shedding threshold is 2 and alloc is 1.
i.After(sr)
is.Equal(int64(1), i.incoming)
is.Equal(int64(1), i.finished)

for j := int64(0); j < i.sampler.get()-1; j++ {
sr := i.Before()
is.True(!sr.ShouldShed) // It is not time to check.
is.True(sr.startTime != time.Time{}) // When not shedding the request start time must be set.
i.After(sr)
}

rt.alloc = 3
sr = i.Before()
is.True(sr.ShouldShed) // Shedding threshold is 2 and alloc is 3.

sr1 := i.Before()
is.True(sr1.ShouldShed) // The server is already unavailable.

i.After(sr)
i.After(sr1)

for {
if i.doingGC == 0 {
break
}
time.Sleep(10 * time.Millisecond)
}

is.Equal(int64(0), i.incoming)
is.Equal(int64(0), i.finished)
}
2 changes: 1 addition & 1 deletion httphandler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
// GCI returns the GCI HTTP handler, which controls Go's GC to decrease service tail latency.
// Ideally, GCI handler should be the first middleware in the service process chain.
func GCI(next http.Handler) http.Handler {
gci := gccontrol.New()
gci := gccontrol.NewInterceptor()
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
sr := gci.Before()
defer gci.After(sr)
Expand Down