Skip to content

Commit

Permalink
upgrader monitoring and alerts (#28951) (#29206)
Browse files Browse the repository at this point in the history
* add rate limit stream helper

* upgrader metrics & alert

* add docs for discovering upgrade enroll prospects

* update prehod protos

* Update docs/pages/management/operations/enroll-agent-into-automatic-updates.mdx



---------

Co-authored-by: Paul Gottschling <paul.gottschling@goteleport.com>
  • Loading branch information
fspmarshall and ptgott committed Jul 17, 2023
1 parent 8a9bde5 commit 1c4bd92
Show file tree
Hide file tree
Showing 15 changed files with 1,551 additions and 609 deletions.
57 changes: 57 additions & 0 deletions api/internalutils/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,3 +312,60 @@ func PageFunc[T any](fn func() ([]T, error), doneFuncs ...func()) Stream[T] {
},
}
}

// Take takes the next n items from a stream. It returns a slice of the items
// and the result of the last call to stream.Next().
func Take[T any](stream Stream[T], n int) ([]T, bool) {
items := make([]T, 0, n)
for i := 0; i < n; i++ {
if !stream.Next() {
return items, false
}
items = append(items, stream.Item())
}
return items, true
}

type rateLimit[T any] struct {
inner Stream[T]
wait func() error
waitErr error
}

func (stream *rateLimit[T]) Next() bool {
stream.waitErr = stream.wait()
if stream.waitErr != nil {
return false
}

return stream.inner.Next()
}

func (stream *rateLimit[T]) Item() T {
return stream.inner.Item()
}

func (stream *rateLimit[T]) Done() error {
if err := stream.inner.Done(); err != nil {
return err
}

if trace.IsEOF(stream.waitErr) {
return nil
}

return stream.waitErr
}

// RateLimit applies a rate-limiting function to a stream s.t. calls to Next() block on
// the supplied function before calling the inner stream. If the function returns an
// error, the inner stream is not polled and Next() returns false. The wait function may
// return io.EOF to indicate a graceful/expected halting condition. Any other error value
// is treated as unexpected and will be bubbled up via Done() unless an error from the
// inner stream takes precedence.
func RateLimit[T any](stream Stream[T], wait func() error) Stream[T] {
return &rateLimit[T]{
inner: stream,
wait: wait,
}
}
256 changes: 256 additions & 0 deletions api/internalutils/stream/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@ limitations under the License.
package stream

import (
"errors"
"fmt"
"io"
"testing"
"time"

"github.com/stretchr/testify/require"
)

