Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
"strings"
"time"

"github.com/rs/zerolog/log"

deploymentApi "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"

"github.com/arangodb/kube-arangodb/pkg/util"
Expand Down Expand Up @@ -146,6 +148,9 @@ func cmdUsage(cmd *cobra.Command, args []string) {

// Run the operator
func cmdMainRun(cmd *cobra.Command, args []string) {
// Set global logger
log.Logger = logging.NewRootLogger()

// Get environment
namespace := os.Getenv(constants.EnvOperatorPodNamespace)
name := os.Getenv(constants.EnvOperatorPodName)
Expand Down
47 changes: 31 additions & 16 deletions pkg/deployment/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,39 +23,54 @@
package deployment

import (
"github.com/arangodb/kube-arangodb/pkg/deployment/resources/inspector"
"github.com/arangodb/kube-arangodb/pkg/util"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
core "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// removePodFinalizers removes all finalizers from all pods owned by us.
func (d *Deployment) removePodFinalizers() error {
func (d *Deployment) removePodFinalizers(cachedStatus inspector.Inspector) error {
log := d.deps.Log
kubecli := d.GetKubeCli()
pods, err := d.GetOwnedPods()
if err != nil {
return maskAny(err)
}
for _, p := range pods {
ignoreNotFound := true
if err := k8sutil.RemovePodFinalizers(log, kubecli, &p, p.GetFinalizers(), ignoreNotFound); err != nil {

if err := cachedStatus.IteratePods(func(pod *core.Pod) error {
if err := k8sutil.RemovePodFinalizers(log, kubecli, pod, pod.GetFinalizers(), true); err != nil {
log.Warn().Err(err).Msg("Failed to remove pod finalizers")
return err
}

if err := kubecli.CoreV1().Pods(pod.GetNamespace()).Delete(pod.GetName(), &meta.DeleteOptions{
GracePeriodSeconds: util.NewInt64(1),
}); err != nil {
if !k8sutil.IsNotFound(err) {
log.Warn().Err(err).Msg("Failed to remove pod")
return err
}
}
return nil
}, inspector.FilterPodsByLabels(k8sutil.LabelsForDeployment(d.GetName(), ""))); err != nil {
return err
}

return nil
}

// removePVCFinalizers removes all finalizers from all PVCs owned by us.
func (d *Deployment) removePVCFinalizers() error {
func (d *Deployment) removePVCFinalizers(cachedStatus inspector.Inspector) error {
log := d.deps.Log
kubecli := d.GetKubeCli()
pvcs, err := d.GetOwnedPVCs()
if err != nil {
return maskAny(err)
}
for _, p := range pvcs {
ignoreNotFound := true
if err := k8sutil.RemovePVCFinalizers(log, kubecli, &p, p.GetFinalizers(), ignoreNotFound); err != nil {

if err := cachedStatus.IteratePersistentVolumeClaims(func(pvc *core.PersistentVolumeClaim) error {
if err := k8sutil.RemovePVCFinalizers(log, kubecli, pvc, pvc.GetFinalizers(), true); err != nil {
log.Warn().Err(err).Msg("Failed to remove PVC finalizers")
return err
}
return nil
}, inspector.FilterPersistentVolumeClaimsByLabels(k8sutil.LabelsForDeployment(d.GetName(), ""))); err != nil {
return err
}

return nil
}
40 changes: 5 additions & 35 deletions pkg/deployment/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"net"
"strconv"

"github.com/arangodb/kube-arangodb/pkg/deployment/resources/inspector"

"k8s.io/apimachinery/pkg/api/errors"

"github.com/arangodb/arangosync-client/client"
Expand Down Expand Up @@ -275,7 +277,7 @@ func (d *Deployment) DeletePod(podName string) error {

// CleanupPod deletes a given pod with force and explicit UID.
// If the pod does not exist, the error is ignored.
func (d *Deployment) CleanupPod(p v1.Pod) error {
func (d *Deployment) CleanupPod(p *v1.Pod) error {
log := d.deps.Log
podName := p.GetName()
ns := p.GetNamespace()
Expand Down Expand Up @@ -344,24 +346,6 @@ func (d *Deployment) GetPv(pvName string) (*v1.PersistentVolume, error) {
return nil, maskAny(err)
}

// GetOwnedPods returns a list of all pods owned by the deployment.
func (d *Deployment) GetOwnedPods() ([]v1.Pod, error) {
// Get all current pods
log := d.deps.Log
pods, err := d.deps.KubeCli.CoreV1().Pods(d.apiObject.GetNamespace()).List(k8sutil.DeploymentListOpt(d.apiObject.GetName()))
if err != nil {
log.Debug().Err(err).Msg("Failed to list pods")
return nil, maskAny(err)
}
myPods := make([]v1.Pod, 0, len(pods.Items))
for _, p := range pods.Items {
if d.isOwnerOf(&p) {
myPods = append(myPods, p)
}
}
return myPods, nil
}

// GetOwnedPVCs returns a list of all PVCs owned by the deployment.
func (d *Deployment) GetOwnedPVCs() ([]v1.PersistentVolumeClaim, error) {
// Get all current PVCs
Expand Down Expand Up @@ -414,20 +398,6 @@ func (d *Deployment) DeleteTLSKeyfile(group api.ServerGroup, member api.MemberSt
return nil
}

// GetTLSCA returns the TLS CA certificate in the secret with given name.
// Returns: publicKey, privateKey, ownerByDeployment, error
func (d *Deployment) GetTLSCA(secretName string) (string, string, bool, error) {
ns := d.apiObject.GetNamespace()
secrets := d.deps.KubeCli.CoreV1().Secrets(ns)
owner := d.apiObject.AsOwner()
cert, priv, isOwned, err := k8sutil.GetCASecret(secrets, secretName, &owner)
if err != nil {
return "", "", false, maskAny(err)
}
return cert, priv, isOwned, nil

}

// DeleteSecret removes the Secret with given name.
// If the secret does not exist, the error is ignored.
func (d *Deployment) DeleteSecret(secretName string) error {
Expand Down Expand Up @@ -470,8 +440,8 @@ func (d *Deployment) GetAgencyData(ctx context.Context, i interface{}, keyParts
return err
}

func (d *Deployment) RenderPodForMember(spec api.DeploymentSpec, status api.DeploymentStatus, memberID string, imageInfo api.ImageInfo) (*v1.Pod, error) {
return d.resources.RenderPodForMember(spec, status, memberID, imageInfo)
func (d *Deployment) RenderPodForMember(cachedStatus inspector.Inspector, spec api.DeploymentSpec, status api.DeploymentStatus, memberID string, imageInfo api.ImageInfo) (*v1.Pod, error) {
return d.resources.RenderPodForMember(cachedStatus, spec, status, memberID, imageInfo)
}

func (d *Deployment) SelectImage(spec api.DeploymentSpec, status api.DeploymentStatus) (api.ImageInfo, bool) {
Expand Down
44 changes: 13 additions & 31 deletions pkg/deployment/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"sync/atomic"
"time"

"github.com/arangodb/kube-arangodb/pkg/deployment/resources/inspector"

"github.com/arangodb/kube-arangodb/pkg/util/arangod"

"github.com/arangodb/arangosync-client/client"
Expand Down Expand Up @@ -84,7 +86,7 @@ type deploymentEvent struct {
const (
deploymentEventQueueSize = 256
minInspectionInterval = 250 * util.Interval(time.Millisecond) // Ensure we inspect the generated resources no less than with this interval
maxInspectionInterval = 30 * util.Interval(time.Second) // Ensure we inspect the generated resources no less than with this interval
maxInspectionInterval = 10 * util.Interval(time.Second) // Ensure we inspect the generated resources no less than with this interval
)

// Deployment is the in process state of an ArangoDeployment.
Expand Down Expand Up @@ -151,10 +153,10 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoDeployment) (*De
go d.resources.RunDeploymentHealthLoop(d.stopCh)
go d.resources.RunDeploymentShardSyncLoop(d.stopCh)
}
if config.AllowChaos {
d.chaosMonkey = chaos.NewMonkey(deps.Log, d)
go d.chaosMonkey.Run(d.stopCh)
}
//if config.AllowChaos {
// d.chaosMonkey = chaos.NewMonkey(deps.Log, d)
// go d.chaosMonkey.Run(d.stopCh)
//}

return d, nil
}
Expand Down Expand Up @@ -199,16 +201,6 @@ func (d *Deployment) run() {
log := d.deps.Log

if d.GetPhase() == api.DeploymentPhaseNone {
// Create secrets
if err := d.resources.EnsureSecrets(); err != nil {
d.CreateEvent(k8sutil.NewErrorEvent("Failed to create secrets", err, d.GetAPIObject()))
}

// Create services
if err := d.resources.EnsureServices(); err != nil {
d.CreateEvent(k8sutil.NewErrorEvent("Failed to create services", err, d.GetAPIObject()))
}

// Create service monitor
if d.haveServiceMonitorCRD {
if err := d.resources.EnsureServiceMonitor(); err != nil {
Expand All @@ -221,16 +213,6 @@ func (d *Deployment) run() {
d.CreateEvent(k8sutil.NewErrorEvent("Failed to create initial members", err, d.GetAPIObject()))
}

// Create PVCs
if err := d.resources.EnsurePVCs(); err != nil {
d.CreateEvent(k8sutil.NewErrorEvent("Failed to create persistent volume claims", err, d.GetAPIObject()))
}

// Create pods
if err := d.resources.EnsurePods(); err != nil {
d.CreateEvent(k8sutil.NewErrorEvent("Failed to create pods", err, d.GetAPIObject()))
}

// Create Pod Disruption Budgets
if err := d.resources.EnsurePDBs(); err != nil {
d.CreateEvent(k8sutil.NewErrorEvent("Failed to create pdbs", err, d.GetAPIObject()))
Expand All @@ -244,22 +226,22 @@ func (d *Deployment) run() {
log.Info().Msg("start running...")
}

if err := d.resources.EnsureAnnotations(); err != nil {
log.Warn().Err(err).Msg("unable to update annotations")
}

d.lookForServiceMonitorCRD()

inspectionInterval := maxInspectionInterval
for {
select {
case <-d.stopCh:
cachedStatus, err := inspector.NewInspector(d.GetKubeCli(), d.GetNamespace())
if err != nil {
log.Error().Err(err).Msg("Unable to get resources")
}
// Remove finalizers from created resources
log.Info().Msg("Deployment removed, removing finalizers to prevent orphaned resources")
if err := d.removePodFinalizers(); err != nil {
if err := d.removePodFinalizers(cachedStatus); err != nil {
log.Warn().Err(err).Msg("Failed to remove Pod finalizers")
}
if err := d.removePVCFinalizers(); err != nil {
if err := d.removePVCFinalizers(cachedStatus); err != nil {
log.Warn().Err(err).Msg("Failed to remove PVC finalizers")
}
// We're being stopped.
Expand Down
12 changes: 7 additions & 5 deletions pkg/deployment/deployment_finalizers.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ package deployment
import (
"context"

"github.com/arangodb/kube-arangodb/pkg/deployment/resources/inspector"

"github.com/rs/zerolog"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand All @@ -47,7 +49,7 @@ func ensureFinalizers(depl *api.ArangoDeployment) {
}

// runDeploymentFinalizers goes through the list of ArangoDeployoment finalizers to see if they can be removed.
func (d *Deployment) runDeploymentFinalizers(ctx context.Context) error {
func (d *Deployment) runDeploymentFinalizers(ctx context.Context, cachedStatus inspector.Inspector) error {
log := d.deps.Log
var removalList []string

Expand All @@ -60,7 +62,7 @@ func (d *Deployment) runDeploymentFinalizers(ctx context.Context) error {
switch f {
case constants.FinalizerDeplRemoveChildFinalizers:
log.Debug().Msg("Inspecting 'remove child finalizers' finalizer")
if err := d.inspectRemoveChildFinalizers(ctx, log, updated); err == nil {
if err := d.inspectRemoveChildFinalizers(ctx, log, updated, cachedStatus); err == nil {
removalList = append(removalList, f)
} else {
log.Debug().Err(err).Str("finalizer", f).Msg("Cannot remove finalizer yet")
Expand All @@ -79,11 +81,11 @@ func (d *Deployment) runDeploymentFinalizers(ctx context.Context) error {

// inspectRemoveChildFinalizers checks the finalizer condition for remove-child-finalizers.
// It returns nil if the finalizer can be removed.
func (d *Deployment) inspectRemoveChildFinalizers(ctx context.Context, log zerolog.Logger, depl *api.ArangoDeployment) error {
if err := d.removePodFinalizers(); err != nil {
func (d *Deployment) inspectRemoveChildFinalizers(ctx context.Context, log zerolog.Logger, depl *api.ArangoDeployment, cachedStatus inspector.Inspector) error {
if err := d.removePodFinalizers(cachedStatus); err != nil {
return maskAny(err)
}
if err := d.removePVCFinalizers(); err != nil {
if err := d.removePVCFinalizers(cachedStatus); err != nil {
return maskAny(err)
}

Expand Down
Loading