Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## [master](https://github.com/arangodb/kube-arangodb/tree/master) (N/A)
- (Bugfix) Ensure pod names not too long
- (Refactor) Use cached member's clients

## [1.2.14](https://github.com/arangodb/kube-arangodb/tree/1.2.14) (2022-07-14)
- (Feature) Add ArangoSync TLS based rotation
Expand Down
9 changes: 6 additions & 3 deletions pkg/apis/deployment/v1/deployment_status_members.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (
"github.com/arangodb/kube-arangodb/pkg/util/errors"
)

// MemberStatusFunc is a callback which is used to traverse a specific group of servers and check their status.
type MemberStatusFunc func(group ServerGroup, list MemberStatusList) error

// DeploymentStatusMembers holds the member status of all server groups
type DeploymentStatusMembers struct {
Single MemberStatusList `json:"single,omitempty"`
Expand Down Expand Up @@ -81,11 +84,11 @@ func (ds DeploymentStatusMembers) ElementByID(id string) (MemberStatus, ServerGr
// ForeachServerGroup calls the given callback for all server groups.
// If the callback returns an error, this error is returned and the callback is
// not called for the remaining groups.
func (ds DeploymentStatusMembers) ForeachServerGroup(cb func(group ServerGroup, list MemberStatusList) error) error {
func (ds DeploymentStatusMembers) ForeachServerGroup(cb MemberStatusFunc) error {
return ds.ForeachServerInGroups(cb, AllServerGroups...)
}

func (ds DeploymentStatusMembers) ForeachServerInGroups(cb func(group ServerGroup, list MemberStatusList) error, groups ...ServerGroup) error {
func (ds DeploymentStatusMembers) ForeachServerInGroups(cb MemberStatusFunc, groups ...ServerGroup) error {
for _, group := range groups {
if err := ds.ForServerGroup(cb, group); err != nil {
return err
Expand All @@ -95,7 +98,7 @@ func (ds DeploymentStatusMembers) ForeachServerInGroups(cb func(group ServerGrou
return nil
}

func (ds DeploymentStatusMembers) ForServerGroup(cb func(group ServerGroup, list MemberStatusList) error, group ServerGroup) error {
func (ds DeploymentStatusMembers) ForServerGroup(cb MemberStatusFunc, group ServerGroup) error {
switch group {
case ServerGroupSingle:
if err := cb(ServerGroupSingle, ds.Single); err != nil {
Expand Down
9 changes: 6 additions & 3 deletions pkg/apis/deployment/v2alpha1/deployment_status_members.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (
"github.com/arangodb/kube-arangodb/pkg/util/errors"
)

// MemberStatusFunc is a callback which is used to traverse a specific group of servers and check their status.
type MemberStatusFunc func(group ServerGroup, list MemberStatusList) error

// DeploymentStatusMembers holds the member status of all server groups
type DeploymentStatusMembers struct {
Single MemberStatusList `json:"single,omitempty"`
Expand Down Expand Up @@ -81,11 +84,11 @@ func (ds DeploymentStatusMembers) ElementByID(id string) (MemberStatus, ServerGr
// ForeachServerGroup calls the given callback for all server groups.
// If the callback returns an error, this error is returned and the callback is
// not called for the remaining groups.
func (ds DeploymentStatusMembers) ForeachServerGroup(cb func(group ServerGroup, list MemberStatusList) error) error {
func (ds DeploymentStatusMembers) ForeachServerGroup(cb MemberStatusFunc) error {
return ds.ForeachServerInGroups(cb, AllServerGroups...)
}

func (ds DeploymentStatusMembers) ForeachServerInGroups(cb func(group ServerGroup, list MemberStatusList) error, groups ...ServerGroup) error {
func (ds DeploymentStatusMembers) ForeachServerInGroups(cb MemberStatusFunc, groups ...ServerGroup) error {
for _, group := range groups {
if err := ds.ForServerGroup(cb, group); err != nil {
return err
Expand All @@ -95,7 +98,7 @@ func (ds DeploymentStatusMembers) ForeachServerInGroups(cb func(group ServerGrou
return nil
}

func (ds DeploymentStatusMembers) ForServerGroup(cb func(group ServerGroup, list MemberStatusList) error, group ServerGroup) error {
func (ds DeploymentStatusMembers) ForServerGroup(cb MemberStatusFunc, group ServerGroup) error {
switch group {
case ServerGroupSingle:
if err := cb(ServerGroupSingle, ds.Single); err != nil {
Expand Down
26 changes: 1 addition & 25 deletions pkg/deployment/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,30 +203,6 @@ func (d *Deployment) GetAuthentication() conn.Auth {
return d.clientCache.GetAuth()
}

// GetAgencyClients returns a client connection for every agency member.
func (d *Deployment) GetAgencyClients(ctx context.Context) ([]driver.Connection, error) {
return d.GetAgencyClientsWithPredicate(ctx, nil)
}

// GetAgencyClientsWithPredicate returns a client connection for every agency member.
// If the given predicate is not nil, only agents are included where the given predicate returns true.
func (d *Deployment) GetAgencyClientsWithPredicate(ctx context.Context, predicate func(id string) bool) ([]driver.Connection, error) {
agencyMembers := d.status.last.Members.Agents
result := make([]driver.Connection, 0, len(agencyMembers))
for _, m := range agencyMembers {
if predicate != nil && !predicate(m.ID) {
continue
}
client, err := d.GetServerClient(ctx, api.ServerGroupAgents, m.ID)
if err != nil {
return nil, errors.WithStack(err)
}
conn := client.Connection()
result = append(result, conn)
}
return result, nil
}

// GetAgency returns a connection to the agency.
func (d *Deployment) GetAgency(ctx context.Context, agencyIDs ...string) (agency.Agency, error) {
return d.clientCache.GetAgency(ctx, agencyIDs...)
Expand Down Expand Up @@ -544,7 +520,7 @@ func (d *Deployment) EnableScalingCluster(ctx context.Context) error {
return d.clusterScalingIntegration.EnableScalingCluster(ctx)
}

// GetAgencyPlan returns agency plan
// GetAgencyData returns agency plan.
func (d *Deployment) GetAgencyData(ctx context.Context, i interface{}, keyParts ...string) error {
a, err := d.GetAgency(ctx)
if err != nil {
Expand Down
51 changes: 50 additions & 1 deletion pkg/deployment/member/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ import (

"github.com/rs/zerolog"

"github.com/arangodb/arangosync-client/client"
"github.com/arangodb/go-driver"

api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/deployment/reconciler"
"github.com/arangodb/kube-arangodb/pkg/logging"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/globals"
)

Expand All @@ -40,6 +42,13 @@ type StateInspectorGetter interface {

type StateInspector interface {
RefreshState(ctx context.Context, members api.DeploymentStatusMemberElements)

// GetMemberClient returns member connection to an ArangoDB server.
GetMemberClient(id string) (driver.Client, error)

// GetMemberSyncClient returns member connection to an ArangoSync server.
GetMemberSyncClient(id string) (client.API, error)

MemberState(id string) (State, bool)

Health() Health
Expand Down Expand Up @@ -164,6 +173,7 @@ func (s *stateInspector) fetchArangosyncMemberState(ctx context.Context, m api.D
"arangosync-build": v.Build,
},
}
state.syncClient = c
}
return state
}
Expand All @@ -180,10 +190,43 @@ func (s *stateInspector) fetchServerMemberState(ctx context.Context, m api.Deplo
state.NotReachableErr = err
} else {
state.Version = v
state.client = c
}
return state
}

// GetMemberClient returns member client to a server.
func (s *stateInspector) GetMemberClient(id string) (driver.Client, error) {
if state, ok := s.MemberState(id); ok {
if state.NotReachableErr != nil {
// ArangoDB client can be set, but it might be old value.
return nil, state.NotReachableErr
}

if state.client != nil {
return state.client, nil
}
}

return nil, errors.Newf("failed to get ArangoDB member client: %s", id)
}

// GetMemberSyncClient returns member client to a server.
func (s *stateInspector) GetMemberSyncClient(id string) (client.API, error) {
if state, ok := s.MemberState(id); ok {
if state.NotReachableErr != nil {
// ArangoSync client can be set, but it might be old value.
return nil, state.NotReachableErr
}

if state.syncClient != nil {
return state.syncClient, nil
}
}

return nil, errors.Newf("failed to get ArangoSync member client: %s", id)
}

func (s *stateInspector) MemberState(id string) (State, bool) {
s.lock.Lock()
defer s.lock.Unlock()
Expand All @@ -203,10 +246,16 @@ type Health struct {
Error error
}

// State describes a state of a member.
type State struct {
// NotReachableErr set to non-nil if a member is not reachable.
NotReachableErr error

// Version of this specific member.
Version driver.VersionInfo
// client to this specific ArangoDB member.
client driver.Client
// client to this specific ArangoSync member.
syncClient client.API
}

func (s State) IsReachable() bool {
Expand Down
22 changes: 1 addition & 21 deletions pkg/deployment/reconcile/action_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (

core "k8s.io/api/core/v1"

"github.com/arangodb/arangosync-client/client"
"github.com/arangodb/go-driver"

backupApi "github.com/arangodb/kube-arangodb/pkg/apis/backup/v1"
Expand All @@ -50,8 +49,7 @@ type ActionContext interface {
reconciler.DeploymentPodRenderer
reconciler.ArangoAgencyGet
reconciler.DeploymentInfoGetter
reconciler.DeploymentClient
reconciler.DeploymentSyncClient
reconciler.DeploymentDatabaseClient

member.StateInspectorGetter

Expand Down Expand Up @@ -267,24 +265,6 @@ func (ac *actionContext) GetDatabaseClient(ctx context.Context) (driver.Client,
return c, nil
}

// GetServerClient returns a cached client for a specific server.
func (ac *actionContext) GetServerClient(ctx context.Context, group api.ServerGroup, id string) (driver.Client, error) {
c, err := ac.context.GetServerClient(ctx, group, id)
if err != nil {
return nil, errors.WithStack(err)
}
return c, nil
}

// GetSyncServerClient returns a cached client for a specific arangosync server.
func (ac *actionContext) GetSyncServerClient(ctx context.Context, group api.ServerGroup, id string) (client.API, error) {
c, err := ac.context.GetSyncServerClient(ctx, group, id)
if err != nil {
return nil, errors.WithStack(err)
}
return c, nil
}

// GetMemberStatusByID returns the current member status
// for the member with given id.
// Returns member status, true when found, or false
Expand Down
9 changes: 4 additions & 5 deletions pkg/deployment/reconcile/action_encryption_refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,14 @@ func (a *encryptionKeyRefreshAction) Start(ctx context.Context) (bool, error) {
func (a *encryptionKeyRefreshAction) CheckProgress(ctx context.Context) (bool, bool, error) {
ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx)
defer cancel()
keyfolder, err := a.actionCtx.ACS().CurrentClusterCache().Secret().V1().Read().Get(ctxChild, pod.GetEncryptionFolderSecretName(a.actionCtx.GetName()), meta.GetOptions{})
keyFolder, err := a.actionCtx.ACS().CurrentClusterCache().Secret().V1().Read().Get(ctxChild,
pod.GetEncryptionFolderSecretName(a.actionCtx.GetName()), meta.GetOptions{})
if err != nil {
a.log.Err(err).Error("Unable to fetch encryption folder")
return true, false, nil
}

ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
defer cancel()
c, err := a.actionCtx.GetServerClient(ctxChild, a.action.Group, a.action.MemberID)
c, err := a.actionCtx.GetMembersState().GetMemberClient(a.action.MemberID)
if err != nil {
a.log.Err(err).Warn("Unable to get client")
return true, false, nil
Expand All @@ -78,7 +77,7 @@ func (a *encryptionKeyRefreshAction) CheckProgress(ctx context.Context) (bool, b
return true, false, nil
}

if !e.Result.KeysPresent(keyfolder.Data) {
if !e.Result.KeysPresent(keyFolder.Data) {
return false, false, nil
}

Expand Down
6 changes: 2 additions & 4 deletions pkg/deployment/reconcile/action_jwt_refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,13 @@ func (a *jwtRefreshAction) CheckProgress(ctx context.Context) (bool, bool, error
return true, false, nil
}

ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
defer cancel()
c, err := a.actionCtx.GetServerClient(ctxChild, a.action.Group, a.action.MemberID)
c, err := a.actionCtx.GetMembersState().GetMemberClient(a.action.MemberID)
if err != nil {
a.log.Err(err).Warn("Unable to get client")
return true, false, nil
}

ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
defer cancel()
if invalid, err := isMemberJWTTokenInvalid(ctxChild, client.NewClient(c.Connection()), folder.Data, true); err != nil {
a.log.Err(err).Warn("Error while getting JWT Status")
Expand Down
2 changes: 1 addition & 1 deletion pkg/deployment/reconcile/action_remove_member.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (a *actionRemoveMember) Start(ctx context.Context) (bool, error) {
if err := arangod.RemoveServerFromCluster(ctxChild, client.Connection(), driver.ServerID(m.ID)); err != nil {
if !driver.IsNotFound(err) && !driver.IsPreconditionFailed(err) {
a.log.Err(err).Str("member-id", m.ID).Error("Failed to remove server from cluster")
// ignore this error, maybe all coordinators are failed and no connction to cluster is possible
// ignore this error, maybe all coordinators are failed and no connection to cluster is possible
} else if driver.IsPreconditionFailed(err) {
health := a.actionCtx.GetMembersState().Health()
if health.Error != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,7 @@ func (a actionRuntimeContainerArgsUpdate) setLogLevel(ctx context.Context, logLe
return nil
}

ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
defer cancel()
cli, err := a.actionCtx.GetServerClient(ctxChild, a.action.Group, a.action.MemberID)
cli, err := a.actionCtx.GetMembersState().GetMemberClient(a.action.MemberID)
if err != nil {
return err
}
Expand All @@ -265,7 +263,7 @@ func (a actionRuntimeContainerArgsUpdate) setLogLevel(ctx context.Context, logLe
return err
}

ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
defer cancel()
resp, err := conn.Do(ctxChild, req)
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions pkg/deployment/reconcile/action_set_license.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,13 @@ func (a *licenseSetAction) Start(ctx context.Context) (bool, error) {
return true, nil
}

group := a.action.Group
m, ok := a.actionCtx.GetMemberStatusByID(a.action.MemberID)
if !ok {
a.log.Error("No such member")
return true, nil
}

c, err := a.actionCtx.GetServerClient(ctxChild, group, m.ID)
c, err := a.actionCtx.GetMembersState().GetMemberClient(m.ID)
if !ok {
a.log.Err(err).Error("Unable to get client")
return true, nil
Expand Down
6 changes: 2 additions & 4 deletions pkg/deployment/reconcile/action_tls_keyfile_refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ type refreshTLSKeyfileCertificateAction struct {
}

func (a *refreshTLSKeyfileCertificateAction) CheckProgress(ctx context.Context) (bool, bool, error) {
ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
defer cancel()
c, err := a.actionCtx.GetServerClient(ctxChild, a.action.Group, a.action.MemberID)
c, err := a.actionCtx.GetMembersState().GetMemberClient(a.action.MemberID)
if err != nil {
a.log.Err(err).Warn("Unable to get client")
return true, false, nil
Expand All @@ -72,7 +70,7 @@ func (a *refreshTLSKeyfileCertificateAction) CheckProgress(ctx context.Context)

client := client.NewClient(c.Connection())

ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
defer cancel()
e, err := client.RefreshTLS(ctxChild)
if err != nil {
Expand Down
6 changes: 2 additions & 4 deletions pkg/deployment/reconcile/action_tls_sni_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,13 @@ func (t *tlsSNIUpdate) CheckProgress(ctx context.Context) (bool, bool, error) {
return true, false, nil
}

ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
defer cancel()
c, err := t.actionCtx.GetServerClient(ctxChild, t.action.Group, t.action.MemberID)
c, err := t.actionCtx.GetMembersState().GetMemberClient(t.action.MemberID)
if err != nil {
t.log.Err(err).Warn("Unable to get client")
return true, false, nil
}

ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
defer cancel()
if ok, err := compareTLSSNIConfig(ctxChild, c.Connection(), fetchedSecrets, true); err != nil {
t.log.Err(err).Warn("Unable to compare TLS config")
Expand Down
Loading