Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upgrader monitoring and alerts #28951

Merged
merged 6 commits into from
Jul 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
44 changes: 44 additions & 0 deletions api/internalutils/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,3 +325,47 @@ func Take[T any](stream Stream[T], n int) ([]T, bool) {
}
return items, true
}

type rateLimit[T any] struct {
inner Stream[T]
wait func() error
waitErr error
}

func (stream *rateLimit[T]) Next() bool {
stream.waitErr = stream.wait()
if stream.waitErr != nil {
return false
}

return stream.inner.Next()
}

func (stream *rateLimit[T]) Item() T {
return stream.inner.Item()
}

func (stream *rateLimit[T]) Done() error {
if err := stream.inner.Done(); err != nil {
return err
}

if trace.IsEOF(stream.waitErr) {
return nil
}

return stream.waitErr
}

// RateLimit applies a rate-limiting function to a stream s.t. calls to Next() block on
// the supplied function before calling the inner stream. If the function returns an
// error, the inner stream is not polled and Next() returns false. The wait function may
// return io.EOF to indicate a graceful/expected halting condition. Any other error value
// is treated as unexpected and will be bubbled up via Done() unless an error from the
// inner stream takes precedence.
func RateLimit[T any](stream Stream[T], wait func() error) Stream[T] {
return &rateLimit[T]{
inner: stream,
wait: wait,
}
}
199 changes: 199 additions & 0 deletions api/internalutils/stream/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@ limitations under the License.
package stream

import (
"errors"
"fmt"
"io"
"testing"
"time"

"github.com/stretchr/testify/require"
)

