Skip to content

Commit

Permalink
Fix NewSystemd ignoring units already exist
Browse files Browse the repository at this point in the history
As done in opencontainers/runc#3782,
UnitExists errors are no longer ignored when starting units.
This change makes the logic more robust like in runc:

1. Attempts to reset a failed unit if it already exists
2. Verifies via the unit status that it successfully starts
3. Waits longer for unit to start
4. Continues to ignore unit existing when pid is -1 to
accommodate kubelet use case
5. Otherwise, returns an error if it already exists

Signed-off-by: Matt Merkes <matt.merkes@gmail.com>
  • Loading branch information
mmerkes committed May 20, 2023
1 parent fb1932a commit 699e348
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 17 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
example/example
cmd/cgctl/cgctl

*.swp
coverage.txt
63 changes: 55 additions & 8 deletions cgroup2/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -865,14 +865,7 @@ func NewSystemd(slice, group string, pid int, resources *Resources) (*Manager, e
newSystemdProperty("TasksMax", uint64(resources.Pids.Max)))
}

statusChan := make(chan string, 1)
if _, err := conn.StartTransientUnitContext(ctx, group, "replace", properties, statusChan); err == nil {
select {
case <-statusChan:
case <-time.After(time.Second):
logrus.Warnf("Timed out while waiting for StartTransientUnit(%s) completion signal from dbus. Continuing...", group)
}
} else if !isUnitExists(err) {
if err := startUnit(conn, group, properties, pid == -1); err != nil {
return &Manager{}, err
}

Expand All @@ -881,6 +874,60 @@ func NewSystemd(slice, group string, pid int, resources *Resources) (*Manager, e
}, nil
}

func startUnit(conn *systemdDbus.Conn, group string, properties []systemdDbus.Property, ignoreExists bool) error {
ctx := context.TODO()

statusChan := make(chan string, 1)
defer close(statusChan)

retry := true
started := false

for !started {
if _, err := conn.StartTransientUnitContext(ctx, group, "replace", properties, statusChan); err != nil {
if !isUnitExists(err) {
return err
}

if ignoreExists {
return nil
}

if retry {
retry = false
// When a unit of the same name already exists, it may be a leftover failed unit.
// If we reset it once, systemd can try to remove it.
attemptFailedUnitReset(conn, group)
continue
}

return err
} else {
started = true
}
}

select {
case s := <-statusChan:
if s != "done" {
attemptFailedUnitReset(conn, group)
return fmt.Errorf("error creating systemd unit `%s`: got `%s`", group, s)
}
case <-time.After(30 * time.Second):
logrus.Warnf("Timed out while waiting for StartTransientUnit(%s) completion signal from dbus. Continuing...", group)
}

return nil
}

func attemptFailedUnitReset(conn *systemdDbus.Conn, group string) {
err := conn.ResetFailedUnitContext(context.TODO(), group)

if err != nil {
logrus.Warnf("Unable to reset failed unit: %v", err)
}
}

func LoadSystemd(slice, group string) (*Manager, error) {
if slice == "" {
slice = defaultSlice
Expand Down
67 changes: 58 additions & 9 deletions cgroup2/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,8 @@ import (
"go.uber.org/goleak"
)

//nolint:staticcheck // Staticcheck false positives for nil pointer deference after t.Fatal
func TestEventChanCleanupOnCgroupRemoval(t *testing.T) {
checkCgroupMode(t)

cmd := exec.Command("cat")
stdin, err := cmd.StdinPipe()
if err != nil {
t.Fatalf("Failed to create cat process: %v", err)
}
func setupForNewSystemd(t *testing.T) (cmd *exec.Cmd, group string) {
cmd = exec.Command("cat")
if err := cmd.Start(); err != nil {
t.Fatalf("Failed to start cat process: %v", err)
}
Expand All @@ -46,6 +39,62 @@ func TestEventChanCleanupOnCgroupRemoval(t *testing.T) {
t.Fatal("Process is nil")
}

group = fmt.Sprintf("testing-watcher-%d.scope", proc.Pid)

return
}

func TestErrorsWhenUnitAlreadyExists(t *testing.T) {
checkCgroupMode(t)

cmd, group := setupForNewSystemd(t)
proc := cmd.Process

_, err := NewSystemd("", group, proc.Pid, &Resources{})
if err != nil {
t.Fatalf("Failed to init new cgroup manager: %v", err)
}

_, err = NewSystemd("", group, proc.Pid, &Resources{})
if err == nil {
t.Fatal("Expected recreating cgroup manager should fail")
} else if !isUnitExists(err) {
t.Fatalf("Failed to init cgroup manager with unexpected error: %v", err)
}
}

// kubelet relies on this behavior to make sure a slice exists
func TestIgnoreUnitExistsWhenPidNegativeOne(t *testing.T) {
checkCgroupMode(t)

cmd, group := setupForNewSystemd(t)
proc := cmd.Process

_, err := NewSystemd("", group, proc.Pid, &Resources{})
if err != nil {
t.Fatalf("Failed to init new cgroup manager: %v", err)
}

_, err = NewSystemd("", group, -1, &Resources{})
if err != nil {
t.Fatalf("Expected to be able to recreate cgroup manager: %v", err)
}
}

//nolint:staticcheck // Staticcheck false positives for nil pointer deference after t.Fatal
func TestEventChanCleanupOnCgroupRemoval(t *testing.T) {
checkCgroupMode(t)

cmd := exec.Command("cat")
stdin, err := cmd.StdinPipe()
require.NoError(t, err, "failed to create cat process")

err = cmd.Start()
require.NoError(t, err, "failed to start cat process")

proc := cmd.Process
require.NotNil(t, proc, "process was nil")

group := fmt.Sprintf("testing-watcher-%d.scope", proc.Pid)
c, err := NewSystemd("", group, proc.Pid, &Resources{})
if err != nil {
Expand Down

0 comments on commit 699e348

Please sign in to comment.