diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 1d3153c..69ccd7d 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -16,6 +16,14 @@ rules: - patch - update - watch +- apiGroups: + - "" + resources: + - endpoints + verbs: + - get + - list + - watch - apiGroups: - "" resources: diff --git a/internal/controller/etcdcluster_controller.go b/internal/controller/etcdcluster_controller.go index 2bf70ac..b086cec 100644 --- a/internal/controller/etcdcluster_controller.go +++ b/internal/controller/etcdcluster_controller.go @@ -88,49 +88,31 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) state := observables{} - // create two services and the pdb, try fetching the sts - { - c := make(chan error) - go func(chan<- error) { - err := factory.CreateOrUpdateClientService(ctx, instance, r.Client) - if err != nil { - err = fmt.Errorf("couldn't ensure client service: %w", err) - } - c <- err - }(c) - go func(chan<- error) { - err := factory.CreateOrUpdateHeadlessService(ctx, instance, r.Client) - if err != nil { - err = fmt.Errorf("couldn't ensure headless service: %w", err) - } - c <- err - }(c) - go func(chan<- error) { - err := factory.CreateOrUpdatePdb(ctx, instance, r.Client) - if err != nil { - err = fmt.Errorf("couldn't ensure pod disruption budget: %w", err) - } - c <- err - }(c) - go func(chan<- error) { - err := r.Get(ctx, req.NamespacedName, &state.statefulSet) - if client.IgnoreNotFound(err) != nil { - err = fmt.Errorf("couldn't get statefulset: %w", err) - } - c <- client.IgnoreNotFound(err) - }(c) - for i := 0; i < 4; i++ { - if err := <-c; err != nil { - return ctrl.Result{}, err - } - } + // create two services and the pdb + err = r.ensureUnconditionalObjects(ctx, instance) + if err != nil { + return ctrl.Result{}, err + } + + // fetch STS if exists + err = r.Get(ctx, req.NamespacedName, &state.statefulSet) + if client.IgnoreNotFound(err) != nil { + return ctrl.Result{}, fmt.Errorf("couldn't get statefulset: %w", err) } + state.stsExists = state.statefulSet.UID != "" + + // fetch endpoints clusterClient, singleClients, err := factory.NewEtcdClientSet(ctx, instance, r.Client) if err != nil { return ctrl.Result{}, err } state.endpointsFound = clusterClient != nil && singleClients != nil - state.stsExists = state.statefulSet.UID != "" + + if !state.endpointsFound { + if !state.stsExists { + // TODO: happy path for new cluster creation + } + } // get status of every endpoint and member list from every endpoint state.etcdStatuses = make([]etcdStatus, len(singleClients)) @@ -138,8 +120,8 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) var wg sync.WaitGroup ctx, cancel := context.WithTimeout(ctx, etcdDefaultTimeout) for i := range singleClients { + wg.Add(1) go func(i int) { - wg.Add(1) defer wg.Done() state.etcdStatuses[i].fill(ctx, singleClients[i]) }(i) @@ -158,7 +140,7 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) } // ensure managed resources - if err = r.ensureClusterObjects(ctx, instance); err != nil { + if err = r.ensureConditionalClusterObjects(ctx, instance); err != nil { return r.updateStatusOnErr(ctx, instance, fmt.Errorf("cannot create Cluster auxiliary objects: %w", err)) } @@ -210,8 +192,8 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) return r.updateStatus(ctx, instance) } -// ensureClusterObjects creates or updates all objects owned by cluster CR -func (r *EtcdClusterReconciler) ensureClusterObjects( +// ensureConditionalClusterObjects creates or updates all objects owned by cluster CR +func (r *EtcdClusterReconciler) ensureConditionalClusterObjects( ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) error { if err := factory.CreateOrUpdateClusterStateConfigMap(ctx, cluster, r.Client); err != nil { @@ -220,30 +202,12 @@ func (r *EtcdClusterReconciler) ensureClusterObjects( } log.Debug(ctx, "cluster state configmap reconciled") - if err := factory.CreateOrUpdateHeadlessService(ctx, cluster, r.Client); err != nil { - log.Error(ctx, err, "reconcile headless service failed") - return err - } - log.Debug(ctx, "headless service reconciled") - if err := factory.CreateOrUpdateStatefulSet(ctx, cluster, r.Client); err != nil { log.Error(ctx, err, "reconcile statefulset failed") return err } log.Debug(ctx, "statefulset reconciled") - if err := factory.CreateOrUpdateClientService(ctx, cluster, r.Client); err != nil { - log.Error(ctx, err, "reconcile client service failed") - return err - } - log.Debug(ctx, "client service reconciled") - - if err := factory.CreateOrUpdatePdb(ctx, cluster, r.Client); err != nil { - log.Error(ctx, err, "reconcile pdb failed") - return err - } - log.Debug(ctx, "pdb reconciled") - return nil } @@ -570,3 +534,57 @@ func (r *EtcdClusterReconciler) disableAuth(ctx context.Context, authClient clie return nil } + +// ensureUnconditionalObjects creates the two services and the PDB +// which can be created at the start of the reconciliation loop +// without any risk of disrupting the etcd cluster +func (r *EtcdClusterReconciler) ensureUnconditionalObjects(ctx context.Context, instance *etcdaenixiov1alpha1.EtcdCluster) error { + const concurrentOperations = 3 + c := make(chan error) + defer close(c) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + var wg sync.WaitGroup + wg.Add(concurrentOperations) + wrapWithMsg := func(err error, msg string) error { + if err != nil { + return fmt.Errorf(msg+": %w", err) + } + return nil + } + go func(chan<- error) { + defer wg.Done() + select { + case <-ctx.Done(): + case c <- wrapWithMsg(factory.CreateOrUpdateClientService(ctx, instance, r.Client), + "couldn't ensure client service"): + } + }(c) + go func(chan<- error) { + defer wg.Done() + select { + case <-ctx.Done(): + case c <- wrapWithMsg(factory.CreateOrUpdateHeadlessService(ctx, instance, r.Client), + "couldn't ensure headless service"): + } + }(c) + go func(chan<- error) { + defer wg.Done() + select { + case <-ctx.Done(): + case c <- wrapWithMsg(factory.CreateOrUpdatePdb(ctx, instance, r.Client), + "couldn't ensure pod disruption budget"): + } + }(c) + + for i := 0; i < concurrentOperations; i++ { + if err := <-c; err != nil { + cancel() + + // let all goroutines select the ctx.Done() case to avoid races on closed channels + wg.Wait() + return err + } + } + return nil +} diff --git a/internal/controller/factory/etcd_client.go b/internal/controller/factory/etcd_client.go index f10f295..4725171 100644 --- a/internal/controller/factory/etcd_client.go +++ b/internal/controller/factory/etcd_client.go @@ -11,8 +11,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) -func NewEtcdClientSet(ctx context.Context, cluster *v1alpha1.EtcdCluster, client client.Client) (*clientv3.Client, []*clientv3.Client, error) { - cfg, err := configFromCluster(ctx, cluster, client) +func NewEtcdClientSet(ctx context.Context, cluster *v1alpha1.EtcdCluster, cli client.Client) (*clientv3.Client, []*clientv3.Client, error) { + cfg, err := configFromCluster(ctx, cluster, cli) if err != nil { return nil, nil, err } @@ -35,12 +35,16 @@ func NewEtcdClientSet(ctx context.Context, cluster *v1alpha1.EtcdCluster, client return clusterClient, singleClients, nil } -func configFromCluster(ctx context.Context, cluster *v1alpha1.EtcdCluster, client client.Client) (clientv3.Config, error) { +func configFromCluster(ctx context.Context, cluster *v1alpha1.EtcdCluster, cli client.Client) (clientv3.Config, error) { ep := v1.Endpoints{} - err := client.Get(ctx, types.NamespacedName{Name: GetHeadlessServiceName(cluster), Namespace: cluster.Namespace}, &ep) - if err != nil { + err := cli.Get(ctx, types.NamespacedName{Name: GetHeadlessServiceName(cluster), Namespace: cluster.Namespace}, &ep) + if client.IgnoreNotFound(err) != nil { return clientv3.Config{}, err } + if err != nil { + return clientv3.Config{Endpoints: []string{}}, nil + } + names := map[string]struct{}{} urls := make([]string, 0, 8) for _, v := range ep.Subsets { diff --git a/internal/controller/observables.go b/internal/controller/observables.go index 8c9a2b3..e79b35e 100644 --- a/internal/controller/observables.go +++ b/internal/controller/observables.go @@ -6,6 +6,7 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" ) // etcdStatus holds the details of the status that an etcd endpoint @@ -27,6 +28,7 @@ type observables struct { etcdStatuses []etcdStatus clusterID uint64 endpointsReached int + pvcs []corev1.PersistentVolumeClaim } // setClusterID populates the clusterID field based on etcdStatuses @@ -56,11 +58,23 @@ func (o *observables) inSplitbrain() bool { // with the endpoint's status and its perceived member list. func (s *etcdStatus) fill(ctx context.Context, c *clientv3.Client) { var wg sync.WaitGroup + wg.Add(1) go func() { - wg.Add(1) defer wg.Done() s.endpointStatus, s.endpointStatusError = c.Status(ctx, c.Endpoints()[0]) }() s.memberList, s.memberListError = c.MemberList(ctx) wg.Wait() } + +// TODO: make a real function +func (o *observables) desiredReplicas() int { + if o.etcdStatuses != nil { + for i := range o.etcdStatuses { + if o.etcdStatuses[i].memberList != nil { + return len(o.etcdStatuses[i].memberList.Members) + } + } + } + return 0 +}