Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 39 additions & 14 deletions pkg/apis/deployment/v1alpha/deployment_status_members.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,22 +137,47 @@ func (ds DeploymentStatusMembers) MemberStatusByPVCName(pvcName string) (MemberS
return MemberStatus{}, 0, false
}

// UpdateMemberStatus updates the given status in the given group.
func (ds *DeploymentStatusMembers) UpdateMemberStatus(status MemberStatus, group ServerGroup) error {
// Add adds the given status in the given group.
func (ds *DeploymentStatusMembers) Add(status MemberStatus, group ServerGroup) error {
var err error
switch group {
case ServerGroupSingle:
err = ds.Single.Update(status)
err = ds.Single.add(status)
case ServerGroupAgents:
err = ds.Agents.Update(status)
err = ds.Agents.add(status)
case ServerGroupDBServers:
err = ds.DBServers.Update(status)
err = ds.DBServers.add(status)
case ServerGroupCoordinators:
err = ds.Coordinators.Update(status)
err = ds.Coordinators.add(status)
case ServerGroupSyncMasters:
err = ds.SyncMasters.Update(status)
err = ds.SyncMasters.add(status)
case ServerGroupSyncWorkers:
err = ds.SyncWorkers.Update(status)
err = ds.SyncWorkers.add(status)
default:
return maskAny(errors.Wrapf(NotFoundError, "ServerGroup %d is not known", group))
}
if err != nil {
return maskAny(err)
}
return nil
}

// Update updates the given status in the given group.
func (ds *DeploymentStatusMembers) Update(status MemberStatus, group ServerGroup) error {
var err error
switch group {
case ServerGroupSingle:
err = ds.Single.update(status)
case ServerGroupAgents:
err = ds.Agents.update(status)
case ServerGroupDBServers:
err = ds.DBServers.update(status)
case ServerGroupCoordinators:
err = ds.Coordinators.update(status)
case ServerGroupSyncMasters:
err = ds.SyncMasters.update(status)
case ServerGroupSyncWorkers:
err = ds.SyncWorkers.update(status)
default:
return maskAny(errors.Wrapf(NotFoundError, "ServerGroup %d is not known", group))
}
Expand All @@ -168,17 +193,17 @@ func (ds *DeploymentStatusMembers) RemoveByID(id string, group ServerGroup) erro
var err error
switch group {
case ServerGroupSingle:
err = ds.Single.RemoveByID(id)
err = ds.Single.removeByID(id)
case ServerGroupAgents:
err = ds.Agents.RemoveByID(id)
err = ds.Agents.removeByID(id)
case ServerGroupDBServers:
err = ds.DBServers.RemoveByID(id)
err = ds.DBServers.removeByID(id)
case ServerGroupCoordinators:
err = ds.Coordinators.RemoveByID(id)
err = ds.Coordinators.removeByID(id)
case ServerGroupSyncMasters:
err = ds.SyncMasters.RemoveByID(id)
err = ds.SyncMasters.removeByID(id)
case ServerGroupSyncWorkers:
err = ds.SyncWorkers.RemoveByID(id)
err = ds.SyncWorkers.removeByID(id)
default:
return maskAny(errors.Wrapf(NotFoundError, "ServerGroup %d is not known", group))
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/apis/deployment/v1alpha/member_status_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (l MemberStatusList) ElementByPVCName(pvcName string) (MemberStatus, bool)

// Add a member to the list.
// Returns an AlreadyExistsError if the ID of the given member already exists.
func (l *MemberStatusList) Add(m MemberStatus) error {
func (l *MemberStatusList) add(m MemberStatus) error {
src := *l
for _, x := range src {
if x.ID == m.ID {
Expand All @@ -89,7 +89,7 @@ func (l *MemberStatusList) Add(m MemberStatus) error {

// Update a member in the list.
// Returns a NotFoundError if the ID of the given member cannot be found.
func (l MemberStatusList) Update(m MemberStatus) error {
func (l MemberStatusList) update(m MemberStatus) error {
for i, x := range l {
if x.ID == m.ID {
l[i] = m
Expand All @@ -101,7 +101,7 @@ func (l MemberStatusList) Update(m MemberStatus) error {

// RemoveByID a member with given ID from the list.
// Returns a NotFoundError if the ID of the given member cannot be found.
func (l *MemberStatusList) RemoveByID(id string) error {
func (l *MemberStatusList) removeByID(id string) error {
src := *l
for i, x := range src {
if x.ID == id {
Expand Down
14 changes: 7 additions & 7 deletions pkg/apis/deployment/v1alpha/member_status_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,32 +36,32 @@ func TestMemberStatusList(t *testing.T) {
m3 := MemberStatus{ID: "m3"}
assert.Equal(t, 0, len(*list))

assert.NoError(t, list.Add(m1))
assert.NoError(t, list.add(m1))
assert.Equal(t, 1, len(*list))

assert.NoError(t, list.Add(m2))
assert.NoError(t, list.Add(m3))
assert.NoError(t, list.add(m2))
assert.NoError(t, list.add(m3))
assert.Equal(t, 3, len(*list))

assert.Error(t, list.Add(m2))
assert.Error(t, list.add(m2))
assert.Equal(t, 3, len(*list))

assert.NoError(t, list.RemoveByID(m3.ID))
assert.NoError(t, list.removeByID(m3.ID))
assert.Equal(t, 2, len(*list))
assert.False(t, list.ContainsID(m3.ID))
assert.Equal(t, m1.ID, (*list)[0].ID)
assert.Equal(t, m2.ID, (*list)[1].ID)

m2.PodName = "foo"
assert.NoError(t, list.Update(m2))
assert.NoError(t, list.update(m2))
assert.Equal(t, 2, len(*list))
assert.True(t, list.ContainsID(m2.ID))
x, found := list.ElementByPodName("foo")
assert.True(t, found)
assert.Equal(t, "foo", x.PodName)
assert.Equal(t, m2.ID, x.ID)

assert.NoError(t, list.Add(m3))
assert.NoError(t, list.add(m3))
assert.Equal(t, 3, len(*list))
assert.Equal(t, m1.ID, (*list)[0].ID)
assert.Equal(t, m2.ID, (*list)[1].ID)
Expand Down
24 changes: 12 additions & 12 deletions pkg/deployment/members.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,68 +90,68 @@ func createMember(log zerolog.Logger, status *api.DeploymentStatus, group api.Se
switch group {
case api.ServerGroupSingle:
log.Debug().Str("id", id).Msg("Adding single server")
if err := status.Members.Single.Add(api.MemberStatus{
if err := status.Members.Add(api.MemberStatus{
ID: id,
CreatedAt: metav1.Now(),
Phase: api.MemberPhaseNone,
PersistentVolumeClaimName: k8sutil.CreatePersistentVolumeClaimName(deploymentName, role, id),
PodName: "",
}); err != nil {
}, group); err != nil {
return "", maskAny(err)
}
case api.ServerGroupAgents:
log.Debug().Str("id", id).Msg("Adding agent")
if err := status.Members.Agents.Add(api.MemberStatus{
if err := status.Members.Add(api.MemberStatus{
ID: id,
CreatedAt: metav1.Now(),
Phase: api.MemberPhaseNone,
PersistentVolumeClaimName: k8sutil.CreatePersistentVolumeClaimName(deploymentName, role, id),
PodName: "",
}); err != nil {
}, group); err != nil {
return "", maskAny(err)
}
case api.ServerGroupDBServers:
log.Debug().Str("id", id).Msg("Adding dbserver")
if err := status.Members.DBServers.Add(api.MemberStatus{
if err := status.Members.Add(api.MemberStatus{
ID: id,
CreatedAt: metav1.Now(),
Phase: api.MemberPhaseNone,
PersistentVolumeClaimName: k8sutil.CreatePersistentVolumeClaimName(deploymentName, role, id),
PodName: "",
}); err != nil {
}, group); err != nil {
return "", maskAny(err)
}
case api.ServerGroupCoordinators:
log.Debug().Str("id", id).Msg("Adding coordinator")
if err := status.Members.Coordinators.Add(api.MemberStatus{
if err := status.Members.Add(api.MemberStatus{
ID: id,
CreatedAt: metav1.Now(),
Phase: api.MemberPhaseNone,
PersistentVolumeClaimName: "",
PodName: "",
}); err != nil {
}, group); err != nil {
return "", maskAny(err)
}
case api.ServerGroupSyncMasters:
log.Debug().Str("id", id).Msg("Adding syncmaster")
if err := status.Members.SyncMasters.Add(api.MemberStatus{
if err := status.Members.Add(api.MemberStatus{
ID: id,
CreatedAt: metav1.Now(),
Phase: api.MemberPhaseNone,
PersistentVolumeClaimName: "",
PodName: "",
}); err != nil {
}, group); err != nil {
return "", maskAny(err)
}
case api.ServerGroupSyncWorkers:
log.Debug().Str("id", id).Msg("Adding syncworker")
if err := status.Members.SyncWorkers.Add(api.MemberStatus{
if err := status.Members.Add(api.MemberStatus{
ID: id,
CreatedAt: metav1.Now(),
Phase: api.MemberPhaseNone,
PersistentVolumeClaimName: "",
PodName: "",
}); err != nil {
}, group); err != nil {
return "", maskAny(err)
}
default:
Expand Down
2 changes: 1 addition & 1 deletion pkg/deployment/reconcile/action_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (ac *actionContext) UpdateMember(member api.MemberStatus) error {
if !found {
return maskAny(fmt.Errorf("Member %s not found", member.ID))
}
if err := status.Members.UpdateMemberStatus(member, group); err != nil {
if err := status.Members.Update(member, group); err != nil {
return maskAny(err)
}
if err := ac.context.UpdateStatus(status, lastVersion); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/deployment/resilience/member_failure.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (r *Resilience) CheckMemberFailure() error {
} else if failureAcceptable {
log.Info().Msg("Member is not ready for long time, marking is failed")
m.Phase = api.MemberPhaseFailed
status.Members.UpdateMemberStatus(m, group)
status.Members.Update(m, group)
updateStatusNeeded = true
} else {
log.Warn().Msgf("Member is not ready for long time, but it is not safe to mark it a failed because: %s", reason)
Expand All @@ -89,7 +89,7 @@ func (r *Resilience) CheckMemberFailure() error {
} else if failureAcceptable {
log.Info().Msg("Member has terminated too often in recent history, marking is failed")
m.Phase = api.MemberPhaseFailed
status.Members.UpdateMemberStatus(m, group)
status.Members.Update(m, group)
updateStatusNeeded = true
} else {
log.Warn().Msgf("Member has terminated too often in recent history, but it is not safe to mark it a failed because: %s", reason)
Expand Down
2 changes: 1 addition & 1 deletion pkg/deployment/resources/member_cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (r *Resources) cleanupRemovedClusterMembers() error {
if serverFound(m.ID) {
// Member is (still) found, skip it
if m.Conditions.Update(api.ConditionTypeMemberOfCluster, true, "", "") {
list.Update(m)
status.Members.Update(m, group)
updateStatusNeeded = true
}
continue
Expand Down
2 changes: 1 addition & 1 deletion pkg/deployment/resources/pod_creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ func (r *Resources) createPodForMember(spec api.DeploymentSpec, memberID string)
m.Conditions.Remove(api.ConditionTypeReady)
m.Conditions.Remove(api.ConditionTypeTerminated)
m.Conditions.Remove(api.ConditionTypeAutoUpgrade)
if err := status.Members.UpdateMemberStatus(m, group); err != nil {
if err := status.Members.Update(m, group); err != nil {
return maskAny(err)
}
if err := r.context.UpdateStatus(status, lastVersion); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/deployment/resources/pod_inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (r *Resources) InspectPods(ctx context.Context) error {
}
}
if updateMemberStatusNeeded {
if err := status.Members.UpdateMemberStatus(memberStatus, group); err != nil {
if err := status.Members.Update(memberStatus, group); err != nil {
return maskAny(err)
}
}
Expand Down Expand Up @@ -180,7 +180,7 @@ func (r *Resources) InspectPods(ctx context.Context) error {
m.RecentTerminations = append(m.RecentTerminations, now)
}
// Save it
if err := status.Members.UpdateMemberStatus(m, group); err != nil {
if err := status.Members.Update(m, group); err != nil {
return maskAny(err)
}
}
Expand All @@ -204,7 +204,7 @@ func (r *Resources) InspectPods(ctx context.Context) error {
}
if updateMemberNeeded {
// Save it
if err := status.Members.UpdateMemberStatus(m, group); err != nil {
if err := status.Members.Update(m, group); err != nil {
return maskAny(err)
}
}
Expand Down