Skip to content

Commit

Permalink
Implement cpu_pressure count.
Browse files Browse the repository at this point in the history
  • Loading branch information
taihuynh167 committed Oct 26, 2022
1 parent 3ed89a0 commit 4adf3f1
Show file tree
Hide file tree
Showing 12 changed files with 216 additions and 34 deletions.
23 changes: 21 additions & 2 deletions api/v1beta1/cpu_pressure.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,28 @@

package v1beta1

import (
"github.com/hashicorp/go-multierror"
"k8s.io/apimachinery/pkg/util/intstr"
)

// CPUPressureSpec represents a cpu pressure disruption
type CPUPressureSpec struct {
Count *intstr.IntOrString `json:"count,omitempty"` // number of cores to target, either an integer form or a percentage form appended with a %
}

// Validate validates args for the given disruption
func (s *CPUPressureSpec) Validate() error {
return nil
func (s *CPUPressureSpec) Validate() (retErr error) {
if s.Count == nil {
return nil
}

// Rule: count must be valid
if err := ValidateCount(s.Count); err != nil {
retErr = multierror.Append(retErr, err)
}

return retErr
}

// GenerateArgs generates injection or cleanup pod arguments for the given spec
Expand All @@ -20,5 +35,9 @@ func (s *CPUPressureSpec) GenerateArgs() []string {
"cpu-pressure",
}

if s.Count != nil {
args = append(args, []string{"--count", s.Count.String()}...)
}

return args
}
7 changes: 6 additions & 1 deletion api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions chart/templates/crds/chaos.datadoghq.com_disruptions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ spec:
cpuPressure:
description: CPUPressureSpec represents a cpu pressure disruption
nullable: true
properties:
count:
anyOf:
- type: integer
- type: string
x-kubernetes-int-or-string: true
type: object
diskPressure:
description: DiskPressureSpec represents a disk pressure disruption
Expand Down
14 changes: 12 additions & 2 deletions cli/injector/cpu_pressure.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,23 @@ import (
"github.com/DataDog/chaos-controller/injector"
"github.com/DataDog/chaos-controller/stress"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/util/intstr"
)

