Skip to content

Commit

Permalink
Merge branch 'main' into release-v0.3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
kvaps committed Jul 3, 2024
2 parents bd8142d + aa4b7e6 commit 387462c
Show file tree
Hide file tree
Showing 14 changed files with 641 additions and 100 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/docker-publish.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ jobs:
- name: Install cosign
if: github.event_name != 'pull_request'
uses: sigstore/cosign-installer@v3.5.0
with:
cosign-release: 'v2.1.1'

# Set up BuildKit Docker container builder to be able to build
# multi-platform images and export cache
Expand Down Expand Up @@ -73,6 +71,7 @@ jobs:
push: ${{ github.event_name != 'pull_request' }}
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
platforms: linux/amd64, linux/arm64
cache-from: type=gha
cache-to: type=gha,mode=max

Expand Down
3 changes: 3 additions & 0 deletions api/v1alpha1/etcdcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type EtcdClusterSpec struct {
const (
EtcdConditionInitialized = "Initialized"
EtcdConditionReady = "Ready"
EtcdConditionError = "Error"
)

type EtcdCondType string
Expand All @@ -66,6 +67,7 @@ const (
EtcdCondTypeWaitingForFirstQuorum EtcdCondType = "WaitingForFirstQuorum"
EtcdCondTypeStatefulSetReady EtcdCondType = "StatefulSetReady"
EtcdCondTypeStatefulSetNotReady EtcdCondType = "StatefulSetNotReady"
EtcdCondTypeSplitbrain EtcdCondType = "Splitbrain"
)

const (
Expand All @@ -74,6 +76,7 @@ const (
EtcdReadyCondNegMessage EtcdCondMessage = "Cluster StatefulSet is not Ready"
EtcdReadyCondPosMessage EtcdCondMessage = "Cluster StatefulSet is Ready"
EtcdReadyCondNegWaitingForQuorum EtcdCondMessage = "Waiting for first quorum to be established"
EtcdErrorCondSplitbrainMessage EtcdCondMessage = "Etcd endpoints reporting more than one unique cluster ID"
)

// EtcdClusterStatus defines the observed state of EtcdCluster
Expand Down
50 changes: 27 additions & 23 deletions cmd/app/commandline.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,17 @@ import (
)

type Flags struct {
Kubeconfig string
MetricsAddress string
ProbeAddress string
LeaderElection bool
SecureMetrics bool
EnableHTTP2 bool
DisableWebhooks bool
LogLevel string
StacktraceLevel string
Dev bool
Kubeconfig string
MetricsAddress string
ProbeAddress string
LeaderElection bool
SecureMetrics bool
EnableHTTP2 bool
DisableWebhooks bool
LogLevel string
StacktraceLevel string
EnableStacktrace bool
Dev bool
}

func ParseCmdLine() Flags {
Expand All @@ -54,6 +55,7 @@ func ParseCmdLine() Flags {
pflag.Bool("enable-http2", false, "If set, HTTP/2 will be enabled for the metrics and webhook servers.")
pflag.Bool("disable-webhooks", false, "If set, the webhooks will be disabled.")
pflag.String("log-level", "info", "Logger verbosity level.Applicable values are debug, info, warn, error.")
pflag.Bool("enable-stacktrace", true, "If set log entries will contain stacktrace.")
pflag.String("stacktrace-level", "error", "Logger level to add stacktrace. "+
"Applicable values are debug, info, warn, error.")
pflag.Bool("dev", false, "development mode.")
Expand All @@ -75,16 +77,17 @@ func ParseCmdLine() Flags {
}

return Flags{
Kubeconfig: viper.GetString("kubeconfig"),
MetricsAddress: viper.GetString("metrics-bind-address"),
ProbeAddress: viper.GetString("health-probe-bind-address"),
LeaderElection: viper.GetBool("leader-elect"),
SecureMetrics: viper.GetBool("metrics-secure"),
EnableHTTP2: viper.GetBool("enable-http2"),
DisableWebhooks: viper.GetBool("disable-webhooks"),
LogLevel: viper.GetString("log-level"),
StacktraceLevel: viper.GetString("stacktrace-level"),
Dev: viper.GetBool("dev"),
Kubeconfig: viper.GetString("kubeconfig"),
MetricsAddress: viper.GetString("metrics-bind-address"),
ProbeAddress: viper.GetString("health-probe-bind-address"),
LeaderElection: viper.GetBool("leader-elect"),
SecureMetrics: viper.GetBool("metrics-secure"),
EnableHTTP2: viper.GetBool("enable-http2"),
DisableWebhooks: viper.GetBool("disable-webhooks"),
LogLevel: viper.GetString("log-level"),
StacktraceLevel: viper.GetString("stacktrace-level"),
EnableStacktrace: viper.GetBool("enable-stacktrace"),
Dev: viper.GetBool("dev"),
}
}

Expand All @@ -107,8 +110,9 @@ func exitUsage(err error) {

func LogParameters(flags Flags) log.Parameters {
return log.Parameters{
LogLevel: flags.LogLevel,
StacktraceLevel: flags.StacktraceLevel,
Development: flags.Dev,
LogLevel: flags.LogLevel,
StacktraceLevel: flags.StacktraceLevel,
EnableStacktrace: flags.EnableStacktrace,
Development: flags.Dev,
}
}
8 changes: 8 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ rules:
- patch
- update
- watch
- apiGroups:
- ""
resources:
- endpoints
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
Expand Down
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ require (
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.19.0
go.etcd.io/etcd/client/v3 v3.5.14
go.uber.org/zap v1.27.0
go.uber.org/zap/exp v0.2.0
k8s.io/api v0.30.2
k8s.io/apimachinery v0.30.2
k8s.io/client-go v0.30.2
Expand Down Expand Up @@ -72,6 +70,7 @@ require (
go.etcd.io/etcd/api/v3 v3.5.14 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.14 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/oauth2 v0.18.0 // indirect
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,6 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
go.uber.org/zap/exp v0.2.0 h1:FtGenNNeCATRB3CmB/yEUnjEFeJWpB/pMcy7e2bKPYs=
go.uber.org/zap/exp v0.2.0/go.mod h1:t0gqAIdh1MfKv9EwN/dLwfZnJxe9ITAZN78HEWPFWDQ=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
Expand Down
143 changes: 120 additions & 23 deletions internal/controller/etcdcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"slices"
"strconv"
"strings"
"sync"
"time"

"github.com/aenix-io/etcd-operator/internal/log"
Expand All @@ -47,6 +48,10 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
)

const (
etcdDefaultTimeout = 5 * time.Second
)

// EtcdClusterReconciler reconciles a EtcdCluster object
type EtcdClusterReconciler struct {
client.Client
Expand All @@ -56,6 +61,7 @@ type EtcdClusterReconciler struct {
// +kubebuilder:rbac:groups=etcd.aenix.io,resources=etcdclusters,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=etcd.aenix.io,resources=etcdclusters/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=etcd.aenix.io,resources=etcdclusters/finalizers,verbs=update
// +kubebuilder:rbac:groups="",resources=endpoints,verbs=get;list;watch
// +kubebuilder:rbac:groups="",resources=configmaps,verbs=get;list;watch;create;update;watch;delete;patch
// +kubebuilder:rbac:groups="",resources=services,verbs=get;create;delete;update;patch;list;watch
// +kubebuilder:rbac:groups="",resources=secrets,verbs=view;list;watch
Expand All @@ -80,13 +86,68 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return reconcile.Result{}, nil
}

state := observables{}

// create two services and the pdb
err = r.ensureUnconditionalObjects(ctx, instance)
if err != nil {
return ctrl.Result{}, err
}

// fetch STS if exists
err = r.Get(ctx, req.NamespacedName, &state.statefulSet)
if client.IgnoreNotFound(err) != nil {
return ctrl.Result{}, fmt.Errorf("couldn't get statefulset: %w", err)
}
state.stsExists = state.statefulSet.UID != ""

// fetch endpoints
clusterClient, singleClients, err := factory.NewEtcdClientSet(ctx, instance, r.Client)
if err != nil {
return ctrl.Result{}, err
}
state.endpointsFound = clusterClient != nil && singleClients != nil

if !state.endpointsFound {
if !state.stsExists {
// TODO: happy path for new cluster creation
log.Debug(ctx, "happy path for new cluster creation (not yet implemented)")
}
}

// get status of every endpoint and member list from every endpoint
state.etcdStatuses = make([]etcdStatus, len(singleClients))
{
var wg sync.WaitGroup
ctx, cancel := context.WithTimeout(ctx, etcdDefaultTimeout)
for i := range singleClients {
wg.Add(1)
go func(i int) {
defer wg.Done()
state.etcdStatuses[i].fill(ctx, singleClients[i])
}(i)
}
wg.Wait()
cancel()
}
state.setClusterID()
if state.inSplitbrain() {
log.Error(ctx, fmt.Errorf("etcd cluster in splitbrain"), "etcd cluster in splitbrain, dropping from reconciliation queue")
factory.SetCondition(instance, factory.NewCondition(etcdaenixiov1alpha1.EtcdConditionError).
WithStatus(true).
WithReason(string(etcdaenixiov1alpha1.EtcdCondTypeSplitbrain)).
WithMessage(string(etcdaenixiov1alpha1.EtcdErrorCondSplitbrainMessage)).
Complete(),
)
return r.updateStatus(ctx, instance)
}
// fill conditions
if len(instance.Status.Conditions) == 0 {
factory.FillConditions(instance)
}

// ensure managed resources
if err = r.ensureClusterObjects(ctx, instance); err != nil {
if err = r.ensureConditionalClusterObjects(ctx, instance); err != nil {
return r.updateStatusOnErr(ctx, instance, fmt.Errorf("cannot create Cluster auxiliary objects: %w", err))
}

Expand Down Expand Up @@ -138,8 +199,8 @@ func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return r.updateStatus(ctx, instance)
}

// ensureClusterObjects creates or updates all objects owned by cluster CR
func (r *EtcdClusterReconciler) ensureClusterObjects(
// ensureConditionalClusterObjects creates or updates all objects owned by cluster CR
func (r *EtcdClusterReconciler) ensureConditionalClusterObjects(
ctx context.Context, cluster *etcdaenixiov1alpha1.EtcdCluster) error {

if err := factory.CreateOrUpdateClusterStateConfigMap(ctx, cluster, r.Client); err != nil {
Expand All @@ -148,30 +209,12 @@ func (r *EtcdClusterReconciler) ensureClusterObjects(
}
log.Debug(ctx, "cluster state configmap reconciled")

if err := factory.CreateOrUpdateHeadlessService(ctx, cluster, r.Client); err != nil {
log.Error(ctx, err, "reconcile headless service failed")
return err
}
log.Debug(ctx, "headless service reconciled")

if err := factory.CreateOrUpdateStatefulSet(ctx, cluster, r.Client); err != nil {
log.Error(ctx, err, "reconcile statefulset failed")
return err
}
log.Debug(ctx, "statefulset reconciled")

if err := factory.CreateOrUpdateClientService(ctx, cluster, r.Client); err != nil {
log.Error(ctx, err, "reconcile client service failed")
return err
}
log.Debug(ctx, "client service reconciled")

if err := factory.CreateOrUpdatePdb(ctx, cluster, r.Client); err != nil {
log.Error(ctx, err, "reconcile pdb failed")
return err
}
log.Debug(ctx, "pdb reconciled")

return nil
}

Expand Down Expand Up @@ -335,7 +378,7 @@ func (r *EtcdClusterReconciler) getTLSConfig(ctx context.Context, cluster *etcda
log.Error(ctx, err, "failed to get server trusted CA secret")
return nil, err
}
log.Debug(ctx, "secret read", "server trusted CA secret") // serverCASecret,
log.Debug(ctx, "secret read", "server trusted CA secret", serverCASecret)

caCertPool = x509.NewCertPool()

Expand All @@ -355,7 +398,7 @@ func (r *EtcdClusterReconciler) getTLSConfig(ctx context.Context, cluster *etcda
log.Error(ctx, err, "failed to get root client secret")
return nil, err
}
log.Debug(ctx, "secret read", "root client secret") // rootSecret,
log.Debug(ctx, "secret read", "root client secret", rootSecret)

cert, err = tls.X509KeyPair(rootSecret.Data["tls.crt"], rootSecret.Data["tls.key"])
if err != nil {
Expand Down Expand Up @@ -498,3 +541,57 @@ func (r *EtcdClusterReconciler) disableAuth(ctx context.Context, authClient clie

return nil
}

// ensureUnconditionalObjects creates the two services and the PDB
// which can be created at the start of the reconciliation loop
// without any risk of disrupting the etcd cluster
func (r *EtcdClusterReconciler) ensureUnconditionalObjects(ctx context.Context, instance *etcdaenixiov1alpha1.EtcdCluster) error {
const concurrentOperations = 3
c := make(chan error)
defer close(c)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var wg sync.WaitGroup
wg.Add(concurrentOperations)
wrapWithMsg := func(err error, msg string) error {
if err != nil {
return fmt.Errorf(msg+": %w", err)
}
return nil
}
go func(chan<- error) {
defer wg.Done()
select {
case <-ctx.Done():
case c <- wrapWithMsg(factory.CreateOrUpdateClientService(ctx, instance, r.Client),
"couldn't ensure client service"):
}
}(c)
go func(chan<- error) {
defer wg.Done()
select {
case <-ctx.Done():
case c <- wrapWithMsg(factory.CreateOrUpdateHeadlessService(ctx, instance, r.Client),
"couldn't ensure headless service"):
}
}(c)
go func(chan<- error) {
defer wg.Done()
select {
case <-ctx.Done():
case c <- wrapWithMsg(factory.CreateOrUpdatePdb(ctx, instance, r.Client),
"couldn't ensure pod disruption budget"):
}
}(c)

for i := 0; i < concurrentOperations; i++ {
if err := <-c; err != nil {
cancel()

// let all goroutines select the ctx.Done() case to avoid races on closed channels
wg.Wait()
return err
}
}
return nil
}
Loading

0 comments on commit 387462c

Please sign in to comment.