Skip to content

Commit

Permalink
Acknowledge state between plugins complete and done
Browse files Browse the repository at this point in the history
If your plugin artifacts are large then it will take
a few seconds for the tarball to be created. However,
the aggregator marks the pod status as done and
therefore there can be a few seconds where the
status reports that results are ready for download
but you will wither get a "file does not exist" error
or a tarball locally that is only partially complete
and most likely corrupted.

Fixes: #572
Signed-off-by: John Schnake <jschnake@vmware.com>
  • Loading branch information
johnSchnake committed Feb 12, 2019
1 parent 081c43f commit eccb315
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 40 deletions.
2 changes: 2 additions & 0 deletions cmd/sonobuoy/app/status.go
Expand Up @@ -105,6 +105,8 @@ func humanReadableStatus(str string) string {
return "Sonobuoy has failed. You can see what happened with `sonobuoy logs`."
case aggregation.CompleteStatus:
return "Sonobuoy has completed. Use `sonobuoy retrieve` to get results."
case aggregation.PostProcessingStatus:
return "Sonobuoy plugins have completed. Preparing results for download."
default:
return fmt.Sprintf("Sonobuoy is in unknown state %q. Please report a bug at github.com/heptio/sonobuoy", str)
}
Expand Down
32 changes: 1 addition & 31 deletions pkg/client/status.go
Expand Up @@ -17,13 +17,6 @@ limitations under the License.
package client

import (
"encoding/json"
"fmt"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/heptio/sonobuoy/pkg/plugin/aggregation"
)

Expand All @@ -33,28 +26,5 @@ func (c *SonobuoyClient) GetStatus(namespace string) (*aggregation.Status, error
return nil, err
}

if _, err := client.CoreV1().Namespaces().Get(namespace, metav1.GetOptions{}); err != nil {
return nil, errors.Wrap(err, "sonobuoy namespace does not exist")
}

pod, err := client.CoreV1().Pods(namespace).Get(aggregation.StatusPodName, metav1.GetOptions{})
if err != nil {
return nil, errors.Wrap(err, "could not retrieve sonobuoy pod")
}

if pod.Status.Phase != corev1.PodRunning {
return nil, fmt.Errorf("pod has status %q", pod.Status.Phase)
}

statusJSON, ok := pod.Annotations[aggregation.StatusAnnotationName]
if !ok {
return nil, fmt.Errorf("missing status annotation %q", aggregation.StatusAnnotationName)
}

var status aggregation.Status
if err := json.Unmarshal([]byte(statusJSON), &status); err != nil {
return nil, errors.Wrap(err, "couldn't unmarshal the JSON status annotation")
}

return &status, nil
return aggregation.GetStatus(client, namespace)
}
43 changes: 42 additions & 1 deletion pkg/discovery/discovery.go
Expand Up @@ -26,12 +26,14 @@ import (

"github.com/heptio/sonobuoy/pkg/config"
"github.com/heptio/sonobuoy/pkg/errlog"
pluginaggregation "github.com/heptio/sonobuoy/pkg/plugin/aggregation"
"github.com/pkg/errors"
"github.com/rifflock/lfshook"
"github.com/sirupsen/logrus"
"github.com/viniciuschiele/tarx"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"

pluginaggregation "github.com/heptio/sonobuoy/pkg/plugin/aggregation"
)

// Run is the main entrypoint for discovery
Expand Down Expand Up @@ -124,7 +126,46 @@ func Run(kubeClient kubernetes.Interface, cfg *config.Config) (errCount int) {
defer os.RemoveAll(outpath)
}
trackErrorsFor("assembling results tarball")(err)

// 9. Mark final annotation stating the results are available and status is completed.
trackErrorsFor("updating pod status")(
updateStatus(kubeClient, cfg.Namespace, pluginaggregation.CompleteStatus),
)

logrus.Infof("Results available at %v", tb)

return errCount
}

