Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ run-unit-tests: $(GOBUILDDIR) $(SOURCES)
go test $(TESTVERBOSEOPTIONS) \
$(REPOPATH)/pkg/apis/deployment/v1alpha \
$(REPOPATH)/pkg/deployment \
$(REPOPATH)/pkg/deployment/reconcile \
$(REPOPATH)/pkg/util/k8sutil \
$(REPOPATH)/pkg/util/k8sutil/test

Expand Down
3 changes: 3 additions & 0 deletions pkg/deployment/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"

api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/deployment/reconcile"
"github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
"github.com/arangodb/kube-arangodb/pkg/util/retry"
Expand Down Expand Up @@ -92,6 +93,7 @@ type Deployment struct {
clientCache *clientCache
recentInspectionErrors int
clusterScalingIntegration *clusterScalingIntegration
reconciler *reconcile.Reconciler
}

// New creates a new Deployment from the given API object.
Expand All @@ -109,6 +111,7 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoDeployment) (*De
eventsCli: deps.KubeCli.Core().Events(apiObject.GetNamespace()),
clientCache: newClientCache(deps.KubeCli, apiObject),
}
d.reconciler = reconcile.NewReconciler(deps.Log, d)
if d.status.AcceptedSpec == nil {
// We've validated the spec, so let's use it from now.
d.status.AcceptedSpec = apiObject.Spec.DeepCopy()
Expand Down
4 changes: 2 additions & 2 deletions pkg/deployment/deployment_inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,13 @@ func (d *Deployment) inspectDeployment(lastInterval time.Duration) time.Duration
}

