Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Heartbeat] Limit jobs by type via Env Vars #34307

Merged
merged 15 commits into from
Feb 5, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]


*Heartbeat*
- Users can now configure max scheduler job limits per monitor type via env var. {pull}34307[34307]


*Metricbeat*
Expand Down
4 changes: 2 additions & 2 deletions heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type Heartbeat struct {
done chan struct{}
stopOnce sync.Once
// config is used for iterating over elements of the config.
config config.Config
config *config.Config
scheduler *scheduler.Scheduler
monitorReloader *cfgfile.Reloader
monitorFactory *monitors.RunnerFactory
Expand All @@ -59,7 +59,7 @@ type Heartbeat struct {

// New creates a new heartbeat.
func New(b *beat.Beat, rawConfig *conf.C) (beat.Beater, error) {
parsedConfig := config.DefaultConfig
parsedConfig := config.DefaultConfig()
if err := rawConfig.Unpack(&parsedConfig); err != nil {
return nil, fmt.Errorf("error reading config file: %w", err)
}
Expand Down
36 changes: 30 additions & 6 deletions heartbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,15 @@
package config

import (
"fmt"
"os"
"strconv"
"strings"

"github.com/elastic/beats/v7/libbeat/autodiscover"
"github.com/elastic/beats/v7/libbeat/processors/util"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
)

type LocationWithID struct {
Expand Down Expand Up @@ -53,10 +59,28 @@ type Scheduler struct {
}

// DefaultConfig is the canonical instantiation of Config.
var DefaultConfig = Config{
Jobs: map[string]*JobLimit{
"browser": {
Limit: 2,
},
},
func DefaultConfig() *Config {
limits := map[string]*JobLimit{
"browser": {Limit: 2},
}

// Read the env key HEARTBEAT_LIMIT_{TYPE} for each type of monitor to set scaling limits
andrewvc marked this conversation as resolved.
Show resolved Hide resolved
// hard coded list of types to avoid cycles in current plugin system.
// TODO: refactor plugin system to DRY this up
for _, t := range []string{"http", "tcp", "icmp", "browser"} {
envKey := fmt.Sprintf("SYNTHETICS_LIMIT_%s", strings.ToUpper(t))
if limitStr := os.Getenv(envKey); limitStr != "" {
tLimitVal, err := strconv.ParseInt(limitStr, 10, 64)
if err != nil {
logp.L().Warnf("Could not parse job limit env var %s with value '%s' as integer", envKey, limitStr)
continue
}

limits[t] = &JobLimit{Limit: tLimitVal}
}
}

return &Config{
Jobs: limits,
}
}
82 changes: 79 additions & 3 deletions heartbeat/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,83 @@
// specific language governing permissions and limitations
// under the License.

//go:build !integration
// +build !integration

package config

import (
"os"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestDefaults(t *testing.T) {
cases := []struct {
Name string
EnvKey string
EnvVal string
LimitType string
LimitVal int64
}{
{
"Browser monitor override",
"SYNTHETICS_LIMIT_BROWSER",
"123",
"browser",
123,
},
{
"Browser default is 2 when other monitor is overriden",
"SYNTHETICS_LIMIT_HTTP",
"123",
"browser",
2,
},
{
"Browser default is 2 when nothing is overriden",
"FOO",
"bar",
"browser",
2,
},
{
"Browser default is 2 when bad value passed",
"SYNTHETICS_LIMIT_BROWSER",
"bar",
"browser",
2,
},
{
"HTTP monitor override",
"SYNTHETICS_LIMIT_HTTP",
"456",
"http",
456,
},
{
"TCP monitor override",
"SYNTHETICS_LIMIT_TCP",
"789",
"tcp",
789,
},
{
"ICMP monitor override",
"SYNTHETICS_LIMIT_ICMP",
"911",
"icmp",
911,
},
}

for _, c := range cases {
t.Run(c.Name, func(t *testing.T) {
os.Setenv(c.EnvKey, c.EnvVal)
defer os.Unsetenv(c.EnvKey)

dc := DefaultConfig()
require.NotNil(t, dc.Jobs[c.LimitType])
assert.Equal(t, dc.Jobs[c.LimitType].Limit, c.LimitVal)
})
}
}
2 changes: 2 additions & 0 deletions heartbeat/docs/heartbeat-scheduler.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,5 @@ heartbeat.jobs:

In the example, at any given time {beatname_uc} guarantees that only 10
concurrent `http` tasks and only 5 concurrent `browser` tasks will be active.

These limits can also be set via the environment variables `SYNTHETICS_LIMIT_{TYPE}`, where `{TYPE}` is one of `BROWSER`, `HTTP`, `TCP`, and `ICMP`.
24 changes: 12 additions & 12 deletions heartbeat/monitors/active/dialchain/dialers.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ import (
//
// The dialer will update the active events with:
//
// {
// "tcp": {
// "port": ...,
// "rtt": { "connect": { "us": ... }}
// }
// }
// {
// "tcp": {
// "port": ...,
// "rtt": { "connect": { "us": ... }}
// }
// }
func TCPDialer(to time.Duration) NetDialer {
return CreateNetDialer(to)
}
Expand All @@ -56,12 +56,12 @@ func TCPDialer(to time.Duration) NetDialer {
//
// The dialer will update the active events with:
//
// {
// "udp": {
// "port": ...,
// "rtt": { "connect": { "us": ... }}
// }
// }
// {
// "udp": {
// "port": ...,
// "rtt": { "connect": { "us": ... }}
// }
// }
func UDPDialer(to time.Duration) NetDialer {
return CreateNetDialer(to)
}
Expand Down
10 changes: 5 additions & 5 deletions heartbeat/monitors/active/dialchain/socks5.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ import (
//
// The layer will update the active event with:
//
// {
// "socks5": {
// "rtt": { "connect": { "us": ... }}
// }
// }
// {
// "socks5": {
// "rtt": { "connect": { "us": ... }}
// }
// }
func SOCKS5Layer(config *transport.ProxyConfig) Layer {
return func(event *beat.Event, next transport.Dialer) (transport.Dialer, error) {
var timer timer
Expand Down
4 changes: 3 additions & 1 deletion heartbeat/monitors/active/http/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,9 @@ func parseBody(b interface{}) (positiveMatch, negativeMatch []match.Matcher, err
return positiveMatch, negativeMatch, errBodyIllegalBody
}

/* checkBody accepts 2 check types:
/*
checkBody accepts 2 check types:

1. positive
2. negative
So, there are 4 kinds of scenarios:
Expand Down
10 changes: 5 additions & 5 deletions heartbeat/monitors/active/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func respondingHTTPBodyChecks(body string) validator.Validator {
func respondingHTTPHeaderChecks() validator.Validator {
return lookslike.MustCompile(map[string]interface{}{
"http.response.headers": map[string]interface{}{
"Date": isdef.IsString,
"Date": isdef.Optional(isdef.IsString),
"Content-Length": isdef.Optional(isdef.IsString),
"Content-Type": isdef.Optional(isdef.IsString),
"Location": isdef.Optional(isdef.IsString),
Expand Down Expand Up @@ -261,12 +261,12 @@ func TestUpStatuses(t *testing.T) {

testslike.Test(
t,
lookslike.Strict(lookslike.Compose(
lookslike.Compose(
hbtest.BaseChecks("127.0.0.1", "up", "http"),
hbtest.RespondingTCPChecks(),
hbtest.SummaryChecks(1, 0),
respondingHTTPChecks(server.URL, "text/plain; charset=utf-8", status),
)),
),
event.Fields,
)
})
Expand Down Expand Up @@ -687,7 +687,7 @@ func TestRedirect(t *testing.T) {

testslike.Test(
t,
lookslike.Strict(lookslike.Compose(
lookslike.Compose(
hbtest.BaseChecks("", "up", "http"),
hbtest.SummaryChecks(1, 0),
minimalRespondingHTTPChecks(testURL, "text/plain; charset=utf-8", 200),
Expand All @@ -701,7 +701,7 @@ func TestRedirect(t *testing.T) {
server.URL + redirectingPaths["/redirect_two"],
},
}),
)),
),
event.Fields,
)
}
Expand Down