Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: support leave member by using any pods #7890

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 53 additions & 1 deletion controllers/apps/transformer_component_workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,9 +618,11 @@ func (r *componentWorkloadOps) leaveMember4ScaleIn() error {

// TODO: Move memberLeave to the ITS controller. Instead of performing a switchover, we can directly scale down the non-leader nodes. This is because the pod ordinal is not guaranteed to be continuous.
podsToMemberLeave := make([]*corev1.Pod, 0)
desiredPods := make([]*corev1.Pod, 0)
for _, pod := range pods {
// if the pod not exists in the generated pod names, it should be a member that needs to leave
if _, ok := r.desiredCompPodNameSet[pod.Name]; ok {
desiredPods = append(desiredPods, pod)
continue
}
podsToMemberLeave = append(podsToMemberLeave, pod)
Expand All @@ -644,12 +646,18 @@ func (r *componentWorkloadOps) leaveMember4ScaleIn() error {
return switchoverErr
}

if err2 := lorryCli.LeaveMember(r.reqCtx.Ctx); err2 != nil {
if err2 := lorryCli.LeaveMember(r.reqCtx.Ctx, nil); err2 != nil {
// For the purpose of upgrade compatibility, if the version of Lorry is 0.7 and
// the version of KB is upgraded to 0.8 or newer, lorry client will return an NotImplemented error,
// in this case, here just ignore it.
if err2 == lorry.NotImplemented {
r.reqCtx.Log.Info("lorry leave member api is not implemented")
} else if unableToConnect(err2) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a pod is pending, new lorry clients may encounter issues. This situation can also be resolved here.

r.reqCtx.Log.Info(fmt.Sprintf("when leaving pod %s by lorry, can not connect lorry on pod %s, try to leave member by other pods", pod.Name, pod.Name))
err3 := r.leaveMemberByOtherPods(desiredPods, pod)
if err == nil {
err = err3
}
Comment on lines +649 to +660
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re-org lorryCli.LeaveMember, unableToConnect and r.leaveMemberByOtherPods into one single method of r seems more readable.

} else if err == nil {
err = err2
}
Expand All @@ -658,6 +666,50 @@ func (r *componentWorkloadOps) leaveMember4ScaleIn() error {
return err // TODO: use requeue-after
}

func unableToConnect(err error) bool {
if err == nil {
return false
}
if strings.Contains(err.Error(), "i/o timeout") {
return true
}
return false
}

// Try to leave `podToLeave` by pods in `desiredPods`,
// if any error occurs not due to `unableToConnect` to pods in `desiredPods`, return it immediately.
func (r *componentWorkloadOps) leaveMemberByOtherPods(desiredPods []*corev1.Pod, podToLeave *corev1.Pod) error {
parameters := make(map[string]any)
parameters["podName"] = podToLeave.Spec.Hostname

for _, pod := range desiredPods {
lorryCli, err1 := lorry.NewClient(*pod)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should follow the API definition, otherwise there will be risks in executing the action on other pods.

if err1 != nil {
return fmt.Errorf("error when leaveMemberByOtherPods NewClient pod %v: %v", pod.Name, err1)
}

if intctrlutil.IsNil(lorryCli) {
// no lorry in the pod
continue
}

if err2 := lorryCli.LeaveMember(r.reqCtx.Ctx, parameters); err2 != nil {
// For the purpose of upgrade compatibility, if the version of Lorry is 0.7 and
// the version of KB is upgraded to 0.8 or newer, lorry client will return an NotImplemented error,
// in this case, here just ignore it.
if err2 == lorry.NotImplemented {
r.reqCtx.Log.Info("lorry leave member api is not implemented")
} else if unableToConnect(err2) {
r.reqCtx.Log.Info(fmt.Sprintf("leaveMemberByOtherPods: can not connect lorry on pod %s", pod.Name))
} else {
return fmt.Errorf("error when leaveMemberByOtherPods LeaveMember, try to leave pod %v on pod %v: %v", podToLeave.Name, pod.Name, err2)
}
}
return nil
}
return fmt.Errorf("leaveMemberByOtherPods: try to leave pod %v by other pods fail", podToLeave.Name)
}

func (r *componentWorkloadOps) deletePVCs4ScaleIn(itsObj *workloads.InstanceSet) error {
graphCli := model.NewGraphClient(r.cli)
for _, podName := range r.runningItsPodNames {
Expand Down
11 changes: 9 additions & 2 deletions pkg/lorry/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,15 @@ func (cli *lorryClient) JoinMember(ctx context.Context) error {
}

// LeaveMember sends a Leave member operation request to Lorry, located on the target pod that is about to leave.
func (cli *lorryClient) LeaveMember(ctx context.Context) error {
_, err := cli.Request(ctx, string(LeaveMemberOperation), http.MethodPost, nil)
// Or if the parameters is not nil, the pod which Lorry accepts the request (assigned by lorryClient) is different
// from the pod to leave (assigned by podName key in request parameters).
func (cli *lorryClient) LeaveMember(ctx context.Context, parameters map[string]any) error {
req := make(map[string]any)
if parameters != nil {
req["parameters"] = parameters
}

_, err := cli.Request(ctx, string(LeaveMemberOperation), http.MethodPost, req)
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/lorry/client/client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions pkg/lorry/client/httpclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ var _ = Describe("Lorry HTTP Client", func() {
mockDBManager.EXPECT().LeaveMemberFromCluster(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
mockDCSStore.EXPECT().GetCluster().Return(cluster, nil)
mockDCSStore.EXPECT().UpdateHaConfig().Return(nil).Times(2)
Expect(lorryClient.LeaveMember(context.TODO())).Should(Succeed())
Expect(lorryClient.LeaveMember(context.TODO(), nil)).Should(Succeed())
Expect(cluster.HaConfig.DeleteMembers).Should(HaveLen(1))
})

Expand All @@ -461,10 +461,10 @@ var _ = Describe("Lorry HTTP Client", func() {
mockDCSStore.EXPECT().GetCluster().Return(cluster, nil).Times(2)
mockDCSStore.EXPECT().UpdateHaConfig().Return(nil).Times(3)
// first leave
Expect(lorryClient.LeaveMember(context.TODO())).Should(Succeed())
Expect(lorryClient.LeaveMember(context.TODO(), nil)).Should(Succeed())
Expect(cluster.HaConfig.DeleteMembers).Should(HaveLen(1))
// second leave
Expect(lorryClient.LeaveMember(context.TODO())).Should(Succeed())
Expect(lorryClient.LeaveMember(context.TODO(), nil)).Should(Succeed())
Expect(cluster.HaConfig.DeleteMembers).Should(HaveLen(1))
})

Expand All @@ -473,7 +473,7 @@ var _ = Describe("Lorry HTTP Client", func() {
mockDBManager.EXPECT().LeaveMemberFromCluster(gomock.Any(), gomock.Any(), gomock.Any()).Return(fmt.Errorf(msg))
mockDCSStore.EXPECT().GetCluster().Return(cluster, nil)
mockDCSStore.EXPECT().UpdateHaConfig().Return(nil)
err := lorryClient.LeaveMember(context.TODO())
err := lorryClient.LeaveMember(context.TODO(), nil)
Expect(err).Should(HaveOccurred())
Expect(err.Error()).Should(ContainSubstring(msg))
})
Expand Down Expand Up @@ -509,7 +509,7 @@ var _ = Describe("Lorry HTTP Client", func() {
_ = ops[strings.ToLower(string(util.LeaveMemberOperation))].Init(context.TODO())
customManager, _ := custom.NewManager(engines.Properties{})
register.SetCustomManager(customManager)
err := lorryClient.LeaveMember(context.TODO())
err := lorryClient.LeaveMember(context.TODO(), nil)
Expect(err).Should(HaveOccurred())
Expect(err.Error()).Should(ContainSubstring("executable file not found"))
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/lorry/client/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Client interface {
JoinMember(ctx context.Context) error

// LeaveMember sends a Leave member operation request to Lorry, located on the target pod that is about to leave.
LeaveMember(ctx context.Context) error
LeaveMember(ctx context.Context, parameters map[string]any) error

Switchover(ctx context.Context, primary, candidate string, force bool) error
Lock(ctx context.Context) error
Expand Down
16 changes: 14 additions & 2 deletions pkg/lorry/engines/postgres/apecloudpostgres/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package apecloudpostgres
import (
"context"
"fmt"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -317,9 +318,20 @@ func (mgr *Manager) LeaveMemberFromCluster(ctx context.Context, cluster *dcs.Clu
return nil
}

sql := fmt.Sprintf(`alter system consensus drop follower '%s:%d';`, addr, mgr.Config.GetDBPort())
var port int
var err error
if memberName != mgr.CurrentMemberName {
port, err = strconv.Atoi(cluster.GetMemberWithName(memberName).DBPort)
if err != nil {
mgr.Logger.Error(err, fmt.Sprintf("get member %v port failed", memberName))
}
} else {
port = mgr.Config.GetDBPort()
}

_, err := mgr.ExecLeader(ctx, sql, cluster)
sql := fmt.Sprintf(`alter system consensus drop follower '%s:%d';`, addr, port)

_, err = mgr.ExecLeader(ctx, sql, cluster)
if err != nil {
mgr.Logger.Error(err, fmt.Sprintf("exec sql:%s failed", sql))
return err
Expand Down
21 changes: 14 additions & 7 deletions pkg/lorry/operations/replica/leave.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,21 +86,28 @@ func (s *Leave) Do(ctx context.Context, req *operations.OpsRequest) (*operations
return nil, err
}

currentMember := cluster.GetMemberWithName(manager.GetCurrentMemberName())
if !cluster.HaConfig.IsDeleting(currentMember) {
cluster.HaConfig.AddMemberToDelete(currentMember)
var memberNameToLeave string
if req.Parameters != nil && req.GetString("podName") != "" {
memberNameToLeave = req.GetString("podName")
} else {
memberNameToLeave = manager.GetCurrentMemberName()
}

memberToLeave := cluster.GetMemberWithName(memberNameToLeave)
if !cluster.HaConfig.IsDeleting(memberToLeave) {
cluster.HaConfig.AddMemberToDelete(memberToLeave)
_ = s.dcsStore.UpdateHaConfig()
}

// remove current member from db cluster
err = manager.LeaveMemberFromCluster(ctx, cluster, manager.GetCurrentMemberName())
// remove member from db cluster, the member may be other pod, depending on if podName is assigned in req.Parameters
err = manager.LeaveMemberFromCluster(ctx, cluster, memberNameToLeave)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change in the API can be reflected in the DBManager interface.

if err != nil {
s.logger.Error(err, "Leave member from cluster failed")
return nil, err
}

if cluster.HaConfig.IsDeleting(currentMember) {
cluster.HaConfig.FinishDeleted(currentMember)
if cluster.HaConfig.IsDeleting(memberToLeave) {
cluster.HaConfig.FinishDeleted(memberToLeave)
_ = s.dcsStore.UpdateHaConfig()
}

Expand Down
Loading