Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- Add Recovery during PlanBuild operation
- Fix Exporter in Deployments without authentication
- Allow to disable ClusterScalingIntegration and add proper Scheduled label to pods
- Add additional timeout parameters and kubernetes batch size

## [1.2.5](https://github.com/arangodb/kube-arangodb/tree/1.2.5) (2021-10-25)
- Split & Unify Lifecycle management functionality
Expand Down
5 changes: 1 addition & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -367,10 +367,7 @@ run-unit-tests: $(SOURCES)
$(REPOPATH)/pkg/apis/storage/... \
$(REPOPATH)/pkg/deployment/... \
$(REPOPATH)/pkg/storage \
$(REPOPATH)/pkg/util/k8sutil \
$(REPOPATH)/pkg/util/k8sutil/test \
$(REPOPATH)/pkg/util/probe \
$(REPOPATH)/pkg/util/validation \
$(REPOPATH)/pkg/util/... \
$(REPOPATH)/pkg/backup/...

# Release building
Expand Down
8 changes: 5 additions & 3 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"strconv"
"syscall"

"github.com/arangodb/kube-arangodb/pkg/util/globals"

"github.com/pkg/errors"
"github.com/spf13/cobra"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -288,7 +290,7 @@ func createClient(endpoints []string, certCA *x509.CertPool, auth connection.Aut

// getJWTTokenFromSecrets returns token from the secret.
func getJWTTokenFromSecrets(ctx context.Context, secrets secret.ReadInterface, name string) (connection.Authentication, error) {
ctxChild, cancel := context.WithTimeout(ctx, k8sutil.GetRequestTimeout())
ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx)
defer cancel()

token, err := k8sutil.GetTokenSecret(ctxChild, secrets, name)
Expand All @@ -306,7 +308,7 @@ func getJWTTokenFromSecrets(ctx context.Context, secrets secret.ReadInterface, n

// getCACertificate returns CA certificate from the secret.
func getCACertificate(ctx context.Context, secrets secret.ReadInterface, name string) (*x509.CertPool, error) {
ctxChild, cancel := context.WithTimeout(ctx, k8sutil.GetRequestTimeout())
ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx)
defer cancel()

s, err := secrets.Get(ctxChild, name, metav1.GetOptions{})
Expand All @@ -331,7 +333,7 @@ func getDeployment(ctx context.Context, namespace, deplName string) (v12.ArangoD
return v12.ArangoDeployment{}, errors.WithMessage(err, "failed to create Arango extension client")
}

ctxChild, cancel := context.WithTimeout(ctx, k8sutil.GetRequestTimeout())
ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx)
defer cancel()

deployments, err := extCli.DatabaseV1().ArangoDeployments(namespace).List(ctxChild, metav1.ListOptions{})
Expand Down
29 changes: 19 additions & 10 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ import (
"strings"
"time"

"github.com/arangodb/kube-arangodb/pkg/util/globals"

operatorHTTP "github.com/arangodb/kube-arangodb/pkg/util/http"
"github.com/gin-gonic/gin"

"github.com/arangodb/kube-arangodb/pkg/version"

"github.com/arangodb/kube-arangodb/pkg/util/arangod"

"github.com/arangodb/kube-arangodb/pkg/operator/scope"

"github.com/arangodb/kube-arangodb/pkg/deployment/features"
Expand Down Expand Up @@ -123,9 +123,13 @@ var (
singleMode bool
scope string
}
timeouts struct {
k8s time.Duration
arangoD time.Duration
operatorKubernetesOptions struct {
maxBatchSize int64
}
operatorTimeouts struct {
k8s time.Duration
arangoD time.Duration
reconciliation time.Duration
}
chaosOptions struct {
allowed bool
Expand Down Expand Up @@ -158,9 +162,11 @@ func init() {
f.BoolVar(&chaosOptions.allowed, "chaos.allowed", false, "Set to allow chaos in deployments. Only activated when allowed and enabled in deployment")
f.BoolVar(&operatorOptions.singleMode, "mode.single", false, "Enable single mode in Operator. WARNING: There should be only one replica of Operator, otherwise Operator can take unexpected actions")
f.StringVar(&operatorOptions.scope, "scope", scope.DefaultScope.String(), "Define scope on which Operator works. Legacy - pre 1.1.0 scope with limited cluster access")
f.DurationVar(&timeouts.k8s, "timeout.k8s", time.Second*3, "The request timeout to the kubernetes")
f.DurationVar(&timeouts.arangoD, "timeout.arangod", time.Second*10, "The request timeout to the ArangoDB")
f.BoolVar(&operatorOptions.scalingIntegrationEnabled, "scaling-integration", false, "Enable Scaling Integration")
f.DurationVar(&operatorTimeouts.k8s, "timeout.k8s", globals.DefaultKubernetesTimeout, "The request timeout to the kubernetes")
f.DurationVar(&operatorTimeouts.arangoD, "timeout.arangod", globals.DefaultArangoDTimeout, "The request timeout to the ArangoDB")
f.DurationVar(&operatorTimeouts.reconciliation, "timeout.reconciliation", globals.DefaultReconciliationTimeout, "The reconciliation timeout to the ArangoDB CR")
f.BoolVar(&operatorOptions.scalingIntegrationEnabled, "internal.scaling-integration", false, "Enable Scaling Integration")
f.Int64Var(&operatorKubernetesOptions.maxBatchSize, "kubernetes.max-batch-size", globals.DefaultKubernetesRequestBatchSize, "Size of batch during objects read")
features.Init(&cmdMain)
}

Expand All @@ -185,8 +191,11 @@ func cmdMainRun(cmd *cobra.Command, args []string) {
ip := os.Getenv(constants.EnvOperatorPodIP)

deploymentApi.DefaultImage = operatorOptions.arangoImage
k8sutil.SetRequestTimeout(timeouts.k8s)
arangod.SetRequestTimeout(timeouts.arangoD)

globals.GetGlobalTimeouts().Kubernetes().Set(operatorTimeouts.k8s)
globals.GetGlobalTimeouts().ArangoD().Set(operatorTimeouts.arangoD)
globals.GetGlobalTimeouts().Reconciliation().Set(operatorTimeouts.reconciliation)
globals.GetGlobals().Kubernetes().RequestBatchSize().Set(operatorKubernetesOptions.maxBatchSize)

// Prepare log service
var err error
Expand Down
4 changes: 3 additions & 1 deletion pkg/backup/handlers/arango/backup/arango_client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"fmt"
"time"

"github.com/arangodb/kube-arangodb/pkg/util/globals"

"github.com/arangodb/kube-arangodb/pkg/util/errors"

"github.com/arangodb/go-driver"
Expand Down Expand Up @@ -130,7 +132,7 @@ func (ac *arangoClientBackupImpl) Get(backupID driver.BackupID) (driver.BackupMe
}

func (ac *arangoClientBackupImpl) getCredentialsFromSecret(ctx context.Context, secretName string) (interface{}, error) {
ctxChild, cancel := context.WithTimeout(ctx, k8sutil.GetRequestTimeout())
ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx)
defer cancel()
token, err := k8sutil.GetTokenSecret(ctxChild, ac.kubecli.CoreV1().Secrets(ac.backup.Namespace), secretName)
if err != nil {
Expand Down
12 changes: 7 additions & 5 deletions pkg/deployment/access_package.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"strings"
"time"

"github.com/arangodb/kube-arangodb/pkg/util/globals"

"github.com/arangodb/kube-arangodb/pkg/util/errors"

certificates "github.com/arangodb-helper/go-certificates"
Expand Down Expand Up @@ -67,7 +69,7 @@ func (d *Deployment) createAccessPackages(ctx context.Context) error {
}

// Remove all access packages that we did build, but are no longer needed
ctxChild, cancel := context.WithTimeout(ctx, k8sutil.GetRequestTimeout())
ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx)
defer cancel()
secretList, err := secrets.List(ctxChild, metav1.ListOptions{})
if err != nil {
Expand All @@ -80,7 +82,7 @@ func (d *Deployment) createAccessPackages(ctx context.Context) error {
// Secret is an access package
if _, wanted := apNameMap[secret.GetName()]; !wanted {
// We found an obsolete access package secret. Remove it.
err = k8sutil.RunWithTimeout(ctx, func(ctxChild context.Context) error {
err = globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
return secrets.Delete(ctxChild, secret.GetName(), metav1.DeleteOptions{
Preconditions: &metav1.Preconditions{UID: &secret.UID},
})
Expand Down Expand Up @@ -110,7 +112,7 @@ func (d *Deployment) ensureAccessPackage(ctx context.Context, apSecretName strin
secrets := d.deps.KubeCli.CoreV1().Secrets(ns)
spec := d.apiObject.Spec

err := k8sutil.RunWithTimeout(ctx, func(ctxChild context.Context) error {
err := globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
_, err := secrets.Get(ctxChild, apSecretName, metav1.GetOptions{})
return err
})
Expand All @@ -124,7 +126,7 @@ func (d *Deployment) ensureAccessPackage(ctx context.Context, apSecretName strin

// Fetch client authentication CA
clientAuthSecretName := spec.Sync.Authentication.GetClientCASecretName()
ctxChild, cancel := context.WithTimeout(ctx, k8sutil.GetRequestTimeout())
ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx)
defer cancel()
clientAuthCert, clientAuthKey, _, err := k8sutil.GetCASecret(ctxChild, secrets, clientAuthSecretName, nil)
if err != nil {
Expand Down Expand Up @@ -220,7 +222,7 @@ func (d *Deployment) ensureAccessPackage(ctx context.Context, apSecretName strin
}
// Attach secret to owner
secret.SetOwnerReferences(append(secret.GetOwnerReferences(), d.apiObject.AsOwner()))
err = k8sutil.RunWithTimeout(ctx, func(ctxChild context.Context) error {
err = globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
_, err := secrets.Create(ctxChild, secret, metav1.CreateOptions{})
return err
})
Expand Down
4 changes: 3 additions & 1 deletion pkg/deployment/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ package deployment
import (
"context"

"github.com/arangodb/kube-arangodb/pkg/util/globals"

core "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand All @@ -45,7 +47,7 @@ func (d *Deployment) removePodFinalizers(ctx context.Context, cachedStatus inspe
return err
}

ctxChild, cancel := context.WithTimeout(ctx, k8sutil.GetRequestTimeout())
ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx)
defer cancel()

if err := d.PodsModInterface().Delete(ctxChild, pod.GetName(), meta.DeleteOptions{
Expand Down
8 changes: 5 additions & 3 deletions pkg/deployment/cluster_scaling_integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"sync"
"time"

"github.com/arangodb/kube-arangodb/pkg/util/globals"

"github.com/arangodb/kube-arangodb/pkg/util/errors"

"github.com/rs/zerolog"
Expand Down Expand Up @@ -152,14 +154,14 @@ func (ci *clusterScalingIntegration) ListenForClusterEvents(stopCh <-chan struct
func (ci *clusterScalingIntegration) inspectCluster(ctx context.Context, expectSuccess bool) error {
log := ci.log

ctxChild, cancel := context.WithTimeout(ctx, arangod.GetRequestTimeout())
ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
defer cancel()
c, err := ci.depl.clientCache.GetDatabase(ctxChild)
if err != nil {
return errors.WithStack(err)
}

ctxChild, cancel = context.WithTimeout(ctx, arangod.GetRequestTimeout())
ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
defer cancel()
req, err := arangod.GetNumberOfServers(ctxChild, c.Connection())
if err != nil {
Expand Down Expand Up @@ -204,7 +206,7 @@ func (ci *clusterScalingIntegration) inspectCluster(ctx context.Context, expectS
}
// Let's update the spec
apiObject := ci.depl.apiObject
ctxChild, cancel = context.WithTimeout(ctx, k8sutil.GetRequestTimeout())
ctxChild, cancel = globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx)
defer cancel()
current, err := ci.depl.deps.DatabaseCRCli.DatabaseV1().ArangoDeployments(apiObject.Namespace).Get(ctxChild, apiObject.Name, metav1.GetOptions{})
if err != nil {
Expand Down
20 changes: 11 additions & 9 deletions pkg/deployment/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
"strconv"
"time"

"github.com/arangodb/kube-arangodb/pkg/util/globals"

"github.com/arangodb/kube-arangodb/pkg/deployment/patch"
"k8s.io/apimachinery/pkg/types"

Expand Down Expand Up @@ -86,7 +88,7 @@ var _ resources.Context = &Deployment{}

// GetBackup receives information about a backup resource
func (d *Deployment) GetBackup(ctx context.Context, backup string) (*backupApi.ArangoBackup, error) {
ctxChild, cancel := context.WithTimeout(ctx, k8sutil.GetRequestTimeout())
ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx)
defer cancel()

return d.deps.DatabaseCRCli.BackupV1().ArangoBackups(d.Namespace()).Get(ctxChild, backup, meta.GetOptions{})
Expand Down Expand Up @@ -392,7 +394,7 @@ func (d *Deployment) GetPod(ctx context.Context, podName string) (*core.Pod, err
// of the deployment. If the pod does not exist, the error is ignored.
func (d *Deployment) DeletePod(ctx context.Context, podName string) error {
log := d.deps.Log
err := k8sutil.RunWithTimeout(ctx, func(ctxChild context.Context) error {
err := globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
return d.PodsModInterface().Delete(ctxChild, podName, meta.DeleteOptions{})
})
if err != nil && !k8sutil.IsNotFound(err) {
Expand All @@ -409,7 +411,7 @@ func (d *Deployment) CleanupPod(ctx context.Context, p *core.Pod) error {
podName := p.GetName()
options := meta.NewDeleteOptions(0)
options.Preconditions = meta.NewUIDPreconditions(string(p.GetUID()))
err := k8sutil.RunWithTimeout(ctx, func(ctxChild context.Context) error {
err := globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
return d.PodsModInterface().Delete(ctxChild, podName, *options)
})
if err != nil && !k8sutil.IsNotFound(err) {
Expand All @@ -424,7 +426,7 @@ func (d *Deployment) CleanupPod(ctx context.Context, p *core.Pod) error {
func (d *Deployment) RemovePodFinalizers(ctx context.Context, podName string) error {
log := d.deps.Log

ctxChild, cancel := context.WithTimeout(ctx, k8sutil.GetRequestTimeout())
ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx)
defer cancel()
p, err := d.GetCachedStatus().PodReadInterface().Get(ctxChild, podName, meta.GetOptions{})
if err != nil {
Expand All @@ -445,7 +447,7 @@ func (d *Deployment) RemovePodFinalizers(ctx context.Context, podName string) er
// of the deployment. If the pvc does not exist, the error is ignored.
func (d *Deployment) DeletePvc(ctx context.Context, pvcName string) error {
log := d.deps.Log
err := k8sutil.RunWithTimeout(ctx, func(ctxChild context.Context) error {
err := globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
return d.PersistentVolumeClaimsModInterface().Delete(ctxChild, pvcName, meta.DeleteOptions{})
})
if err != nil && !k8sutil.IsNotFound(err) {
Expand All @@ -458,7 +460,7 @@ func (d *Deployment) DeletePvc(ctx context.Context, pvcName string) error {
// UpdatePvc updated a persistent volume claim in the namespace
// of the deployment. If the pvc does not exist, the error is ignored.
func (d *Deployment) UpdatePvc(ctx context.Context, pvc *core.PersistentVolumeClaim) error {
err := k8sutil.RunWithTimeout(ctx, func(ctxChild context.Context) error {
err := globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
_, err := d.PersistentVolumeClaimsModInterface().Update(ctxChild, pvc, meta.UpdateOptions{})
return err
})
Expand Down Expand Up @@ -488,7 +490,7 @@ func (d *Deployment) GetOwnedPVCs() ([]core.PersistentVolumeClaim, error) {

// GetPvc gets a PVC by the given name, in the samespace of the deployment.
func (d *Deployment) GetPvc(ctx context.Context, pvcName string) (*core.PersistentVolumeClaim, error) {
ctxChild, cancel := context.WithTimeout(ctx, k8sutil.GetRequestTimeout())
ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx)
defer cancel()

pvc, err := d.GetCachedStatus().PersistentVolumeClaimReadInterface().Get(ctxChild, pvcName, meta.GetOptions{})
Expand All @@ -514,7 +516,7 @@ func (d *Deployment) GetTLSKeyfile(group api.ServerGroup, member api.MemberStatu
// If the secret does not exist, the error is ignored.
func (d *Deployment) DeleteTLSKeyfile(ctx context.Context, group api.ServerGroup, member api.MemberStatus) error {
secretName := k8sutil.CreateTLSKeyfileSecretName(d.GetName(), group.AsRole(), member.ID)
err := k8sutil.RunWithTimeout(ctx, func(ctxChild context.Context) error {
err := globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
return d.SecretsModInterface().Delete(ctxChild, secretName, meta.DeleteOptions{})
})
if err != nil && !k8sutil.IsNotFound(err) {
Expand Down Expand Up @@ -724,7 +726,7 @@ func (d *Deployment) ApplyPatchOnPod(ctx context.Context, pod *core.Pod, p ...pa

c := d.deps.KubeCli.CoreV1().Pods(pod.GetNamespace())

ctxChild, cancel := context.WithTimeout(ctx, k8sutil.GetRequestTimeout())
ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx)
defer cancel()
_, err = c.Patch(ctxChild, pod.GetName(), types.JSONPatchType, data, meta.PatchOptions{})
if err != nil {
Expand Down
Loading