// TestSlice tests the slice stream.
func TestSlice(t *testing.T) {
t.Parallel()

// normal usage
s, err := Collect(Slice([]int{1, 2, 3}))
require.NoError(t, err)
Expand All @@ -44,6 +48,8 @@ func TestSlice(t *testing.T) {

// TestFilterMap tests the FilterMap combinator.
func TestFilterMap(t *testing.T) {
t.Parallel()

// normal usage
s, err := Collect(FilterMap(Slice([]int{1, 2, 3, 4}), func(i int) (string, bool) {
if i%2 == 0 {
Expand Down Expand Up @@ -83,6 +89,8 @@ func TestFilterMap(t *testing.T) {

// TestMapWhile tests the MapWhile combinator.
func TestMapWhile(t *testing.T) {
t.Parallel()

// normal usage
s, err := Collect(MapWhile(Slice([]int{1, 2, 3, 4}), func(i int) (string, bool) {
if i == 3 {
Expand Down Expand Up @@ -122,6 +130,8 @@ func TestMapWhile(t *testing.T) {

// TestFunc tests the Func stream.
func TestFunc(t *testing.T) {
t.Parallel()

// normal usage
var n int
s, err := Collect(Func(func() (int, error) {
Expand Down Expand Up @@ -172,6 +182,8 @@ func TestFunc(t *testing.T) {
}

func TestPageFunc(t *testing.T) {
t.Parallel()

// basic pages
var n int
s, err := Collect(PageFunc(func() ([]int, error) {
Expand Down Expand Up @@ -273,6 +285,8 @@ func TestPageFunc(t *testing.T) {

// TestEmpty tests the Empty/Fail stream.
func TestEmpty(t *testing.T) {
t.Parallel()

// empty case
s, err := Collect(Empty[int]())
require.NoError(t, err)
Expand All @@ -290,6 +304,8 @@ func TestEmpty(t *testing.T) {
}

func TestCollectPages(t *testing.T) {
t.Parallel()

tts := []struct {
pages [][]string
expect []string
Expand Down Expand Up @@ -405,3 +421,186 @@ func TestTake(t *testing.T) {
})
}
}

// TestRateLimitFailure verifies the expected failure conditions of the RateLimit helper.
func TestRateLimitFailure(t *testing.T) {
t.Parallel()

var limiterError = errors.New("limiter-error")
var streamError = errors.New("stream-error")

tts := []struct {
desc string
items int
stream error
limiter error
expect error
}{
{
desc: "simultaneous",
stream: streamError,
limiter: limiterError,
expect: streamError,
},
{
desc: "stream-only",
stream: streamError,
expect: streamError,
},
{
desc: "limiter-only",
limiter: limiterError,
expect: limiterError,
},
{
desc: "limiter-graceful",
limiter: io.EOF,
expect: nil,
},
}

for _, tt := range tts {
t.Run(tt.desc, func(t *testing.T) {
err := Drain(RateLimit(Fail[int](tt.stream), func() error { return tt.limiter }))
if tt.expect == nil {
require.NoError(t, err)
return
}

require.ErrorIs(t, err, tt.expect)
})
}
}

// TestRateLimit sets up a concurrent channel-based limiter and verifies its effect on a pool of workers consuming
// items from streams.
func TestRateLimit(t *testing.T) {
t.Parallel()

const workers = 16
const maxItemsPerWorker = 16
const tokens = 100
const burst = 10

lim := make(chan struct{}, burst)
done := make(chan struct{})

results := make(chan error, workers)

items := make(chan struct{}, tokens+1)

for i := 0; i < workers; i++ {
go func() {
stream := RateLimit(repeat("some-item", maxItemsPerWorker), func() error {
select {
case <-lim:
return nil
case <-done:
// make sure we still consume remaining tokens even if 'done' is closed (simplifies
// test logic by letting us close 'done' immediately after sending last token without
// worrying about racing).
select {
case <-lim:
return nil
default:
return io.EOF
}
}
})

for stream.Next() {
items <- struct{}{}
}

results <- stream.Done()
}()
}

// yielded tracks total number of tokens yielded on limiter channel
var yielded int

// do an initial fill of limiter channel
for i := 0; i < burst; i++ {
select {
case lim <- struct{}{}:
yielded++
default:
require.FailNow(t, "initial burst should never block")
}
}

var consumed int

// consume item receipt events
timeoutC := time.After(time.Second * 30)
for i := 0; i < burst; i++ {
select {
case <-items:
consumed++
case <-timeoutC:
require.FailNow(t, "timeout waiting for item")
}
}

// ensure no more items available
select {
case <-items:
require.FailNow(t, "received item without corresponding token yield")
default:
}

// yield the rest of the tokens
for yielded < tokens {
select {
case lim <- struct{}{}:
yielded++
case <-timeoutC:
require.FailNow(t, "timeout waiting to yield token")
}
}

// signal workers that they should exit once remaining tokens
// are consumed.
close(done)

// wait for all workers to finish
for i := 0; i < workers; i++ {
select {
case err := <-results:
require.NoError(t, err)
case <-timeoutC:
require.FailNow(t, "timeout waiting for worker to exit")
}
}

// consume the rest of the item events
ConsumeItems:
for {
select {
case <-items:
consumed++
default:
break ConsumeItems
}
}

// note that total number of processed items may actually vary since we are rate-limiting
// how frequently a stream is *polled*, not how frequently it yields an item. A stream being
// polled may result in us discovering that it is empty, in which case a limiter token is still
// consumed, but no item is yielded.
require.LessOrEqual(t, consumed, tokens)
require.GreaterOrEqual(t, consumed, tokens-workers)
}

// repeat repeats the same item N times
func repeat[T any](item T, count int) Stream[T] {
var n int
return Func(func() (T, error) {
n++
if n > count {
var zero T
return zero, io.EOF
}
return item, nil
})
}
13 changes: 13 additions & 0 deletions api/types/system_role.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ var localServiceMappings = map[SystemRole]struct{}{
RoleMDM: {},
}

// controlPlaneMapping is the subset of local services which are definitively control plane
// elements.
var controlPlaneMapping = map[SystemRole]struct{}{
RoleAuth: {},
RoleProxy: {},
}

// LocalServiceMappings returns the subset of role mappings which happen
// to be true Teleport services (e.g. db, kube, proxy, etc), excluding
// those which represent remote service (i.e. remoteproxy).
Expand Down Expand Up @@ -278,3 +285,9 @@ func (r *SystemRole) IsLocalService() bool {
_, ok := localServiceMappings[*r]
return ok
}

// IsControlPlane checks if the given system role is a control plane element (i.e. auth/proxy).
func (r *SystemRole) IsControlPlane() bool {
_, ok := controlPlaneMapping[*r]
return ok
}
2 changes: 2 additions & 0 deletions docs/cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@
"Tmkx",
"Toboth",
"Traefik",
"Upgrader",
"Upsert",
"Upserted",
"Upserts",
Expand Down Expand Up @@ -746,6 +747,7 @@
"updaterreleasechannel",
"updaterversionserver",
"updatespreadsheet",
"upgrader",
"uqcje",
"urandom",
"userdata",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,34 @@ updates.

## Enroll instructions

<Details
scope={["cloud"]}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this only visible for the cloud scope?

It's also worth noting that most users tend not to use the scope switcher when they navigate between pages, so there's a good chance that users will be viewing the default (oss) scope and not see this Details box.

Finally, if this is a flow we expect most users to use, I would remove it from the Details box and make it part of the main body text.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feature is currently only available on cloud. This will eventually be able to be moved into the main docs, but we need to do some performance improvements first so that we can enable the feature by default. Until then, we have to scope this to cloud to avoid confusion.

We also suggest these commands in the alert so that users will be informed of this strategy even if they miss the docs.

opened
title="Finding Agents in Teleport Enterprise Cloud">

If you have a Teleport Enterprise Cloud account, you can find agents that need to be enrolled using `tctl inventory ls` like so:

```code
$ tctl inventory ls --upgrader=none
Server ID Hostname Services Version Upgrader
------------------------------------ ----------------- -------- ------- --------
4fb2d97d-884a-4566-b477-c805d477df09 agent.example.com Node v1.2.3 none
...
```

If you have a lot of agents on different versions and want to prioritize enrolling
your oldest agents, you can limit your search using the `--older-than` filter:

```code
$ tctl inventory ls --upgrader=none --older-than=v1.2.3
Server ID Hostname Services Version Upgrader
------------------------------------ --------------- -------- ------- --------
1e6578b6-9530-448e-8013-d32641324abb old.example.com Node v1.1.1 none
...
```

</Details>

<Tabs dropdownCaption="Cluster Type" dropdownSelected="Self-hosted">
<TabItem label="systemd" options="Self-hosted">

Expand Down