Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 49 additions & 21 deletions api/runnerpool/ch_placer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ import (
"context"
"time"

"github.com/fnproject/fn/api/common"
"github.com/fnproject/fn/api/models"

"github.com/dchest/siphash"
"github.com/fnproject/fn/api/common"
"github.com/sirupsen/logrus"
"go.opencensus.io/stats"
)

type chPlacer struct {
Expand All @@ -30,50 +31,77 @@ func NewCHPlacer() Placer {
// the LB_WAIT to drive placement decisions: runners only accept work if they have the capacity for it.
func (p *chPlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall) error {

tracker := newAttemptTracker(ctx)
log := common.Logger(ctx)

// The key is just the path in this case
key := call.Model().Path
sum64 := siphash.Hash(0, 0x4c617279426f6174, []byte(key))

OutTries:
for {
runners, err := rp.Runners(call)
if err != nil {
log.WithError(err).Error("Failed to find runners for call")
} else {
i := int(jumpConsistentHash(sum64, int32(len(runners))))
for j := 0; j < len(runners); j++ {

select {
case <-ctx.Done():
return models.ErrCallTimeoutServerBusy
default:
}
stats.Record(ctx, errorPoolCountMeasure.M(0))
tracker.finalizeAttempts(false)
return err
}

r := runners[i]
i := int(jumpConsistentHash(sum64, int32(len(runners))))
for j := 0; j < len(runners); j++ {
if ctx.Err() != nil {
break OutTries
}

tryCtx, tryCancel := context.WithCancel(ctx)
placed, err := r.TryExec(tryCtx, call)
tryCancel()
r := runners[i]

if err != nil && err != models.ErrCallTimeoutServerBusy {
log.WithError(err).Error("Failed during call placement")
}
if placed {
return err
tracker.recordAttempt()
tryCtx, tryCancel := context.WithCancel(ctx)
placed, err := r.TryExec(tryCtx, call)
tryCancel()

// Only log unusual (except for too-busy) errors
if err != nil && err != models.ErrCallTimeoutServerBusy {
log.WithError(err).Errorf("Failed during call placement, placed=%v", placed)
}

if placed {
if err != nil {
stats.Record(ctx, placedErrorCountMeasure.M(0))
} else {
stats.Record(ctx, placedOKCountMeasure.M(0))
}
tracker.finalizeAttempts(true)
return err
}

i = (i + 1) % len(runners)

i = (i + 1) % len(runners)
// Too Busy is super common case, we track it separately
if err == models.ErrCallTimeoutServerBusy {
stats.Record(ctx, retryTooBusyCountMeasure.M(0))
} else {
stats.Record(ctx, retryErrorCountMeasure.M(0))
}
}

if len(runners) == 0 {
stats.Record(ctx, emptyPoolCountMeasure.M(0))
}

// backoff
select {
case <-ctx.Done():
return models.ErrCallTimeoutServerBusy
break OutTries
case <-time.After(p.rrInterval):
}
}

// Cancel Exit Path / Client cancelled/timedout

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this code be conditional on the length of the runners list being greater than zero? Also worth noting: this changes the behavior of this function in the case that no runners are found. You'll now return ErrCallTimeoutServerBusy when previously it would have returned nil

stats.Record(ctx, cancelCountMeasure.M(0))
tracker.finalizeAttempts(false)
return models.ErrCallTimeoutServerBusy
}

// A Fast, Minimal Memory, Consistent Hash Algorithm:
Expand Down
67 changes: 48 additions & 19 deletions api/runnerpool/naive_placer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import (
"sync/atomic"
"time"

"github.com/fnproject/fn/api/common"
"github.com/fnproject/fn/api/models"

"github.com/fnproject/fn/api/common"
"github.com/sirupsen/logrus"
"go.opencensus.io/stats"
)

type naivePlacer struct {
Expand All @@ -27,41 +28,69 @@ func NewNaivePlacer() Placer {

func (sp *naivePlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall) error {

tracker := newAttemptTracker(ctx)
log := common.Logger(ctx)

OutTries:
for {
runners, err := rp.Runners(call)
if err != nil {
log.WithError(err).Error("Failed to find runners for call")
} else {
for j := 0; j < len(runners); j++ {
stats.Record(ctx, errorPoolCountMeasure.M(0))
tracker.finalizeAttempts(false)
return err
}

select {
case <-ctx.Done():
return models.ErrCallTimeoutServerBusy
default:
}
for j := 0; j < len(runners); j++ {
if ctx.Err() != nil {
break OutTries
}

i := atomic.AddUint64(&sp.rrIndex, uint64(1))
r := runners[int(i)%len(runners)]
i := atomic.AddUint64(&sp.rrIndex, uint64(1))
r := runners[int(i)%len(runners)]

tryCtx, tryCancel := context.WithCancel(ctx)
placed, err := r.TryExec(tryCtx, call)
tryCancel()
tracker.recordAttempt()
tryCtx, tryCancel := context.WithCancel(ctx)
placed, err := r.TryExec(tryCtx, call)
tryCancel()

if err != nil && err != models.ErrCallTimeoutServerBusy {
log.WithError(err).Error("Failed during call placement")
}
if placed {
return err
// Only log unusual (except for too-busy) errors
if err != nil && err != models.ErrCallTimeoutServerBusy {
log.WithError(err).Errorf("Failed during call placement, placed=%v", placed)
}

if placed {
if err != nil {
stats.Record(ctx, placedErrorCountMeasure.M(0))
} else {
stats.Record(ctx, placedOKCountMeasure.M(0))
}
tracker.finalizeAttempts(true)
return err
}

// Too Busy is super common case, we track it separately
if err == models.ErrCallTimeoutServerBusy {
stats.Record(ctx, retryTooBusyCountMeasure.M(0))
} else {
stats.Record(ctx, retryErrorCountMeasure.M(0))
}
}

if len(runners) == 0 {
stats.Record(ctx, emptyPoolCountMeasure.M(0))
}

// backoff
select {
case <-ctx.Done():
return models.ErrCallTimeoutServerBusy
break OutTries
case <-time.After(sp.rrInterval):
}
}

// Cancel Exit Path / Client cancelled/timedout
stats.Record(ctx, cancelCountMeasure.M(0))
tracker.finalizeAttempts(false)
return models.ErrCallTimeoutServerBusy
}
103 changes: 103 additions & 0 deletions api/runnerpool/placer_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package runnerpool

import (
"context"
"math"
"time"

"github.com/sirupsen/logrus"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
)

var (
attemptCountMeasure = stats.Int64("lb_placer_attempt_count", "LB Placer Number of Runners Attempted Count", "")
errorPoolCountMeasure = stats.Int64("lb_placer_rp_error_count", "LB Placer RunnerPool RunnerList Error Count", "")
emptyPoolCountMeasure = stats.Int64("lb_placer_rp_empty_count", "LB Placer RunnerPool RunnerList Empty Count", "")
cancelCountMeasure = stats.Int64("lb_placer_client_cancelled_count", "LB Placer Client Cancel Count", "")
placedErrorCountMeasure = stats.Int64("lb_placer_placed_error_count", "LB Placer Placed Call Count With Errors", "")
placedOKCountMeasure = stats.Int64("lb_placer_placed_ok_count", "LB Placer Placed Call Count Without Errors", "")
retryTooBusyCountMeasure = stats.Int64("lb_placer_retry_busy_count", "LB Placer Retry Count - Too Busy", "")
retryErrorCountMeasure = stats.Int64("lb_placer_retry_error_count", "LB Placer Retry Count - Errors", "")
placerLatencyMeasure = stats.Int64("lb_placer_latency", "LB Placer Latency", "msecs")
)

// Helper struct for tracking LB Placer latency and attempt counts
type attemptTracker struct {
ctx context.Context
startTime time.Time
lastAttemptTime time.Time
attemptCount int64
}

func newAttemptTracker(ctx context.Context) *attemptTracker {
return &attemptTracker{
ctx: ctx,
startTime: time.Now(),
}
}

func (data *attemptTracker) finalizeAttempts(isSuccess bool) {
stats.Record(data.ctx, attemptCountMeasure.M(data.attemptCount))

// IMPORTANT: here we use (lastAttemptTime - startTime). We want to exclude TryExec
// latency *if* TryExec() goes through with success. Placer latency metric only shows
// how much time are spending in Placer loop/retries. The metric includes rtt/latency of
// *all* unsuccessful NACK (retriable) responses from runners as well. For example, if
// Placer loop here retries 4 runners (which takes 5 msecs each) and then 5th runner
// succeeds (but takes 35 seconds to finish execution), we report 20 msecs as our LB
// latency.
endTime := data.lastAttemptTime
if !isSuccess {
endTime = time.Now()
}

stats.Record(data.ctx, placerLatencyMeasure.M(int64(endTime.Sub(data.startTime)/time.Millisecond)))
}

func (data *attemptTracker) recordAttempt() {
data.lastAttemptTime = time.Now()
if data.attemptCount != math.MaxInt64 {
data.attemptCount++
}
}

func makeKeys(names []string) []tag.Key {
var tagKeys []tag.Key
for _, name := range names {
key, err := tag.NewKey(name)
if err != nil {
logrus.WithError(err).Fatal("cannot create tag key for %v", name)
}
tagKeys = append(tagKeys, key)
}
return tagKeys
}

func createView(measure stats.Measure, agg *view.Aggregation, tagKeys []string) *view.View {
return &view.View{
Name: measure.Name(),
Description: measure.Description(),
TagKeys: makeKeys(tagKeys),
Measure: measure,
Aggregation: agg,
}
}

func RegisterPlacerViews(tagKeys []string) {
err := view.Register(
createView(attemptCountMeasure, view.Distribution(0, 1, 2, 4, 8, 32, 64, 256), tagKeys),
createView(errorPoolCountMeasure, view.Count(), tagKeys),
createView(emptyPoolCountMeasure, view.Count(), tagKeys),
createView(cancelCountMeasure, view.Count(), tagKeys),
createView(placedErrorCountMeasure, view.Count(), tagKeys),
createView(placedOKCountMeasure, view.Count(), tagKeys),
createView(retryTooBusyCountMeasure, view.Count(), tagKeys),
createView(retryErrorCountMeasure, view.Count(), tagKeys),
createView(placerLatencyMeasure, view.Distribution(1, 10, 25, 50, 200, 1000, 10000, 60000), tagKeys),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did something similar in api/server/stats.go

The idea was to register views only if the server is started with WithPrometheus option.
All the views are public, so one can register selected views if WithPrometheus is not set.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Showing the limits of my own knowledge with this question. Looked at the docs for view.Distribution() but it wasn't clear if this gives us multiple values per-bucket, or a single value? Can the upstream metrics collector re-sort this to generate better percentile values if we need?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Distribution seems to get multiple values in the buckets.

)
if err != nil {
logrus.WithError(err).Fatal("cannot create view")
}
}
1 change: 1 addition & 0 deletions api/runnerpool/runner_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Placer interface {

// RunnerPool is the abstraction for getting an ordered list of runners to try for a call
type RunnerPool interface {
// returns an error for unrecoverable errors that should not be retried
Runners(call RunnerCall) ([]Runner, error)
Shutdown(ctx context.Context) error
}
Expand Down
6 changes: 6 additions & 0 deletions api/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,12 @@ func WithAgentFromEnv() ServerOption {
placer = pool.NewNaivePlacer()
}

// If prometheus is enabled, add LB placer metrics to the views
if s.promExporter != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't think we should be doing this as a general rule of thumb -- which exporters we have attached shouldn't affect which metrics we collect (easy to fix...) -- if we need a way to turn these off we kinda have it half plumbed with the register views methods throughout now, seems like we could do similar to as was done in #1057 to have the default set of views and let the user either import the package that turns on all our views or they can add them individually themselves. anyway, not too big a deal

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdallman thanks for pointing this out, saw your PR and just merged it. :-)

keys := []string{"fn_appname", "fn_path"}
pool.RegisterPlacerViews(keys)
}

s.agent, err = agent.NewLBAgent(agent.NewCachedDataAccess(cl), runnerPool, placer)
if err != nil {
return errors.New("LBAgent creation failed")
Expand Down
9 changes: 8 additions & 1 deletion system_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ function remove_system_containers {

remove_system_containers

case "$1" in
DB_NAME=$1

case "$DB_NAME" in
"sqlite3" )
rm -fr /tmp/fn_system_tests.db
touch /tmp/fn_system_tests.db
Expand Down Expand Up @@ -45,6 +47,11 @@ export FN_MAX_REQUEST_SIZE=6291456
export FN_MAX_RESPONSE_SIZE=6291456
export FN_ENABLE_NB_RESOURCE_TRACKER=1

#
# dump prometheus metrics to this file
#
export SYSTEM_TEST_PROMETHEUS_FILE=./prometheus.${DB_NAME}.txt

cd test/fn-system-tests && FN_DB_URL=${FN_DB_URL} FN_API_URL=${FN_API_URL} go test -v -parallel ${2:-1} ./...; cd ../../

remove_system_containers
1 change: 1 addition & 0 deletions test/fn-system-tests/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
prometheus.*.txt
10 changes: 0 additions & 10 deletions test/fn-system-tests/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,6 @@ import (
sdkmodels "github.com/fnproject/fn_go/models"
)

func LB() (string, error) {
lbURL := "http://127.0.0.1:8081"

u, err := url.Parse(lbURL)
if err != nil {
return "", err
}
return u.Host, nil
}

func getEchoContent(respBytes []byte) (string, error) {

var respJs map[string]interface{}
Expand Down
Loading