Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- (Maintenance) Add check make targets
- (Feature) Create support for local variables in actions.
- (Feature) Support for asynchronous ArangoD resquests.
- (Feature) Change Restore in Cluster mode to Async Request

## [1.2.11](https://github.com/arangodb/kube-arangodb/tree/1.2.11) (2022-04-30)
- (Bugfix) Orphan PVC are not removed
Expand Down
22 changes: 22 additions & 0 deletions pkg/deployment/client/client_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,16 @@ import (
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
)

type ConnectionWrap func(c driver.Connection) driver.Connection

type Cache interface {
GetAuth() conn.Auth

Connection(ctx context.Context, host string) (driver.Connection, error)

Get(ctx context.Context, group api.ServerGroup, id string) (driver.Client, error)
GetDatabase(ctx context.Context) (driver.Client, error)
GetDatabaseWithWrap(ctx context.Context, wraps ...ConnectionWrap) (driver.Client, error)
GetAgency(ctx context.Context) (agency.Agency, error)
}

Expand Down Expand Up @@ -144,6 +147,25 @@ func (cc *cache) GetDatabase(ctx context.Context) (driver.Client, error) {
}
}

func (cc *cache) GetDatabaseWithWrap(ctx context.Context, wraps ...ConnectionWrap) (driver.Client, error) {
c, err := cc.getDatabaseClient()
if err != nil {
return nil, err
}

conn := c.Connection()

for _, w := range wraps {
if w != nil {
conn = w(conn)
}
}

return driver.NewClient(driver.ClientConfig{
Connection: conn,
})
}

