Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to specify a cpu pressure usage value #609

Merged
merged 2 commits into from
Oct 27, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions api/v1beta1/cpu_pressure.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,28 @@

package v1beta1

import (
"github.com/hashicorp/go-multierror"
"k8s.io/apimachinery/pkg/util/intstr"
)

// CPUPressureSpec represents a cpu pressure disruption
type CPUPressureSpec struct {
Count *intstr.IntOrString `json:"count,omitempty"` // number of cores to target, either an integer form or a percentage form appended with a %
}

// Validate validates args for the given disruption
func (s *CPUPressureSpec) Validate() error {
return nil
func (s *CPUPressureSpec) Validate() (retErr error) {
if s.Count == nil {
return nil
}

// Rule: count must be valid
if err := ValidateCount(s.Count); err != nil {
retErr = multierror.Append(retErr, err)
}

return retErr
}

// GenerateArgs generates injection or cleanup pod arguments for the given spec
Expand All @@ -20,5 +35,9 @@ func (s *CPUPressureSpec) GenerateArgs() []string {
"cpu-pressure",
}

if s.Count != nil {
args = append(args, []string{"--count", s.Count.String()}...)
}

return args
}
7 changes: 6 additions & 1 deletion api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions chart/templates/crds/chaos.datadoghq.com_disruptions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ spec:
cpuPressure:
description: CPUPressureSpec represents a cpu pressure disruption
nullable: true
properties:
count:
anyOf:
- type: integer
- type: string
x-kubernetes-int-or-string: true
type: object
diskPressure:
description: DiskPressureSpec represents a disk pressure disruption
Expand Down
14 changes: 12 additions & 2 deletions cli/injector/cpu_pressure.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,23 @@ import (
"github.com/DataDog/chaos-controller/injector"
"github.com/DataDog/chaos-controller/stress"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/util/intstr"
)