// Create scale/update plan
if err := d.createPlan(); err != nil {
if err := d.reconciler.CreatePlan(); err != nil {
hasError = true
d.createEvent(k8sutil.NewErrorEvent("Plan creation failed", err, d.apiObject))
}

// Execute current step of scale/update plan
retrySoon, err := d.executePlan(ctx)
retrySoon, err := d.reconciler.ExecutePlan(ctx)
if err != nil {
hasError = true
d.createEvent(k8sutil.NewErrorEvent("Plan execution failed", err, d.apiObject))
Expand Down
8 changes: 4 additions & 4 deletions pkg/deployment/pod_inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ func (d *Deployment) inspectPods() error {
log := d.deps.Log
var events []*v1.Event

pods, err := d.deps.KubeCli.CoreV1().Pods(d.apiObject.GetNamespace()).List(k8sutil.DeploymentListOpt(d.apiObject.GetName()))
pods, err := d.GetOwnedPods()
if err != nil {
log.Debug().Err(err).Msg("Failed to list pods")
log.Debug().Err(err).Msg("Failed to get owned pods")
return maskAny(err)
}

// Update member status from all pods found
for _, p := range pods.Items {
for _, p := range pods {
// Check ownership
if !d.isOwnerOf(&p) {
log.Debug().Str("pod", p.GetName()).Msg("pod not owned by this deployment")
Expand Down Expand Up @@ -101,7 +101,7 @@ func (d *Deployment) inspectPods() error {
}

podExists := func(podName string) bool {
for _, p := range pods.Items {
for _, p := range pods {
if p.GetName() == podName && d.isOwnerOf(&p) {
return true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// Author Ewout Prangsma
//

package deployment
package reconcile

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// Author Ewout Prangsma
//

package deployment
package reconcile

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// Author Ewout Prangsma
//

package deployment
package reconcile

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// Author Ewout Prangsma
//

package deployment
package reconcile

import (
"context"
Expand All @@ -29,11 +29,9 @@ import (
driver "github.com/arangodb/go-driver"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/util/arangod"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
)

// ActionContext provides methods to the Action implementations
Expand Down Expand Up @@ -68,28 +66,28 @@ type ActionContext interface {
}

// NewActionContext creates a new ActionContext implementation.
func NewActionContext(log zerolog.Logger, deployment *Deployment) ActionContext {
func NewActionContext(log zerolog.Logger, context ReconcileContext) ActionContext {
return &actionContext{
log: log,
deployment: deployment,
log: log,
context: context,
}
}

// actionContext implements ActionContext
type actionContext struct {
log zerolog.Logger
deployment *Deployment
log zerolog.Logger
context ReconcileContext
}

// Gets the specified mode of deployment
func (ac *actionContext) GetMode() api.DeploymentMode {
return ac.deployment.apiObject.Spec.GetMode()
return ac.context.GetSpec().GetMode()
}

// GetDatabaseClient returns a cached client for the entire database (cluster coordinators or single server),
// creating one if needed.
func (ac *actionContext) GetDatabaseClient(ctx context.Context) (driver.Client, error) {
c, err := ac.deployment.clientCache.GetDatabase(ctx)
c, err := ac.context.GetDatabaseClient(ctx)
if err != nil {
return nil, maskAny(err)
}
Expand All @@ -98,7 +96,7 @@ func (ac *actionContext) GetDatabaseClient(ctx context.Context) (driver.Client,

// GetServerClient returns a cached client for a specific server.
func (ac *actionContext) GetServerClient(ctx context.Context, group api.ServerGroup, id string) (driver.Client, error) {
c, err := ac.deployment.clientCache.Get(ctx, group, id)
c, err := ac.context.GetServerClient(ctx, group, id)
if err != nil {
return nil, maskAny(err)
}
Expand All @@ -107,55 +105,39 @@ func (ac *actionContext) GetServerClient(ctx context.Context, group api.ServerGr

// GetAgencyClients returns a client connection for every agency member.
func (ac *actionContext) GetAgencyClients(ctx context.Context) ([]arangod.Agency, error) {
agencyMembers := ac.deployment.status.Members.Agents
result := make([]arangod.Agency, 0, len(agencyMembers))
for _, m := range agencyMembers {
client, err := ac.GetServerClient(ctx, api.ServerGroupAgents, m.ID)
if err != nil {
return nil, maskAny(err)
}
aClient, err := arangod.NewAgencyClient(client)
if err != nil {
return nil, maskAny(err)
}
result = append(result, aClient)
c, err := ac.context.GetAgencyClients(ctx)
if err != nil {
return nil, maskAny(err)
}
return result, nil
return c, nil
}

// GetMemberStatusByID returns the current member status
// for the member with given id.
// Returns member status, true when found, or false
// when no such member is found.
func (ac *actionContext) GetMemberStatusByID(id string) (api.MemberStatus, bool) {
m, _, ok := ac.deployment.status.Members.ElementByID(id)
m, _, ok := ac.context.GetStatus().Members.ElementByID(id)
return m, ok
}

// CreateMember adds a new member to the given group.
func (ac *actionContext) CreateMember(group api.ServerGroup) error {
d := ac.deployment
if err := d.createMember(group, d.apiObject); err != nil {
ac.log.Debug().Err(err).Str("group", group.AsRole()).Msg("Failed to create member")
return maskAny(err)
}
// Save added member
if err := d.updateCRStatus(); err != nil {
log.Debug().Err(err).Msg("Updating CR status failed")
if err := ac.context.CreateMember(group); err != nil {
return maskAny(err)
}
return nil
}

// UpdateMember updates the deployment status wrt the given member.
func (ac *actionContext) UpdateMember(member api.MemberStatus) error {
d := ac.deployment
_, group, found := ac.deployment.status.Members.ElementByID(member.ID)
status := ac.context.GetStatus()
_, group, found := status.Members.ElementByID(member.ID)
if !found {
return maskAny(fmt.Errorf("Member %s not found", member.ID))
}
d.status.Members.UpdateMemberStatus(member, group)
if err := d.updateCRStatus(); err != nil {
status.Members.UpdateMemberStatus(member, group)
if err := ac.context.UpdateStatus(status); err != nil {
log.Debug().Err(err).Msg("Updating CR status failed")
return maskAny(err)
}
Expand All @@ -164,17 +146,17 @@ func (ac *actionContext) UpdateMember(member api.MemberStatus) error {

// RemoveMemberByID removes a member with given id.
func (ac *actionContext) RemoveMemberByID(id string) error {
d := ac.deployment
_, group, found := d.status.Members.ElementByID(id)
status := ac.context.GetStatus()
_, group, found := status.Members.ElementByID(id)
if !found {
return nil
}
if err := d.status.Members.RemoveByID(id, group); err != nil {
if err := status.Members.RemoveByID(id, group); err != nil {
log.Debug().Err(err).Str("group", group.AsRole()).Msg("Failed to remove member")
return maskAny(err)
}
// Save removed member
if err := d.updateCRStatus(); err != nil {
if err := ac.context.UpdateStatus(status); err != nil {
return maskAny(err)
}
return nil
Expand All @@ -183,10 +165,7 @@ func (ac *actionContext) RemoveMemberByID(id string) error {
// DeletePod deletes a pod with given name in the namespace
// of the deployment. If the pod does not exist, the error is ignored.
func (ac *actionContext) DeletePod(podName string) error {
d := ac.deployment
ns := d.apiObject.GetNamespace()
if err := d.deps.KubeCli.Core().Pods(ns).Delete(podName, &metav1.DeleteOptions{}); err != nil && !k8sutil.IsNotFound(err) {
log.Debug().Err(err).Str("pod", podName).Msg("Failed to remove pod")
if err := ac.context.DeletePod(podName); err != nil {
return maskAny(err)
}
return nil
Expand All @@ -195,10 +174,7 @@ func (ac *actionContext) DeletePod(podName string) error {
// DeletePvc deletes a persistent volume claim with given name in the namespace
// of the deployment. If the pvc does not exist, the error is ignored.
func (ac *actionContext) DeletePvc(pvcName string) error {
d := ac.deployment
ns := d.apiObject.GetNamespace()
if err := d.deps.KubeCli.Core().PersistentVolumeClaims(ns).Delete(pvcName, &metav1.DeleteOptions{}); err != nil && !k8sutil.IsNotFound(err) {
log.Debug().Err(err).Str("pvc", pvcName).Msg("Failed to remove pvc")
if err := ac.context.DeletePvc(pvcName); err != nil {
return maskAny(err)
}
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// Author Ewout Prangsma
//

package deployment
package reconcile

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// Author Ewout Prangsma
//

package deployment
package reconcile

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// Author Ewout Prangsma
//

package deployment
package reconcile

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// Author Ewout Prangsma
//

package deployment
package reconcile

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// Author Ewout Prangsma
//

package deployment
package reconcile

import (
"context"
Expand Down
29 changes: 29 additions & 0 deletions pkg/deployment/reconcile/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//

package reconcile

import "github.com/pkg/errors"

var (
maskAny = errors.WithStack
)
Loading