Skip to content

Commit

Permalink
upgrader monitoring and alerts (#28951)
Browse files Browse the repository at this point in the history
* add rate limit stream helper

* upgrader metrics & alert

* add docs for discovering upgrade enroll prospects

* update prehod protos

* Update docs/pages/management/operations/enroll-agent-into-automatic-updates.mdx

Co-authored-by: Paul Gottschling <paul.gottschling@goteleport.com>

---------

Co-authored-by: Paul Gottschling <paul.gottschling@goteleport.com>
  • Loading branch information
2 people authored and greedy52 committed Jul 19, 2023
1 parent e7b4a00 commit a7d42f4
Show file tree
Hide file tree
Showing 15 changed files with 1,479 additions and 608 deletions.
44 changes: 44 additions & 0 deletions api/internalutils/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,3 +325,47 @@ func Take[T any](stream Stream[T], n int) ([]T, bool) {
}
return items, true
}

type rateLimit[T any] struct {
inner Stream[T]
wait func() error
waitErr error
}

func (stream *rateLimit[T]) Next() bool {
stream.waitErr = stream.wait()
if stream.waitErr != nil {
return false
}

return stream.inner.Next()
}

func (stream *rateLimit[T]) Item() T {
return stream.inner.Item()
}

func (stream *rateLimit[T]) Done() error {
if err := stream.inner.Done(); err != nil {
return err
}

if trace.IsEOF(stream.waitErr) {
return nil
}

return stream.waitErr
}

// RateLimit applies a rate-limiting function to a stream s.t. calls to Next() block on
// the supplied function before calling the inner stream. If the function returns an
// error, the inner stream is not polled and Next() returns false. The wait function may
// return io.EOF to indicate a graceful/expected halting condition. Any other error value
// is treated as unexpected and will be bubbled up via Done() unless an error from the
// inner stream takes precedence.
func RateLimit[T any](stream Stream[T], wait func() error) Stream[T] {
return &rateLimit[T]{
inner: stream,
wait: wait,
}
}
199 changes: 199 additions & 0 deletions api/internalutils/stream/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@ limitations under the License.
package stream

import (
"errors"
"fmt"
"io"
"testing"
"time"

"github.com/stretchr/testify/require"
)