// updateStatus changes the summary status of the sonobuoy pod in order to
// effect the finalized status the user sees. This does not change the status
// of individual plugins.
func updateStatus(client kubernetes.Interface, namespace string, status string) error {
podStatus, err := pluginaggregation.GetStatus(client, namespace)
if err != nil {
return errors.Wrap(err, "failed to get the existing status")
}

// Update status
logrus.Infof("podstatus at first schnake %v", podStatus)

podStatus.Status = status

// Marshal back into json, inject into the patch, then serialize again.

logrus.Infof("podstatus at myupdate schnake %v", podStatus)
statusBytes, err := json.Marshal(podStatus)
if err != nil {
return errors.Wrap(err, "failed to marshal the status")
}

logrus.Infof("podstatus at statusbytes schnake %v", string(statusBytes))
patch := pluginaggregation.GetPatch(string(statusBytes))
patchBytes, err := json.Marshal(patch)
if err != nil {
return errors.Wrap(err, "failed to marshal the patch")
}

_, err = client.CoreV1().Pods(namespace).Patch(pluginaggregation.StatusPodName, types.MergePatchType, patchBytes)
return err
}
47 changes: 45 additions & 2 deletions pkg/plugin/aggregation/status.go
Expand Up @@ -16,13 +16,26 @@ limitations under the License.

package aggregation

import "fmt"
import (
"encoding/json"
"fmt"

"github.com/pkg/errors"
"k8s.io/client-go/kubernetes"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
// RunningStatus means the sonobuoy run is still in progress.
RunningStatus string = "running"
// CompleteStatus means the sonobuoy run is complete.
CompleteStatus string = "complete"
// PostProcessingStatus means the plugins are complete. The state is not
// put in the more finalized, complete, status until any postprocessing is
// done.
PostProcessingStatus string = "post-processing"
// FailedStatus means one or more plugins has failed and the run will not complete successfully.
FailedStatus string = "failed"
)
Expand All @@ -42,7 +55,7 @@ type Status struct {
}

func (s *Status) updateStatus() error {
status := CompleteStatus
status := PostProcessingStatus
for _, plugin := range s.Plugins {
switch plugin.Status {
case CompleteStatus:
Expand All @@ -60,3 +73,33 @@ func (s *Status) updateStatus() error {
s.Status = status
return nil
}

// GetStatus returns the current status status on the sonobuoy pod. If the pod
// does not exist, is not running, or is missing the status annotation, an error
// is returned.
func GetStatus(client kubernetes.Interface, namespace string) (*Status, error) {
if _, err := client.CoreV1().Namespaces().Get(namespace, metav1.GetOptions{}); err != nil {
return nil, errors.Wrap(err, "sonobuoy namespace does not exist")
}

pod, err := client.CoreV1().Pods(namespace).Get(StatusPodName, metav1.GetOptions{})
if err != nil {
return nil, errors.Wrap(err, "could not retrieve sonobuoy pod")
}

if pod.Status.Phase != corev1.PodRunning {
return nil, fmt.Errorf("pod has status %q", pod.Status.Phase)
}

statusJSON, ok := pod.Annotations[StatusAnnotationName]
if !ok {
return nil, fmt.Errorf("missing status annotation %q", StatusAnnotationName)
}

var status Status
if err := json.Unmarshal([]byte(statusJSON), &status); err != nil {
return nil, errors.Wrap(err, "couldn't unmarshal the JSON status annotation")
}

return &status, nil
}
8 changes: 4 additions & 4 deletions pkg/plugin/aggregation/status_test.go
Expand Up @@ -25,14 +25,14 @@ func TestUpdateStatus(t *testing.T) {
expectedStatus string
}{
{
name: "empty is complete",
name: "empty is post-processing",
pluginStatuses: []string{},
expectedStatus: "complete",
expectedStatus: "post-processing",
},
{
name: "all completed is complete",
name: "all completed is post-processing",
pluginStatuses: []string{"complete", "complete", "complete"},
expectedStatus: "complete",
expectedStatus: "post-processing",
},
{
name: "one running is running",
Expand Down
6 changes: 4 additions & 2 deletions pkg/plugin/aggregation/update.go
Expand Up @@ -109,7 +109,7 @@ func (u *updater) Annotate(results map[string]*plugin.Result) error {
return errors.Wrap(err, "couldn't serialize status")
}

patch := getPatch(str)
patch := GetPatch(str)
bytes, err := json.Marshal(patch)
if err != nil {
return errors.Wrap(err, "couldn't encode patch")
Expand Down Expand Up @@ -146,7 +146,9 @@ func (u *updater) ReceiveAll(results map[string]*plugin.Result) {
}
}

func getPatch(annotation string) map[string]interface{} {
// GetPatch takes a json encoded string and creates a map which can be used as
// a patch to indicate the Sonobuoy status.
func GetPatch(annotation string) map[string]interface{} {
return map[string]interface{}{
"metadata": map[string]interface{}{
"annotations": map[string]string{
Expand Down

0 comments on commit eccb315

Please sign in to comment.