Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
- (Bugfix) Ensure pod names not too long
- (Refactor) Use cached member's clients
- (Feature) Move PVC resize action to high-priority plan
- (Feature) Remove forgotten ArangoDB jobs during restart

## [1.2.14](https://github.com/arangodb/kube-arangodb/tree/1.2.14) (2022-07-14)
- (Feature) Add ArangoSync TLS based rotation
Expand Down
12 changes: 12 additions & 0 deletions pkg/apis/deployment/v1/deployment_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,18 @@ func (m DeploymentMode) IsCluster() bool {
return m == DeploymentModeCluster
}

// ServingGroup returns mode serving group
func (m DeploymentMode) ServingGroup() ServerGroup {
switch m {
case DeploymentModeCluster:
return ServerGroupCoordinators
case DeploymentModeSingle, DeploymentModeActiveFailover:
return ServerGroupSingle
default:
return ServerGroupUnknown
}
}

// NewMode returns a reference to a string with given value.
func NewMode(input DeploymentMode) *DeploymentMode {
return &input
Expand Down
6 changes: 3 additions & 3 deletions pkg/apis/deployment/v1/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func (p Plan) IsEmpty() bool {
return len(p) == 0
}

// Add add action at the end of plan
// After add action at the end of plan
func (p Plan) After(action ...Action) Plan {
n := Plan{}

Expand All @@ -375,7 +375,7 @@ func (p Plan) After(action ...Action) Plan {
return n
}

// Prefix add action at the beginning of plan
// Before add action at the beginning of plan
func (p Plan) Before(action ...Action) Plan {
n := Plan{}

Expand All @@ -386,7 +386,7 @@ func (p Plan) Before(action ...Action) Plan {
return n
}

// Prefix add action at the beginning of plan
// Wrap wraps plan with actions
func (p Plan) Wrap(before, after Action) Plan {
n := Plan{}

Expand Down
12 changes: 12 additions & 0 deletions pkg/apis/deployment/v2alpha1/deployment_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,18 @@ func (m DeploymentMode) IsCluster() bool {
return m == DeploymentModeCluster
}

// ServingGroup returns mode serving group
func (m DeploymentMode) ServingGroup() ServerGroup {
switch m {
case DeploymentModeCluster:
return ServerGroupCoordinators
case DeploymentModeSingle, DeploymentModeActiveFailover:
return ServerGroupSingle
default:
return ServerGroupUnknown
}
}

// NewMode returns a reference to a string with given value.
func NewMode(input DeploymentMode) *DeploymentMode {
return &input
Expand Down
6 changes: 3 additions & 3 deletions pkg/apis/deployment/v2alpha1/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func (p Plan) IsEmpty() bool {
return len(p) == 0
}

// Add add action at the end of plan
// After add action at the end of plan
func (p Plan) After(action ...Action) Plan {
n := Plan{}

Expand All @@ -375,7 +375,7 @@ func (p Plan) After(action ...Action) Plan {
return n
}

// Prefix add action at the beginning of plan
// Before add action at the beginning of plan
func (p Plan) Before(action ...Action) Plan {
n := Plan{}

Expand All @@ -386,7 +386,7 @@ func (p Plan) Before(action ...Action) Plan {
return n
}

// Prefix add action at the beginning of plan
// Wrap wraps plan with actions
func (p Plan) Wrap(before, after Action) Plan {
n := Plan{}

Expand Down
3 changes: 3 additions & 0 deletions pkg/deployment/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package client
import (
"context"
"net/http"
"time"

"github.com/arangodb/go-driver"
)
Expand All @@ -44,6 +45,8 @@ type Client interface {

GetJWT(ctx context.Context) (JWTDetails, error)
RefreshJWT(ctx context.Context) (JWTDetails, error)

DeleteExpiredJobs(ctx context.Context, timeout time.Duration) error
}

type client struct {
Expand Down
50 changes: 50 additions & 0 deletions pkg/deployment/client/jobs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//

package client

import (
"context"
"fmt"
"net/http"
"time"
)

const DeleteExpiredJobsURL = "/_api/job/expired"

func (c *client) DeleteExpiredJobs(ctx context.Context, timeout time.Duration) error {
req, err := c.c.NewRequest(http.MethodDelete, DeleteExpiredJobsURL)
if err != nil {
return err
}

req.SetQuery("stamp", fmt.Sprintf("%d", time.Now().UTC().Add(-1*timeout).Unix()))

resp, err := c.c.Do(ctx, req)
if err != nil {
return err
}

if err := resp.CheckStatus(http.StatusOK); err != nil {
return err
}

return nil
}
39 changes: 39 additions & 0 deletions pkg/deployment/reconcile/action_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/arangodb/kube-arangodb/pkg/deployment/member"
"github.com/arangodb/kube-arangodb/pkg/deployment/reconciler"
"github.com/arangodb/kube-arangodb/pkg/logging"
"github.com/arangodb/kube-arangodb/pkg/util"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector"
Expand Down Expand Up @@ -104,6 +105,11 @@ type ActionLocalsContext interface {

Get(action api.Action, key api.PlanLocalKey) (string, bool)
Add(key api.PlanLocalKey, value string, override bool) bool

SetTime(key api.PlanLocalKey, t time.Time) bool
GetTime(action api.Action, key api.PlanLocalKey) (time.Time, bool)

BackoffExecution(action api.Action, key api.PlanLocalKey, duration time.Duration) bool
}

// newActionContext creates a new ActionContext implementation.
Expand Down Expand Up @@ -144,6 +150,39 @@ func (ac *actionContext) Get(action api.Action, key api.PlanLocalKey) (string, b
return ac.locals.GetWithParent(action.Locals, key)
}

func (ac *actionContext) BackoffExecution(action api.Action, key api.PlanLocalKey, duration time.Duration) bool {
t, ok := ac.GetTime(action, key)
if !ok {
// Reset as zero time
t = time.Time{}
}

if t.IsZero() || time.Since(t) > duration {
// Execution is needed
ac.SetTime(key, time.Now())
return true
}

return false
}

func (ac *actionContext) SetTime(key api.PlanLocalKey, t time.Time) bool {
return ac.Add(key, t.Format(util.TimeLayout), true)
}

func (ac *actionContext) GetTime(action api.Action, key api.PlanLocalKey) (time.Time, bool) {
s, ok := ac.locals.GetWithParent(action.Locals, key)
if !ok {
return time.Time{}, false
}

if t, err := time.Parse(util.TimeLayout, s); err != nil {
return time.Time{}, false
} else {
return t, true
}
}

func (ac *actionContext) Add(key api.PlanLocalKey, value string, override bool) bool {
return ac.locals.Add(key, value, override)
}
Expand Down
33 changes: 30 additions & 3 deletions pkg/deployment/reconcile/helper_shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,26 @@ package reconcile

import (
"context"
"time"

meta "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/arangodb/kube-arangodb/pkg/apis/deployment"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/deployment/client"
"github.com/arangodb/kube-arangodb/pkg/deployment/features"
"github.com/arangodb/kube-arangodb/pkg/util"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/globals"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
)

const (
actionShutdownJobExpiredTermination api.PlanLocalKey = "expiredJobTerminationCheck"
actionShutdownJobExpiredTerminationDelay = 10 * time.Second
ActionShutdownJobExpiredTerminationTimeout = time.Minute
)

// getShutdownHelper returns an action to shut down a pod according to the settings.
// Returns true when member status exists.
// There are 3 possibilities to shut down the pod: immediately, gracefully, standard kubernetes delete API.
Expand Down Expand Up @@ -150,9 +158,28 @@ func (s shutdownHelperAPI) Start(ctx context.Context) (bool, error) {
}

// CheckProgress returns true when pod is terminated.
func (s shutdownHelperAPI) CheckProgress(_ context.Context) (bool, bool, error) {
terminated := s.memberStatus.Conditions.IsTrue(api.ConditionTypeTerminated)
return terminated, false, nil
func (s shutdownHelperAPI) CheckProgress(ctx context.Context) (bool, bool, error) {
if s.memberStatus.Conditions.IsTrue(api.ConditionTypeTerminated) {
return true, false, nil
}

if s.action.Group == s.actionCtx.GetMode().ServingGroup() {
if s.actionCtx.BackoffExecution(s.action, actionShutdownJobExpiredTermination, actionShutdownJobExpiredTerminationDelay) {
// Lets try to run termination
c, err := s.actionCtx.GetMembersState().GetMemberClient(s.action.MemberID)
if err != nil {
s.log.Err(err).Warn("Failed to create member client")
} else {
internal := client.NewClient(c.Connection())

if err := internal.DeleteExpiredJobs(ctx, ActionShutdownJobExpiredTerminationTimeout); err != nil {
s.log.Err(err).Warn("Unable to kill async jobs on member")
}
}
}
}

return false, false, nil
}

type shutdownHelperDelete struct {
Expand Down
4 changes: 3 additions & 1 deletion pkg/util/times.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const TimeLayout = time.RFC3339

// TimeCompareEqual compares two times, allowing an error of 1s
func TimeCompareEqual(a, b meta.Time) bool {
return math.Abs(a.Time.Sub(b.Time).Seconds()) <= 1
Expand All @@ -45,7 +47,7 @@ func TimeCompareEqualPointer(a, b *meta.Time) bool {

func TimeAgencyLayouts() []string {
return []string{
time.RFC3339,
TimeLayout,
}
}

Expand Down