diff --git a/api/v1beta1/cpu_pressure.go b/api/v1beta1/cpu_pressure.go index 85ae94fe2..98f53611d 100644 --- a/api/v1beta1/cpu_pressure.go +++ b/api/v1beta1/cpu_pressure.go @@ -5,13 +5,28 @@ package v1beta1 +import ( + "github.com/hashicorp/go-multierror" + "k8s.io/apimachinery/pkg/util/intstr" +) + // CPUPressureSpec represents a cpu pressure disruption type CPUPressureSpec struct { + Count *intstr.IntOrString `json:"count,omitempty"` // number of cores to target, either an integer form or a percentage form appended with a % } // Validate validates args for the given disruption -func (s *CPUPressureSpec) Validate() error { - return nil +func (s *CPUPressureSpec) Validate() (retErr error) { + if s.Count == nil { + return nil + } + + // Rule: count must be valid + if err := ValidateCount(s.Count); err != nil { + retErr = multierror.Append(retErr, err) + } + + return retErr } // GenerateArgs generates injection or cleanup pod arguments for the given spec @@ -20,5 +35,9 @@ func (s *CPUPressureSpec) GenerateArgs() []string { "cpu-pressure", } + if s.Count != nil { + args = append(args, []string{"--count", s.Count.String()}...) + } + return args } diff --git a/api/v1beta1/zz_generated.deepcopy.go b/api/v1beta1/zz_generated.deepcopy.go index 30df9de8d..e710c893f 100644 --- a/api/v1beta1/zz_generated.deepcopy.go +++ b/api/v1beta1/zz_generated.deepcopy.go @@ -30,6 +30,11 @@ import ( // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *CPUPressureSpec) DeepCopyInto(out *CPUPressureSpec) { *out = *in + if in.Count != nil { + in, out := &in.Count, &out.Count + *out = new(intstr.IntOrString) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CPUPressureSpec. @@ -311,7 +316,7 @@ func (in *DisruptionSpec) DeepCopyInto(out *DisruptionSpec) { if in.CPUPressure != nil { in, out := &in.CPUPressure, &out.CPUPressure *out = new(CPUPressureSpec) - **out = **in + (*in).DeepCopyInto(*out) } if in.DiskPressure != nil { in, out := &in.DiskPressure, &out.DiskPressure diff --git a/chart/templates/crds/chaos.datadoghq.com_disruptions.yaml b/chart/templates/crds/chaos.datadoghq.com_disruptions.yaml index 1b11e91ec..65aac9649 100644 --- a/chart/templates/crds/chaos.datadoghq.com_disruptions.yaml +++ b/chart/templates/crds/chaos.datadoghq.com_disruptions.yaml @@ -85,6 +85,12 @@ spec: cpuPressure: description: CPUPressureSpec represents a cpu pressure disruption nullable: true + properties: + count: + anyOf: + - type: integer + - type: string + x-kubernetes-int-or-string: true type: object diskPressure: description: DiskPressureSpec represents a disk pressure disruption diff --git a/cli/injector/cpu_pressure.go b/cli/injector/cpu_pressure.go index 4bc0494b8..e849b0800 100644 --- a/cli/injector/cpu_pressure.go +++ b/cli/injector/cpu_pressure.go @@ -10,6 +10,7 @@ import ( "github.com/DataDog/chaos-controller/injector" "github.com/DataDog/chaos-controller/stress" "github.com/spf13/cobra" + "k8s.io/apimachinery/pkg/util/intstr" ) var cpuPressureCmd = &cobra.Command{ @@ -17,10 +18,15 @@ var cpuPressureCmd = &cobra.Command{ Short: "CPU pressure subcommands", Run: injectAndWait, PreRun: func(cmd *cobra.Command, args []string) { + countStr, _ := cmd.Flags().GetString("count") + count := intstr.FromString(countStr) + // prepare spec - spec := v1beta1.CPUPressureSpec{} - stresserManager := stress.NewCPUStresserManager(log) + spec := v1beta1.CPUPressureSpec{ + Count: &count, + } + stresserManager := stress.NewCPUStresserManager(log) // create injector for _, config := range configs { injector, _ := injector.NewCPUPressureInjector(spec, injector.CPUPressureInjectorConfig{ @@ -31,3 +37,7 @@ var cpuPressureCmd = &cobra.Command{ } }, } + +func init() { + cpuPressureCmd.Flags().String("count", "", "number of cores to target, either an integer form or a percentage form appended with a %") +} diff --git a/docs/cpu_pressure.md b/docs/cpu_pressure.md index e087649ee..7d738ad63 100644 --- a/docs/cpu_pressure.md +++ b/docs/cpu_pressure.md @@ -21,7 +21,9 @@ The `/sys/fs/cgroup` directory of the host must be mounted in the injector pod a When the injector pod starts: - It parses the `cpuset.cpus` file (located in the target `cpuset` cgroup) to retrieve cores allocated to the target processes. -- It creates one goroutine per allocated core. Each goroutine is locked on the thread they are running on. By doing so, it forces the Go runtime scheduler to create one thread per locked goroutine. +- It calculates the number of cores to apply pressure, by taking user input `Count`. +- It then creates one goroutine per target core. +- Each goroutine is locked on the thread they are running on. By doing so, it forces the Go runtime scheduler to create one thread per locked goroutine. - Each goroutine joins the target `cpu` and `cpuset` cgroups. - Joining the `cpuset` cgroup is important to both have the same number of allocated cores as the target as well as the same allocated cores so we ensure that the goroutines threads will be scheduled on the same cores as the target processes - Each goroutine renices itself to the highest priority (`-20`) so the Linux scheduler will always give it the priority to consume CPU time over other running processes diff --git a/examples/cpu_pressure.yaml b/examples/cpu_pressure.yaml index 49af2402b..c482f02f7 100644 --- a/examples/cpu_pressure.yaml +++ b/examples/cpu_pressure.yaml @@ -13,4 +13,5 @@ spec: selector: app: demo-curl count: 1 - cpuPressure: {} # consume as much CPU as possible to generate load + cpuPressure: + count: 50% diff --git a/injector/cpu_pressure.go b/injector/cpu_pressure.go index bd04e4396..af86178f3 100644 --- a/injector/cpu_pressure.go +++ b/injector/cpu_pressure.go @@ -62,7 +62,7 @@ func (i *cpuPressureInjector) GetDisruptionKind() types.DisruptionKindName { } func (i *cpuPressureInjector) Inject() error { - cores, err := i.config.StresserManager.TrackInjectorCores(i.config.Cgroup) + cores, err := i.config.StresserManager.TrackInjectorCores(i.config.Cgroup, i.spec.Count) if err != nil { return fmt.Errorf("failed to parse CPUSet %w", err) diff --git a/injector/cpu_pressure_test.go b/injector/cpu_pressure_test.go index daea1e318..4c365ccbd 100644 --- a/injector/cpu_pressure_test.go +++ b/injector/cpu_pressure_test.go @@ -15,6 +15,7 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/stretchr/testify/mock" + "k8s.io/apimachinery/pkg/util/intstr" ) var _ = Describe("Failure", func() { @@ -51,7 +52,6 @@ var _ = Describe("Failure", func() { manager.On("ThreadID").Return(666) stresserManager = &stress.StresserManagerMock{} - stresserManager.On("TrackInjectorCores", mock.Anything).Return(cpuset.NewCPUSet(0, 1), nil) stresserManager.On("TrackCoreAlreadyStressed", mock.Anything, mock.Anything).Return(nil) stresserManager.On("StresserPIDs").Return(map[int]int{0: 666}) stresserManager.On("IsCoreAlreadyStressed", 0).Return(true) @@ -73,7 +73,9 @@ var _ = Describe("Failure", func() { // spec spec = v1beta1.CPUPressureSpec{} + }) + JustBeforeEach(func() { var err error inj, err = NewCPUPressureInjector(spec, config) Expect(err).To(BeNil()) @@ -98,27 +100,46 @@ var _ = Describe("Failure", func() { }) Describe("injection", func() { - - It("should join the cpu and cpuset cgroups for the unstressed core", func() { - cgroupManager.AssertCalled(GinkgoT(), "Join", "cpu", 666, false) - cgroupManager.AssertCalled(GinkgoT(), "Join", "cpuset", 666, false) - cgroupManager.AssertNumberOfCalls(GinkgoT(), "Join", 2) - }) - - It("should prioritize the current process", func() { - manager.AssertCalled(GinkgoT(), "Prioritize") - }) - - It("should run the stress on one core", func() { - stresser.AssertNumberOfCalls(GinkgoT(), "Stress", 1) - }) - - It("should record core and StresserPID in StresserManager", func() { - stresserManager.AssertCalled(GinkgoT(), "TrackCoreAlreadyStressed", 1, 666) + Context("user request to stress all the cores", func() { + BeforeEach(func() { + stresserManager.On("TrackInjectorCores", mock.Anything, mock.Anything).Return(cpuset.NewCPUSet(0, 1), nil) + }) + + It("should join the cpu and cpuset cgroups for the unstressed core", func() { + cgroupManager.AssertCalled(GinkgoT(), "Join", "cpu", 666, false) + cgroupManager.AssertCalled(GinkgoT(), "Join", "cpuset", 666, false) + cgroupManager.AssertNumberOfCalls(GinkgoT(), "Join", 2) + }) + + It("should prioritize the current process", func() { + manager.AssertCalled(GinkgoT(), "Prioritize") + }) + + It("should run the stress on one core", func() { + stresser.AssertNumberOfCalls(GinkgoT(), "Stress", 1) + }) + + It("should record core and StresserPID in StresserManager", func() { + stresserManager.AssertCalled(GinkgoT(), "TrackCoreAlreadyStressed", 1, 666) + }) + + It("should skip a target core that was already stress", func() { + stresserManager.AssertNotCalled(GinkgoT(), "TrackCoreAlreadyStressed", 0, mock.Anything) + }) }) - It("should skip a target core that was already stress", func() { - stresserManager.AssertNotCalled(GinkgoT(), "TrackCoreAlreadyStressed", 0, mock.Anything) + Context("user request to stress half of the cores", func() { + BeforeEach(func() { + userRequestCount := intstr.FromString("50%") + spec = v1beta1.CPUPressureSpec{ + Count: &userRequestCount, + } + stresserManager.On("TrackInjectorCores", mock.Anything, &userRequestCount).Return(cpuset.NewCPUSet(0, 1), nil) + }) + + It("should call stresserManager track cores and get new core to apply pressure", func() { + // left empty as AfterEach 'AssertExpectations' check all this tests expectations + }) }) }) }) diff --git a/stress/cpuset_manager.go b/stress/cpuset_manager.go index 7a6b97d3e..2d49113c4 100644 --- a/stress/cpuset_manager.go +++ b/stress/cpuset_manager.go @@ -10,12 +10,13 @@ import ( "github.com/DataDog/chaos-controller/cgroup" "github.com/DataDog/chaos-controller/cpuset" "go.uber.org/zap" + "k8s.io/apimachinery/pkg/util/intstr" "runtime" "sync" ) type StresserManager interface { - TrackInjectorCores(cgroup cgroup.Manager) (cpuset.CPUSet, error) + TrackInjectorCores(cgroup cgroup.Manager, userRequestCount *intstr.IntOrString) (cpuset.CPUSet, error) IsCoreAlreadyStressed(core int) bool TrackCoreAlreadyStressed(core int, stresserPID int) StresserPIDs() map[int]int @@ -63,7 +64,7 @@ func (manager *cpuStressserManager) StresserPIDs() map[int]int { return manager.stresserPIDPerCore } -func (manager *cpuStressserManager) TrackInjectorCores(cgroup cgroup.Manager) (cpuset.CPUSet, error) { +func (manager *cpuStressserManager) TrackInjectorCores(cgroup cgroup.Manager, userRequestCount *intstr.IntOrString) (cpuset.CPUSet, error) { // read cpuset allocated cores manager.log.Infow("retrieving target cpuset allocated cores") @@ -78,7 +79,11 @@ func (manager *cpuStressserManager) TrackInjectorCores(cgroup cgroup.Manager) (c return cpuset.NewCPUSet(), fmt.Errorf("error parsing cpuset allocated cores: %w", err) } - manager.setCoresToBeStressed(manager.coresToBeStressed.Union(cores)) + coresToBeStressedCount, _ := intstr.GetScaledValueFromIntOrPercent(userRequestCount, cores.Size(), true) + coresToBeStressed := cpuset.NewCPUSet(cores.ToSlice()[:coresToBeStressedCount]...) - return cores, nil + manager.log.Infof("retrieved coresToBeStressed after applying userRequestCount %s: %s", userRequestCount.StrVal, coresToBeStressed) + manager.setCoresToBeStressed(manager.coresToBeStressed.Union(coresToBeStressed)) + + return coresToBeStressed, nil } diff --git a/stress/cpuset_manager_mock.go b/stress/cpuset_manager_mock.go index 7068f8bd9..54e7288d7 100644 --- a/stress/cpuset_manager_mock.go +++ b/stress/cpuset_manager_mock.go @@ -8,6 +8,7 @@ package stress import ( "github.com/DataDog/chaos-controller/cgroup" "github.com/stretchr/testify/mock" + "k8s.io/apimachinery/pkg/util/intstr" ) import ( "github.com/DataDog/chaos-controller/cpuset" @@ -35,7 +36,7 @@ func (f *StresserManagerMock) StresserPIDs() map[int]int { } //nolint:golint -func (f *StresserManagerMock) TrackInjectorCores(cgroup cgroup.Manager) (cpuset.CPUSet, error) { - args := f.Called(cgroup) +func (f *StresserManagerMock) TrackInjectorCores(cgroup cgroup.Manager, userRequestCount *intstr.IntOrString) (cpuset.CPUSet, error) { + args := f.Called(cgroup, userRequestCount) return args.Get(0).(cpuset.CPUSet), args.Error(1) } diff --git a/stress/cpuset_manager_test.go b/stress/cpuset_manager_test.go new file mode 100644 index 000000000..ee316abec --- /dev/null +++ b/stress/cpuset_manager_test.go @@ -0,0 +1,95 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2022 Datadog, Inc. + +package stress_test + +import ( + . "github.com/DataDog/chaos-controller/cgroup" + "github.com/DataDog/chaos-controller/cpuset" + . "github.com/DataDog/chaos-controller/stress" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "go.uber.org/zap" + "k8s.io/apimachinery/pkg/util/intstr" +) + +var _ = Describe("StresserManager Test", func() { + var ( + log *zap.SugaredLogger + stresserManager StresserManager + ) + + BeforeEach(func() { + z, _ := zap.NewDevelopment() + log = z.Sugar() + stresserManager = NewCPUStresserManager(log) + }) + + When("IsCoreAlreadyStress", func() { + Context("StresserManager is freshly initialized", func() { + It("does not have any core stressed", func() { + Expect(stresserManager.IsCoreAlreadyStressed(1)).To(BeFalse()) + Expect(stresserManager.StresserPIDs()).To(BeEmpty()) + }) + }) + + Context("StresserManager previously tracked the same core", func() { + It("should return true for core already stressed", func() { + stresserManager.TrackCoreAlreadyStressed(0, 123) + + Expect(stresserManager.IsCoreAlreadyStressed(0)).To(BeTrue()) + }) + }) + }) + + When("TrackInjectorCores", func() { + var ( + cgroup = cgroupManager() + ) + + Context("StresserManager is freshly initialized", func() { + It("should track injector cores and return new cores to apply stress", func() { + userRequestedCount := intstr.FromInt(2) + cores, err := stresserManager.TrackInjectorCores(cgroup, &userRequestedCount) + + Expect(err).To(BeNil()) + Expect(cores).To(Equal(cpuset.NewCPUSet(0, 1))) + }) + }) + + Context("user request to target a specific number of cores", func() { + It("should discount core according to user request", func() { + userRequestedCount := intstr.FromInt(1) + cores, err := stresserManager.TrackInjectorCores(cgroup, &userRequestedCount) + + Expect(err).To(BeNil()) + Expect(cores).To(Equal(cpuset.NewCPUSet(0))) + }) + + It("should discount core, percentage round up, according to user request", func() { + userRequestedCount := intstr.FromString("30%") + cores, err := stresserManager.TrackInjectorCores(cgroup, &userRequestedCount) + + Expect(err).To(BeNil()) + Expect(cores).To(Equal(cpuset.NewCPUSet(0))) + }) + }) + }) + + When("calling TrackCoreAlreadyStressed", func() { + It("should record the stresser processID by coreID", func() { + stresserManager.TrackCoreAlreadyStressed(0, 123) + + Expect(stresserManager.StresserPIDs()).To(HaveKeyWithValue(0, 123)) + }) + }) +}) + +func cgroupManager() *ManagerMock { + cgroup := &ManagerMock{} + cgroup.On("Read", "cpuset", "cpuset.cpus").Return("0-1", nil) + + return cgroup +} diff --git a/stress/stress_suite_test.go b/stress/stress_suite_test.go new file mode 100644 index 000000000..ec57108cf --- /dev/null +++ b/stress/stress_suite_test.go @@ -0,0 +1,17 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2022 Datadog, Inc. + +package stress_test + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "testing" +) + +func TestStress(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Stress Suite") +}