// TestSlice tests the slice stream.
func TestSlice(t *testing.T) {
t.Parallel()

// normal usage
s, err := Collect(Slice([]int{1, 2, 3}))
require.NoError(t, err)
Expand All @@ -44,6 +48,8 @@ func TestSlice(t *testing.T) {

// TestFilterMap tests the FilterMap combinator.
func TestFilterMap(t *testing.T) {
t.Parallel()

// normal usage
s, err := Collect(FilterMap(Slice([]int{1, 2, 3, 4}), func(i int) (string, bool) {
if i%2 == 0 {
Expand Down Expand Up @@ -83,6 +89,8 @@ func TestFilterMap(t *testing.T) {

// TestMapWhile tests the MapWhile combinator.
func TestMapWhile(t *testing.T) {
t.Parallel()

// normal usage
s, err := Collect(MapWhile(Slice([]int{1, 2, 3, 4}), func(i int) (string, bool) {
if i == 3 {
Expand Down Expand Up @@ -122,6 +130,8 @@ func TestMapWhile(t *testing.T) {

// TestFunc tests the Func stream.
func TestFunc(t *testing.T) {
t.Parallel()

// normal usage
var n int
s, err := Collect(Func(func() (int, error) {
Expand Down Expand Up @@ -172,6 +182,8 @@ func TestFunc(t *testing.T) {
}

func TestPageFunc(t *testing.T) {
t.Parallel()

// basic pages
var n int
s, err := Collect(PageFunc(func() ([]int, error) {
Expand Down Expand Up @@ -273,6 +285,8 @@ func TestPageFunc(t *testing.T) {

// TestEmpty tests the Empty/Fail stream.
func TestEmpty(t *testing.T) {
t.Parallel()

// empty case
s, err := Collect(Empty[int]())
require.NoError(t, err)
Expand All @@ -290,6 +304,8 @@ func TestEmpty(t *testing.T) {
}

func TestCollectPages(t *testing.T) {
t.Parallel()

tts := []struct {
pages [][]string
expect []string
Expand Down Expand Up @@ -405,3 +421,186 @@ func TestTake(t *testing.T) {
})
}
}

// TestRateLimitFailure verifies the expected failure conditions of the RateLimit helper.
func TestRateLimitFailure(t *testing.T) {
t.Parallel()

var limiterError = errors.New("limiter-error")
var streamError = errors.New("stream-error")

tts := []struct {
desc string
items int
stream error
limiter error
expect error
}{
{
desc: "simultaneous",
stream: streamError,
limiter: limiterError,
expect: streamError,
},
{
desc: "stream-only",
stream: streamError,
expect: streamError,
},
{
desc: "limiter-only",
limiter: limiterError,
expect: limiterError,
},
{
desc: "limiter-graceful",
limiter: io.EOF,
expect: nil,
},
}

for _, tt := range tts {
t.Run(tt.desc, func(t *testing.T) {
err := Drain(RateLimit(Fail[int](tt.stream), func() error { return tt.limiter }))
if tt.expect == nil {
require.NoError(t, err)
return
}

require.ErrorIs(t, err, tt.expect)
})
}
}

// TestRateLimit sets up a concurrent channel-based limiter and verifies its effect on a pool of workers consuming
// items from streams.
func TestRateLimit(t *testing.T) {
t.Parallel()

const workers = 16
const maxItemsPerWorker = 16
const tokens = 100
const burst = 10

lim := make(chan struct{}, burst)
done := make(chan struct{})

results := make(chan error, workers)

items := make(chan struct{}, tokens+1)

for i := 0; i < workers; i++ {
go func() {
stream := RateLimit(repeat("some-item", maxItemsPerWorker), func() error {
select {
case <-lim:
return nil
case <-done:
// make sure we still consume remaining tokens even if 'done' is closed (simplifies
// test logic by letting us close 'done' immediately after sending last token without
// worrying about racing).
select {
case <-lim:
return nil
default:
return io.EOF
}
}
})

for stream.Next() {
items <- struct{}{}
}

results <- stream.Done()
}()
}

// yielded tracks total number of tokens yielded on limiter channel
var yielded int

// do an initial fill of limiter channel
for i := 0; i < burst; i++ {
select {
case lim <- struct{}{}:
yielded++
default:
require.FailNow(t, "initial burst should never block")
}
}

var consumed int

// consume item receipt events
timeoutC := time.After(time.Second * 30)
for i := 0; i < burst; i++ {
select {
case <-items:
consumed++
case <-timeoutC:
require.FailNow(t, "timeout waiting for item")
}
}

// ensure no more items available
select {
case <-items:
require.FailNow(t, "received item without corresponding token yield")
default:
}

// yield the rest of the tokens
for yielded < tokens {
select {
case lim <- struct{}{}:
yielded++
case <-timeoutC:
require.FailNow(t, "timeout waiting to yield token")
}
}

// signal workers that they should exit once remaining tokens
// are consumed.
close(done)

// wait for all workers to finish
for i := 0; i < workers; i++ {
select {
case err := <-results:
require.NoError(t, err)
case <-timeoutC:
require.FailNow(t, "timeout waiting for worker to exit")
}
}

// consume the rest of the item events
ConsumeItems:
for {
select {
case <-items:
consumed++
default:
break ConsumeItems
}
}

// note that total number of processed items may actually vary since we are rate-limiting
// how frequently a stream is *polled*, not how frequently it yields an item. A stream being
// polled may result in us discovering that it is empty, in which case a limiter token is still
// consumed, but no item is yielded.
require.LessOrEqual(t, consumed, tokens)
require.GreaterOrEqual(t, consumed, tokens-workers)
}

// repeat repeats the same item N times
func repeat[T any](item T, count int) Stream[T] {
var n int
return Func(func() (T, error) {
n++
if n > count {
var zero T
return zero, io.EOF
}
return item, nil
})
}
13 changes: 13 additions & 0 deletions api/types/system_role.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ var localServiceMappings = map[SystemRole]struct{}{
RoleMDM: {},
}

// controlPlaneMapping is the subset of local services which are definitively control plane
// elements.
var controlPlaneMapping = map[SystemRole]struct{}{
RoleAuth: {},
RoleProxy: {},
}

// LocalServiceMappings returns the subset of role mappings which happen
// to be true Teleport services (e.g. db, kube, proxy, etc), excluding
// those which represent remote service (i.e. remoteproxy).
Expand Down Expand Up @@ -278,3 +285,9 @@ func (r *SystemRole) IsLocalService() bool {
_, ok := localServiceMappings[*r]
return ok
}

// IsControlPlane checks if the given system role is a control plane element (i.e. auth/proxy).
func (r *SystemRole) IsControlPlane() bool {
_, ok := controlPlaneMapping[*r]
return ok
}
2 changes: 2 additions & 0 deletions docs/cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@
"Tmkx",
"Toboth",
"Traefik",
"Upgrader",
"Upsert",
"Upserted",
"Upserts",
Expand Down Expand Up @@ -746,6 +747,7 @@
"updaterreleasechannel",
"updaterversionserver",
"updatespreadsheet",
"upgrader",
"uqcje",
"urandom",
"userdata",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,34 @@ updates.

## Enroll instructions

<Details
scope={["cloud"]}
opened
title="Finding Agents in Teleport Enterprise Cloud">

If you have a Teleport Enterprise Cloud account, you can find agents that need to be enrolled using `tctl inventory ls` like so:

```code
$ tctl inventory ls --upgrader=none
Server ID Hostname Services Version Upgrader
------------------------------------ ----------------- -------- ------- --------
4fb2d97d-884a-4566-b477-c805d477df09 agent.example.com Node v1.2.3 none
...
```

If you have a lot of agents on different versions and want to prioritize enrolling
your oldest agents, you can limit your search using the `--older-than` filter:

```code
$ tctl inventory ls --upgrader=none --older-than=v1.2.3
Server ID Hostname Services Version Upgrader
------------------------------------ --------------- -------- ------- --------
1e6578b6-9530-448e-8013-d32641324abb old.example.com Node v1.1.1 none
...
```

</Details>

<Tabs dropdownCaption="Cluster Type" dropdownSelected="Self-hosted">
<TabItem label="systemd" options="Self-hosted">

Expand Down

0 comments on commit a7d42f4

Please sign in to comment.