From c5ec541119cba8b8444958f9b5f2e2f146f193d1 Mon Sep 17 00:00:00 2001 From: Tomasz Mielech Date: Tue, 12 Jul 2022 11:20:02 +0200 Subject: [PATCH] get member client from member status get arangosync member client from member status synchronize v2alpha1 --- CHANGELOG.md | 1 + .../v1/deployment_status_members.go | 9 ++-- .../v2alpha1/deployment_status_members.go | 9 ++-- pkg/deployment/context_impl.go | 26 +--------- pkg/deployment/member/state.go | 51 ++++++++++++++++++- pkg/deployment/reconcile/action_context.go | 22 +------- .../reconcile/action_encryption_refresh.go | 9 ++-- .../reconcile/action_jwt_refresh.go | 6 +-- .../reconcile/action_remove_member.go | 2 +- .../action_runtime_container_args_udpate.go | 6 +-- .../reconcile/action_set_license.go | 3 +- .../reconcile/action_tls_keyfile_refresh.go | 6 +-- .../reconcile/action_tls_sni_update.go | 6 +-- .../reconcile/action_wait_for_member_up.go | 4 +- pkg/deployment/reconcile/context.go | 3 +- pkg/deployment/reconcile/helper_shutdown.go | 6 +-- .../reconcile/plan_builder_context.go | 5 +- .../reconcile/plan_builder_encryption.go | 7 +-- pkg/deployment/reconcile/plan_builder_jwt.go | 2 +- .../reconcile/plan_builder_license.go | 2 +- pkg/deployment/reconcile/plan_builder_test.go | 19 ------- pkg/deployment/reconcile/plan_builder_tls.go | 2 +- .../reconcile/plan_builder_tls_sni.go | 3 +- pkg/deployment/resources/context.go | 1 - pkg/deployment/resources/member_cleanup.go | 8 +-- pkg/deployment/resources/pod_leader.go | 2 +- 26 files changed, 98 insertions(+), 122 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d0bd8ac7f..649a277fc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ - (Feature) Member restarts metric - (Bugfix) Infinite loop fix in ArangoD AsyncClient - (Bugfix) Add Panic Handler +- (Refactor) Use cached member's clients ## [1.2.13](https://github.com/arangodb/kube-arangodb/tree/1.2.13) (2022-06-07) - (Bugfix) Fix arangosync members state inspection diff --git a/pkg/apis/deployment/v1/deployment_status_members.go b/pkg/apis/deployment/v1/deployment_status_members.go index 59f228a2c..b7e8de30b 100644 --- a/pkg/apis/deployment/v1/deployment_status_members.go +++ b/pkg/apis/deployment/v1/deployment_status_members.go @@ -24,6 +24,9 @@ import ( "github.com/arangodb/kube-arangodb/pkg/util/errors" ) +// MemberStatusFunc is a callback which is used to traverse a specific group of servers and check their status. +type MemberStatusFunc func(group ServerGroup, list MemberStatusList) error + // DeploymentStatusMembers holds the member status of all server groups type DeploymentStatusMembers struct { Single MemberStatusList `json:"single,omitempty"` @@ -81,11 +84,11 @@ func (ds DeploymentStatusMembers) ElementByID(id string) (MemberStatus, ServerGr // ForeachServerGroup calls the given callback for all server groups. // If the callback returns an error, this error is returned and the callback is // not called for the remaining groups. -func (ds DeploymentStatusMembers) ForeachServerGroup(cb func(group ServerGroup, list MemberStatusList) error) error { +func (ds DeploymentStatusMembers) ForeachServerGroup(cb MemberStatusFunc) error { return ds.ForeachServerInGroups(cb, AllServerGroups...) } -func (ds DeploymentStatusMembers) ForeachServerInGroups(cb func(group ServerGroup, list MemberStatusList) error, groups ...ServerGroup) error { +func (ds DeploymentStatusMembers) ForeachServerInGroups(cb MemberStatusFunc, groups ...ServerGroup) error { for _, group := range groups { if err := ds.ForServerGroup(cb, group); err != nil { return err @@ -95,7 +98,7 @@ func (ds DeploymentStatusMembers) ForeachServerInGroups(cb func(group ServerGrou return nil } -func (ds DeploymentStatusMembers) ForServerGroup(cb func(group ServerGroup, list MemberStatusList) error, group ServerGroup) error { +func (ds DeploymentStatusMembers) ForServerGroup(cb MemberStatusFunc, group ServerGroup) error { switch group { case ServerGroupSingle: if err := cb(ServerGroupSingle, ds.Single); err != nil { diff --git a/pkg/apis/deployment/v2alpha1/deployment_status_members.go b/pkg/apis/deployment/v2alpha1/deployment_status_members.go index d4377ba9c..a33504790 100644 --- a/pkg/apis/deployment/v2alpha1/deployment_status_members.go +++ b/pkg/apis/deployment/v2alpha1/deployment_status_members.go @@ -24,6 +24,9 @@ import ( "github.com/arangodb/kube-arangodb/pkg/util/errors" ) +// MemberStatusFunc is a callback which is used to traverse a specific group of servers and check their status. +type MemberStatusFunc func(group ServerGroup, list MemberStatusList) error + // DeploymentStatusMembers holds the member status of all server groups type DeploymentStatusMembers struct { Single MemberStatusList `json:"single,omitempty"` @@ -81,11 +84,11 @@ func (ds DeploymentStatusMembers) ElementByID(id string) (MemberStatus, ServerGr // ForeachServerGroup calls the given callback for all server groups. // If the callback returns an error, this error is returned and the callback is // not called for the remaining groups. -func (ds DeploymentStatusMembers) ForeachServerGroup(cb func(group ServerGroup, list MemberStatusList) error) error { +func (ds DeploymentStatusMembers) ForeachServerGroup(cb MemberStatusFunc) error { return ds.ForeachServerInGroups(cb, AllServerGroups...) } -func (ds DeploymentStatusMembers) ForeachServerInGroups(cb func(group ServerGroup, list MemberStatusList) error, groups ...ServerGroup) error { +func (ds DeploymentStatusMembers) ForeachServerInGroups(cb MemberStatusFunc, groups ...ServerGroup) error { for _, group := range groups { if err := ds.ForServerGroup(cb, group); err != nil { return err @@ -95,7 +98,7 @@ func (ds DeploymentStatusMembers) ForeachServerInGroups(cb func(group ServerGrou return nil } -func (ds DeploymentStatusMembers) ForServerGroup(cb func(group ServerGroup, list MemberStatusList) error, group ServerGroup) error { +func (ds DeploymentStatusMembers) ForServerGroup(cb MemberStatusFunc, group ServerGroup) error { switch group { case ServerGroupSingle: if err := cb(ServerGroupSingle, ds.Single); err != nil { diff --git a/pkg/deployment/context_impl.go b/pkg/deployment/context_impl.go index 68efb6dc5..545bf6f81 100644 --- a/pkg/deployment/context_impl.go +++ b/pkg/deployment/context_impl.go @@ -203,30 +203,6 @@ func (d *Deployment) GetAuthentication() conn.Auth { return d.clientCache.GetAuth() } -// GetAgencyClients returns a client connection for every agency member. -func (d *Deployment) GetAgencyClients(ctx context.Context) ([]driver.Connection, error) { - return d.GetAgencyClientsWithPredicate(ctx, nil) -} - -// GetAgencyClientsWithPredicate returns a client connection for every agency member. -// If the given predicate is not nil, only agents are included where the given predicate returns true. -func (d *Deployment) GetAgencyClientsWithPredicate(ctx context.Context, predicate func(id string) bool) ([]driver.Connection, error) { - agencyMembers := d.status.last.Members.Agents - result := make([]driver.Connection, 0, len(agencyMembers)) - for _, m := range agencyMembers { - if predicate != nil && !predicate(m.ID) { - continue - } - client, err := d.GetServerClient(ctx, api.ServerGroupAgents, m.ID) - if err != nil { - return nil, errors.WithStack(err) - } - conn := client.Connection() - result = append(result, conn) - } - return result, nil -} - // GetAgency returns a connection to the agency. func (d *Deployment) GetAgency(ctx context.Context, agencyIDs ...string) (agency.Agency, error) { return d.clientCache.GetAgency(ctx, agencyIDs...) @@ -544,7 +520,7 @@ func (d *Deployment) EnableScalingCluster(ctx context.Context) error { return d.clusterScalingIntegration.EnableScalingCluster(ctx) } -// GetAgencyPlan returns agency plan +// GetAgencyData returns agency plan. func (d *Deployment) GetAgencyData(ctx context.Context, i interface{}, keyParts ...string) error { a, err := d.GetAgency(ctx) if err != nil { diff --git a/pkg/deployment/member/state.go b/pkg/deployment/member/state.go index d06862a16..bd182d48a 100644 --- a/pkg/deployment/member/state.go +++ b/pkg/deployment/member/state.go @@ -26,11 +26,13 @@ import ( "github.com/rs/zerolog" + "github.com/arangodb/arangosync-client/client" "github.com/arangodb/go-driver" api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" "github.com/arangodb/kube-arangodb/pkg/deployment/reconciler" "github.com/arangodb/kube-arangodb/pkg/logging" + "github.com/arangodb/kube-arangodb/pkg/util/errors" "github.com/arangodb/kube-arangodb/pkg/util/globals" ) @@ -40,6 +42,13 @@ type StateInspectorGetter interface { type StateInspector interface { RefreshState(ctx context.Context, members api.DeploymentStatusMemberElements) + + // GetMemberClient returns member connection to an ArangoDB server. + GetMemberClient(id string) (driver.Client, error) + + // GetMemberSyncClient returns member connection to an ArangoSync server. + GetMemberSyncClient(id string) (client.API, error) + MemberState(id string) (State, bool) Health() Health @@ -164,6 +173,7 @@ func (s *stateInspector) fetchArangosyncMemberState(ctx context.Context, m api.D "arangosync-build": v.Build, }, } + state.syncClient = c } return state } @@ -180,10 +190,43 @@ func (s *stateInspector) fetchServerMemberState(ctx context.Context, m api.Deplo state.NotReachableErr = err } else { state.Version = v + state.client = c } return state } +// GetMemberClient returns member client to a server. +func (s *stateInspector) GetMemberClient(id string) (driver.Client, error) { + if state, ok := s.MemberState(id); ok { + if state.NotReachableErr != nil { + // ArangoDB client can be set, but it might be old value. + return nil, state.NotReachableErr + } + + if state.client != nil { + return state.client, nil + } + } + + return nil, errors.Newf("failed to get ArangoDB member client: %s", id) +} + +// GetMemberSyncClient returns member client to a server. +func (s *stateInspector) GetMemberSyncClient(id string) (client.API, error) { + if state, ok := s.MemberState(id); ok { + if state.NotReachableErr != nil { + // ArangoSync client can be set, but it might be old value. + return nil, state.NotReachableErr + } + + if state.syncClient != nil { + return state.syncClient, nil + } + } + + return nil, errors.Newf("failed to get ArangoSync member client: %s", id) +} + func (s *stateInspector) MemberState(id string) (State, bool) { s.lock.Lock() defer s.lock.Unlock() @@ -203,10 +246,16 @@ type Health struct { Error error } +// State describes a state of a member. type State struct { + // NotReachableErr set to non-nil if a member is not reachable. NotReachableErr error - + // Version of this specific member. Version driver.VersionInfo + // client to this specific ArangoDB member. + client driver.Client + // client to this specific ArangoSync member. + syncClient client.API } func (s State) IsReachable() bool { diff --git a/pkg/deployment/reconcile/action_context.go b/pkg/deployment/reconcile/action_context.go index 89d71a354..e4e3ccb56 100644 --- a/pkg/deployment/reconcile/action_context.go +++ b/pkg/deployment/reconcile/action_context.go @@ -26,7 +26,6 @@ import ( core "k8s.io/api/core/v1" - "github.com/arangodb/arangosync-client/client" "github.com/arangodb/go-driver" backupApi "github.com/arangodb/kube-arangodb/pkg/apis/backup/v1" @@ -50,8 +49,7 @@ type ActionContext interface { reconciler.DeploymentPodRenderer reconciler.ArangoAgencyGet reconciler.DeploymentInfoGetter - reconciler.DeploymentClient - reconciler.DeploymentSyncClient + reconciler.DeploymentDatabaseClient member.StateInspectorGetter @@ -267,24 +265,6 @@ func (ac *actionContext) GetDatabaseClient(ctx context.Context) (driver.Client, return c, nil } -// GetServerClient returns a cached client for a specific server. -func (ac *actionContext) GetServerClient(ctx context.Context, group api.ServerGroup, id string) (driver.Client, error) { - c, err := ac.context.GetServerClient(ctx, group, id) - if err != nil { - return nil, errors.WithStack(err) - } - return c, nil -} - -// GetSyncServerClient returns a cached client for a specific arangosync server. -func (ac *actionContext) GetSyncServerClient(ctx context.Context, group api.ServerGroup, id string) (client.API, error) { - c, err := ac.context.GetSyncServerClient(ctx, group, id) - if err != nil { - return nil, errors.WithStack(err) - } - return c, nil -} - // GetMemberStatusByID returns the current member status // for the member with given id. // Returns member status, true when found, or false diff --git a/pkg/deployment/reconcile/action_encryption_refresh.go b/pkg/deployment/reconcile/action_encryption_refresh.go index 58b44daf3..8cc717851 100644 --- a/pkg/deployment/reconcile/action_encryption_refresh.go +++ b/pkg/deployment/reconcile/action_encryption_refresh.go @@ -55,15 +55,14 @@ func (a *encryptionKeyRefreshAction) Start(ctx context.Context) (bool, error) { func (a *encryptionKeyRefreshAction) CheckProgress(ctx context.Context) (bool, bool, error) { ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx) defer cancel() - keyfolder, err := a.actionCtx.ACS().CurrentClusterCache().Secret().V1().Read().Get(ctxChild, pod.GetEncryptionFolderSecretName(a.actionCtx.GetName()), meta.GetOptions{}) + keyFolder, err := a.actionCtx.ACS().CurrentClusterCache().Secret().V1().Read().Get(ctxChild, + pod.GetEncryptionFolderSecretName(a.actionCtx.GetName()), meta.GetOptions{}) if err != nil { a.log.Err(err).Error("Unable to fetch encryption folder") return true, false, nil } - ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx) - defer cancel() - c, err := a.actionCtx.GetServerClient(ctxChild, a.action.Group, a.action.MemberID) + c, err := a.actionCtx.GetMembersState().GetMemberClient(a.action.MemberID) if err != nil { a.log.Err(err).Warn("Unable to get client") return true, false, nil @@ -78,7 +77,7 @@ func (a *encryptionKeyRefreshAction) CheckProgress(ctx context.Context) (bool, b return true, false, nil } - if !e.Result.KeysPresent(keyfolder.Data) { + if !e.Result.KeysPresent(keyFolder.Data) { return false, false, nil } diff --git a/pkg/deployment/reconcile/action_jwt_refresh.go b/pkg/deployment/reconcile/action_jwt_refresh.go index 9cecddda6..c690938e9 100644 --- a/pkg/deployment/reconcile/action_jwt_refresh.go +++ b/pkg/deployment/reconcile/action_jwt_refresh.go @@ -56,15 +56,13 @@ func (a *jwtRefreshAction) CheckProgress(ctx context.Context) (bool, bool, error return true, false, nil } - ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx) - defer cancel() - c, err := a.actionCtx.GetServerClient(ctxChild, a.action.Group, a.action.MemberID) + c, err := a.actionCtx.GetMembersState().GetMemberClient(a.action.MemberID) if err != nil { a.log.Err(err).Warn("Unable to get client") return true, false, nil } - ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx) + ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx) defer cancel() if invalid, err := isMemberJWTTokenInvalid(ctxChild, client.NewClient(c.Connection()), folder.Data, true); err != nil { a.log.Err(err).Warn("Error while getting JWT Status") diff --git a/pkg/deployment/reconcile/action_remove_member.go b/pkg/deployment/reconcile/action_remove_member.go index 791a9c721..88de818e3 100644 --- a/pkg/deployment/reconcile/action_remove_member.go +++ b/pkg/deployment/reconcile/action_remove_member.go @@ -86,7 +86,7 @@ func (a *actionRemoveMember) Start(ctx context.Context) (bool, error) { if err := arangod.RemoveServerFromCluster(ctxChild, client.Connection(), driver.ServerID(m.ID)); err != nil { if !driver.IsNotFound(err) && !driver.IsPreconditionFailed(err) { a.log.Err(err).Str("member-id", m.ID).Error("Failed to remove server from cluster") - // ignore this error, maybe all coordinators are failed and no connction to cluster is possible + // ignore this error, maybe all coordinators are failed and no connection to cluster is possible } else if driver.IsPreconditionFailed(err) { health := a.actionCtx.GetMembersState().Health() if health.Error != nil { diff --git a/pkg/deployment/reconcile/action_runtime_container_args_udpate.go b/pkg/deployment/reconcile/action_runtime_container_args_udpate.go index a8f9e4dff..ac3615bed 100644 --- a/pkg/deployment/reconcile/action_runtime_container_args_udpate.go +++ b/pkg/deployment/reconcile/action_runtime_container_args_udpate.go @@ -248,9 +248,7 @@ func (a actionRuntimeContainerArgsUpdate) setLogLevel(ctx context.Context, logLe return nil } - ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx) - defer cancel() - cli, err := a.actionCtx.GetServerClient(ctxChild, a.action.Group, a.action.MemberID) + cli, err := a.actionCtx.GetMembersState().GetMemberClient(a.action.MemberID) if err != nil { return err } @@ -265,7 +263,7 @@ func (a actionRuntimeContainerArgsUpdate) setLogLevel(ctx context.Context, logLe return err } - ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx) + ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx) defer cancel() resp, err := conn.Do(ctxChild, req) if err != nil { diff --git a/pkg/deployment/reconcile/action_set_license.go b/pkg/deployment/reconcile/action_set_license.go index 941e0c916..52473545a 100644 --- a/pkg/deployment/reconcile/action_set_license.go +++ b/pkg/deployment/reconcile/action_set_license.go @@ -66,14 +66,13 @@ func (a *licenseSetAction) Start(ctx context.Context) (bool, error) { return true, nil } - group := a.action.Group m, ok := a.actionCtx.GetMemberStatusByID(a.action.MemberID) if !ok { a.log.Error("No such member") return true, nil } - c, err := a.actionCtx.GetServerClient(ctxChild, group, m.ID) + c, err := a.actionCtx.GetMembersState().GetMemberClient(m.ID) if !ok { a.log.Err(err).Error("Unable to get client") return true, nil diff --git a/pkg/deployment/reconcile/action_tls_keyfile_refresh.go b/pkg/deployment/reconcile/action_tls_keyfile_refresh.go index 388ca22a8..eb964b40a 100644 --- a/pkg/deployment/reconcile/action_tls_keyfile_refresh.go +++ b/pkg/deployment/reconcile/action_tls_keyfile_refresh.go @@ -48,9 +48,7 @@ type refreshTLSKeyfileCertificateAction struct { } func (a *refreshTLSKeyfileCertificateAction) CheckProgress(ctx context.Context) (bool, bool, error) { - ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx) - defer cancel() - c, err := a.actionCtx.GetServerClient(ctxChild, a.action.Group, a.action.MemberID) + c, err := a.actionCtx.GetMembersState().GetMemberClient(a.action.MemberID) if err != nil { a.log.Err(err).Warn("Unable to get client") return true, false, nil @@ -72,7 +70,7 @@ func (a *refreshTLSKeyfileCertificateAction) CheckProgress(ctx context.Context) client := client.NewClient(c.Connection()) - ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx) + ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx) defer cancel() e, err := client.RefreshTLS(ctxChild) if err != nil { diff --git a/pkg/deployment/reconcile/action_tls_sni_update.go b/pkg/deployment/reconcile/action_tls_sni_update.go index 58c666fc4..a7a1a6092 100644 --- a/pkg/deployment/reconcile/action_tls_sni_update.go +++ b/pkg/deployment/reconcile/action_tls_sni_update.go @@ -66,15 +66,13 @@ func (t *tlsSNIUpdate) CheckProgress(ctx context.Context) (bool, bool, error) { return true, false, nil } - ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx) - defer cancel() - c, err := t.actionCtx.GetServerClient(ctxChild, t.action.Group, t.action.MemberID) + c, err := t.actionCtx.GetMembersState().GetMemberClient(t.action.MemberID) if err != nil { t.log.Err(err).Warn("Unable to get client") return true, false, nil } - ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx) + ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx) defer cancel() if ok, err := compareTLSSNIConfig(ctxChild, c.Connection(), fetchedSecrets, true); err != nil { t.log.Err(err).Warn("Unable to compare TLS config") diff --git a/pkg/deployment/reconcile/action_wait_for_member_up.go b/pkg/deployment/reconcile/action_wait_for_member_up.go index baeaee982..62f5def19 100644 --- a/pkg/deployment/reconcile/action_wait_for_member_up.go +++ b/pkg/deployment/reconcile/action_wait_for_member_up.go @@ -110,7 +110,7 @@ func (a *actionWaitForMemberUp) checkProgressSingle(ctx context.Context) (bool, // checkProgressSingleInActiveFailover checks the progress of the action in the case // of a single server as part of an active failover deployment. func (a *actionWaitForMemberUp) checkProgressSingleInActiveFailover(ctx context.Context) (bool, bool, error) { - c, err := a.actionCtx.GetServerClient(ctx, a.action.Group, a.action.MemberID) + c, err := a.actionCtx.GetMembersState().GetMemberClient(a.action.MemberID) if err != nil { a.log.Err(err).Debug("Failed to create database client") return false, false, nil @@ -174,7 +174,7 @@ func (a *actionWaitForMemberUp) checkProgressCluster() (bool, bool, error) { // checkProgressArangoSync checks the progress of the action in the case // of a sync master / worker. func (a *actionWaitForMemberUp) checkProgressArangoSync(ctx context.Context) (bool, bool, error) { - c, err := a.actionCtx.GetSyncServerClient(ctx, a.action.Group, a.action.MemberID) + c, err := a.actionCtx.GetMembersState().GetMemberSyncClient(a.action.MemberID) if err != nil { a.log.Err(err).Debug("Failed to create arangosync client") return false, false, nil diff --git a/pkg/deployment/reconcile/context.go b/pkg/deployment/reconcile/context.go index 3caac6c10..b6bf258db 100644 --- a/pkg/deployment/reconcile/context.go +++ b/pkg/deployment/reconcile/context.go @@ -43,9 +43,8 @@ type Context interface { reconciler.ArangoAgencyGet reconciler.ArangoApplier reconciler.DeploymentInfoGetter - reconciler.DeploymentClient + reconciler.DeploymentDatabaseClient reconciler.KubernetesEventGenerator - reconciler.DeploymentSyncClient member.StateInspectorGetter diff --git a/pkg/deployment/reconcile/helper_shutdown.go b/pkg/deployment/reconcile/helper_shutdown.go index 64db20c69..afa553858 100644 --- a/pkg/deployment/reconcile/helper_shutdown.go +++ b/pkg/deployment/reconcile/helper_shutdown.go @@ -121,16 +121,14 @@ func (s shutdownHelperAPI) Start(ctx context.Context) (bool, error) { if group.IsArangod() { // Invoke shutdown endpoint - ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx) - defer cancel() - c, err := s.actionCtx.GetServerClient(ctxChild, group, s.action.MemberID) + c, err := s.actionCtx.GetMembersState().GetMemberClient(s.action.MemberID) if err != nil { s.log.Err(err).Debug("Failed to create member client") return false, errors.WithStack(err) } removeFromCluster := false s.log.Bool("removeFromCluster", removeFromCluster).Debug("Shutting down member") - ctxChild, cancel = context.WithTimeout(ctx, shutdownTimeout) + ctxChild, cancel := context.WithTimeout(ctx, shutdownTimeout) defer cancel() if err := c.ShutdownV2(ctxChild, removeFromCluster, true); err != nil { // Shutdown failed. Let's check if we're already done diff --git a/pkg/deployment/reconcile/plan_builder_context.go b/pkg/deployment/reconcile/plan_builder_context.go index fdb856f9e..1c8904299 100644 --- a/pkg/deployment/reconcile/plan_builder_context.go +++ b/pkg/deployment/reconcile/plan_builder_context.go @@ -25,6 +25,7 @@ import ( backupApi "github.com/arangodb/kube-arangodb/pkg/apis/backup/v1" "github.com/arangodb/kube-arangodb/pkg/deployment/acs/sutil" + "github.com/arangodb/kube-arangodb/pkg/deployment/member" "github.com/arangodb/kube-arangodb/pkg/deployment/reconciler" "github.com/arangodb/kube-arangodb/pkg/util/arangod/conn" ) @@ -37,9 +38,11 @@ type PlanBuilderContext interface { reconciler.DeploymentPodRenderer reconciler.DeploymentImageManager reconciler.ArangoAgencyGet - reconciler.DeploymentClient + reconciler.DeploymentDatabaseClient reconciler.KubernetesEventGenerator + member.StateInspectorGetter + sutil.ACSGetter // GetAuthentication return authentication for members GetAuthentication() conn.Auth diff --git a/pkg/deployment/reconcile/plan_builder_encryption.go b/pkg/deployment/reconcile/plan_builder_encryption.go index 9bed474a9..87b32ff82 100644 --- a/pkg/deployment/reconcile/plan_builder_encryption.go +++ b/pkg/deployment/reconcile/plan_builder_encryption.go @@ -260,10 +260,7 @@ func (r *Reconciler) isEncryptionKeyUpToDate(ctx context.Context, status api.Dep } log := r.log.Str("group", group.AsRole()).Str("member", m.ID) - - ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx) - defer cancel() - c, err := planCtx.GetServerClient(ctxChild, group, m.ID) + c, err := planCtx.GetMembersState().GetMemberClient(m.ID) if err != nil { log.Err(err).Warn("Unable to get client") return false, true @@ -271,7 +268,7 @@ func (r *Reconciler) isEncryptionKeyUpToDate(ctx context.Context, status api.Dep client := client.NewClient(c.Connection()) - ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx) + ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx) defer cancel() e, err := client.GetEncryption(ctxChild) if err != nil { diff --git a/pkg/deployment/reconcile/plan_builder_jwt.go b/pkg/deployment/reconcile/plan_builder_jwt.go index 05db27037..daa2dc934 100644 --- a/pkg/deployment/reconcile/plan_builder_jwt.go +++ b/pkg/deployment/reconcile/plan_builder_jwt.go @@ -233,7 +233,7 @@ func (r *Reconciler) isJWTTokenUpToDate(ctx context.Context, status api.Deployme log := r.planLogger.Str("group", group.AsRole()).Str("member", m.ID) - c, err := context.GetServerClient(ctx, group, m.ID) + c, err := context.GetMembersState().GetMemberClient(m.ID) if err != nil { log.Err(err).Warn("Unable to get client") return false, true diff --git a/pkg/deployment/reconcile/plan_builder_license.go b/pkg/deployment/reconcile/plan_builder_license.go index eba086214..2edfd8cc2 100644 --- a/pkg/deployment/reconcile/plan_builder_license.go +++ b/pkg/deployment/reconcile/plan_builder_license.go @@ -69,7 +69,7 @@ func (r *Reconciler) updateClusterLicense(ctx context.Context, apiObject k8sutil ctxChild, cancel := globals.GetGlobals().Timeouts().ArangoD().WithTimeout(ctx) defer cancel() - c, err := context.GetServerClient(ctxChild, member.Group, member.Member.ID) + c, err := context.GetMembersState().GetMemberClient(member.Member.ID) if err != nil { r.log.Err(err).Error("Unable to get client") return nil diff --git a/pkg/deployment/reconcile/plan_builder_test.go b/pkg/deployment/reconcile/plan_builder_test.go index 16a0dd26d..25534db5a 100644 --- a/pkg/deployment/reconcile/plan_builder_test.go +++ b/pkg/deployment/reconcile/plan_builder_test.go @@ -35,7 +35,6 @@ import ( meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" - "github.com/arangodb/arangosync-client/client" "github.com/arangodb/go-driver" "github.com/arangodb/go-driver/agency" @@ -141,16 +140,6 @@ func (c *testContext) GetNamespace() string { panic("implement me") } -func (c *testContext) GetAgencyClients(ctx context.Context) ([]driver.Connection, error) { - //TODO implement me - panic("implement me") -} - -func (c *testContext) GetAgencyClientsWithPredicate(ctx context.Context, predicate func(id string) bool) ([]driver.Connection, error) { - //TODO implement me - panic("implement me") -} - func (c *testContext) ApplyPatchOnPod(ctx context.Context, pod *core.Pod, p ...patch.Item) error { panic("implement me") } @@ -309,18 +298,10 @@ func (c *testContext) GetDatabaseClient(ctx context.Context) (driver.Client, err return nil, errors.Newf("Client Not Found") } -func (c *testContext) GetServerClient(ctx context.Context, group api.ServerGroup, id string) (driver.Client, error) { - panic("implement me") -} - func (c *testContext) GetAgency(_ context.Context, _ ...string) (agency.Agency, error) { panic("implement me") } -func (c *testContext) GetSyncServerClient(ctx context.Context, group api.ServerGroup, id string) (client.API, error) { - panic("implement me") -} - func (c *testContext) CreateMember(_ context.Context, group api.ServerGroup, id string, mods ...CreateMemberMod) (string, error) { panic("implement me") } diff --git a/pkg/deployment/reconcile/plan_builder_tls.go b/pkg/deployment/reconcile/plan_builder_tls.go index 05d3c91c9..dc4e2fd8d 100644 --- a/pkg/deployment/reconcile/plan_builder_tls.go +++ b/pkg/deployment/reconcile/plan_builder_tls.go @@ -572,7 +572,7 @@ func (r *Reconciler) keyfileRenewalRequired(ctx context.Context, apiObject k8sut // Ensure secret is propagated only on 3.7.0+ enterprise and inplace mode if mode == api.TLSRotateModeInPlace && group.IsArangod() { - conn, err := context.GetServerClient(ctx, group, member.ID) + conn, err := context.GetMembersState().GetMemberClient(member.ID) if err != nil { r.planLogger.Err(err).Warn("Unable to get client") return false, false diff --git a/pkg/deployment/reconcile/plan_builder_tls_sni.go b/pkg/deployment/reconcile/plan_builder_tls_sni.go index 0cd141042..4e27bbfe8 100644 --- a/pkg/deployment/reconcile/plan_builder_tls_sni.go +++ b/pkg/deployment/reconcile/plan_builder_tls_sni.go @@ -79,7 +79,8 @@ func (r *Reconciler) createRotateTLSServerSNIPlan(ctx context.Context, apiObject var c driver.Client err := globals.GetGlobalTimeouts().ArangoD().RunWithTimeout(ctx, func(ctxChild context.Context) error { var err error - c, err = planCtx.GetServerClient(ctxChild, group, m.ID) + c, err = planCtx.GetMembersState().GetMemberClient(m.ID) + return err }) if err != nil { diff --git a/pkg/deployment/resources/context.go b/pkg/deployment/resources/context.go index 24394da25..1b6ff7192 100644 --- a/pkg/deployment/resources/context.go +++ b/pkg/deployment/resources/context.go @@ -45,7 +45,6 @@ type Context interface { reconciler.ArangoApplier reconciler.DeploymentInfoGetter reconciler.DeploymentClient - reconciler.DeploymentSyncClient reconciler.KubernetesEventGenerator member.StateInspectorGetter diff --git a/pkg/deployment/resources/member_cleanup.go b/pkg/deployment/resources/member_cleanup.go index de364d67a..f401bd359 100644 --- a/pkg/deployment/resources/member_cleanup.go +++ b/pkg/deployment/resources/member_cleanup.go @@ -82,11 +82,7 @@ func (r *Resources) syncMembersInCluster(ctx context.Context, health memberState status, lastVersion := r.context.GetStatus() updateStatusNeeded := false - status.Members.ForeachServerGroup(func(group api.ServerGroup, list api.MemberStatusList) error { - if group != api.ServerGroupCoordinators && group != api.ServerGroupDBServers { - // We're not interested in these other groups - return nil - } + status.Members.ForeachServerInGroups(func(group api.ServerGroup, list api.MemberStatusList) error { for _, m := range list { log := log.Str("member", m.ID).Str("role", group.AsRole()) if serverFound(m.ID) { @@ -110,7 +106,7 @@ func (r *Resources) syncMembersInCluster(ctx context.Context, health memberState } } return nil - }) + }, api.ServerGroupCoordinators, api.ServerGroupDBServers) if updateStatusNeeded { log.Debug("UpdateStatus needed") diff --git a/pkg/deployment/resources/pod_leader.go b/pkg/deployment/resources/pod_leader.go index 53c9a96dc..bf69268f0 100644 --- a/pkg/deployment/resources/pod_leader.go +++ b/pkg/deployment/resources/pod_leader.go @@ -181,7 +181,7 @@ func (r *Resources) getSingleServerLeaderID(ctx context.Context) (string, error) go func(id string) { defer wg.Done() err := globals.GetGlobalTimeouts().ArangoD().RunWithTimeout(ctxCancel, func(ctxChild context.Context) error { - c, err := r.context.GetServerClient(ctxChild, api.ServerGroupSingle, id) + c, err := r.context.GetMembersState().GetMemberClient(id) if err != nil { return err }