Skip to content

Commit

Permalink
Bootstrap healthcheck: Check GitRepository and return Ready condition…
Browse files Browse the repository at this point in the history
… message on timeout

Signed-off-by: Somtochi Onyekwere <somtochionyekwere@gmail.com>
  • Loading branch information
somtochiama committed Dec 6, 2023
1 parent 62ac960 commit 17ce65f
Show file tree
Hide file tree
Showing 5 changed files with 595 additions and 55 deletions.
28 changes: 2 additions & 26 deletions cmd/flux/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,11 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

helmv2beta1 "github.com/fluxcd/helm-controller/api/v2beta1"
autov1 "github.com/fluxcd/image-automation-controller/api/v1beta1"
imagev1 "github.com/fluxcd/image-reflector-controller/api/v1beta2"
kustomizev1 "github.com/fluxcd/kustomize-controller/api/v1"
notificationv1 "github.com/fluxcd/notification-controller/api/v1"
notificationv1b2 "github.com/fluxcd/notification-controller/api/v1beta2"
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
"github.com/fluxcd/pkg/ssa"
sourcev1 "github.com/fluxcd/source-controller/api/v1"
sourcev1b2 "github.com/fluxcd/source-controller/api/v1beta2"

"github.com/fluxcd/flux2/v2/internal/utils"
)
Expand Down Expand Up @@ -172,7 +163,7 @@ func Test_getObjectRef(t *testing.T) {
objs, err := ssa.ReadObjects(strings.NewReader(objects))
g.Expect(err).To(Not(HaveOccurred()))

builder := fake.NewClientBuilder().WithScheme(getScheme())
builder := fake.NewClientBuilder().WithScheme(utils.NewScheme())
for _, obj := range objs {
builder = builder.WithObjects(obj)
}
Expand Down Expand Up @@ -256,7 +247,7 @@ func Test_getRows(t *testing.T) {
objs, err := ssa.ReadObjects(strings.NewReader(objects))
g.Expect(err).To(Not(HaveOccurred()))

builder := fake.NewClientBuilder().WithScheme(getScheme())
builder := fake.NewClientBuilder().WithScheme(utils.NewScheme())
for _, obj := range objs {
builder = builder.WithObjects(obj)
}
Expand Down Expand Up @@ -410,21 +401,6 @@ func getTestListOpt(kind, name string) client.ListOption {
return client.MatchingFieldsSelector{Selector: sel}
}

func getScheme() *runtime.Scheme {
newscheme := runtime.NewScheme()
corev1.AddToScheme(newscheme)
kustomizev1.AddToScheme(newscheme)
helmv2beta1.AddToScheme(newscheme)
notificationv1.AddToScheme(newscheme)
notificationv1b2.AddToScheme(newscheme)
imagev1.AddToScheme(newscheme)
autov1.AddToScheme(newscheme)
sourcev1.AddToScheme(newscheme)
sourcev1b2.AddToScheme(newscheme)

return newscheme
}

func createEvent(obj client.Object, eventType, msg, reason string) corev1.Event {
return corev1.Event{
ObjectMeta: metav1.ObjectMeta{
Expand Down
94 changes: 73 additions & 21 deletions pkg/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,24 @@ import (
"context"
"errors"
"fmt"
"k8s.io/apimachinery/pkg/util/wait"
"strings"
"time"

corev1 "k8s.io/api/core/v1"
apierr "k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
apierrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/wait"
"sigs.k8s.io/controller-runtime/pkg/client"

kustomizev1 "github.com/fluxcd/kustomize-controller/api/v1"
"github.com/fluxcd/pkg/apis/meta"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
sourcev1 "github.com/fluxcd/source-controller/api/v1"
sourcev1b2 "github.com/fluxcd/source-controller/api/v1beta2"

"github.com/fluxcd/flux2/v2/pkg/manifestgen/install"
"github.com/fluxcd/flux2/v2/pkg/manifestgen/sourcesecret"
Expand All @@ -44,6 +48,10 @@ var (
ErrReconciledWithWarning = errors.New("reconciled with warning")
)

// Reconciler reconciles and reports the health of different
// components and kubernetes resources involved in the installation of Flux.
//
// It is recommended use the `ReconcilerWithSyncCheck` interface that also reports the health of the GitRepository.
type Reconciler interface {
// ReconcileComponents reconciles the components by generating the
// manifests with the provided values, committing them to Git and
Expand All @@ -62,8 +70,7 @@ type Reconciler interface {
// and pushing to remote if there are any changes.
ReconcileSyncConfig(ctx context.Context, options sync.Options) error

// ReportKustomizationHealth reports about the health of the
// Kustomization synchronizing the components.
// ReportKustomizationHealth reports about the health of the Kustomization synchronizing the components.
ReportKustomizationHealth(ctx context.Context, options sync.Options, pollInterval, timeout time.Duration) error

// ReportComponentsHealth reports about the health for the components
Expand All @@ -76,6 +83,14 @@ type RepositoryReconciler interface {
ReconcileRepository(ctx context.Context) error
}

// ReconcilerWithSyncCheck extends the Reconciler interface to also report the health of the GitReposiotry
// that syncs Flux on the cluster
type ReconcilerWithSyncCheck interface {
Reconciler
// ReportGitRepoHealth reports about the health of the GitRepository synchronizing the components.
ReportGitRepoHealth(ctx context.Context, options sync.Options, pollInterval, timeout time.Duration) error
}

type PostGenerateSecretFunc func(ctx context.Context, secret corev1.Secret, options sourcesecret.Options) error

func Run(ctx context.Context, reconciler Reconciler, manifestsBase string,
Expand All @@ -99,18 +114,25 @@ func Run(ctx context.Context, reconciler Reconciler, manifestsBase string,
return err
}

var healthErrCount int
var errs []error
if r, ok := reconciler.(ReconcilerWithSyncCheck); ok {
if err := r.ReportGitRepoHealth(ctx, syncOpts, pollInterval, timeout); err != nil {
errs = append(errs, err)
}
}

if err := reconciler.ReportKustomizationHealth(ctx, syncOpts, pollInterval, timeout); err != nil {
healthErrCount++
errs = append(errs, err)
}

if err := reconciler.ReportComponentsHealth(ctx, installOpts, timeout); err != nil {
healthErrCount++
errs = append(errs, err)
}
if healthErrCount > 0 {
if len(errs) > 0 {
// Composing a "smart" error message here from the returned
// errors does not result in any useful information for the
// user, as both methods log the failures they run into.
err = fmt.Errorf("bootstrap failed with %d health check failure(s)", healthErrCount)
err = fmt.Errorf("bootstrap failed with %d health check failure(s): %w", len(errs), apierrors.NewAggregate(errs))
}

return err
Expand Down Expand Up @@ -173,32 +195,47 @@ func kustomizationPathDiffers(ctx context.Context, kube client.Client, objKey cl
return k.Spec.Path, nil
}

func kustomizationReconciled(kube client.Client, objKey client.ObjectKey, kustomization *kustomizev1.Kustomization, expectRevision string) wait.ConditionWithContextFunc {
type objectWithConditions interface {
client.Object
GetConditions() []metav1.Condition
}

func objectReconciled(kube client.Client, objKey client.ObjectKey, clientObject objectWithConditions, expectRevision string) wait.ConditionWithContextFunc {
return func(ctx context.Context) (bool, error) {
if err := kube.Get(ctx, objKey, kustomization); err != nil {
// for some reason, TypeMeta gets unset after kube.Get so we want to store the GVK and set it after
// ref https://github.com/kubernetes-sigs/controller-runtime/issues/1517#issuecomment-844703142
gvk := clientObject.GetObjectKind().GroupVersionKind()
if err := kube.Get(ctx, objKey, clientObject); err != nil {
return false, err
}
clientObject.GetObjectKind().SetGroupVersionKind(gvk)

// Detect suspended Kustomization, as this would result in an endless wait
if kustomization.Spec.Suspend {
return false, fmt.Errorf("Kustomization is suspended")
kind := gvk.Kind
obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(clientObject)
if err != nil {
return false, err
}

// Confirm the state we are observing is for the current generation
if kustomization.Generation != kustomization.Status.ObservedGeneration {
return false, nil
// Detect suspended object, as this would result in an endless wait
if suspended, ok, _ := unstructured.NestedBool(obj, "spec", "suspend"); ok && suspended {
return false, fmt.Errorf("%s '%s' is suspended", kind, objKey.String())
}

// Confirm the given revision has been attempted by the controller
if sourcev1.TransformLegacyRevision(kustomization.Status.LastAttemptedRevision) != expectRevision {
// Confirm the state we are observing is for the current generation
if generation, ok, _ := unstructured.NestedInt64(obj, "status", "observedGeneration"); ok && generation != clientObject.GetGeneration() {
return false, nil
}

// Confirm the resource is healthy
if c := apimeta.FindStatusCondition(kustomization.Status.Conditions, meta.ReadyCondition); c != nil {
if c := apimeta.FindStatusCondition(clientObject.GetConditions(), meta.ReadyCondition); c != nil {
switch c.Status {
case metav1.ConditionTrue:
return true, nil
// Confirm the given revision has been attempted by the controller
hasRev, err := hasRevision(kind, obj, expectRevision)
if err != nil {
return false, err
}
return hasRev, nil
case metav1.ConditionFalse:
return false, fmt.Errorf(c.Message)
}
Expand All @@ -207,6 +244,21 @@ func kustomizationReconciled(kube client.Client, objKey client.ObjectKey, kustom
}
}

// hasRevision checks that the reconciled revision (for Kustomization this is `.status.lastAttemptedRevision`
// and for Source APIs, it is stored in `.status.artifact.revision`) is the same as the expectedRev
func hasRevision(kind string, obj map[string]interface{}, expectedRev string) (bool, error) {
var rev string
switch kind {
case sourcev1.GitRepositoryKind, sourcev1b2.OCIRepositoryKind, sourcev1b2.BucketKind, sourcev1b2.HelmChartKind:
rev, _, _ = unstructured.NestedString(obj, "status", "artifact", "revision")
case kustomizev1.KustomizationKind:
rev, _, _ = unstructured.NestedString(obj, "status", "lastAttemptedRevision")
default:
return false, fmt.Errorf("cannot get status revision for kind: '%s'", kind)
}
return sourcev1b2.TransformLegacyRevision(rev) == expectedRev, nil
}

func retry(retries int, wait time.Duration, fn func() error) (err error) {
for i := 0; ; i++ {
err = fn()
Expand Down
57 changes: 50 additions & 7 deletions pkg/bootstrap/bootstrap_plain_git.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/ProtonMail/go-crypto/openpgp"
gogit "github.com/go-git/go-git/v5"
corev1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/cli-runtime/pkg/genericclioptions"
Expand All @@ -38,10 +40,12 @@ import (

"github.com/fluxcd/cli-utils/pkg/object"
kustomizev1 "github.com/fluxcd/kustomize-controller/api/v1"
"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/git"
"github.com/fluxcd/pkg/git/repository"
"github.com/fluxcd/pkg/kustomize/filesys"
runclient "github.com/fluxcd/pkg/runtime/client"
sourcev1 "github.com/fluxcd/source-controller/api/v1"

"github.com/fluxcd/flux2/v2/internal/utils"
"github.com/fluxcd/flux2/v2/pkg/log"
Expand Down Expand Up @@ -397,20 +401,59 @@ func (b *PlainGitBootstrapper) ReportKustomizationHealth(ctx context.Context, op

objKey := client.ObjectKey{Name: options.Name, Namespace: options.Namespace}

b.logger.Waitingf("waiting for Kustomization %q to be reconciled", objKey.String())

expectRevision := fmt.Sprintf("%s@%s", options.Branch, git.Hash(head).Digest())
var k kustomizev1.Kustomization
b.logger.Waitingf("waiting for Kustomization %q to be reconciled", objKey.String())
k := &kustomizev1.Kustomization{
TypeMeta: metav1.TypeMeta{
Kind: kustomizev1.KustomizationKind,
},
}
if err := wait.PollUntilContextTimeout(ctx, pollInterval, timeout, true,
kustomizationReconciled(b.kube, objKey, &k, expectRevision)); err != nil {
b.logger.Failuref(err.Error())
return err
objectReconciled(b.kube, objKey, k, expectRevision)); err != nil {
// if the poll timed out, we want to log the ready condition message as
// that likely contains the reason
if errors.Is(err, context.DeadlineExceeded) {
readyCondition := apimeta.FindStatusCondition(k.Status.Conditions, meta.ReadyCondition)
if readyCondition != nil && readyCondition.Status != metav1.ConditionTrue {
err = fmt.Errorf("kustomization '%s' not ready: '%s'", objKey, readyCondition.Message)
}
}
return fmt.Errorf("error while waiting for Kustomization to be ready: '%s'", err)
}

b.logger.Successf("Kustomization reconciled successfully")

return nil
}

func (b *PlainGitBootstrapper) ReportGitRepoHealth(ctx context.Context, options sync.Options, pollInterval, timeout time.Duration) error {
head, err := b.gitClient.Head()
if err != nil {
return err
}

objKey := client.ObjectKey{Name: options.Name, Namespace: options.Namespace}

b.logger.Waitingf("waiting for GitRepository %q to be reconciled", objKey.String())
expectRevision := fmt.Sprintf("%s@%s", options.Branch, git.Hash(head).Digest())
g := &sourcev1.GitRepository{
TypeMeta: metav1.TypeMeta{
Kind: sourcev1.GitRepositoryKind,
APIVersion: sourcev1.GroupVersion.String(),
},
}
if err := wait.PollUntilContextTimeout(ctx, pollInterval, timeout, true,
objectReconciled(b.kube, objKey, g, expectRevision)); err != nil {
if errors.Is(err, context.DeadlineExceeded) {
readyCondition := apimeta.FindStatusCondition(g.Status.Conditions, meta.ReadyCondition)
if readyCondition != nil && readyCondition.Status != metav1.ConditionTrue {
return fmt.Errorf("gitrepository '%s' not ready: '%s'", objKey, readyCondition.Message)
}
}
return fmt.Errorf("error while waiting for gitrepository to be ready: '%s'", err)
}
b.logger.Successf("GitRepsoitory reconciled successfully")
return nil
}
func (b *PlainGitBootstrapper) ReportComponentsHealth(ctx context.Context, install install.Options, timeout time.Duration) error {
cfg, err := utils.KubeConfig(b.restClientGetter, b.restClientOptions)
if err != nil {
Expand Down

0 comments on commit 17ce65f

Please sign in to comment.