var cpuPressureCmd = &cobra.Command{
Use: "cpu-pressure",
Short: "CPU pressure subcommands",
Run: injectAndWait,
PreRun: func(cmd *cobra.Command, args []string) {
countStr, _ := cmd.Flags().GetString("count")
count := intstr.FromString(countStr)

// prepare spec
spec := v1beta1.CPUPressureSpec{}
stresserManager := stress.NewCPUStresserManager(log)
spec := v1beta1.CPUPressureSpec{
Count: &count,
}

stresserManager := stress.NewCPUStresserManager(log)
// create injector
for _, config := range configs {
injector, _ := injector.NewCPUPressureInjector(spec, injector.CPUPressureInjectorConfig{
Expand All @@ -31,3 +37,7 @@ var cpuPressureCmd = &cobra.Command{
}
},
}

func init() {
cpuPressureCmd.Flags().String("count", "", "number of cores to target, either an integer form or a percentage form appended with a %")
}
4 changes: 3 additions & 1 deletion docs/cpu_pressure.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ The `/sys/fs/cgroup` directory of the host must be mounted in the injector pod a
When the injector pod starts:

- It parses the `cpuset.cpus` file (located in the target `cpuset` cgroup) to retrieve cores allocated to the target processes.
- It creates one goroutine per allocated core. Each goroutine is locked on the thread they are running on. By doing so, it forces the Go runtime scheduler to create one thread per locked goroutine.
- It calculates the number of cores to apply pressure, by taking user input `Count`.
- It then creates one goroutine per target core.
- Each goroutine is locked on the thread they are running on. By doing so, it forces the Go runtime scheduler to create one thread per locked goroutine.
- Each goroutine joins the target `cpu` and `cpuset` cgroups.
- Joining the `cpuset` cgroup is important to both have the same number of allocated cores as the target as well as the same allocated cores so we ensure that the goroutines threads will be scheduled on the same cores as the target processes
- Each goroutine renices itself to the highest priority (`-20`) so the Linux scheduler will always give it the priority to consume CPU time over other running processes
Expand Down
3 changes: 2 additions & 1 deletion examples/cpu_pressure.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ spec:
selector:
app: demo-curl
count: 1
cpuPressure: {} # consume as much CPU as possible to generate load
cpuPressure:
count: 50%
2 changes: 1 addition & 1 deletion injector/cpu_pressure.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (i *cpuPressureInjector) GetDisruptionKind() types.DisruptionKindName {
}

func (i *cpuPressureInjector) Inject() error {
cores, err := i.config.StresserManager.TrackInjectorCores(i.config.Cgroup)
cores, err := i.config.StresserManager.TrackInjectorCores(i.config.Cgroup, i.spec.Count)

if err != nil {
return fmt.Errorf("failed to parse CPUSet %w", err)
Expand Down
61 changes: 41 additions & 20 deletions injector/cpu_pressure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/stretchr/testify/mock"
"k8s.io/apimachinery/pkg/util/intstr"
)

var _ = Describe("Failure", func() {
Expand Down Expand Up @@ -51,7 +52,6 @@ var _ = Describe("Failure", func() {
manager.On("ThreadID").Return(666)

stresserManager = &stress.StresserManagerMock{}
stresserManager.On("TrackInjectorCores", mock.Anything).Return(cpuset.NewCPUSet(0, 1), nil)
stresserManager.On("TrackCoreAlreadyStressed", mock.Anything, mock.Anything).Return(nil)
stresserManager.On("StresserPIDs").Return(map[int]int{0: 666})
stresserManager.On("IsCoreAlreadyStressed", 0).Return(true)
Expand All @@ -73,7 +73,9 @@ var _ = Describe("Failure", func() {

// spec
spec = v1beta1.CPUPressureSpec{}
})

JustBeforeEach(func() {
var err error
inj, err = NewCPUPressureInjector(spec, config)
Expect(err).To(BeNil())
Expand All @@ -98,27 +100,46 @@ var _ = Describe("Failure", func() {
})

Describe("injection", func() {

It("should join the cpu and cpuset cgroups for the unstressed core", func() {
cgroupManager.AssertCalled(GinkgoT(), "Join", "cpu", 666, false)
cgroupManager.AssertCalled(GinkgoT(), "Join", "cpuset", 666, false)
cgroupManager.AssertNumberOfCalls(GinkgoT(), "Join", 2)
})

It("should prioritize the current process", func() {
manager.AssertCalled(GinkgoT(), "Prioritize")
})

It("should run the stress on one core", func() {
stresser.AssertNumberOfCalls(GinkgoT(), "Stress", 1)
})

It("should record core and StresserPID in StresserManager", func() {
stresserManager.AssertCalled(GinkgoT(), "TrackCoreAlreadyStressed", 1, 666)
Context("user request to stress all the cores", func() {
BeforeEach(func() {
stresserManager.On("TrackInjectorCores", mock.Anything, mock.Anything).Return(cpuset.NewCPUSet(0, 1), nil)
})

It("should join the cpu and cpuset cgroups for the unstressed core", func() {
cgroupManager.AssertCalled(GinkgoT(), "Join", "cpu", 666, false)
cgroupManager.AssertCalled(GinkgoT(), "Join", "cpuset", 666, false)
cgroupManager.AssertNumberOfCalls(GinkgoT(), "Join", 2)
})

It("should prioritize the current process", func() {
manager.AssertCalled(GinkgoT(), "Prioritize")
})

It("should run the stress on one core", func() {
stresser.AssertNumberOfCalls(GinkgoT(), "Stress", 1)
})

It("should record core and StresserPID in StresserManager", func() {
stresserManager.AssertCalled(GinkgoT(), "TrackCoreAlreadyStressed", 1, 666)
})

It("should skip a target core that was already stress", func() {
stresserManager.AssertNotCalled(GinkgoT(), "TrackCoreAlreadyStressed", 0, mock.Anything)
})
})

It("should skip a target core that was already stress", func() {
stresserManager.AssertNotCalled(GinkgoT(), "TrackCoreAlreadyStressed", 0, mock.Anything)
Context("user request to stress half of the cores", func() {
BeforeEach(func() {
userRequestCount := intstr.FromString("50%")
spec = v1beta1.CPUPressureSpec{
Count: &userRequestCount,
}
stresserManager.On("TrackInjectorCores", mock.Anything, &userRequestCount).Return(cpuset.NewCPUSet(0, 1), nil)
})

It("should call stresserManager track cores and get new core to apply pressure", func() {
// left empty for AfterEach
taihuynh167 marked this conversation as resolved.
Show resolved Hide resolved
})
})
})
})
13 changes: 9 additions & 4 deletions stress/cpuset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ import (
"github.com/DataDog/chaos-controller/cgroup"
"github.com/DataDog/chaos-controller/cpuset"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/intstr"
"runtime"
"sync"
)

type StresserManager interface {
TrackInjectorCores(cgroup cgroup.Manager) (cpuset.CPUSet, error)
TrackInjectorCores(cgroup cgroup.Manager, userRequestCount *intstr.IntOrString) (cpuset.CPUSet, error)
IsCoreAlreadyStressed(core int) bool
TrackCoreAlreadyStressed(core int, stresserPID int)
StresserPIDs() map[int]int
Expand Down Expand Up @@ -63,7 +64,7 @@ func (manager *cpuStressserManager) StresserPIDs() map[int]int {
return manager.stresserPIDPerCore
}

func (manager *cpuStressserManager) TrackInjectorCores(cgroup cgroup.Manager) (cpuset.CPUSet, error) {
func (manager *cpuStressserManager) TrackInjectorCores(cgroup cgroup.Manager, userRequestCount *intstr.IntOrString) (cpuset.CPUSet, error) {
// read cpuset allocated cores
manager.log.Infow("retrieving target cpuset allocated cores")

Expand All @@ -78,7 +79,11 @@ func (manager *cpuStressserManager) TrackInjectorCores(cgroup cgroup.Manager) (c
return cpuset.NewCPUSet(), fmt.Errorf("error parsing cpuset allocated cores: %w", err)
}

manager.setCoresToBeStressed(manager.coresToBeStressed.Union(cores))
coresToBeStressedCount, _ := intstr.GetScaledValueFromIntOrPercent(userRequestCount, cores.Size(), true)
coresToBeStressed := cpuset.NewCPUSet(cores.ToSlice()[:coresToBeStressedCount]...)

return cores, nil
manager.log.Infof("retrieved coresToBeStressed after applying userRequestCount %s: %s", userRequestCount.StrVal, coresToBeStressed)
manager.setCoresToBeStressed(manager.coresToBeStressed.Union(coresToBeStressed))

return coresToBeStressed, nil
}
5 changes: 3 additions & 2 deletions stress/cpuset_manager_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package stress
import (
"github.com/DataDog/chaos-controller/cgroup"
"github.com/stretchr/testify/mock"
"k8s.io/apimachinery/pkg/util/intstr"
)
import (
"github.com/DataDog/chaos-controller/cpuset"
Expand Down Expand Up @@ -35,7 +36,7 @@ func (f *StresserManagerMock) StresserPIDs() map[int]int {
}

//nolint:golint
func (f *StresserManagerMock) TrackInjectorCores(cgroup cgroup.Manager) (cpuset.CPUSet, error) {
args := f.Called(cgroup)
func (f *StresserManagerMock) TrackInjectorCores(cgroup cgroup.Manager, userRequestCount *intstr.IntOrString) (cpuset.CPUSet, error) {
args := f.Called(cgroup, userRequestCount)
return args.Get(0).(cpuset.CPUSet), args.Error(1)
}
95 changes: 95 additions & 0 deletions stress/cpuset_manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2022 Datadog, Inc.

package stress_test

import (
. "github.com/DataDog/chaos-controller/cgroup"
"github.com/DataDog/chaos-controller/cpuset"
. "github.com/DataDog/chaos-controller/stress"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/intstr"
)

var _ = Describe("StresserManager Test", func() {
var (
log *zap.SugaredLogger
stresserManager StresserManager
)

BeforeEach(func() {
z, _ := zap.NewDevelopment()
log = z.Sugar()
stresserManager = NewCPUStresserManager(log)
})

When("IsCoreAlreadyStress", func() {
Context("StresserManager is freshly initialized", func() {
It("does not have any core stressed", func() {
Expect(stresserManager.IsCoreAlreadyStressed(1)).To(BeFalse())
Expect(stresserManager.StresserPIDs()).To(BeEmpty())
})
})

Context("StresserManager previously tracked the same core", func() {
It("should return true for core already stressed", func() {
stresserManager.TrackCoreAlreadyStressed(0, 123)

Expect(stresserManager.IsCoreAlreadyStressed(0)).To(BeTrue())
})
})
})

When("TrackInjectorCores", func() {
var (
cgroup = cgroupManager()
)

Context("StresserManager is freshly initialized", func() {
It("should track injector cores and return new cores to apply stress", func() {
userRequestedCount := intstr.FromInt(2)
cores, err := stresserManager.TrackInjectorCores(cgroup, &userRequestedCount)

Expect(err).To(BeNil())
Expect(cores).To(Equal(cpuset.NewCPUSet(0, 1)))
})
})

Context("user request to target a specific number of cores", func() {
It("should discount core according to user request", func() {
userRequestedCount := intstr.FromInt(1)
cores, err := stresserManager.TrackInjectorCores(cgroup, &userRequestedCount)

Expect(err).To(BeNil())
Expect(cores).To(Equal(cpuset.NewCPUSet(0)))
})

It("should discount core, percentage round up, according to user request", func() {
userRequestedCount := intstr.FromString("30%")
cores, err := stresserManager.TrackInjectorCores(cgroup, &userRequestedCount)

Expect(err).To(BeNil())
Expect(cores).To(Equal(cpuset.NewCPUSet(0)))
})
})
})

When("calling TrackCoreAlreadyStressed", func() {
It("should record the stresser processID by coreID", func() {
stresserManager.TrackCoreAlreadyStressed(0, 123)

Expect(stresserManager.StresserPIDs()).To(HaveKeyWithValue(0, 123))
})
})
})

func cgroupManager() *ManagerMock {
cgroup := &ManagerMock{}
cgroup.On("Read", "cpuset", "cpuset.cpus").Return("0-1", nil)

return cgroup
}
17 changes: 17 additions & 0 deletions stress/stress_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2022 Datadog, Inc.

package stress_test

import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"testing"
)

func TestStress(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Stress Suite")
}