Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- (Feature) Add `ACSDeploymentSynced` condition type and fix comparison of `SecretHashes` method
- (Feature) Add agency leader service
- (Feature) Add HostPath and PVC Volume types and allow templating
- (Feature) Replace mod

## [1.2.12](https://github.com/arangodb/kube-arangodb/tree/1.2.12) (2022-05-10)
- (Feature) Add CoreV1 Endpoints Inspector
Expand Down
8 changes: 4 additions & 4 deletions pkg/deployment/access_package.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (d *Deployment) createAccessPackages(ctx context.Context) error {
}

// Remove all access packages that we did build, but are no longer needed
secretList := d.currentState.Secret().V1().ListSimple()
secretList := d.acs.CurrentClusterCache().Secret().V1().ListSimple()
for _, secret := range secretList {
if d.isOwnerOf(secret) {
if _, found := secret.Data[constants.SecretAccessPackageYaml]; found {
Expand Down Expand Up @@ -100,7 +100,7 @@ func (d *Deployment) ensureAccessPackage(ctx context.Context, apSecretName strin
log := d.deps.Log
spec := d.apiObject.Spec

_, err := d.currentState.Secret().V1().Read().Get(ctx, apSecretName, metav1.GetOptions{})
_, err := d.acs.CurrentClusterCache().Secret().V1().Read().Get(ctx, apSecretName, metav1.GetOptions{})
if err == nil {
// Secret already exists
return nil
Expand All @@ -111,15 +111,15 @@ func (d *Deployment) ensureAccessPackage(ctx context.Context, apSecretName strin

// Fetch client authentication CA
clientAuthSecretName := spec.Sync.Authentication.GetClientCASecretName()
clientAuthCert, clientAuthKey, _, err := k8sutil.GetCASecret(ctx, d.currentState.Secret().V1().Read(), clientAuthSecretName, nil)
clientAuthCert, clientAuthKey, _, err := k8sutil.GetCASecret(ctx, d.acs.CurrentClusterCache().Secret().V1().Read(), clientAuthSecretName, nil)
if err != nil {
log.Debug().Err(err).Msg("Failed to get client-auth CA secret")
return errors.WithStack(err)
}

// Fetch TLS CA public key
tlsCASecretName := spec.Sync.TLS.GetCASecretName()
tlsCACert, err := k8sutil.GetCACertficateSecret(ctx, d.currentState.Secret().V1().Read(), tlsCASecretName)
tlsCACert, err := k8sutil.GetCACertficateSecret(ctx, d.acs.CurrentClusterCache().Secret().V1().Read(), tlsCASecretName)
if err != nil {
log.Debug().Err(err).Msg("Failed to get TLS CA secret")
return errors.WithStack(err)
Expand Down
16 changes: 8 additions & 8 deletions pkg/deployment/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,37 +624,37 @@ func (d *Deployment) WithStatusUpdate(ctx context.Context, action reconciler.Dep
}

func (d *Deployment) SecretsModInterface() secretv1.ModInterface {
d.currentState.GetThrottles().Secret().Invalidate()
d.acs.CurrentClusterCache().GetThrottles().Secret().Invalidate()
return kclient.NewModInterface(d.deps.Client, d.namespace).Secrets()
}

func (d *Deployment) PodsModInterface() podv1.ModInterface {
d.currentState.GetThrottles().Pod().Invalidate()
d.acs.CurrentClusterCache().GetThrottles().Pod().Invalidate()
return kclient.NewModInterface(d.deps.Client, d.namespace).Pods()
}

func (d *Deployment) ServiceAccountsModInterface() serviceaccountv1.ModInterface {
d.currentState.GetThrottles().ServiceAccount().Invalidate()
d.acs.CurrentClusterCache().GetThrottles().ServiceAccount().Invalidate()
return kclient.NewModInterface(d.deps.Client, d.namespace).ServiceAccounts()
}

func (d *Deployment) ServicesModInterface() servicev1.ModInterface {
d.currentState.GetThrottles().Service().Invalidate()
d.acs.CurrentClusterCache().GetThrottles().Service().Invalidate()
return kclient.NewModInterface(d.deps.Client, d.namespace).Services()
}

func (d *Deployment) PersistentVolumeClaimsModInterface() persistentvolumeclaimv1.ModInterface {
d.currentState.GetThrottles().PersistentVolumeClaim().Invalidate()
d.acs.CurrentClusterCache().GetThrottles().PersistentVolumeClaim().Invalidate()
return kclient.NewModInterface(d.deps.Client, d.namespace).PersistentVolumeClaims()
}

func (d *Deployment) PodDisruptionBudgetsModInterface() poddisruptionbudgetv1beta1.ModInterface {
d.currentState.GetThrottles().PodDisruptionBudget().Invalidate()
d.acs.CurrentClusterCache().GetThrottles().PodDisruptionBudget().Invalidate()
return kclient.NewModInterface(d.deps.Client, d.namespace).PodDisruptionBudgets()
}

func (d *Deployment) ServiceMonitorsModInterface() servicemonitorv1.ModInterface {
d.currentState.GetThrottles().ServiceMonitor().Invalidate()
d.acs.CurrentClusterCache().GetThrottles().ServiceMonitor().Invalidate()
return kclient.NewModInterface(d.deps.Client, d.namespace).ServiceMonitors()
}

Expand All @@ -679,7 +679,7 @@ func (d *Deployment) GetOwnedPods(ctx context.Context) ([]core.Pod, error) {
}

func (d *Deployment) GetCachedStatus() inspectorInterface.Inspector {
return d.currentState
return d.acs.CurrentClusterCache()
}

func (d *Deployment) ApplyPatchOnPod(ctx context.Context, pod *core.Pod, p ...patch.Item) error {
Expand Down
26 changes: 12 additions & 14 deletions pkg/deployment/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ type Deployment struct {
inspectCRDTrigger trigger.Trigger
updateDeploymentTrigger trigger.Trigger
clientCache deploymentClient.Cache
currentState inspectorInterface.Inspector
agencyCache agency.Cache
recentInspectionErrors int
clusterScalingIntegration *clusterScalingIntegration
Expand All @@ -143,7 +142,7 @@ func (d *Deployment) WithArangoMember(cache inspectorInterface.Inspector, timeou
}

func (d *Deployment) WithCurrentArangoMember(name string) reconciler.ArangoMemberModContext {
return d.WithArangoMember(d.currentState, globals.GetGlobals().Timeouts().Kubernetes().Get(), name)
return d.WithArangoMember(d.acs.CurrentClusterCache(), globals.GetGlobals().Timeouts().Kubernetes().Get(), name)
}

func (d *Deployment) GetMembersState() memberState.StateInspector {
Expand Down Expand Up @@ -227,16 +226,15 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoDeployment) (*De
i := inspector.NewInspector(inspector.NewDefaultThrottle(), deps.Client, apiObject.GetNamespace(), apiObject.GetName())

d := &Deployment{
apiObject: apiObject,
name: apiObject.GetName(),
namespace: apiObject.GetNamespace(),
config: config,
deps: deps,
eventCh: make(chan *deploymentEvent, deploymentEventQueueSize),
stopCh: make(chan struct{}),
agencyCache: agency.NewCache(apiObject.Spec.Mode),
currentState: i,
acs: acs.NewACS(apiObject.GetUID(), i),
apiObject: apiObject,
name: apiObject.GetName(),
namespace: apiObject.GetNamespace(),
config: config,
deps: deps,
eventCh: make(chan *deploymentEvent, deploymentEventQueueSize),
stopCh: make(chan struct{}),
agencyCache: agency.NewCache(apiObject.Spec.Mode),
acs: acs.NewACS(apiObject.GetUID(), i),
}

d.memberState = memberState.NewStateInspector(d)
Expand Down Expand Up @@ -348,7 +346,7 @@ func (d *Deployment) run() {
for {
select {
case <-d.stopCh:
err := d.currentState.Refresh(context.Background())
err := d.acs.CurrentClusterCache().Refresh(context.Background())
if err != nil {
log.Error().Err(err).Msg("Unable to get resources")
}
Expand Down Expand Up @@ -596,7 +594,7 @@ func (d *Deployment) isOwnerOf(obj meta.Object) bool {
func (d *Deployment) lookForServiceMonitorCRD() {
var err error
if d.GetScope().IsNamespaced() {
_, err = d.currentState.ServiceMonitor().V1()
_, err = d.acs.CurrentClusterCache().ServiceMonitor().V1()
if k8sutil.IsForbiddenOrNotFound(err) {
return
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/deployment/deployment_inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,14 @@ func (d *Deployment) inspectDeployment(lastInterval util.Interval) util.Interval
deploymentName := d.GetName()
defer metrics.SetDuration(inspectDeploymentDurationGauges.WithLabelValues(deploymentName), start)

err := d.currentState.Refresh(ctxReconciliation)
err := d.acs.CurrentClusterCache().Refresh(ctxReconciliation)
if err != nil {
log.Error().Err(err).Msg("Unable to get resources")
return minInspectionInterval // Retry ASAP
}

// Check deployment still exists
updated, err := d.currentState.GetCurrentArangoDeployment()
updated, err := d.acs.CurrentClusterCache().GetCurrentArangoDeployment()
if k8sutil.IsNotFound(err) {
// Deployment is gone
log.Info().Msg("Deployment is gone")
Expand Down
6 changes: 3 additions & 3 deletions pkg/deployment/deployment_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func runTestCase(t *testing.T, testCase testCaseStruct) {

errs := 0
for {
require.NoError(t, d.currentState.Refresh(context.Background()))
require.NoError(t, d.acs.CurrentClusterCache().Refresh(context.Background()))
err := d.resources.EnsureSecrets(context.Background(), log.Logger, d.GetCachedStatus())
if err == nil {
break
Expand Down Expand Up @@ -172,7 +172,7 @@ func runTestCase(t *testing.T, testCase testCaseStruct) {
return err
}

require.NoError(t, d.currentState.Refresh(context.Background()))
require.NoError(t, d.acs.CurrentClusterCache().Refresh(context.Background()))

groupSpec := d.apiObject.Spec.GetServerGroupSpec(group)

Expand Down Expand Up @@ -217,7 +217,7 @@ func runTestCase(t *testing.T, testCase testCaseStruct) {
}

// Act
require.NoError(t, d.currentState.Refresh(context.Background()))
require.NoError(t, d.acs.CurrentClusterCache().Refresh(context.Background()))
err = d.resources.EnsurePods(context.Background(), d.GetCachedStatus())

// Assert
Expand Down
21 changes: 11 additions & 10 deletions pkg/deployment/deployment_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,20 +480,21 @@ func createTestDeployment(t *testing.T, config Config, arangoDeployment *api.Ara
Client: kclient.NewStaticClient(kubernetesClientSet, kubernetesExtClientSet, arangoClientSet, monitoringClientSet),
}

i := inspector.NewInspector(throttle.NewAlwaysThrottleComponents(), deps.Client, arangoDeployment.GetNamespace(), arangoDeployment.GetName())

d := &Deployment{
apiObject: arangoDeployment,
name: arangoDeployment.GetName(),
namespace: arangoDeployment.GetNamespace(),
config: config,
deps: deps,
eventCh: make(chan *deploymentEvent, deploymentEventQueueSize),
stopCh: make(chan struct{}),
currentState: inspector.NewInspector(throttle.NewAlwaysThrottleComponents(), deps.Client, arangoDeployment.GetNamespace(), arangoDeployment.GetName()),
apiObject: arangoDeployment,
name: arangoDeployment.GetName(),
namespace: arangoDeployment.GetNamespace(),
config: config,
deps: deps,
eventCh: make(chan *deploymentEvent, deploymentEventQueueSize),
stopCh: make(chan struct{}),
}
d.clientCache = client.NewClientCache(d, conn.NewFactory(d.getAuth, d.getConnConfig))
d.acs = acs.NewACS("", d.currentState)
d.acs = acs.NewACS("", i)

require.NoError(t, d.currentState.Refresh(context.Background()))
require.NoError(t, d.acs.CurrentClusterCache().Refresh(context.Background()))

arangoDeployment.Spec.SetDefaults(arangoDeployment.GetName())
d.resources = resources.NewResources(deps.Log, d)
Expand Down
8 changes: 4 additions & 4 deletions pkg/deployment/images.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,14 @@ func (ib *imagesBuilder) fetchArangoDBImageIDAndVersion(ctx context.Context, cac
// Check if pod exists
ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx)
defer cancel()
pod, err := ib.Context.GetCachedStatus().Pod().V1().Read().Get(ctxChild, podName, metav1.GetOptions{})
pod, err := ib.Context.ACS().CurrentClusterCache().Pod().V1().Read().Get(ctxChild, podName, metav1.GetOptions{})
if err == nil {
// Pod found
if k8sutil.IsPodFailed(pod, utils.StringList{shared.ServerContainerName}) {
// Wait some time before deleting the pod
if time.Now().After(pod.GetCreationTimestamp().Add(30 * time.Second)) {
err := globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
return ib.Context.PodsModInterface().Delete(ctxChild, podName, metav1.DeleteOptions{})
return ib.Context.ACS().CurrentClusterCache().PodsModInterface().V1().Delete(ctxChild, podName, metav1.DeleteOptions{})
})
if err != nil && !k8sutil.IsNotFound(err) {
log.Warn().Err(err).Msg("Failed to delete Image ID Pod")
Expand Down Expand Up @@ -189,7 +189,7 @@ func (ib *imagesBuilder) fetchArangoDBImageIDAndVersion(ctx context.Context, cac

// We have all the info we need now, kill the pod and store the image info.
err = globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
return ib.Context.PodsModInterface().Delete(ctxChild, podName, metav1.DeleteOptions{})
return ib.Context.ACS().CurrentClusterCache().PodsModInterface().V1().Delete(ctxChild, podName, metav1.DeleteOptions{})
})
if err != nil && !k8sutil.IsNotFound(err) {
log.Warn().Err(err).Msg("Failed to delete Image ID Pod")
Expand Down Expand Up @@ -236,7 +236,7 @@ func (ib *imagesBuilder) fetchArangoDBImageIDAndVersion(ctx context.Context, cac
}

err = globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
_, _, err := resources.CreateArangoPod(ctxChild, ib.Context.PodsModInterface(), ib.APIObject, ib.Spec, api.ServerGroupImageDiscovery, pod)
_, _, err := resources.CreateArangoPod(ctxChild, ib.Context.ACS().CurrentClusterCache().PodsModInterface().V1(), ib.APIObject, ib.Spec, api.ServerGroupImageDiscovery, pod)
return err
})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/deployment/images_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func TestEnsureImages(t *testing.T) {
_, err := d.deps.Client.Arango().DatabaseV1().ArangoDeployments(testNamespace).Create(context.Background(), d.apiObject, metav1.CreateOptions{})
require.NoError(t, err)

require.NoError(t, d.currentState.Refresh(context.Background()))
require.NoError(t, d.acs.CurrentClusterCache().Refresh(context.Background()))

// Act
retrySoon, _, err := d.ensureImages(context.Background(), d.apiObject, d.GetCachedStatus())
Expand Down
24 changes: 12 additions & 12 deletions pkg/deployment/informers.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,19 @@ func (d *Deployment) listenForPodEvents(stopCh <-chan struct{}) {
&v1.Pod{},
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
d.currentState.GetThrottles().Pod().Invalidate()
d.acs.CurrentClusterCache().GetThrottles().Pod().Invalidate()
if p, ok := getPod(obj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
d.currentState.GetThrottles().Pod().Invalidate()
d.acs.CurrentClusterCache().GetThrottles().Pod().Invalidate()
if p, ok := getPod(newObj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
DeleteFunc: func(obj interface{}) {
d.currentState.GetThrottles().Pod().Invalidate()
d.acs.CurrentClusterCache().GetThrottles().Pod().Invalidate()
if p, ok := getPod(obj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
Expand Down Expand Up @@ -96,19 +96,19 @@ func (d *Deployment) listenForPVCEvents(stopCh <-chan struct{}) {
&v1.PersistentVolumeClaim{},
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
d.currentState.GetThrottles().PersistentVolumeClaim().Invalidate()
d.acs.CurrentClusterCache().GetThrottles().PersistentVolumeClaim().Invalidate()
if p, ok := getPVC(obj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
d.currentState.GetThrottles().PersistentVolumeClaim().Invalidate()
d.acs.CurrentClusterCache().GetThrottles().PersistentVolumeClaim().Invalidate()
if p, ok := getPVC(newObj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
DeleteFunc: func(obj interface{}) {
d.currentState.GetThrottles().PersistentVolumeClaim().Invalidate()
d.acs.CurrentClusterCache().GetThrottles().PersistentVolumeClaim().Invalidate()
if p, ok := getPVC(obj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
Expand Down Expand Up @@ -142,19 +142,19 @@ func (d *Deployment) listenForSecretEvents(stopCh <-chan struct{}) {
cache.ResourceEventHandlerFuncs{
// Note: For secrets we look at all of them because they do not have to be owned by this deployment.
AddFunc: func(obj interface{}) {
d.currentState.GetThrottles().Secret().Invalidate()
d.acs.CurrentClusterCache().GetThrottles().Secret().Invalidate()
if getSecret(obj) {
d.triggerInspection()
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
d.currentState.GetThrottles().Secret().Invalidate()
d.acs.CurrentClusterCache().GetThrottles().Secret().Invalidate()
if getSecret(newObj) {
d.triggerInspection()
}
},
DeleteFunc: func(obj interface{}) {
d.currentState.GetThrottles().Secret().Invalidate()
d.acs.CurrentClusterCache().GetThrottles().Secret().Invalidate()
if getSecret(obj) {
d.triggerInspection()
}
Expand Down Expand Up @@ -187,19 +187,19 @@ func (d *Deployment) listenForServiceEvents(stopCh <-chan struct{}) {
&v1.Service{},
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
d.currentState.GetThrottles().Service().Invalidate()
d.acs.CurrentClusterCache().GetThrottles().Service().Invalidate()
if s, ok := getService(obj); ok && d.isOwnerOf(s) {
d.triggerInspection()
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
d.currentState.GetThrottles().Service().Invalidate()
d.acs.CurrentClusterCache().GetThrottles().Service().Invalidate()
if s, ok := getService(newObj); ok && d.isOwnerOf(s) {
d.triggerInspection()
}
},
DeleteFunc: func(obj interface{}) {
d.currentState.GetThrottles().Service().Invalidate()
d.acs.CurrentClusterCache().GetThrottles().Service().Invalidate()
if s, ok := getService(obj); ok && d.isOwnerOf(s) {
d.triggerInspection()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/deployment/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (i *inventory) Collect(m chan<- prometheus.Metric) {
for _, deployment := range deployments {
p.Push(i.deploymentsMetric.Gauge(1, deployment.GetNamespace(), deployment.GetName()))

if state := deployment.currentState; state != nil {
if state := deployment.acs.CurrentClusterCache(); state != nil {
t := state.GetThrottles()

for _, c := range throttle.AllComponents() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/deployment/reconcile/action_bootstrap_set_password.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (a actionBootstrapSetPassword) ensureUserPasswordSecret(ctx context.Context
token := hex.EncodeToString(tokenData)
owner := a.actionCtx.GetAPIObject().AsOwner()

err := k8sutil.CreateBasicAuthSecret(ctx, a.actionCtx.SecretsModInterface(), secret, user, token, &owner)
err := k8sutil.CreateBasicAuthSecret(ctx, a.actionCtx.ACS().CurrentClusterCache().SecretsModInterface().V1(), secret, user, token, &owner)
if err != nil {
return "", err
}
Expand Down
Loading