From 79a205087e61f83d185c76b5a023895c76cf37fb Mon Sep 17 00:00:00 2001 From: ajanikow <12255597+ajanikow@users.noreply.github.com> Date: Fri, 15 Jul 2022 12:40:20 +0000 Subject: [PATCH 1/2] [Feature] Remove forgotten ArangoDB jobs during restart --- CHANGELOG.md | 1 + pkg/apis/deployment/v1/deployment_mode.go | 12 +++++ pkg/apis/deployment/v1/plan.go | 6 +-- .../deployment/v2alpha1/deployment_mode.go | 12 +++++ pkg/apis/deployment/v2alpha1/plan.go | 6 +-- pkg/deployment/client/client.go | 3 ++ pkg/deployment/client/jobs.go | 50 +++++++++++++++++++ pkg/deployment/reconcile/action_context.go | 39 +++++++++++++++ pkg/deployment/reconcile/helper_shutdown.go | 33 ++++++++++-- pkg/util/times.go | 4 +- 10 files changed, 156 insertions(+), 10 deletions(-) create mode 100644 pkg/deployment/client/jobs.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 3403690b7..9902194be 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ - (Bugfix) Ensure pod names not too long - (Refactor) Use cached member's clients - (Feature) Move PVC resize action to high-priority plan +- (Feature) Remove forgotten ArangoDB jobs during restart ## [1.2.14](https://github.com/arangodb/kube-arangodb/tree/1.2.14) (2022-07-14) - (Feature) Add ArangoSync TLS based rotation diff --git a/pkg/apis/deployment/v1/deployment_mode.go b/pkg/apis/deployment/v1/deployment_mode.go index 08c424e66..e7183fce9 100644 --- a/pkg/apis/deployment/v1/deployment_mode.go +++ b/pkg/apis/deployment/v1/deployment_mode.go @@ -96,6 +96,18 @@ func (m DeploymentMode) IsCluster() bool { return m == DeploymentModeCluster } +// ServingGroup returns mode serving group +func (m DeploymentMode) ServingGroup() ServerGroup { + switch m { + case DeploymentModeCluster: + return ServerGroupCoordinators + case DeploymentModeSingle, DeploymentModeActiveFailover: + return ServerGroupSingle + default: + return ServerGroupUnknown + } +} + // NewMode returns a reference to a string with given value. func NewMode(input DeploymentMode) *DeploymentMode { return &input diff --git a/pkg/apis/deployment/v1/plan.go b/pkg/apis/deployment/v1/plan.go index 446f0d3db..9c8d78855 100644 --- a/pkg/apis/deployment/v1/plan.go +++ b/pkg/apis/deployment/v1/plan.go @@ -364,7 +364,7 @@ func (p Plan) IsEmpty() bool { return len(p) == 0 } -// Add add action at the end of plan +// After add action at the end of plan func (p Plan) After(action ...Action) Plan { n := Plan{} @@ -375,7 +375,7 @@ func (p Plan) After(action ...Action) Plan { return n } -// Prefix add action at the beginning of plan +// Before add action at the beginning of plan func (p Plan) Before(action ...Action) Plan { n := Plan{} @@ -386,7 +386,7 @@ func (p Plan) Before(action ...Action) Plan { return n } -// Prefix add action at the beginning of plan +// Wrap wraps plan with actions func (p Plan) Wrap(before, after Action) Plan { n := Plan{} diff --git a/pkg/apis/deployment/v2alpha1/deployment_mode.go b/pkg/apis/deployment/v2alpha1/deployment_mode.go index d8e5c95d5..0295ad2ce 100644 --- a/pkg/apis/deployment/v2alpha1/deployment_mode.go +++ b/pkg/apis/deployment/v2alpha1/deployment_mode.go @@ -96,6 +96,18 @@ func (m DeploymentMode) IsCluster() bool { return m == DeploymentModeCluster } +// ServingGroup returns mode serving group +func (m DeploymentMode) ServingGroup() ServerGroup { + switch m { + case DeploymentModeCluster: + return ServerGroupCoordinators + case DeploymentModeSingle, DeploymentModeActiveFailover: + return ServerGroupSingle + default: + return ServerGroupUnknown + } +} + // NewMode returns a reference to a string with given value. func NewMode(input DeploymentMode) *DeploymentMode { return &input diff --git a/pkg/apis/deployment/v2alpha1/plan.go b/pkg/apis/deployment/v2alpha1/plan.go index 1b2ae03ab..8e8e65433 100644 --- a/pkg/apis/deployment/v2alpha1/plan.go +++ b/pkg/apis/deployment/v2alpha1/plan.go @@ -364,7 +364,7 @@ func (p Plan) IsEmpty() bool { return len(p) == 0 } -// Add add action at the end of plan +// After add action at the end of plan func (p Plan) After(action ...Action) Plan { n := Plan{} @@ -375,7 +375,7 @@ func (p Plan) After(action ...Action) Plan { return n } -// Prefix add action at the beginning of plan +// Before add action at the beginning of plan func (p Plan) Before(action ...Action) Plan { n := Plan{} @@ -386,7 +386,7 @@ func (p Plan) Before(action ...Action) Plan { return n } -// Prefix add action at the beginning of plan +// Wrap wraps plan with actions func (p Plan) Wrap(before, after Action) Plan { n := Plan{} diff --git a/pkg/deployment/client/client.go b/pkg/deployment/client/client.go index 14ff459fa..68d941430 100644 --- a/pkg/deployment/client/client.go +++ b/pkg/deployment/client/client.go @@ -23,6 +23,7 @@ package client import ( "context" "net/http" + "time" "github.com/arangodb/go-driver" ) @@ -44,6 +45,8 @@ type Client interface { GetJWT(ctx context.Context) (JWTDetails, error) RefreshJWT(ctx context.Context) (JWTDetails, error) + + DeleteExpiredJobs(ctx context.Context, timeout time.Duration) error } type client struct { diff --git a/pkg/deployment/client/jobs.go b/pkg/deployment/client/jobs.go new file mode 100644 index 000000000..f9e23a8a3 --- /dev/null +++ b/pkg/deployment/client/jobs.go @@ -0,0 +1,50 @@ +// +// DISCLAIMER +// +// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package client + +import ( + "context" + "fmt" + "net/http" + "time" +) + +const DeleteExpiredJobsURL = "/_api/job/expired" + +func (c *client) DeleteExpiredJobs(ctx context.Context, timeout time.Duration) error { + req, err := c.c.NewRequest(http.MethodDelete, DeleteExpiredJobsURL) + if err != nil { + return err + } + + req.SetQuery("stamp", fmt.Sprintf("%d", time.Now().Add(-1*timeout).Unix())) + + resp, err := c.c.Do(ctx, req) + if err != nil { + return err + } + + if err := resp.CheckStatus(http.StatusOK); err != nil { + return err + } + + return nil +} diff --git a/pkg/deployment/reconcile/action_context.go b/pkg/deployment/reconcile/action_context.go index e4e3ccb56..2459bb2ac 100644 --- a/pkg/deployment/reconcile/action_context.go +++ b/pkg/deployment/reconcile/action_context.go @@ -35,6 +35,7 @@ import ( "github.com/arangodb/kube-arangodb/pkg/deployment/member" "github.com/arangodb/kube-arangodb/pkg/deployment/reconciler" "github.com/arangodb/kube-arangodb/pkg/logging" + "github.com/arangodb/kube-arangodb/pkg/util" "github.com/arangodb/kube-arangodb/pkg/util/errors" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector" @@ -104,6 +105,11 @@ type ActionLocalsContext interface { Get(action api.Action, key api.PlanLocalKey) (string, bool) Add(key api.PlanLocalKey, value string, override bool) bool + + SetTime(key api.PlanLocalKey, t time.Time) bool + GetTime(action api.Action, key api.PlanLocalKey) (time.Time, bool) + + BackoffExecution(action api.Action, key api.PlanLocalKey, duration time.Duration) bool } // newActionContext creates a new ActionContext implementation. @@ -144,6 +150,39 @@ func (ac *actionContext) Get(action api.Action, key api.PlanLocalKey) (string, b return ac.locals.GetWithParent(action.Locals, key) } +func (ac *actionContext) BackoffExecution(action api.Action, key api.PlanLocalKey, duration time.Duration) bool { + t, ok := ac.GetTime(action, key) + if !ok { + // Reset as zero time + t = time.Time{} + } + + if t.IsZero() || time.Since(t) > duration { + // Execution is needed + ac.SetTime(key, time.Now()) + return true + } + + return false +} + +func (ac *actionContext) SetTime(key api.PlanLocalKey, t time.Time) bool { + return ac.Add(key, t.Format(util.TimeLayout), true) +} + +func (ac *actionContext) GetTime(action api.Action, key api.PlanLocalKey) (time.Time, bool) { + s, ok := ac.locals.GetWithParent(action.Locals, key) + if !ok { + return time.Time{}, false + } + + if t, err := time.Parse(util.TimeLayout, s); err != nil { + return time.Time{}, false + } else { + return t, true + } +} + func (ac *actionContext) Add(key api.PlanLocalKey, value string, override bool) bool { return ac.locals.Add(key, value, override) } diff --git a/pkg/deployment/reconcile/helper_shutdown.go b/pkg/deployment/reconcile/helper_shutdown.go index afa553858..82b832e08 100644 --- a/pkg/deployment/reconcile/helper_shutdown.go +++ b/pkg/deployment/reconcile/helper_shutdown.go @@ -22,11 +22,13 @@ package reconcile import ( "context" + "time" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/arangodb/kube-arangodb/pkg/apis/deployment" api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" + "github.com/arangodb/kube-arangodb/pkg/deployment/client" "github.com/arangodb/kube-arangodb/pkg/deployment/features" "github.com/arangodb/kube-arangodb/pkg/util" "github.com/arangodb/kube-arangodb/pkg/util/errors" @@ -34,6 +36,12 @@ import ( "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" ) +const ( + actionShutdownJobExpiredTermination api.PlanLocalKey = "expiredJobTerminationCheck" + actionShutdownJobExpiredTerminationDelay = 10 * time.Second + ActionShutdownJobExpiredTerminationTimeout = time.Minute +) + // getShutdownHelper returns an action to shut down a pod according to the settings. // Returns true when member status exists. // There are 3 possibilities to shut down the pod: immediately, gracefully, standard kubernetes delete API. @@ -150,9 +158,28 @@ func (s shutdownHelperAPI) Start(ctx context.Context) (bool, error) { } // CheckProgress returns true when pod is terminated. -func (s shutdownHelperAPI) CheckProgress(_ context.Context) (bool, bool, error) { - terminated := s.memberStatus.Conditions.IsTrue(api.ConditionTypeTerminated) - return terminated, false, nil +func (s shutdownHelperAPI) CheckProgress(ctx context.Context) (bool, bool, error) { + if s.memberStatus.Conditions.IsTrue(api.ConditionTypeTerminated) { + return true, false, nil + } + + if s.action.Group == s.actionCtx.GetMode().ServingGroup() { + if s.actionCtx.BackoffExecution(s.action, actionShutdownJobExpiredTermination, actionShutdownJobExpiredTerminationDelay) { + // Lets try to run termination + c, err := s.actionCtx.GetMembersState().GetMemberClient(s.action.MemberID) + if err != nil { + s.log.Err(err).Warn("Failed to create member client") + } else { + internal := client.NewClient(c.Connection()) + + if err := internal.DeleteExpiredJobs(ctx, ActionShutdownJobExpiredTerminationTimeout); err != nil { + s.log.Err(err).Warn("Unable to kill async jobs on member") + } + } + } + } + + return false, false, nil } type shutdownHelperDelete struct { diff --git a/pkg/util/times.go b/pkg/util/times.go index 46bdbee19..4aa0e65a0 100644 --- a/pkg/util/times.go +++ b/pkg/util/times.go @@ -27,6 +27,8 @@ import ( meta "k8s.io/apimachinery/pkg/apis/meta/v1" ) +const TimeLayout = time.RFC3339 + // TimeCompareEqual compares two times, allowing an error of 1s func TimeCompareEqual(a, b meta.Time) bool { return math.Abs(a.Time.Sub(b.Time).Seconds()) <= 1 @@ -45,7 +47,7 @@ func TimeCompareEqualPointer(a, b *meta.Time) bool { func TimeAgencyLayouts() []string { return []string{ - time.RFC3339, + TimeLayout, } } From f1229ca6cccf3732806b11d6a5e3389cad8dee54 Mon Sep 17 00:00:00 2001 From: ajanikow <12255597+ajanikow@users.noreply.github.com> Date: Fri, 15 Jul 2022 13:24:21 +0000 Subject: [PATCH 2/2] Change time to UTC --- pkg/deployment/client/jobs.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/deployment/client/jobs.go b/pkg/deployment/client/jobs.go index f9e23a8a3..49697ecbf 100644 --- a/pkg/deployment/client/jobs.go +++ b/pkg/deployment/client/jobs.go @@ -35,7 +35,7 @@ func (c *client) DeleteExpiredJobs(ctx context.Context, timeout time.Duration) e return err } - req.SetQuery("stamp", fmt.Sprintf("%d", time.Now().Add(-1*timeout).Unix())) + req.SetQuery("stamp", fmt.Sprintf("%d", time.Now().UTC().Add(-1*timeout).Unix())) resp, err := c.c.Do(ctx, req) if err != nil {