Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions go.sum

Large diffs are not rendered by default.

14 changes: 12 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//
// DISCLAIMER
//
// Copyright 2020 ArangoDB GmbH, Cologne, Germany
// Copyright 2020-2021 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
// Author Tomasz Mielech
//

package main
Expand All @@ -32,6 +33,8 @@ import (
"strings"
"time"

"github.com/arangodb/kube-arangodb/pkg/util/arangod"

"github.com/arangodb/kube-arangodb/pkg/operator/scope"

"github.com/arangodb/kube-arangodb/pkg/deployment/features"
Expand Down Expand Up @@ -114,6 +117,10 @@ var (
singleMode bool
scope string
}
timeouts struct {
k8s time.Duration
arangoD time.Duration
}
chaosOptions struct {
allowed bool
}
Expand Down Expand Up @@ -143,7 +150,8 @@ func init() {
f.BoolVar(&chaosOptions.allowed, "chaos.allowed", false, "Set to allow chaos in deployments. Only activated when allowed and enabled in deployment")
f.BoolVar(&operatorOptions.singleMode, "mode.single", false, "Enable single mode in Operator. WARNING: There should be only one replica of Operator, otherwise Operator can take unexpected actions")
f.StringVar(&operatorOptions.scope, "scope", scope.DefaultScope.String(), "Define scope on which Operator works. Legacy - pre 1.1.0 scope with limited cluster access")

f.DurationVar(&timeouts.k8s, "timeout.k8s", time.Second*3, "The request timeout to the kubernetes")
f.DurationVar(&timeouts.arangoD, "timeout.arangod", time.Second*10, "The request timeout to the ArangoDB")
features.Init(&cmdMain)
}

Expand All @@ -168,6 +176,8 @@ func cmdMainRun(cmd *cobra.Command, args []string) {
ip := os.Getenv(constants.EnvOperatorPodIP)

deploymentApi.DefaultImage = operatorOptions.arangoImage
k8sutil.SetRequestTimeout(timeouts.k8s)
arangod.SetRequestTimeout(timeouts.arangoD)

// Prepare log service
var err error
Expand Down
14 changes: 8 additions & 6 deletions pkg/backup/handlers/arango/backup/arango_client_impl.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//
// DISCLAIMER
//
// Copyright 2020 ArangoDB GmbH, Cologne, Germany
// Copyright 2020-2021 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Lars Maier
// Author Tomasz Mielech
//

package backup
Expand Down Expand Up @@ -128,9 +129,10 @@ func (ac *arangoClientBackupImpl) Get(backupID driver.BackupID) (driver.BackupMe
}
}

func (ac *arangoClientBackupImpl) getCredentialsFromSecret(secretName string) (interface{}, error) {

token, err := k8sutil.GetTokenSecret(ac.kubecli.CoreV1().Secrets(ac.backup.Namespace), secretName)
func (ac *arangoClientBackupImpl) getCredentialsFromSecret(ctx context.Context, secretName string) (interface{}, error) {
ctxChild, cancel := context.WithTimeout(ctx, k8sutil.GetRequestTimeout())
defer cancel()
token, err := k8sutil.GetTokenSecret(ctxChild, ac.kubecli.CoreV1().Secrets(ac.backup.Namespace), secretName)
if err != nil {
return nil, err
}
Expand All @@ -152,7 +154,7 @@ func (ac *arangoClientBackupImpl) Upload(backupID driver.BackupID) (driver.Backu
return "", errors.Newf("upload was called but no upload spec was given")
}

cred, err := ac.getCredentialsFromSecret(uploadSpec.CredentialsSecretName)
cred, err := ac.getCredentialsFromSecret(ctx, uploadSpec.CredentialsSecretName)
if err != nil {
return "", err
}
Expand All @@ -169,7 +171,7 @@ func (ac *arangoClientBackupImpl) Download(backupID driver.BackupID) (driver.Bac
return "", errors.Newf("Download was called but not download spec was given")
}

cred, err := ac.getCredentialsFromSecret(downloadSpec.CredentialsSecretName)
cred, err := ac.getCredentialsFromSecret(ctx, downloadSpec.CredentialsSecretName)
if err != nil {
return "", err
}
Expand Down
43 changes: 31 additions & 12 deletions pkg/deployment/access_package.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//
// DISCLAIMER
//
// Copyright 2020 ArangoDB GmbH, Cologne, Germany
// Copyright 2020-2021 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
// Author Tomasz Mielech
//

package deployment
Expand Down Expand Up @@ -46,7 +47,7 @@ const (

// createAccessPackages creates a arangosync access packages specified
// in spec.sync.externalAccess.accessPackageSecretNames.
func (d *Deployment) createAccessPackages() error {
func (d *Deployment) createAccessPackages(ctx context.Context) error {
log := d.deps.Log
spec := d.apiObject.Spec
secrets := d.deps.KubeCli.CoreV1().Secrets(d.GetNamespace())
Expand All @@ -60,13 +61,15 @@ func (d *Deployment) createAccessPackages() error {
apNameMap := make(map[string]struct{})
for _, apSecretName := range spec.Sync.ExternalAccess.AccessPackageSecretNames {
apNameMap[apSecretName] = struct{}{}
if err := d.ensureAccessPackage(apSecretName); err != nil {
if err := d.ensureAccessPackage(ctx, apSecretName); err != nil {
return errors.WithStack(err)
}
}

// Remove all access packages that we did build, but are no longer needed
secretList, err := secrets.List(context.Background(), metav1.ListOptions{})
ctxChild, cancel := context.WithTimeout(ctx, k8sutil.GetRequestTimeout())
defer cancel()
secretList, err := secrets.List(ctxChild, metav1.ListOptions{})
if err != nil {
log.Debug().Err(err).Msg("Failed to list secrets")
return errors.WithStack(err)
Expand All @@ -77,9 +80,12 @@ func (d *Deployment) createAccessPackages() error {
// Secret is an access package
if _, wanted := apNameMap[secret.GetName()]; !wanted {
// We found an obsolete access package secret. Remove it.
if err := secrets.Delete(context.Background(), secret.GetName(), metav1.DeleteOptions{
Preconditions: &metav1.Preconditions{UID: &secret.UID},
}); err != nil && !k8sutil.IsNotFound(err) {
err = k8sutil.RunWithTimeout(ctx, func(ctxChild context.Context) error {
return secrets.Delete(ctxChild, secret.GetName(), metav1.DeleteOptions{
Preconditions: &metav1.Preconditions{UID: &secret.UID},
})
})
if err != nil && !k8sutil.IsNotFound(err) {
// Not serious enough to stop everything now, just log and create an event
log.Warn().Err(err).Msg("Failed to remove obsolete access package secret")
d.CreateEvent(k8sutil.NewErrorEvent("Access Package cleanup failed", err, d.apiObject))
Expand All @@ -98,28 +104,37 @@ func (d *Deployment) createAccessPackages() error {

// ensureAccessPackage creates an arangosync access package with given name
// it is does not already exist.
func (d *Deployment) ensureAccessPackage(apSecretName string) error {
func (d *Deployment) ensureAccessPackage(ctx context.Context, apSecretName string) error {
log := d.deps.Log
ns := d.GetNamespace()
secrets := d.deps.KubeCli.CoreV1().Secrets(ns)
spec := d.apiObject.Spec

if _, err := secrets.Get(context.Background(), apSecretName, metav1.GetOptions{}); err == nil {
err := k8sutil.RunWithTimeout(ctx, func(ctxChild context.Context) error {
_, err := secrets.Get(ctxChild, apSecretName, metav1.GetOptions{})
return err
})
if err == nil {
// Secret already exists
return nil
} else if !k8sutil.IsNotFound(err) {
log.Debug().Err(err).Str("name", apSecretName).Msg("Failed to get arangosync access package secret")
return errors.WithStack(err)
}

// Fetch client authentication CA
clientAuthSecretName := spec.Sync.Authentication.GetClientCASecretName()
clientAuthCert, clientAuthKey, _, err := k8sutil.GetCASecret(secrets, clientAuthSecretName, nil)
ctxChild, cancel := context.WithTimeout(ctx, k8sutil.GetRequestTimeout())
defer cancel()
clientAuthCert, clientAuthKey, _, err := k8sutil.GetCASecret(ctxChild, secrets, clientAuthSecretName, nil)
if err != nil {
log.Debug().Err(err).Msg("Failed to get client-auth CA secret")
return errors.WithStack(err)
}

// Fetch TLS CA public key
tlsCASecretName := spec.Sync.TLS.GetCASecretName()
tlsCACert, err := k8sutil.GetCACertficateSecret(secrets, tlsCASecretName)
tlsCACert, err := k8sutil.GetCACertficateSecret(ctx, secrets, tlsCASecretName)
if err != nil {
log.Debug().Err(err).Msg("Failed to get TLS CA secret")
return errors.WithStack(err)
Expand Down Expand Up @@ -205,7 +220,11 @@ func (d *Deployment) ensureAccessPackage(apSecretName string) error {
}
// Attach secret to owner
secret.SetOwnerReferences(append(secret.GetOwnerReferences(), d.apiObject.AsOwner()))
if _, err := secrets.Create(context.Background(), secret, metav1.CreateOptions{}); err != nil {
err = k8sutil.RunWithTimeout(ctx, func(ctxChild context.Context) error {
_, err := secrets.Create(ctxChild, secret, metav1.CreateOptions{})
return err
})
if err != nil {
// Failed to create secret
log.Debug().Err(err).Str("secret-name", apSecretName).Msg("Failed to create access package Secret")
return errors.WithStack(err)
Expand Down
9 changes: 6 additions & 3 deletions pkg/deployment/chaos/context.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//
// DISCLAIMER
//
// Copyright 2020 ArangoDB GmbH, Cologne, Germany
// Copyright 2020-2021 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -18,11 +18,14 @@
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
// Author Tomasz Mielech
//

package chaos

import (
"context"

v1 "k8s.io/api/core/v1"

api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
Expand All @@ -34,7 +37,7 @@ type Context interface {
GetSpec() api.DeploymentSpec
// DeletePod deletes a pod with given name in the namespace
// of the deployment. If the pod does not exist, the error is ignored.
DeletePod(podName string) error
DeletePod(ctx context.Context, podName string) error
// GetOwnedPods returns a list of all pods owned by the deployment.
GetOwnedPods() ([]v1.Pod, error)
GetOwnedPods(ctx context.Context) ([]v1.Pod, error)
}
15 changes: 10 additions & 5 deletions pkg/deployment/chaos/monkey.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//
// DISCLAIMER
//
// Copyright 2020 ArangoDB GmbH, Cologne, Germany
// Copyright 2020-2021 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -18,11 +18,13 @@
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
// Author Tomasz Mielech
//

package chaos

import (
"context"
"math/rand"
"time"

Expand Down Expand Up @@ -50,14 +52,17 @@ func NewMonkey(log zerolog.Logger, context Context) *Monkey {

// Run the monkey until the given channel is closed.
func (m Monkey) Run(stopCh <-chan struct{}) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

for {
spec := m.context.GetSpec()
if spec.Chaos.IsEnabled() {
// Gamble to set if we must introduce chaos
chance := float64(spec.Chaos.GetKillPodProbability()) / 100.0
if rand.Float64() < chance {
// Let's introduce pod chaos
if err := m.killRandomPod(); err != nil {
if err := m.killRandomPod(ctx); err != nil {
log.Info().Err(err).Msg("Failed to kill random pod")
}
}
Expand All @@ -74,8 +79,8 @@ func (m Monkey) Run(stopCh <-chan struct{}) {
}

// killRandomPod fetches all owned pods and tries to kill one.
func (m Monkey) killRandomPod() error {
pods, err := m.context.GetOwnedPods()
func (m Monkey) killRandomPod(ctx context.Context) error {
pods, err := m.context.GetOwnedPods(ctx)
if err != nil {
return errors.WithStack(err)
}
Expand All @@ -85,7 +90,7 @@ func (m Monkey) killRandomPod() error {
}
p := pods[rand.Intn(len(pods))]
m.log.Info().Str("pod-name", p.GetName()).Msg("Killing pod")
if err := m.context.DeletePod(p.GetName()); err != nil {
if err := m.context.DeletePod(ctx, p.GetName()); err != nil {
return errors.WithStack(err)
}
return nil
Expand Down
21 changes: 13 additions & 8 deletions pkg/deployment/cleanup.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//
// DISCLAIMER
//
// Copyright 2020 ArangoDB GmbH, Cologne, Germany
// Copyright 2020-2021 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -18,33 +18,38 @@
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
// Author Tomasz Mielech
//

package deployment

import (
"context"

core "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/arangodb/kube-arangodb/pkg/deployment/resources/inspector"
"github.com/arangodb/kube-arangodb/pkg/util"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector"
core "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// removePodFinalizers removes all finalizers from all pods owned by us.
func (d *Deployment) removePodFinalizers(cachedStatus inspectorInterface.Inspector) error {
func (d *Deployment) removePodFinalizers(ctx context.Context, cachedStatus inspectorInterface.Inspector) error {
log := d.deps.Log
kubecli := d.GetKubeCli()

if err := cachedStatus.IteratePods(func(pod *core.Pod) error {
if err := k8sutil.RemovePodFinalizers(log, kubecli, pod, pod.GetFinalizers(), true); err != nil {
if err := k8sutil.RemovePodFinalizers(ctx, log, kubecli, pod, pod.GetFinalizers(), true); err != nil {
log.Warn().Err(err).Msg("Failed to remove pod finalizers")
return err
}

if err := kubecli.CoreV1().Pods(pod.GetNamespace()).Delete(context.Background(), pod.GetName(), meta.DeleteOptions{
ctxChild, cancel := context.WithTimeout(ctx, k8sutil.GetRequestTimeout())
defer cancel()

if err := kubecli.CoreV1().Pods(pod.GetNamespace()).Delete(ctxChild, pod.GetName(), meta.DeleteOptions{
GracePeriodSeconds: util.NewInt64(1),
}); err != nil {
if !k8sutil.IsNotFound(err) {
Expand All @@ -61,12 +66,12 @@ func (d *Deployment) removePodFinalizers(cachedStatus inspectorInterface.Inspect
}

// removePVCFinalizers removes all finalizers from all PVCs owned by us.
func (d *Deployment) removePVCFinalizers(cachedStatus inspectorInterface.Inspector) error {
func (d *Deployment) removePVCFinalizers(ctx context.Context, cachedStatus inspectorInterface.Inspector) error {
log := d.deps.Log
kubecli := d.GetKubeCli()

if err := cachedStatus.IteratePersistentVolumeClaims(func(pvc *core.PersistentVolumeClaim) error {
if err := k8sutil.RemovePVCFinalizers(log, kubecli, pvc, pvc.GetFinalizers(), true); err != nil {
if err := k8sutil.RemovePVCFinalizers(ctx, log, kubecli, pvc, pvc.GetFinalizers(), true); err != nil {
log.Warn().Err(err).Msg("Failed to remove PVC finalizers")
return err
}
Expand Down
Loading