var cpuPressureCmd = &cobra.Command{
Use: "cpu-pressure",
Short: "CPU pressure subcommands",
Run: injectAndWait,
PreRun: func(cmd *cobra.Command, args []string) {
countStr, _ := cmd.Flags().GetString("count")
count := intstr.FromString(countStr)

// prepare spec
spec := v1beta1.CPUPressureSpec{}
stresserManager := stress.NewCPUStresserManager(log)
spec := v1beta1.CPUPressureSpec{
Count: &count,
}

stresserManager := stress.NewCPUStresserManager(log)
// create injector
for _, config := range configs {
injector, _ := injector.NewCPUPressureInjector(spec, injector.CPUPressureInjectorConfig{
Expand All @@ -31,3 +37,7 @@ var cpuPressureCmd = &cobra.Command{
}
},
}

func init() {
cpuPressureCmd.Flags().String("count", "", "number of cores to target, either an integer form or a percentage form appended with a %")
}
4 changes: 3 additions & 1 deletion docs/cpu_pressure.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ The `/sys/fs/cgroup` directory of the host must be mounted in the injector pod a
When the injector pod starts:

- It parses the `cpuset.cpus` file (located in the target `cpuset` cgroup) to retrieve cores allocated to the target processes.
- It creates one goroutine per allocated core. Each goroutine is locked on the thread they are running on. By doing so, it forces the Go runtime scheduler to create one thread per locked goroutine.
- It calculates the number of cores to apply pressure, by taking user input `Count`.
- It then creates one goroutine per target core.
- Each goroutine is locked on the thread they are running on. By doing so, it forces the Go runtime scheduler to create one thread per locked goroutine.
- Each goroutine joins the target `cpu` and `cpuset` cgroups.
- Joining the `cpuset` cgroup is important to both have the same number of allocated cores as the target as well as the same allocated cores so we ensure that the goroutines threads will be scheduled on the same cores as the target processes
- Each goroutine renices itself to the highest priority (`-20`) so the Linux scheduler will always give it the priority to consume CPU time over other running processes
Expand Down
3 changes: 2 additions & 1 deletion examples/cpu_pressure.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ spec:
selector:
app: demo-curl
count: 1
cpuPressure: {} # consume as much CPU as possible to generate load
cpuPressure:
count: 50%
2 changes: 1 addition & 1 deletion injector/cpu_pressure.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (i *cpuPressureInjector) GetDisruptionKind() types.DisruptionKindName {
}

func (i *cpuPressureInjector) Inject() error {
cores, err := i.config.StresserManager.TrackInjectorCores(i.config.Cgroup)
cores, err := i.config.StresserManager.TrackInjectorCores(i.config.Cgroup, i.spec.Count)

if err != nil {
return fmt.Errorf("failed to parse CPUSet %w", err)
Expand Down
61 changes: 41 additions & 20 deletions injector/cpu_pressure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/stretchr/testify/mock"
"k8s.io/apimachinery/pkg/util/intstr"
)

var _ = Describe("Failure", func() {
Expand Down Expand Up @@ -51,7 +52,6 @@ var _ = Describe("Failure", func() {
manager.On("ThreadID").Return(666)

stresserManager = &stress.StresserManagerMock{}
stresserManager.On("TrackInjectorCores", mock.Anything).Return(cpuset.NewCPUSet(0, 1), nil)
stresserManager.On("TrackCoreAlreadyStressed", mock.Anything, mock.Anything).Return(nil)
stresserManager.On("StresserPIDs").Return(map[int]int{0: 666})
stresserManager.On("IsCoreAlreadyStressed", 0).Return(true)
Expand All @@ -73,7 +73,9 @@ var _ = Describe("Failure", func() {

// spec
spec = v1beta1.CPUPressureSpec{}
})

JustBeforeEach(func() {
var err error
inj, err = NewCPUPressureInjector(spec, config)
Expect(err).To(BeNil())
Expand All @@ -98,27 +100,46 @@ var _ = Describe("Failure", func() {
})

Describe("injection", func() {

It("should join the cpu and cpuset cgroups for the unstressed core", func() {
cgroupManager.AssertCalled(GinkgoT(), "Join", "cpu", 666, false)
cgroupManager.AssertCalled(GinkgoT(), "Join", "cpuset", 666, false)
cgroupManager.AssertNumberOfCalls(GinkgoT(), "Join", 2)
})

It("should prioritize the current process", func() {
manager.AssertCalled(GinkgoT(), "Prioritize")
})

It("should run the stress on one core", func() {
stresser.AssertNumberOfCalls(GinkgoT(), "Stress", 1)
})

It("should record core and StresserPID in StresserManager", func() {
stresserManager.AssertCalled(GinkgoT(), "TrackCoreAlreadyStressed", 1, 666)
Context("user request to stress all the cores", func() {
BeforeEach(func() {
stresserManager.On("TrackInjectorCores", mock.Anything, mock.Anything).Return(cpuset.NewCPUSet(0, 1), nil)
})

It("should join the cpu and cpuset cgroups for the unstressed core", func() {
cgroupManager.AssertCalled(GinkgoT(), "Join", "cpu", 666, false)
cgroupManager.AssertCalled(GinkgoT(), "Join", "cpuset", 666, false)
cgroupManager.AssertNumberOfCalls(GinkgoT(), "Join", 2)
})

It("should prioritize the current process", func() {
manager.AssertCalled(GinkgoT(), "Prioritize")
})

It("should run the stress on one core", func() {
stresser.AssertNumberOfCalls(GinkgoT(), "Stress", 1)
})

It("should record core and StresserPID in StresserManager", func() {
stresserManager.AssertCalled(GinkgoT(), "TrackCoreAlreadyStressed", 1, 666)
})

It("should skip a target core that was already stress", func() {
stresserManager.AssertNotCalled(GinkgoT(), "TrackCoreAlreadyStressed", 0, mock.Anything)
})
})

It("should skip a target core that was already stress", func() {
stresserManager.AssertNotCalled(GinkgoT(), "TrackCoreAlreadyStressed", 0, mock.Anything)
Context("user request to stress half of the cores", func() {
BeforeEach(func() {
userRequestCount := intstr.FromString("50%")
spec = v1beta1.CPUPressureSpec{
Count: &userRequestCount,
}
stresserManager.On("TrackInjectorCores", mock.Anything, &userRequestCount).Return(cpuset.NewCPUSet(0, 1), nil)
})

It("should call stresserManager track cores and get new core to apply pressure", func() {
// left empty for AfterEach
})
})
})
})
13 changes: 9 additions & 4 deletions stress/cpuset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ import (
"github.com/DataDog/chaos-controller/cgroup"
"github.com/DataDog/chaos-controller/cpuset"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/intstr"
"runtime"
"sync"
)

type StresserManager interface {
TrackInjectorCores(cgroup cgroup.Manager) (cpuset.CPUSet, error)
TrackInjectorCores(cgroup cgroup.Manager, userRequestCount *intstr.IntOrString) (cpuset.CPUSet, error)
IsCoreAlreadyStressed(core int) bool
TrackCoreAlreadyStressed(core int, stresserPID int)
StresserPIDs() map[int]int
Expand Down Expand Up @@ -63,7 +64,7 @@ func (manager *cpuStressserManager) StresserPIDs() map[int]int {
return manager.stresserPIDPerCore
}

func (manager *cpuStressserManager) TrackInjectorCores(cgroup cgroup.Manager) (cpuset.CPUSet, error) {
func (manager *cpuStressserManager) TrackInjectorCores(cgroup cgroup.Manager, userRequestCount *intstr.IntOrString) (cpuset.CPUSet, error) {
// read cpuset allocated cores
manager.log.Infow("retrieving target cpuset allocated cores")

Expand All @@ -78,7 +79,11 @@ func (manager *cpuStressserManager) TrackInjectorCores(cgroup cgroup.Manager) (c
return cpuset.NewCPUSet(), fmt.Errorf("error parsing cpuset allocated cores: %w", err)
}

manager.setCoresToBeStressed(manager.coresToBeStressed.Union(cores))
coresToBeStressedCount, _ := intstr.GetScaledValueFromIntOrPercent(userRequestCount, cores.Size(), true)
coresToBeStressed := cpuset.NewCPUSet(cores.ToSlice()[:coresToBeStressedCount]...)

return cores, nil
manager.log.Infof("retrieved coresToBeStressed after applying userRequestCount %s: %s", userRequestCount.StrVal, coresToBeStressed)
manager.setCoresToBeStressed(manager.coresToBeStressed.Union(coresToBeStressed))

return coresToBeStressed, nil
}
5 changes: 3 additions & 2 deletions stress/cpuset_manager_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package stress
import (
"github.com/DataDog/chaos-controller/cgroup"
"github.com/stretchr/testify/mock"
"k8s.io/apimachinery/pkg/util/intstr"
)
import (
"github.com/DataDog/chaos-controller/cpuset"
Expand Down Expand Up @@ -35,7 +36,7 @@ func (f *StresserManagerMock) StresserPIDs() map[int]int {
}

//nolint:golint
func (f *StresserManagerMock) TrackInjectorCores(cgroup cgroup.Manager) (cpuset.CPUSet, error) {
args := f.Called(cgroup)
func (f *StresserManagerMock) TrackInjectorCores(cgroup cgroup.Manager, userRequestCount *intstr.IntOrString) (cpuset.CPUSet, error) {
args := f.Called(cgroup, userRequestCount)
return args.Get(0).(cpuset.CPUSet), args.Error(1)
}
95 changes: 95 additions & 0 deletions stress/cpuset_manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2022 Datadog, Inc.

package stress_test

import (
. "github.com/DataDog/chaos-controller/cgroup"
"github.com/DataDog/chaos-controller/cpuset"
. "github.com/DataDog/chaos-controller/stress"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/intstr"
)

var _ = Describe("StresserManager Test", func() {
var (
log *zap.SugaredLogger
stresserManager StresserManager
)

BeforeEach(func() {
z, _ := zap.NewDevelopment()
log = z.Sugar()
stresserManager = NewCPUStresserManager(log)
})

When("IsCoreAlreadyStress", func() {
Context("StresserManager is freshly initialized", func() {
It("does not have any core stressed", func() {
Expect(stresserManager.IsCoreAlreadyStressed(1)).To(BeFalse())
Expect(stresserManager.StresserPIDs()).To(BeEmpty())
})
})

Context("StresserManager previously tracked the same core", func() {
It("should return true for core already stressed", func() {
stresserManager.TrackCoreAlreadyStressed(0, 123)

Expect(stresserManager.IsCoreAlreadyStressed(0)).To(BeTrue())
})
})
})

When("TrackInjectorCores", func() {
var (
cgroup = cgroupManager()
)

Context("StresserManager is freshly initialized", func() {
It("should track injector cores and return new cores to apply stress", func() {
userRequestedCount := intstr.FromInt(2)
cores, err := stresserManager.TrackInjectorCores(cgroup, &userRequestedCount)

Expect(err).To(BeNil())
Expect(cores).To(Equal(cpuset.NewCPUSet(0, 1)))
})
})

Context("user request to target a specific number of cores", func() {
It("should discount core according to user request", func() {
userRequestedCount := intstr.FromInt(1)
cores, err := stresserManager.TrackInjectorCores(cgroup, &userRequestedCount)

Expect(err).To(BeNil())
Expect(cores).To(Equal(cpuset.NewCPUSet(0)))
})

It("should discount core, percentage round up, according to user request", func() {
userRequestedCount := intstr.FromString("30%")
cores, err := stresserManager.TrackInjectorCores(cgroup, &userRequestedCount)

Expect(err).To(BeNil())
Expect(cores).To(Equal(cpuset.NewCPUSet(0)))
})
})
})

When("calling TrackCoreAlreadyStressed", func() {
It("should record the stresser processID by coreID", func() {
stresserManager.TrackCoreAlreadyStressed(0, 123)

Expect(stresserManager.StresserPIDs()).To(HaveKeyWithValue(0, 123))
})
})
})

func cgroupManager() *ManagerMock {
cgroup := &ManagerMock{}
cgroup.On("Read", "cpuset", "cpuset.cpus").Return("0-1", nil)

return cgroup
}
17 changes: 17 additions & 0 deletions stress/stress_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2022 Datadog, Inc.

package stress_test

import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"testing"
)

func TestStress(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Stress Suite")
}

0 comments on commit 4adf3f1

Please sign in to comment.