Skip to content

Commit

Permalink
config, handlers: Increase steady state and burst throttles for task …
Browse files Browse the repository at this point in the history
…endpoints

This fixes #1231
  • Loading branch information
sharanyad committed Feb 16, 2018
1 parent e64b019 commit 28a70f1
Show file tree
Hide file tree
Showing 10 changed files with 128 additions and 25 deletions.
28 changes: 28 additions & 0 deletions agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ const (

// pauseContainerTarball is the path to the pause container tarball
pauseContainerTarballPath = "/images/amazon-ecs-pause.tar"

DefaultRpsLimit = 40

DefaultRpsBurstLimit = 60
)

var (
Expand Down Expand Up @@ -347,6 +351,22 @@ func environmentConfig() (Config, error) {
containerMetadataEnabled := utils.ParseBool(os.Getenv("ECS_ENABLE_CONTAINER_METADATA"), false)
dataDirOnHost := os.Getenv("ECS_HOST_DATA_DIR")

var rpsLimit, rpsBurstLimit int
rpsLimitEnvVal := os.Getenv("ECS_TASK_METADATA_RPS_LIMIT")
rpsLimitSplits := strings.Split(rpsLimitEnvVal, ",")
if len(rpsLimitSplits) != 2 {
seelog.Warnf("Invalid format for \"ECS_TASK_METADATA_RPS_LIMIT\", expected: \"rateLimit,burst\"")
} else {
rpsLimitVal, err := strconv.Atoi(rpsLimitSplits[0])
rpsBurstLimitVal, err1 := strconv.Atoi(rpsLimitSplits[1])
if err != nil || err1 != nil {
seelog.Warn("Invalid format for \"ECS_TASK_METADATA_RPS_LIMIT\", expected integers of the form 'rateLimit,burst'")
} else {
rpsLimit = rpsLimitVal
rpsBurstLimit = rpsBurstLimitVal
}
}

if len(errs) > 0 {
err = utils.NewMultiError(errs...)
} else {
Expand Down Expand Up @@ -391,6 +411,8 @@ func environmentConfig() (Config, error) {
DataDirOnHost: dataDirOnHost,
OverrideAWSLogsExecutionRole: overrideAWSLogsExecutionRoleEnabled,
CgroupPath: cgroupPath,
RpsLimit: rpsLimit,
RpsBurstLimit: rpsBurstLimit,
}, err
}

Expand Down Expand Up @@ -557,6 +579,12 @@ func (cfg *Config) validateAndOverrideBounds() error {
cfg.NumImagesToDeletePerCycle = DefaultNumImagesToDeletePerCycle
}

if cfg.RpsLimit <= 0 || cfg.RpsBurstLimit <= 0 {
seelog.Warnf("Invalid values for rate limits, will be overridden with default values: %d,%d.", DefaultRpsLimit, DefaultRpsBurstLimit)
cfg.RpsLimit = DefaultRpsLimit
cfg.RpsBurstLimit = DefaultRpsBurstLimit
}

cfg.platformOverrides()

return nil
Expand Down
65 changes: 65 additions & 0 deletions agent/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func TestEnvironmentConfig(t *testing.T) {
defer setTestEnv("ECS_NUM_IMAGES_DELETE_PER_CYCLE", "2")()
defer setTestEnv("ECS_INSTANCE_ATTRIBUTES", "{\"my_attribute\": \"testing\"}")()
defer setTestEnv("ECS_ENABLE_TASK_ENI", "true")()
defer setTestEnv("ECS_TASK_METADATA_RPS_LIMIT", "1000,1100")()
additionalLocalRoutesJSON := `["1.2.3.4/22","5.6.7.8/32"]`
setTestEnv("ECS_AWSVPC_ADDITIONAL_LOCAL_ROUTES", additionalLocalRoutesJSON)
setTestEnv("ECS_ENABLE_CONTAINER_METADATA", "true")
Expand Down Expand Up @@ -115,6 +116,8 @@ func TestEnvironmentConfig(t *testing.T) {
assert.Equal(t, additionalLocalRoutesJSON, string(serializedAdditionalLocalRoutesJSON))
assert.Equal(t, "/etc/ecs/", conf.DataDirOnHost, "Wrong value for DataDirOnHost")
assert.True(t, conf.ContainerMetadataEnabled, "Wrong value for ContainerMetadataEnabled")
assert.Equal(t, 1000, conf.RpsLimit)
assert.Equal(t, 1100, conf.RpsBurstLimit)
}

func TestTrimWhitespaceWhenCreating(t *testing.T) {
Expand Down Expand Up @@ -356,6 +359,68 @@ func TestAWSLogsExecutionRole(t *testing.T) {
assert.True(t, conf.OverrideAWSLogsExecutionRole)
}

func TestTaskMetadataRPSLimits(t *testing.T) {
testCases := []struct {
name string
envVarVal string
expectedRps int
expectedRpsBurst int
}{
{
name: "negative limit values",
envVarVal: "-10,-10",
expectedRps: DefaultRpsLimit,
expectedRpsBurst: DefaultRpsBurstLimit,
},
{
name: "negative limit,valid burst",
envVarVal: "-10,10",
expectedRps: DefaultRpsLimit,
expectedRpsBurst: DefaultRpsBurstLimit,
},
{
name: "missing limit,valid burst",
envVarVal: " ,10",
expectedRps: DefaultRpsLimit,
expectedRpsBurst: DefaultRpsBurstLimit,
},
{
name: "valid limit,missing burst",
envVarVal: "10,",
expectedRps: DefaultRpsLimit,
expectedRpsBurst: DefaultRpsBurstLimit,
},
{
name: "empty variable",
envVarVal: "",
expectedRps: DefaultRpsLimit,
expectedRpsBurst: DefaultRpsBurstLimit,
},
{
name: "missing burst",
envVarVal: "10",
expectedRps: DefaultRpsLimit,
expectedRpsBurst: DefaultRpsBurstLimit,
},
{
name: "more than expected values",
envVarVal: "10,10,10",
expectedRps: DefaultRpsLimit,
expectedRpsBurst: DefaultRpsBurstLimit,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
defer setTestEnv("ECS_TASK_METADATA_RPS_LIMIT", tc.envVarVal)()
defer setTestRegion()()
cfg, err := NewConfig(ec2.NewBlackholeEC2MetadataClient())
assert.NoError(t, err)
assert.Equal(t, tc.expectedRps, cfg.RpsLimit)
assert.Equal(t, tc.expectedRpsBurst, cfg.RpsBurstLimit)
})
}
}

func setTestRegion() func() {
return setTestEnv("AWS_DEFAULT_REGION", "us-west-2")
}
Expand Down
2 changes: 2 additions & 0 deletions agent/config/config_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ func DefaultConfig() Config {
ContainerMetadataEnabled: false,
TaskCPUMemLimit: DefaultEnabled,
CgroupPath: defaultCgroupPath,
RpsLimit: DefaultRpsLimit,
RpsBurstLimit: DefaultRpsBurstLimit,
}
}

Expand Down
2 changes: 2 additions & 0 deletions agent/config/config_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ func TestConfigDefault(t *testing.T) {
assert.Equal(t, defaultCNIPluginsPath, cfg.CNIPluginsPath, "CNIPluginsPath default is set incorrectly")
assert.False(t, cfg.AWSVPCBlockInstanceMetdata, "AWSVPCBlockInstanceMetdata default is incorrectly set")
assert.Equal(t, "/var/lib/ecs", cfg.DataDirOnHost, "Default DataDirOnHost set incorrectly")
assert.Equal(t, DefaultRpsLimit, cfg.RpsLimit, "Default RpsLimit is set incorrectly")
assert.Equal(t, DefaultRpsBurstLimit, cfg.RpsBurstLimit, "Default RpsBurstLimit is set incorrectly")
}

// TestConfigFromFile tests the configuration can be read from file
Expand Down
2 changes: 2 additions & 0 deletions agent/config/config_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ func DefaultConfig() Config {
NumImagesToDeletePerCycle: DefaultNumImagesToDeletePerCycle,
ContainerMetadataEnabled: false,
TaskCPUMemLimit: ExplicitlyDisabled,
RpsLimit: DefaultRpsLimit,
RpsBurstLimit: DefaultRpsBurstLimit,
}
}

Expand Down
2 changes: 2 additions & 0 deletions agent/config/config_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ func TestConfigDefault(t *testing.T) {
assert.Equal(t, DefaultImageCleanupTimeInterval, cfg.ImageCleanupInterval, "ImageCleanupInterval default is set incorrectly")
assert.Equal(t, DefaultNumImagesToDeletePerCycle, cfg.NumImagesToDeletePerCycle, "NumImagesToDeletePerCycle default is set incorrectly")
assert.Equal(t, `C:\ProgramData\Amazon\ECS\data`, cfg.DataDirOnHost, "Default DataDirOnHost set incorrectly")
assert.Equal(t, DefaultRpsLimit, cfg.RpsLimit, "Default RpsLimit is set incorrectly")
assert.Equal(t, DefaultRpsBurstLimit, cfg.RpsBurstLimit, "Default RpsBurstLimit is set incorrectly")
}

func TestConfigIAMTaskRolesReserves80(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions agent/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,4 +198,10 @@ type Config struct {
// CgroupPath is the path expected by the agent, defaults to
// '/sys/fs/cgroup'
CgroupPath string

// RpsLimit specifies the steady state throttle for the task metadata endpoint
RpsLimit int

// RpsBurstLimit specifies the burst rate throttle for the task metadata endpoint
RpsBurstLimit int
}
26 changes: 7 additions & 19 deletions agent/handlers/taskmetadata/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,6 @@ const (
// Credentials API versions
apiVersion1 = 1
apiVersion2 = 2

// Rate limits for the metadata endpoint(s) and APIs by request ip addresses. Today, we
// support the following endpoints:
// 1. /v2/credentials: For serving IAM role credentials
// 2. /v2/metadata: For serving task and container metadata information (for "awsvpc" tasks)
// 3. /v2/stats: For serving task and container stats information (for "awsvpc" tasks)

// rateLimitPerSecond specifies the steady state throttle for the task metadata endpoint.
// Because all containers in a task share the same IP address in an "awsvpc" task, and a
// task can be constituted of up to 10 containers, the steady state rate is set at 10
// per second
rateLimitPerSecond = 10

// rateLimitBurstPerSecond specifies the burst rate throttle for the task metadata endpoint.
rateLimitBurstPerSecond = 15
)

// ServeHTTP serves IAM Role Credentials for Tasks being managed by the agent.
Expand All @@ -74,7 +59,8 @@ func ServeHTTP(credentialsManager credentials.Manager,

auditLogger := audit.NewAuditLog(containerInstanceArn, cfg, logger)

server := setupServer(credentialsManager, auditLogger, state, cfg.Cluster, statsEngine)
server := setupServer(credentialsManager, auditLogger, state, cfg.Cluster, statsEngine,
cfg.RpsLimit, cfg.RpsBurstLimit)

for {
utils.RetryWithBackoff(utils.NewSimpleBackoff(time.Second, time.Minute, 0.2, 2), func() error {
Expand All @@ -93,7 +79,9 @@ func setupServer(credentialsManager credentials.Manager,
auditLogger audit.AuditLogger,
state dockerstate.TaskEngineState,
cluster string,
statsEngine stats.Engine) *http.Server {
statsEngine stats.Engine,
rpsLimit int,
rpsBurstLimit int) *http.Server {
serverMux := http.NewServeMux()
// Credentials handlers
serverMux.HandleFunc(credentials.V1CredentialsPath,
Expand All @@ -109,8 +97,8 @@ func setupServer(credentialsManager credentials.Manager,
serverMux.HandleFunc(statsPath+"/", statsV2Handler(state, statsEngine))
serverMux.HandleFunc(statsPath, statsV2Handler(state, statsEngine))

limiter := tollbooth.NewLimiter(rateLimitPerSecond, nil)
limiter.SetBurst(rateLimitBurstPerSecond)
limiter := tollbooth.NewLimiter(int64(rpsLimit), nil)
limiter.SetBurst(rpsBurstLimit)
// Log all requests and then pass through to serverMux
loggingServeMux := http.NewServeMux()
loggingServeMux.Handle("/", tollbooth.LimitHandler(
Expand Down
7 changes: 5 additions & 2 deletions agent/handlers/taskmetadata/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net/url"
"testing"

"github.com/aws/amazon-ecs-agent/agent/config"
"github.com/aws/amazon-ecs-agent/agent/credentials"
mock_credentials "github.com/aws/amazon-ecs-agent/agent/credentials/mocks"
mock_audit "github.com/aws/amazon-ecs-agent/agent/logger/audit/mocks"
Expand Down Expand Up @@ -174,7 +175,8 @@ func testErrorResponsesFromServer(t *testing.T, path string, expectedErrorMessag

credentialsManager := mock_credentials.NewMockManager(ctrl)
auditLog := mock_audit.NewMockAuditLogger(ctrl)
server := setupServer(credentialsManager, auditLog, nil, "", nil)
server := setupServer(credentialsManager, auditLog, nil, "", nil, config.DefaultRpsLimit,
config.DefaultRpsBurstLimit)

recorder := httptest.NewRecorder()
req, _ := http.NewRequest("GET", path, nil)
Expand Down Expand Up @@ -207,7 +209,8 @@ func getResponseForCredentialsRequest(t *testing.T, expectedStatus int,
defer ctrl.Finish()
credentialsManager := mock_credentials.NewMockManager(ctrl)
auditLog := mock_audit.NewMockAuditLogger(ctrl)
server := setupServer(credentialsManager, auditLog, nil, "", nil)
server := setupServer(credentialsManager, auditLog, nil, "", nil, config.DefaultRpsLimit,
config.DefaultRpsBurstLimit)
recorder := httptest.NewRecorder()

creds, ok := getCredentials()
Expand Down
13 changes: 9 additions & 4 deletions agent/handlers/taskmetadata/taskinfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/aws/amazon-ecs-agent/agent/api"
"github.com/aws/amazon-ecs-agent/agent/config"
"github.com/aws/amazon-ecs-agent/agent/containermetadata"
"github.com/aws/amazon-ecs-agent/agent/credentials"
"github.com/aws/amazon-ecs-agent/agent/engine/dockerstate/mocks"
Expand Down Expand Up @@ -165,7 +166,8 @@ func TestTaskMetadata(t *testing.T) {
state.EXPECT().TaskByArn(taskARN).Return(task, true),
state.EXPECT().ContainerMapByArn(taskARN).Return(containerNameToDockerContainer, true),
)
server := setupServer(credentials.NewManager(), auditLog, state, clusterName, statsEngine)
server := setupServer(credentials.NewManager(), auditLog, state, clusterName, statsEngine,
config.DefaultRpsLimit, config.DefaultRpsBurstLimit)
recorder := httptest.NewRecorder()
req, _ := http.NewRequest("GET", metadataPath, nil)
req.RemoteAddr = remoteIP + ":" + remotePort
Expand All @@ -192,7 +194,8 @@ func TestContainerMetadata(t *testing.T) {
state.EXPECT().ContainerByID(containerID).Return(dockerContainer, true),
state.EXPECT().TaskByID(containerID).Return(task, true),
)
server := setupServer(credentials.NewManager(), auditLog, state, clusterName, statsEngine)
server := setupServer(credentials.NewManager(), auditLog, state, clusterName, statsEngine,
config.DefaultRpsLimit, config.DefaultRpsBurstLimit)
recorder := httptest.NewRecorder()
req, _ := http.NewRequest("GET", metadataPath+"/"+containerID, nil)
req.RemoteAddr = remoteIP + ":" + remotePort
Expand All @@ -219,7 +222,8 @@ func TestContainerStats(t *testing.T) {
state.EXPECT().GetTaskByIPAddress(remoteIP).Return(taskARN, true),
statsEngine.EXPECT().ContainerDockerStats(taskARN, containerID).Return(dockerStats, nil),
)
server := setupServer(credentials.NewManager(), auditLog, state, clusterName, statsEngine)
server := setupServer(credentials.NewManager(), auditLog, state, clusterName, statsEngine,
config.DefaultRpsLimit, config.DefaultRpsBurstLimit)
recorder := httptest.NewRecorder()
req, _ := http.NewRequest("GET", statsPath+"/"+containerID, nil)
req.RemoteAddr = remoteIP + ":" + remotePort
Expand Down Expand Up @@ -252,7 +256,8 @@ func TestTaskStats(t *testing.T) {
state.EXPECT().ContainerMapByArn(taskARN).Return(containerMap, true),
statsEngine.EXPECT().ContainerDockerStats(taskARN, containerID).Return(dockerStats, nil),
)
server := setupServer(credentials.NewManager(), auditLog, state, clusterName, statsEngine)
server := setupServer(credentials.NewManager(), auditLog, state, clusterName, statsEngine,
config.DefaultRpsLimit, config.DefaultRpsBurstLimit)
recorder := httptest.NewRecorder()
req, _ := http.NewRequest("GET", statsPath, nil)
req.RemoteAddr = remoteIP + ":" + remotePort
Expand Down

0 comments on commit 28a70f1

Please sign in to comment.