// GetAgency returns a cached client for the agency
func (cc *cache) GetAgency(ctx context.Context) (agency.Agency, error) {
cc.mutex.Lock()
Expand Down
8 changes: 8 additions & 0 deletions pkg/deployment/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,14 @@ func (d *Deployment) GetDatabaseClient(ctx context.Context) (driver.Client, erro
return c, nil
}

func (d *Deployment) GetDatabaseAsyncClient(ctx context.Context) (driver.Client, error) {
c, err := d.clientCache.GetDatabaseWithWrap(ctx, conn.NewAsyncConnection)
if err != nil {
return nil, errors.WithStack(err)
}
return c, nil
}

// GetServerClient returns a cached client for a specific server.
func (d *Deployment) GetServerClient(ctx context.Context, group api.ServerGroup, id string) (driver.Client, error) {
c, err := d.clientCache.Get(ctx, group, id)
Expand Down
130 changes: 118 additions & 12 deletions pkg/deployment/reconcile/action_backup_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,21 @@ import (
"github.com/arangodb/go-driver"
"github.com/rs/zerolog"

backupApi "github.com/arangodb/kube-arangodb/pkg/apis/backup/v1"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/util/arangod/conn"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
)

func init() {
registerAction(api.ActionTypeBackupRestore, newBackupRestoreAction, backupRestoreTimeout)
}

const (
actionBackupRestoreLocalJobID api.PlanLocalKey = "jobID"
actionBackupRestoreLocalBackupName api.PlanLocalKey = "backupName"
)

func newBackupRestoreAction(log zerolog.Logger, action api.Action, actionCtx ActionContext) Action {
a := &actionBackupRestore{}

Expand All @@ -47,8 +55,6 @@ func newBackupRestoreAction(log zerolog.Logger, action api.Action, actionCtx Act
type actionBackupRestore struct {
// actionImpl implement timeout and member id functions
actionImpl

actionEmptyCheckProgress
}

func (a actionBackupRestore) Start(ctx context.Context) (bool, error) {
Expand All @@ -64,13 +70,6 @@ func (a actionBackupRestore) Start(ctx context.Context) (bool, error) {
return true, nil
}

ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
defer cancel()
dbc, err := a.actionCtx.GetDatabaseClient(ctxChild)
if err != nil {
return false, err
}

backupResource, err := a.actionCtx.GetBackup(ctx, *spec.RestoreFrom)
if err != nil {
a.log.Error().Err(err).Msg("Unable to find backup")
Expand All @@ -96,15 +95,61 @@ func (a actionBackupRestore) Start(ctx context.Context) (bool, error) {
return false, err
}

switch mode := a.actionCtx.GetSpec().Mode.Get(); mode {
case api.DeploymentModeActiveFailover, api.DeploymentModeSingle:
return a.restoreSync(ctx, backupResource)
case api.DeploymentModeCluster:
return a.restoreAsync(ctx, backupResource)
default:
return false, errors.Newf("Unknown mode %s", mode)
}
}

func (a actionBackupRestore) restoreAsync(ctx context.Context, backup *backupApi.ArangoBackup) (bool, error) {
ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
defer cancel()

dbc, err := a.actionCtx.GetDatabaseAsyncClient(ctxChild)
if err != nil {
return false, errors.Wrapf(err, "Unable to create client")
}

ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
defer cancel()

if err := dbc.Backup().Restore(ctxChild, driver.BackupID(backup.Status.Backup.ID), nil); err != nil {
if id, ok := conn.IsAsyncJobInProgress(err); ok {
a.actionCtx.Add(actionBackupRestoreLocalJobID, id, true)
a.actionCtx.Add(actionBackupRestoreLocalBackupName, backup.GetName(), true)

// Async request has been send
return false, nil
} else {
return false, errors.Wrapf(err, "Unknown restore error")
}
}

return false, errors.Newf("Async response not received")
}

func (a actionBackupRestore) restoreSync(ctx context.Context, backup *backupApi.ArangoBackup) (bool, error) {
ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
defer cancel()
dbc, err := a.actionCtx.GetDatabaseClient(ctxChild)
if err != nil {
a.log.Debug().Err(err).Msg("Failed to create database client")
return false, nil
}

// The below action can take a while so the full parent timeout context is used.
restoreError := dbc.Backup().Restore(ctx, driver.BackupID(backupResource.Status.Backup.ID), nil)
restoreError := dbc.Backup().Restore(ctx, driver.BackupID(backup.Status.Backup.ID), nil)
if restoreError != nil {
a.log.Error().Err(restoreError).Msg("Restore failed")
}

if err := a.actionCtx.WithStatusUpdate(ctx, func(s *api.DeploymentStatus) bool {
result := &api.DeploymentRestoreResult{
RequestedFrom: spec.GetRestoreFrom(),
RequestedFrom: backup.GetName(),
}

if restoreError != nil {
Expand All @@ -118,9 +163,70 @@ func (a actionBackupRestore) Start(ctx context.Context) (bool, error) {

return true
}); err != nil {
a.log.Error().Err(err).Msg("Unable to ser restored state")
a.log.Error().Err(err).Msg("Unable to set restored state")
return false, err
}

return true, nil
}

func (a actionBackupRestore) CheckProgress(ctx context.Context) (bool, bool, error) {
backup, ok := a.actionCtx.Get(a.action, actionBackupRestoreLocalBackupName)
if !ok {
return false, false, errors.Newf("Local Key is missing in action: %s", actionBackupRestoreLocalBackupName)
}

job, ok := a.actionCtx.Get(a.action, actionBackupRestoreLocalJobID)
if !ok {
return false, false, errors.Newf("Local Key is missing in action: %s", actionBackupRestoreLocalJobID)
}

ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
defer cancel()

dbc, err := a.actionCtx.GetDatabaseAsyncClient(ctxChild)
if err != nil {
a.log.Debug().Err(err).Msg("Failed to create database client")
return false, false, nil
}

ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
defer cancel()

// Params does not matter in async fetch
restoreError := dbc.Backup().Restore(conn.WithAsyncID(ctxChild, job), "", nil)
if restoreError != nil {
if _, ok := conn.IsAsyncJobInProgress(restoreError); ok {
// Job still in progress
return false, false, nil
}

if errors.IsTemporary(restoreError) {
// Retry
return false, false, nil
}
}

// Restore is done

if err := a.actionCtx.WithStatusUpdate(ctx, func(s *api.DeploymentStatus) bool {
result := &api.DeploymentRestoreResult{
RequestedFrom: backup,
State: api.DeploymentRestoreStateRestored,
}

if restoreError != nil {
result.State = api.DeploymentRestoreStateRestoreFailed
result.Message = restoreError.Error()
}

s.Restore = result

return true
}); err != nil {
a.log.Error().Err(err).Msg("Unable to set restored state")
return false, false, err
}

return true, false, nil
}
4 changes: 4 additions & 0 deletions pkg/deployment/reconcile/action_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ type actionContext struct {
locals api.PlanLocals
}

func (ac *actionContext) GetDatabaseAsyncClient(ctx context.Context) (driver.Client, error) {
return ac.context.GetDatabaseAsyncClient(ctx)
}

func (ac *actionContext) CurrentLocals() api.PlanLocals {
return ac.locals
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/deployment/reconcile/plan_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ type testContext struct {
Inspector inspectorInterface.Inspector
}

func (c *testContext) GetDatabaseAsyncClient(ctx context.Context) (driver.Client, error) {
//TODO implement me
panic("implement me")
}

func (c *testContext) WithArangoMember(cache inspectorInterface.Inspector, timeout time.Duration, name string) reconciler.ArangoMemberModContext {
return reconciler.NewArangoMemberModContext(cache, timeout, name)
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/deployment/reconciler/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ type DeploymentDatabaseClient interface {
// GetDatabaseClient returns a cached client for the entire database (cluster coordinators or single server),
// creating one if needed.
GetDatabaseClient(ctx context.Context) (driver.Client, error)

// GetDatabaseAsyncClient returns a cached client for the entire database (cluster coordinators or single server),
// creating one if needed. Only in AsyncMode
GetDatabaseAsyncClient(ctx context.Context) (driver.Client, error)
}

type DeploymentMemberClient interface {
Expand Down
10 changes: 7 additions & 3 deletions pkg/util/arangod/conn/async_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@

package conn

import "fmt"
import (
"fmt"

"github.com/arangodb/kube-arangodb/pkg/util/errors"
)

func IsAsyncErrorNotFound(err error) bool {
if err == nil {
Expand All @@ -31,7 +35,7 @@ func IsAsyncErrorNotFound(err error) bool {
return true
}

return false
return IsAsyncErrorNotFound(errors.Cause(err))
}

func newAsyncErrorNotFound(id string) error {
Expand All @@ -57,7 +61,7 @@ func IsAsyncJobInProgress(err error) (string, bool) {
return v.jobID, true
}

return "", false
return IsAsyncJobInProgress(errors.Cause(err))
}

func newAsyncJobInProgress(id string) error {
Expand Down