Skip to content
This repository has been archived by the owner on Jan 10, 2023. It is now read-only.

Fix CPU bursting settings #146

Merged
merged 2 commits into from
Jun 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions executor/mock/jobrunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ type JobInput struct {
IgnoreLaunchGuard bool
// Batch sets batch mode on the task
Batch string
// CPUBursting sets the CPU bursting protobuf attribute
CPUBursting bool
// CPU sets the CPU count resource attribute
CPU *int64
// StopTimeoutSeconds is the duration we wait after SIGTERM for the container to exit
KillWaitSeconds uint32
}
Expand Down Expand Up @@ -211,7 +215,12 @@ func (jobRunner *JobRunner) StartJob(jobInput *JobInput) *JobRunResponse {
if id := jobInput.ImageDigest; id != "" {
ci.ImageDigest = protobuf.String(id)
}

ci.AllowCpuBursting = protobuf.Bool(jobInput.CPUBursting)
cpu := int64(1)
if jobInput.CPU != nil {
cpu = *jobInput.CPU
}
memMiB := int64(400)
diskMiB := uint64(100)

Expand Down
45 changes: 45 additions & 0 deletions executor/mock/standalone/standalone_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ func TestStandalone(t *testing.T) {
testNewEnvironmentLocationNegative,
testOldEnvironmentLocationPositive,
testOldEnvironmentLocationNegative,
testNoCPUBursting,
testCPUBursting,
testTwoCPUs,
}
for _, fun := range testFunctions {
fullName := runtime.FuncForPC(reflect.ValueOf(fun).Pointer()).Name()
Expand Down Expand Up @@ -671,3 +674,45 @@ func testOldEnvironmentLocationNegative(t *testing.T, jobID string) {
t.Fail()
}
}

func testNoCPUBursting(t *testing.T, jobID string) {
ji := &mock.JobInput{
ImageName: ubuntu.name,
Version: ubuntu.tag,
// Make sure quota is set
Entrypoint: `/bin/bash -c 'cat /sys/fs/cgroup/cpuacct/cpu.cfs_quota_us|grep -v - -1'`,
JobID: jobID,
}
if !mock.RunJobExpectingSuccess(ji) {
t.Fail()
}
}

func testCPUBursting(t *testing.T, jobID string) {
ji := &mock.JobInput{
ImageName: ubuntu.name,
Version: ubuntu.tag,
// Make sure quota is set
Entrypoint: `/bin/bash -c 'cat /sys/fs/cgroup/cpuacct/cpu.cfs_quota_us|grep - -1'`,
JobID: jobID,
CPUBursting: true,
}
if !mock.RunJobExpectingSuccess(ji) {
t.Fail()
}
}

func testTwoCPUs(t *testing.T, jobID string) {
var cpuCount int64 = 2
ji := &mock.JobInput{
ImageName: ubuntu.name,
Version: ubuntu.tag,
// Make sure quota is set
Entrypoint: `/bin/bash -c 'cat /sys/fs/cgroup/cpuacct/cpu.shares|grep 200'`,
JobID: jobID,
CPU: &cpuCount,
}
if !mock.RunJobExpectingSuccess(ji) {
t.Fail()
}
}
29 changes: 9 additions & 20 deletions executor/runtime/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
runtimeTypes "github.com/Netflix/titus-executor/executor/runtime/types"
"github.com/Netflix/titus-executor/nvidia"
vpcTypes "github.com/Netflix/titus-executor/vpc/types"
"github.com/aws/aws-sdk-go/aws/arn"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/events"
Expand Down Expand Up @@ -192,11 +191,6 @@ var (
envFileTemplate = template.Must(template.New("").Parse(envFileTemplateStr))
)

// ErrMissingIAMRole indicates that the Titus job was submitted without an IAM role
// This is a transition because previously the protobuf had this marked as an optional field
// and it's a temporary measure during protocol evolution.
var ErrMissingIAMRole = errors.New("IAM Role Missing")

// NoEntrypointError indicates that the Titus job does not have an entrypoint, or command
var NoEntrypointError = &runtimeTypes.BadEntryPointError{Reason: errors.New("Image, and job have no entrypoint, or command")}

Expand Down Expand Up @@ -335,7 +329,7 @@ func setupLoggingInfra(dockerRuntime *DockerRuntime) error {
}

func maybeSetCFSBandwidth(cfsBandwidthMode string, cfsBandwidthPeriod uint64, c *runtimeTypes.Container, hostCfg *container.HostConfig) {
cpuBurst := c.TitusInfo.GetAllowNetworkBursting()
cpuBurst := c.TitusInfo.GetAllowCpuBursting()
logEntry := log.WithField("taskID", c.TaskID).WithField("bandwidthMode", cfsBandwidthMode).WithField("cpuBurst", cpuBurst)

if cpuBurst {
Expand All @@ -344,6 +338,11 @@ func maybeSetCFSBandwidth(cfsBandwidthMode string, cfsBandwidthPeriod uint64, c
return
}

if c.Resources.CPU == 0 {
logEntry.WithField("shares", 1).Info("Setting shares as this is an opportunistic workload")
hostCfg.CPUShares = 1
}

switch cfsBandwidthMode {
case bandwidthMode:
setCFSBandwidth(logEntry, cfsBandwidthPeriod, c, hostCfg)
Expand Down Expand Up @@ -398,17 +397,11 @@ func (r *DockerRuntime) dockerConfig(c *runtimeTypes.Container, binds []string,
return nil, nil, err
}

if c.TitusInfo.IamProfile == nil || c.TitusInfo.GetIamProfile() == "" {
return nil, nil, ErrMissingIAMRole
}

_, err = arn.Parse(c.TitusInfo.GetIamProfile())
c.Env["TITUS_IAM_ROLE"], err = c.GetIamProfile()
if err != nil {
return nil, nil, err
}

c.Env["TITUS_IAM_ROLE"] = c.TitusInfo.GetIamProfile()

hostname := strings.ToLower(c.TaskID)
containerCfg := &container.Config{
Image: c.QualifiedImageName(),
Expand Down Expand Up @@ -496,12 +489,8 @@ func (r *DockerRuntime) dockerConfig(c *runtimeTypes.Container, binds []string,
hostCfg.NetworkMode = container.NetworkMode("none")
}

if c.TitusInfo.GetBatch() {
if c.TitusInfo.GetPassthroughAttributes()["titusParameter.agent.batchPriority"] == "idle" {
c.Env["TITUS_BATCH"] = "idle"
} else {
c.Env["TITUS_BATCH"] = "true"
}
if batch := c.GetBatch(); batch != nil {
c.Env["TITUS_BATCH"] = *batch
}

// This must got after all setup
Expand Down
40 changes: 40 additions & 0 deletions executor/runtime/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import "fmt"

import (
"context"
"errors"
"os/exec"
"path/filepath"
"sort"
Expand All @@ -15,8 +16,14 @@ import (
// The purpose of this is to tell gometalinter to keep vendoring this package
_ "github.com/Netflix/titus-api-definitions/src/main/proto/netflix/titus"
"github.com/Netflix/titus-executor/executor/dockershellparser"
"github.com/aws/aws-sdk-go/aws/arn"
)

// ErrMissingIAMRole indicates that the Titus job was submitted without an IAM role
// This is a transition because previously the protobuf had this marked as an optional field
// and it's a temporary measure during protocol evolution.
var ErrMissingIAMRole = errors.New("IAM Role Missing")

// RegistryImageNotFoundError represents an error where an image
// did not exist in the registry
type RegistryImageNotFoundError struct {
Expand Down Expand Up @@ -178,6 +185,39 @@ func (c *Container) GetSortedEnvArray() []string {

}

// GetIamProfile retrieves, and validates the format of the IAM profile
func (c *Container) GetIamProfile() (string, error) {
if c.TitusInfo.IamProfile == nil || c.TitusInfo.GetIamProfile() == "" {
return "", ErrMissingIAMRole
}
if _, err := arn.Parse(c.TitusInfo.GetIamProfile()); err != nil {
return "", err
}

return c.TitusInfo.GetIamProfile(), nil
}

// GetBatch returns what the environment variable TITUS_BATCH should be set to.
// if it returns nil, TITUS_BATCH should be unset
func (c *Container) GetBatch() *string {
idleStr := "idle"
trueStr := "true"

if c.Resources.CPU == 0 {
return &idleStr
}

if !c.TitusInfo.GetBatch() {
return nil
}

if c.TitusInfo.GetPassthroughAttributes()["titusParameter.agent.batchPriority"] == "idle" {
return &idleStr
}

return &trueStr
}

// Resources specify constraints to be applied to a Container
type Resources struct {
Mem int64 // in MiB
Expand Down