// TestSlice tests the slice stream.
func TestSlice(t *testing.T) {
t.Parallel()

// normal usage
s, err := Collect(Slice([]int{1, 2, 3}))
require.NoError(t, err)
Expand All @@ -44,6 +48,8 @@ func TestSlice(t *testing.T) {

// TestFilterMap tests the FilterMap combinator.
func TestFilterMap(t *testing.T) {
t.Parallel()

// normal usage
s, err := Collect(FilterMap(Slice([]int{1, 2, 3, 4}), func(i int) (string, bool) {
if i%2 == 0 {
Expand Down Expand Up @@ -83,6 +89,8 @@ func TestFilterMap(t *testing.T) {

// TestMapWhile tests the MapWhile combinator.
func TestMapWhile(t *testing.T) {
t.Parallel()

// normal usage
s, err := Collect(MapWhile(Slice([]int{1, 2, 3, 4}), func(i int) (string, bool) {
if i == 3 {
Expand Down Expand Up @@ -122,6 +130,8 @@ func TestMapWhile(t *testing.T) {

// TestFunc tests the Func stream.
func TestFunc(t *testing.T) {
t.Parallel()

// normal usage
var n int
s, err := Collect(Func(func() (int, error) {
Expand Down Expand Up @@ -172,6 +182,8 @@ func TestFunc(t *testing.T) {
}

func TestPageFunc(t *testing.T) {
t.Parallel()

// basic pages
var n int
s, err := Collect(PageFunc(func() ([]int, error) {
Expand Down Expand Up @@ -273,6 +285,8 @@ func TestPageFunc(t *testing.T) {

// TestEmpty tests the Empty/Fail stream.
func TestEmpty(t *testing.T) {
t.Parallel()

// empty case
s, err := Collect(Empty[int]())
require.NoError(t, err)
Expand All @@ -290,6 +304,8 @@ func TestEmpty(t *testing.T) {
}

func TestCollectPages(t *testing.T) {
t.Parallel()

tts := []struct {
pages [][]string
expect []string
Expand Down Expand Up @@ -348,3 +364,243 @@ func TestCollectPages(t *testing.T) {
})
}
}

func TestTake(t *testing.T) {
t.Parallel()

intSlice := func(n int) []int {
s := make([]int, 0, n)
for i := 0; i < n; i++ {
s = append(s, i)
}
return s
}

tests := []struct {
name string
input []int
n int
expectedOutput []int
expectMore bool
}{
{
name: "empty stream",
input: []int{},
n: 10,
expectedOutput: []int{},
expectMore: false,
},
{
name: "full stream",
input: intSlice(20),
n: 10,
expectedOutput: intSlice(10),
expectMore: true,
},
{
name: "drain stream of size n",
input: intSlice(10),
n: 10,
expectedOutput: intSlice(10),
expectMore: true,
},
{
name: "drain stream of size < n",
input: intSlice(5),
n: 10,
expectedOutput: intSlice(5),
expectMore: false,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
stream := Slice(tc.input)
output, more := Take(stream, tc.n)
require.Equal(t, tc.expectedOutput, output)
require.Equal(t, tc.expectMore, more)
})
}
}

// TestRateLimitFailure verifies the expected failure conditions of the RateLimit helper.
func TestRateLimitFailure(t *testing.T) {
t.Parallel()

var limiterError = errors.New("limiter-error")
var streamError = errors.New("stream-error")

tts := []struct {
desc string
items int
stream error
limiter error
expect error
}{
{
desc: "simultaneous",
stream: streamError,
limiter: limiterError,
expect: streamError,
},
{
desc: "stream-only",
stream: streamError,
expect: streamError,
},
{
desc: "limiter-only",
limiter: limiterError,
expect: limiterError,
},
{
desc: "limiter-graceful",
limiter: io.EOF,
expect: nil,
},
}

for _, tt := range tts {
t.Run(tt.desc, func(t *testing.T) {
err := Drain(RateLimit(Fail[int](tt.stream), func() error { return tt.limiter }))
if tt.expect == nil {
require.NoError(t, err)
return
}

require.ErrorIs(t, err, tt.expect)
})
}
}

// TestRateLimit sets up a concurrent channel-based limiter and verifies its effect on a pool of workers consuming
// items from streams.
func TestRateLimit(t *testing.T) {
t.Parallel()

const workers = 16
const maxItemsPerWorker = 16
const tokens = 100
const burst = 10

lim := make(chan struct{}, burst)
done := make(chan struct{})

results := make(chan error, workers)

items := make(chan struct{}, tokens+1)

for i := 0; i < workers; i++ {
go func() {
stream := RateLimit(repeat("some-item", maxItemsPerWorker), func() error {
select {
case <-lim:
return nil
case <-done:
// make sure we still consume remaining tokens even if 'done' is closed (simplifies
// test logic by letting us close 'done' immediately after sending last token without
// worrying about racing).
select {
case <-lim:
return nil
default:
return io.EOF
}
}
})

for stream.Next() {
items <- struct{}{}
}

results <- stream.Done()
}()
}

// yielded tracks total number of tokens yielded on limiter channel
var yielded int

// do an initial fill of limiter channel
for i := 0; i < burst; i++ {
select {
case lim <- struct{}{}:
yielded++
default:
require.FailNow(t, "initial burst should never block")
}
}

var consumed int

// consume item receipt events
timeoutC := time.After(time.Second * 30)
for i := 0; i < burst; i++ {
select {
case <-items:
consumed++
case <-timeoutC:
require.FailNow(t, "timeout waiting for item")
}
}

// ensure no more items available
select {
case <-items:
require.FailNow(t, "received item without corresponding token yield")
default:
}

// yield the rest of the tokens
for yielded < tokens {
select {
case lim <- struct{}{}:
yielded++
case <-timeoutC:
require.FailNow(t, "timeout waiting to yield token")
}
}

// signal workers that they should exit once remaining tokens
// are consumed.
close(done)

// wait for all workers to finish
for i := 0; i < workers; i++ {
select {
case err := <-results:
require.NoError(t, err)
case <-timeoutC:
require.FailNow(t, "timeout waiting for worker to exit")
}
}

// consume the rest of the item events
ConsumeItems:
for {
select {
case <-items:
consumed++
default:
break ConsumeItems
}
}

// note that total number of processed items may actually vary since we are rate-limiting
// how frequently a stream is *polled*, not how frequently it yields an item. A stream being
// polled may result in us discovering that it is empty, in which case a limiter token is still
// consumed, but no item is yielded.
require.LessOrEqual(t, consumed, tokens)
require.GreaterOrEqual(t, consumed, tokens-workers)
}

// repeat repeats the same item N times
func repeat[T any](item T, count int) Stream[T] {
var n int
return Func(func() (T, error) {
n++
if n > count {
var zero T
return zero, io.EOF
}
return item, nil
})
}

0 comments on commit 1c4bd92

Please sign in to comment.