Skip to content

Commit

Permalink
Separate creation of owned objects into conditional and unconditional
Browse files Browse the repository at this point in the history
  • Loading branch information
lllamnyp committed Jun 25, 2024
1 parent a6e16e9 commit 3f2231b
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 65 deletions.
8 changes: 8 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ rules:
- patch
- update
- watch
- apiGroups:
- ""
resources:
- endpoints
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
Expand Down
136 changes: 77 additions & 59 deletions internal/controller/etcdcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,58 +88,40 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)

state := observables{}

// create two services and the pdb, try fetching the sts
{
c := make(chan error)
go func(chan<- error) {
err := factory.CreateOrUpdateClientService(ctx, instance, r.Client)
if err != nil {
err = fmt.Errorf("couldn't ensure client service: %w", err)
}
c <- err
}(c)
go func(chan<- error) {
err := factory.CreateOrUpdateHeadlessService(ctx, instance, r.Client)
if err != nil {
err = fmt.Errorf("couldn't ensure headless service: %w", err)
}
c <- err
}(c)
go func(chan<- error) {
err := factory.CreateOrUpdatePdb(ctx, instance, r.Client)
if err != nil {
err = fmt.Errorf("couldn't ensure pod disruption budget: %w", err)
}
c <- err
}(c)
go func(chan<- error) {
err := r.Get(ctx, req.NamespacedName, &state.statefulSet)
if client.IgnoreNotFound(err) != nil {
err = fmt.Errorf("couldn't get statefulset: %w", err)
}
c <- client.IgnoreNotFound(err)
}(c)
for i := 0; i < 4; i++ {
if err := <-c; err != nil {
return ctrl.Result{}, err
}
}
// create two services and the pdb
err = r.ensureUnconditionalObjects(ctx, instance)
if err != nil {
return ctrl.Result{}, err
}

// fetch STS if exists
err = r.Get(ctx, req.NamespacedName, &state.statefulSet)
if client.IgnoreNotFound(err) != nil {
return ctrl.Result{}, fmt.Errorf("couldn't get statefulset: %w", err)
}
state.stsExists = state.statefulSet.UID != ""

// fetch endpoints
clusterClient, singleClients, err := factory.NewEtcdClientSet(ctx, instance, r.Client)
if err != nil {
return ctrl.Result{}, err
}
state.endpointsFound = clusterClient != nil && singleClients != nil
state.stsExists = state.statefulSet.UID != ""

if !state.endpointsFound {
if !state.stsExists {
// TODO: happy path for new cluster creation
}
}

// get status of every endpoint and member list from every endpoint
state.etcdStatuses = make([]etcdStatus, len(singleClients))
{
var wg sync.WaitGroup
ctx, cancel := context.WithTimeout(ctx, etcdDefaultTimeout)
for i := range singleClients {
wg.Add(1)
go func(i int) {
wg.Add(1)
defer wg.Done()
state.etcdStatuses[i].fill(ctx, singleClients[i])
}(i)
Expand All @@ -158,7 +140,7 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}

// ensure managed resources
if err = r.ensureClusterObjects(ctx, instance); err != nil {
if err = r.ensureConditionalClusterObjects(ctx, instance); err != nil {
return r.updateStatusOnErr(ctx, instance, fmt.Errorf("cannot create Cluster auxiliary objects: %w", err))
}

Expand Down Expand Up @@ -210,8 +192,8 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return r.updateStatus(ctx, instance)
}

// ensureClusterObjects creates or updates all objects owned by cluster CR
func (r *EtcdClusterReconciler) ensureClusterObjects(
// ensureConditionalClusterObjects creates or updates all objects owned by cluster CR
func (r *EtcdClusterReconciler) ensureConditionalClusterObjects(
ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) error {

if err := factory.CreateOrUpdateClusterStateConfigMap(ctx, cluster, r.Client); err != nil {
Expand All @@ -220,30 +202,12 @@ func (r *EtcdClusterReconciler) ensureClusterObjects(
}
log.Debug(ctx, "cluster state configmap reconciled")

if err := factory.CreateOrUpdateHeadlessService(ctx, cluster, r.Client); err != nil {
log.Error(ctx, err, "reconcile headless service failed")
return err
}
log.Debug(ctx, "headless service reconciled")

if err := factory.CreateOrUpdateStatefulSet(ctx, cluster, r.Client); err != nil {
log.Error(ctx, err, "reconcile statefulset failed")
return err
}
log.Debug(ctx, "statefulset reconciled")

if err := factory.CreateOrUpdateClientService(ctx, cluster, r.Client); err != nil {
log.Error(ctx, err, "reconcile client service failed")
return err
}
log.Debug(ctx, "client service reconciled")

if err := factory.CreateOrUpdatePdb(ctx, cluster, r.Client); err != nil {
log.Error(ctx, err, "reconcile pdb failed")
return err
}
log.Debug(ctx, "pdb reconciled")

return nil
}

Expand Down Expand Up @@ -570,3 +534,57 @@ func (r *EtcdClusterReconciler) disableAuth(ctx context.Context, authClient clie

return nil
}

// ensureUnconditionalObjects creates the two services and the PDB
// which can be created at the start of the reconciliation loop
// without any risk of disrupting the etcd cluster
func (r *EtcdClusterReconciler) ensureUnconditionalObjects(ctx context.Context, instance *etcdaenixiov1alpha1.EtcdCluster) error {
const concurrentOperations = 3
c := make(chan error)
defer close(c)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var wg sync.WaitGroup
wg.Add(concurrentOperations)
wrapWithMsg := func(err error, msg string) error {
if err != nil {
return fmt.Errorf(msg+": %w", err)
}
return nil
}
go func(chan<- error) {
defer wg.Done()
select {
case <-ctx.Done():
case c <- wrapWithMsg(factory.CreateOrUpdateClientService(ctx, instance, r.Client),
"couldn't ensure client service"):
}
}(c)
go func(chan<- error) {
defer wg.Done()
select {
case <-ctx.Done():
case c <- wrapWithMsg(factory.CreateOrUpdateHeadlessService(ctx, instance, r.Client),
"couldn't ensure headless service"):
}
}(c)
go func(chan<- error) {
defer wg.Done()
select {
case <-ctx.Done():
case c <- wrapWithMsg(factory.CreateOrUpdatePdb(ctx, instance, r.Client),
"couldn't ensure pod disruption budget"):
}
}(c)

for i := 0; i < concurrentOperations; i++ {
if err := <-c; err != nil {
cancel()

// let all goroutines select the ctx.Done() case to avoid races on closed channels
wg.Wait()
return err
}
}
return nil
}
14 changes: 9 additions & 5 deletions internal/controller/factory/etcd_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

func NewEtcdClientSet(ctx context.Context, cluster *v1alpha1.EtcdCluster, client client.Client) (*clientv3.Client, []*clientv3.Client, error) {
cfg, err := configFromCluster(ctx, cluster, client)
func NewEtcdClientSet(ctx context.Context, cluster *v1alpha1.EtcdCluster, cli client.Client) (*clientv3.Client, []*clientv3.Client, error) {
cfg, err := configFromCluster(ctx, cluster, cli)
if err != nil {
return nil, nil, err
}
Expand All @@ -35,12 +35,16 @@ func NewEtcdClientSet(ctx context.Context, cluster *v1alpha1.EtcdCluster, client
return clusterClient, singleClients, nil
}

func configFromCluster(ctx context.Context, cluster *v1alpha1.EtcdCluster, client client.Client) (clientv3.Config, error) {
func configFromCluster(ctx context.Context, cluster *v1alpha1.EtcdCluster, cli client.Client) (clientv3.Config, error) {
ep := v1.Endpoints{}
err := client.Get(ctx, types.NamespacedName{Name: GetHeadlessServiceName(cluster), Namespace: cluster.Namespace}, &ep)
if err != nil {
err := cli.Get(ctx, types.NamespacedName{Name: GetHeadlessServiceName(cluster), Namespace: cluster.Namespace}, &ep)
if client.IgnoreNotFound(err) != nil {
return clientv3.Config{}, err
}
if err != nil {
return clientv3.Config{Endpoints: []string{}}, nil
}

names := map[string]struct{}{}
urls := make([]string, 0, 8)
for _, v := range ep.Subsets {
Expand Down
16 changes: 15 additions & 1 deletion internal/controller/observables.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

clientv3 "go.etcd.io/etcd/client/v3"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
)

// etcdStatus holds the details of the status that an etcd endpoint
Expand All @@ -27,6 +28,7 @@ type observables struct {
etcdStatuses []etcdStatus
clusterID uint64
endpointsReached int
pvcs []corev1.PersistentVolumeClaim
}

// setClusterID populates the clusterID field based on etcdStatuses
Expand Down Expand Up @@ -56,11 +58,23 @@ func (o *observables) inSplitbrain() bool {
// with the endpoint's status and its perceived member list.
func (s *etcdStatus) fill(ctx context.Context, c *clientv3.Client) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
wg.Add(1)
defer wg.Done()
s.endpointStatus, s.endpointStatusError = c.Status(ctx, c.Endpoints()[0])
}()
s.memberList, s.memberListError = c.MemberList(ctx)
wg.Wait()
}

// TODO: make a real function
func (o *observables) desiredReplicas() int {
if o.etcdStatuses != nil {
for i := range o.etcdStatuses {
if o.etcdStatuses[i].memberList != nil {
return len(o.etcdStatuses[i].memberList.Members)
}
}
}
return 0
}

0 comments on commit 3f2231b

Please sign in to comment.