Skip to content

Commit

Permalink
Add checks for if a container has terminated (#734)
Browse files Browse the repository at this point in the history
In some cases, a plugin's main container may terminate without
reporting results. If this is the case, the sonobuoy worker
hangs forever (until the aggregator times out) because it thinks
the plugin is still running.

This change modifies the logic of the aggregator/plugin monitoring
logic in order to not only check for restarts/failure to schedule,
but also for if containers terminate without submitting results.

To ensure that we do not race with the normal plugin results
submission and termination, we did the following:
 - do not consider this a failure condition until some non-trivial
time has passed (1m set for now).
 - Stop monitoring once we see that we have results for the plugin
in question.

To facilitate those abilities, we had to make the following changes:
 - The plugin Monitor() functions now require a context which the
method uses to terminate the monitoring loop. It used to only exit
when the plugin was cleaned up.
 - The aggregator flow was modified slightly so that each plugin
would spawn its own monitor/ingestResults which would have a
dedicated channel and context.
 - Introduced a pkg/time and pkg/time/timetest package to help
control the pace of polling in those tests.

In addition, a bug was found in the recent tests that were added
that meant we were not really sleeping between polling operations
like we expected to. This has been resolved.

Fixes #667

Signed-off-by: John Schnake <jschnake@vmware.com>
  • Loading branch information
johnSchnake committed Jun 11, 2019
1 parent a7bba9b commit 3324bc4
Show file tree
Hide file tree
Showing 12 changed files with 540 additions and 23 deletions.
15 changes: 12 additions & 3 deletions pkg/plugin/aggregation/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ limitations under the License.
package aggregation

import (
"context"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -249,11 +250,19 @@ func (a *Aggregator) HandleHTTPResult(result *plugin.Result, w http.ResponseWrit
//
// If we support plugins that are just simple commands that the sonobuoy master
// runs, those plugins can submit results through the same channel.
func (a *Aggregator) IngestResults(resultsCh <-chan *plugin.Result) {
func (a *Aggregator) IngestResults(ctx context.Context, resultsCh <-chan *plugin.Result) {
for {
result, more := <-resultsCh
var result *plugin.Result
var more bool

select {
case <-ctx.Done():
return
case result, more = <-resultsCh:
}

if !more {
break
return
}

err := a.processResult(result)
Expand Down
3 changes: 2 additions & 1 deletion pkg/plugin/aggregation/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package aggregation

import (
"bytes"
"context"
"io"
"io/ioutil"
"net/http"
Expand Down Expand Up @@ -327,7 +328,7 @@ func TestAggregation_errors(t *testing.T) {

withAggregator(t, expected, func(agg *Aggregator, srv *authtest.Server) {
resultsCh := make(chan *plugin.Result)
go agg.IngestResults(resultsCh)
go agg.IngestResults(context.TODO(), resultsCh)

// Send an error
resultsCh <- pluginutils.MakeErrorResult("e2e", map[string]interface{}{"error": "foo"}, "")
Expand Down
67 changes: 56 additions & 11 deletions pkg/plugin/aggregation/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ import (
"github.com/heptio/sonobuoy/pkg/backplane/ca"
"github.com/heptio/sonobuoy/pkg/plugin"
"github.com/heptio/sonobuoy/pkg/plugin/driver/utils"
sonotime "github.com/heptio/sonobuoy/pkg/time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"

Expand Down Expand Up @@ -85,7 +87,6 @@ func Run(client kubernetes.Interface, plugins []plugin.Interface, cfg plugin.Agg
// 1. Await results from each plugin
aggr := NewAggregator(outdir+"/plugins", expectedResults)
doneAggr := make(chan bool, 1)
monitorCh := make(chan *plugin.Result, len(expectedResults))
stopWaitCh := make(chan bool, 1)

go func() {
Expand Down Expand Up @@ -163,17 +164,8 @@ func Run(client kubernetes.Interface, plugins []plugin.Interface, cfg plugin.Agg

for _, p := range plugins {
logrus.WithField("plugin", p.GetName()).Info("Running plugin")
if err = p.Run(client, cfg.AdvertiseAddress, certs[p.GetName()]); err != nil {
err = errors.Wrapf(err, "error running plugin %v", p.GetName())
logrus.Error(err)
monitorCh <- utils.MakeErrorResult(p.GetResultType(), map[string]interface{}{"error": err.Error()}, "")
continue
}
// Have the plugin monitor for errors
go p.Monitor(client, nodes.Items, monitorCh)
go aggr.RunAndMonitorPlugin(ctx, p, client, nodes.Items, cfg.AdvertiseAddress, certs[p.GetName()])
}
// 5. Have the aggregator plumb results from each plugins' monitor function
go aggr.IngestResults(monitorCh)

// Give the plugins a chance to cleanup before a hard timeout occurs
shutdownPlugins := time.After(time.Duration(cfg.TimeoutSeconds-plugin.GracefulShutdownPeriod) * time.Second)
Expand Down Expand Up @@ -209,3 +201,56 @@ func Cleanup(client kubernetes.Interface, plugins []plugin.Interface) {
p.Cleanup(client)
}
}

// RunAndMonitorPlugin will start a plugin then monitor it for errors starting/running.
// Errors detected will be handled by saving an error result in the aggregator.Results.
func (a *Aggregator) RunAndMonitorPlugin(ctx context.Context, p plugin.Interface, client kubernetes.Interface, nodes []corev1.Node, address string, cert *tls.Certificate) {
monitorCh := make(chan *plugin.Result, 1)
pCtx, cancel := context.WithCancel(ctx)

if err := p.Run(client, address, cert); err != nil {
err := errors.Wrapf(err, "error running plugin %v", p.GetName())
logrus.Error(err)
monitorCh <- utils.MakeErrorResult(p.GetResultType(), map[string]interface{}{"error": err.Error()}, "")
}

go p.Monitor(pCtx, client, nodes, monitorCh)
go a.IngestResults(pCtx, monitorCh)

// Control loop; check regularly if we have results or not for this plugin. If results are in,
// then stop the go routines monitoring the plugin. If the parent context is cancelled, stop monitoring.
for {
select {
case <-ctx.Done():
cancel()
return
case <-sonotime.After(10 * time.Second):
}

hasResults := a.pluginHasResults(p)
if hasResults {
cancel()
return
}
}
}

// pluginHasResults returns true if all the expected results for the given plugin
// have already been reported.
func (a *Aggregator) pluginHasResults(p plugin.Interface) bool {
a.resultsMutex.Lock()
defer a.resultsMutex.Unlock()

targetType := p.GetResultType()
for expResultID, expResult := range a.ExpectedResults {
if expResult.ResultType != targetType {
continue
}

if _, ok := a.Results[expResultID]; !ok {
return false
}
}

return true
}
192 changes: 192 additions & 0 deletions pkg/plugin/aggregation/run_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package aggregation

import (
"context"
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
"crypto/tls"
"crypto/x509"
"math/big"
"testing"
"time"

"github.com/heptio/sonobuoy/pkg/plugin"
"github.com/heptio/sonobuoy/pkg/plugin/driver"
"github.com/heptio/sonobuoy/pkg/plugin/driver/job"
sonotime "github.com/heptio/sonobuoy/pkg/time/timetest"
"github.com/pkg/errors"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"
k8stesting "k8s.io/client-go/testing"
)

func TestRunAndMonitorPlugin(t *testing.T) {
// Dead simple plugin works for this test. No need to test daemonset/job specific logic so
// a job plugin is much simpler to test against.
testPlugin := &job.Plugin{
Base: driver.Base{
Definition: plugin.Definition{
Name: "myPlugin",
ResultType: "myPlugin",
},
Namespace: "testNS",
},
}
testPluginExpectedResults := []plugin.ExpectedResult{
{ResultType: "myPlugin"},
}
healthyPod := corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"sonobuoy-run": ""}},
}
failingPod := corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"sonobuoy-run": ""}},
Status: corev1.PodStatus{
Conditions: []corev1.PodCondition{
{Reason: "Unschedulable"},
},
},
}
testCert, err := getTestCert()
if err != nil {
t.Fatalf("Could not generate test cert: %v", err)
}

sonotime.UseShortAfter()
defer sonotime.ResetAfter()

testCases := []struct {
desc string

expectNumResults int
expectStillRunning bool
forceResults bool
cancelContext bool

plugin plugin.Interface
expectedResults []plugin.ExpectedResult
podList *corev1.PodList
podCreationError error
}{
{
desc: "Continue monitoring if no results/errors",
plugin: testPlugin,
expectedResults: testPluginExpectedResults,
podList: &corev1.PodList{
Items: []corev1.Pod{healthyPod},
},
expectStillRunning: true,
}, {
desc: "Error launching plugin causes exit and plugin result",
plugin: testPlugin,
expectedResults: testPluginExpectedResults,
podList: &corev1.PodList{
Items: []corev1.Pod{healthyPod},
},
podCreationError: errors.New("createPod error"),
expectNumResults: 1,
}, {
desc: "Failing plugin causes exit and plugin result",
plugin: testPlugin,
expectedResults: testPluginExpectedResults,
podList: &corev1.PodList{
Items: []corev1.Pod{failingPod},
},
expectNumResults: 1,
}, {
desc: "Plugin obtaining results in exits",
plugin: testPlugin,
expectedResults: testPluginExpectedResults,
podList: &corev1.PodList{
Items: []corev1.Pod{healthyPod},
},
forceResults: true,
expectNumResults: 1,
}, {
desc: "Context cancellation results in exit",
plugin: testPlugin,
expectedResults: testPluginExpectedResults,
podList: &corev1.PodList{
Items: []corev1.Pod{healthyPod},
},
cancelContext: true,
expectNumResults: 0,
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
a := NewAggregator(".", tc.expectedResults)
ctx, cancel := context.WithCancel(context.Background())

fclient := fake.NewSimpleClientset()
fclient.PrependReactor("list", "pods", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
return true, tc.podList, nil
})
fclient.PrependReactor("create", "pods", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
return true, nil, tc.podCreationError
})

doneCh, timeoutCh := make(chan (struct{}), 1), make(chan (struct{}), 1)
if tc.cancelContext {
cancel()
} else {
// Max timeout for test to unblock.
go func() {
time.Sleep(2 * time.Second)
timeoutCh <- struct{}{}
cancel()
}()
}

go func() {
a.RunAndMonitorPlugin(ctx, tc.plugin, fclient, nil, "testname", testCert)
doneCh <- struct{}{}
}()

if tc.forceResults {
a.resultsMutex.Lock()
a.Results["myPlugin"] = &plugin.Result{}
a.resultsMutex.Unlock()
}

// Wait for completion/timeout and see which happens first.
wasStillRunning := false
select {
case <-doneCh:
t.Log("runAndMonitor is done")
case <-timeoutCh:
t.Log("runAndMonitor timed out")
wasStillRunning = true
}

if len(a.Results) != tc.expectNumResults {
t.Errorf("Expected %v results but found %v: %+v", tc.expectNumResults, len(a.Results), a.Results)
}
if wasStillRunning != tc.expectStillRunning {
t.Errorf("Expected wasStillMonitoring %v but found %v", tc.expectStillRunning, wasStillRunning)
}
})
}
}

func getTestCert() (*tls.Certificate, error) {
privKey, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
if err != nil {
return nil, errors.Wrap(err, "couldn't generate private key")
}
tmpl := &x509.Certificate{
SerialNumber: big.NewInt(0),
}
certDER, err := x509.CreateCertificate(rand.Reader, tmpl, tmpl, &privKey.PublicKey, privKey)
if err != nil {
return nil, errors.Wrap(err, "couldn't create certificate")
}

return &tls.Certificate{
Certificate: [][]byte{certDER},
PrivateKey: privKey,
}, nil
}
11 changes: 9 additions & 2 deletions pkg/plugin/driver/daemonset/daemonset.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package daemonset

import (
"bytes"
"context"
"crypto/tls"
"fmt"
"time"
Expand All @@ -33,6 +34,7 @@ import (
"github.com/heptio/sonobuoy/pkg/plugin"
"github.com/heptio/sonobuoy/pkg/plugin/driver"
"github.com/heptio/sonobuoy/pkg/plugin/driver/utils"
sonotime "github.com/heptio/sonobuoy/pkg/time"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -170,7 +172,7 @@ func (p *Plugin) findDaemonSet(kubeclient kubernetes.Interface) (*appsv1.DaemonS

// Monitor adheres to plugin.Interface by ensuring the DaemonSet is correctly
// configured and that each pod is running normally.
func (p *Plugin) Monitor(kubeclient kubernetes.Interface, availableNodes []v1.Node, resultsCh chan<- *plugin.Result) {
func (p *Plugin) Monitor(ctx context.Context, kubeclient kubernetes.Interface, availableNodes []v1.Node, resultsCh chan<- *plugin.Result) {
podsReported := make(map[string]bool)
podsFound := make(map[string]bool, len(availableNodes))
for _, node := range availableNodes {
Expand All @@ -181,7 +183,12 @@ func (p *Plugin) Monitor(kubeclient kubernetes.Interface, availableNodes []v1.No
for {
// Sleep between each poll, which should give the DaemonSet
// enough time to create pods
time.Sleep(10 * time.Second)
select {
case <-ctx.Done():
return
case <-sonotime.After(10 * time.Second):
}

done, errResults := p.monitorOnce(kubeclient, availableNodes, podsFound, podsReported)
for _, v := range errResults {
resultsCh <- v
Expand Down
Loading

0 comments on commit 3324bc4

Please sign in to comment.