From b06301d012e864dc3e0749d5c3a24e3ed4be81fa Mon Sep 17 00:00:00 2001 From: ajanikow <12255597+ajanikow@users.noreply.github.com> Date: Sun, 8 May 2022 20:55:23 +0000 Subject: [PATCH 1/3] [Feature] Change Restore in Cluster mode to Async Request --- CHANGELOG.md | 1 + pkg/deployment/client/client_cache.go | 22 +++ pkg/deployment/context_impl.go | 8 ++ .../reconcile/action_backup_restore.go | 131 ++++++++++++++++-- pkg/deployment/reconcile/action_context.go | 4 + pkg/deployment/reconcile/plan_builder_test.go | 5 + pkg/deployment/reconciler/context.go | 4 + 7 files changed, 163 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 857c735c2..10abdf092 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ - (Maintenance) Add check make targets - (Feature) Create support for local variables in actions. - (Feature) Support for asynchronous ArangoD resquests. +- (Feature) Change Restore in Cluster mode to Async Request ## [1.2.11](https://github.com/arangodb/kube-arangodb/tree/1.2.11) (2022-04-30) - (Bugfix) Orphan PVC are not removed diff --git a/pkg/deployment/client/client_cache.go b/pkg/deployment/client/client_cache.go index 9fb161f72..763c9ef07 100644 --- a/pkg/deployment/client/client_cache.go +++ b/pkg/deployment/client/client_cache.go @@ -36,6 +36,8 @@ import ( "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" ) +type ConnectionWrap func(c driver.Connection) driver.Connection + type Cache interface { GetAuth() conn.Auth @@ -43,6 +45,7 @@ type Cache interface { Get(ctx context.Context, group api.ServerGroup, id string) (driver.Client, error) GetDatabase(ctx context.Context) (driver.Client, error) + GetDatabaseWithWrap(ctx context.Context, wraps ...ConnectionWrap) (driver.Client, error) GetAgency(ctx context.Context) (agency.Agency, error) } @@ -144,6 +147,25 @@ func (cc *cache) GetDatabase(ctx context.Context) (driver.Client, error) { } } +func (cc *cache) GetDatabaseWithWrap(ctx context.Context, wraps ...ConnectionWrap) (driver.Client, error) { + c, err := cc.getDatabaseClient() + if err != nil { + return nil, err + } + + conn := c.Connection() + + for _, w := range wraps { + if w != nil { + conn = w(conn) + } + } + + return driver.NewClient(driver.ClientConfig{ + Connection: conn, + }) +} + // GetAgency returns a cached client for the agency func (cc *cache) GetAgency(ctx context.Context) (agency.Agency, error) { cc.mutex.Lock() diff --git a/pkg/deployment/context_impl.go b/pkg/deployment/context_impl.go index 9a965c2cb..c0424e023 100644 --- a/pkg/deployment/context_impl.go +++ b/pkg/deployment/context_impl.go @@ -187,6 +187,14 @@ func (d *Deployment) GetDatabaseClient(ctx context.Context) (driver.Client, erro return c, nil } +func (d *Deployment) GetDatabaseAsyncClient(ctx context.Context) (driver.Client, error) { + c, err := d.clientCache.GetDatabaseWithWrap(ctx, conn.NewAsyncConnection) + if err != nil { + return nil, errors.WithStack(err) + } + return c, nil +} + // GetServerClient returns a cached client for a specific server. func (d *Deployment) GetServerClient(ctx context.Context, group api.ServerGroup, id string) (driver.Client, error) { c, err := d.clientCache.Get(ctx, group, id) diff --git a/pkg/deployment/reconcile/action_backup_restore.go b/pkg/deployment/reconcile/action_backup_restore.go index 73b99e03c..861ccbed3 100644 --- a/pkg/deployment/reconcile/action_backup_restore.go +++ b/pkg/deployment/reconcile/action_backup_restore.go @@ -28,13 +28,21 @@ import ( "github.com/arangodb/go-driver" "github.com/rs/zerolog" + backupApi "github.com/arangodb/kube-arangodb/pkg/apis/backup/v1" api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" + "github.com/arangodb/kube-arangodb/pkg/util/arangod/conn" + "github.com/arangodb/kube-arangodb/pkg/util/errors" ) func init() { registerAction(api.ActionTypeBackupRestore, newBackupRestoreAction, backupRestoreTimeout) } +const ( + actionBackupRestoreLocalJobID api.PlanLocalKey = "jobID" + actionBackupRestoreLocalBackupName api.PlanLocalKey = "backupName" +) + func newBackupRestoreAction(log zerolog.Logger, action api.Action, actionCtx ActionContext) Action { a := &actionBackupRestore{} @@ -47,8 +55,6 @@ func newBackupRestoreAction(log zerolog.Logger, action api.Action, actionCtx Act type actionBackupRestore struct { // actionImpl implement timeout and member id functions actionImpl - - actionEmptyCheckProgress } func (a actionBackupRestore) Start(ctx context.Context) (bool, error) { @@ -64,13 +70,6 @@ func (a actionBackupRestore) Start(ctx context.Context) (bool, error) { return true, nil } - ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx) - defer cancel() - dbc, err := a.actionCtx.GetDatabaseClient(ctxChild) - if err != nil { - return false, err - } - backupResource, err := a.actionCtx.GetBackup(ctx, *spec.RestoreFrom) if err != nil { a.log.Error().Err(err).Msg("Unable to find backup") @@ -96,15 +95,62 @@ func (a actionBackupRestore) Start(ctx context.Context) (bool, error) { return false, err } + switch mode := a.actionCtx.GetSpec().Mode.Get(); mode { + case api.DeploymentModeActiveFailover, api.DeploymentModeSingle: + return a.restoreSync(ctx, backupResource) + case api.DeploymentModeCluster: + return a.restoreAsync(ctx, backupResource) + default: + return false, errors.Newf("Unknown mode %s", mode) + } +} + +func (a actionBackupRestore) restoreAsync(ctx context.Context, backup *backupApi.ArangoBackup) (bool, error) { + ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx) + defer cancel() + + dbc, err := a.actionCtx.GetDatabaseAsyncClient(ctxChild) + if err != nil { + return false, err + } + + ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx) + defer cancel() + + restoreError := dbc.Backup().Restore(ctxChild, driver.BackupID(backup.Status.Backup.ID), nil) + if restoreError != nil { + if id, ok := conn.IsAsyncJobInProgress(restoreError); ok { + a.actionCtx.Add(actionBackupRestoreLocalJobID, id, true) + a.actionCtx.Add(actionBackupRestoreLocalBackupName, backup.GetName(), true) + + // Async request has been send + return false, nil + } else { + return false, restoreError + } + } + + return false, errors.Newf("Async response not received") +} + +func (a actionBackupRestore) restoreSync(ctx context.Context, backup *backupApi.ArangoBackup) (bool, error) { + ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx) + defer cancel() + dbc, err := a.actionCtx.GetDatabaseClient(ctxChild) + if err != nil { + a.log.Debug().Err(err).Msg("Failed to create database client") + return false, nil + } + // The below action can take a while so the full parent timeout context is used. - restoreError := dbc.Backup().Restore(ctx, driver.BackupID(backupResource.Status.Backup.ID), nil) + restoreError := dbc.Backup().Restore(ctx, driver.BackupID(backup.Status.Backup.ID), nil) if restoreError != nil { a.log.Error().Err(restoreError).Msg("Restore failed") } if err := a.actionCtx.WithStatusUpdate(ctx, func(s *api.DeploymentStatus) bool { result := &api.DeploymentRestoreResult{ - RequestedFrom: spec.GetRestoreFrom(), + RequestedFrom: backup.GetName(), } if restoreError != nil { @@ -118,9 +164,70 @@ func (a actionBackupRestore) Start(ctx context.Context) (bool, error) { return true }); err != nil { - a.log.Error().Err(err).Msg("Unable to ser restored state") + a.log.Error().Err(err).Msg("Unable to set restored state") return false, err } return true, nil } + +func (a actionBackupRestore) CheckProgress(ctx context.Context) (bool, bool, error) { + backup, ok := a.actionCtx.Get(a.action, actionBackupRestoreLocalBackupName) + if !ok { + return false, false, errors.Newf("Local Key is missing in action: %s", actionBackupRestoreLocalBackupName) + } + + job, ok := a.actionCtx.Get(a.action, actionBackupRestoreLocalJobID) + if !ok { + return false, false, errors.Newf("Local Key is missing in action: %s", actionBackupRestoreLocalJobID) + } + + ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx) + defer cancel() + + dbc, err := a.actionCtx.GetDatabaseAsyncClient(ctxChild) + if err != nil { + a.log.Debug().Err(err).Msg("Failed to create database client") + return false, false, nil + } + + ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx) + defer cancel() + + // Params does not matter in async fetch + restoreError := dbc.Backup().Restore(conn.WithAsyncID(ctxChild, job), "", nil) + if restoreError != nil { + if _, ok := conn.IsAsyncJobInProgress(restoreError); ok { + // Job still in progress + return false, false, nil + } + + if errors.IsTemporary(restoreError) { + // Retry + return false, false, nil + } + } + + // Restore is done + + if err := a.actionCtx.WithStatusUpdate(ctx, func(s *api.DeploymentStatus) bool { + result := &api.DeploymentRestoreResult{ + RequestedFrom: backup, + State: api.DeploymentRestoreStateRestored, + } + + if restoreError != nil { + result.State = api.DeploymentRestoreStateRestoreFailed + result.Message = restoreError.Error() + } + + s.Restore = result + + return true + }); err != nil { + a.log.Error().Err(err).Msg("Unable to set restored state") + return false, false, err + } + + return true, false, nil +} diff --git a/pkg/deployment/reconcile/action_context.go b/pkg/deployment/reconcile/action_context.go index d1993738d..e4f00a1af 100644 --- a/pkg/deployment/reconcile/action_context.go +++ b/pkg/deployment/reconcile/action_context.go @@ -155,6 +155,10 @@ type actionContext struct { locals api.PlanLocals } +func (ac *actionContext) GetDatabaseAsyncClient(ctx context.Context) (driver.Client, error) { + return ac.context.GetDatabaseAsyncClient(ctx) +} + func (ac *actionContext) CurrentLocals() api.PlanLocals { return ac.locals } diff --git a/pkg/deployment/reconcile/plan_builder_test.go b/pkg/deployment/reconcile/plan_builder_test.go index 1edaf3f7b..874cda876 100644 --- a/pkg/deployment/reconcile/plan_builder_test.go +++ b/pkg/deployment/reconcile/plan_builder_test.go @@ -85,6 +85,11 @@ type testContext struct { Inspector inspectorInterface.Inspector } +func (c *testContext) GetDatabaseAsyncClient(ctx context.Context) (driver.Client, error) { + //TODO implement me + panic("implement me") +} + func (c *testContext) WithArangoMember(cache inspectorInterface.Inspector, timeout time.Duration, name string) reconciler.ArangoMemberModContext { return reconciler.NewArangoMemberModContext(cache, timeout, name) } diff --git a/pkg/deployment/reconciler/context.go b/pkg/deployment/reconciler/context.go index a7a230f04..397b16500 100644 --- a/pkg/deployment/reconciler/context.go +++ b/pkg/deployment/reconciler/context.go @@ -165,6 +165,10 @@ type DeploymentDatabaseClient interface { // GetDatabaseClient returns a cached client for the entire database (cluster coordinators or single server), // creating one if needed. GetDatabaseClient(ctx context.Context) (driver.Client, error) + + // GetDatabaseAsyncClient returns a cached client for the entire database (cluster coordinators or single server), + // creating one if needed. Only in AsyncMode + GetDatabaseAsyncClient(ctx context.Context) (driver.Client, error) } type DeploymentMemberClient interface { From 3bcff1ea88dac05d4014d494a43bf1ad49b0c7ee Mon Sep 17 00:00:00 2001 From: ajanikow <12255597+ajanikow@users.noreply.github.com> Date: Mon, 9 May 2022 02:38:08 +0000 Subject: [PATCH 2/3] Add error wrap --- pkg/deployment/reconcile/action_backup_restore.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/pkg/deployment/reconcile/action_backup_restore.go b/pkg/deployment/reconcile/action_backup_restore.go index 861ccbed3..fea991a75 100644 --- a/pkg/deployment/reconcile/action_backup_restore.go +++ b/pkg/deployment/reconcile/action_backup_restore.go @@ -111,22 +111,21 @@ func (a actionBackupRestore) restoreAsync(ctx context.Context, backup *backupApi dbc, err := a.actionCtx.GetDatabaseAsyncClient(ctxChild) if err != nil { - return false, err + return false, errors.Wrapf(err, "Unable to create client") } ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx) defer cancel() - restoreError := dbc.Backup().Restore(ctxChild, driver.BackupID(backup.Status.Backup.ID), nil) - if restoreError != nil { - if id, ok := conn.IsAsyncJobInProgress(restoreError); ok { + if err := dbc.Backup().Restore(ctxChild, driver.BackupID(backup.Status.Backup.ID), nil); err != nil { + if id, ok := conn.IsAsyncJobInProgress(err); ok { a.actionCtx.Add(actionBackupRestoreLocalJobID, id, true) a.actionCtx.Add(actionBackupRestoreLocalBackupName, backup.GetName(), true) // Async request has been send return false, nil } else { - return false, restoreError + return false, errors.Wrapf(err, "Unknown restore error") } } From bc60735c3c84567d5f291340db8f7423c9c178f7 Mon Sep 17 00:00:00 2001 From: ajanikow <12255597+ajanikow@users.noreply.github.com> Date: Mon, 9 May 2022 08:23:22 +0000 Subject: [PATCH 3/3] Add case for causer --- pkg/util/arangod/conn/async_errors.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/pkg/util/arangod/conn/async_errors.go b/pkg/util/arangod/conn/async_errors.go index 81340e8a6..77f3834ac 100644 --- a/pkg/util/arangod/conn/async_errors.go +++ b/pkg/util/arangod/conn/async_errors.go @@ -20,7 +20,11 @@ package conn -import "fmt" +import ( + "fmt" + + "github.com/arangodb/kube-arangodb/pkg/util/errors" +) func IsAsyncErrorNotFound(err error) bool { if err == nil { @@ -31,7 +35,7 @@ func IsAsyncErrorNotFound(err error) bool { return true } - return false + return IsAsyncErrorNotFound(errors.Cause(err)) } func newAsyncErrorNotFound(id string) error { @@ -57,7 +61,7 @@ func IsAsyncJobInProgress(err error) (string, bool) { return v.jobID, true } - return "", false + return IsAsyncJobInProgress(errors.Cause(err)) } func newAsyncJobInProgress(id string) error {