From ad5ceaf9f2be0fff0217a6f81b382a88edb35a0e Mon Sep 17 00:00:00 2001 From: Chun Chen Date: Mon, 4 Jan 2021 18:39:38 +0800 Subject: [PATCH] Fix release IP API race condition --- pkg/ipam/api/api.go | 117 +++++++++--------- pkg/ipam/api/api_test.go | 69 ----------- pkg/ipam/schedulerplugin/bind.go | 44 +++++++ pkg/ipam/schedulerplugin/floatingip_plugin.go | 18 +-- pkg/ipam/schedulerplugin/types.go | 7 ++ pkg/ipam/server/server.go | 2 +- 6 files changed, 120 insertions(+), 137 deletions(-) delete mode 100644 pkg/ipam/api/api_test.go diff --git a/pkg/ipam/api/api.go b/pkg/ipam/api/api.go index 77f30e4f..98db62f8 100644 --- a/pkg/ipam/api/api.go +++ b/pkg/ipam/api/api.go @@ -25,10 +25,12 @@ import ( "time" "github.com/emicklei/go-restful" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/client-go/listers/core/v1" glog "k8s.io/klog" "tkestack.io/galaxy/pkg/api/galaxy/constant" "tkestack.io/galaxy/pkg/ipam/floatingip" + "tkestack.io/galaxy/pkg/ipam/schedulerplugin" "tkestack.io/galaxy/pkg/ipam/schedulerplugin/util" "tkestack.io/galaxy/pkg/utils/httputil" pageutil "tkestack.io/galaxy/pkg/utils/page" @@ -36,15 +38,18 @@ import ( // Controller is the API controller type Controller struct { - ipam floatingip.IPAM - podLister v1.PodLister + ipam floatingip.IPAM + releaseFunc func(r *schedulerplugin.ReleaseRequest) error + podLister v1.PodLister } // NewController construct a controller object -func NewController(ipam floatingip.IPAM, lister v1.PodLister) *Controller { +func NewController( + ipam floatingip.IPAM, lister v1.PodLister, releaseFunc func(r *schedulerplugin.ReleaseRequest) error) *Controller { return &Controller{ - ipam: ipam, - podLister: lister, + ipam: ipam, + podLister: lister, + releaseFunc: releaseFunc, } } @@ -118,37 +123,35 @@ func (c *Controller) ListIPs(req *restful.Request, resp *restful.Response) { sort.Sort(bySortParam{array: fips, lessFunc: sortFunc(sortParam)}) start, end, pagin := pageutil.Pagination(page, size, len(fips)) pagedFips := fips[start:end] - if err := fillReleasableAndStatus(c.podLister, pagedFips); err != nil { - httputil.InternalError(resp, err) - return + for i := range pagedFips { + releasable, status := c.checkReleasableAndStatus(&pagedFips[i]) + pagedFips[i].Status = status + pagedFips[i].Releasable = releasable } resp.WriteEntity(ListIPResp{Page: *pagin, Content: pagedFips}) // nolint: errcheck } -// fillReleasableAndStatus fills status and releasable field -func fillReleasableAndStatus(lister v1.PodLister, ips []FloatingIP) error { - for i := range ips { - if ips[i].labels != nil { - if _, ok := ips[i].labels[constant.ReserveFIPLabel]; ok { - ips[i].Releasable = false - continue - } - } - ips[i].Releasable = true - if ips[i].PodName == "" { - continue - } - pod, err := lister.Pods(ips[i].Namespace).Get(ips[i].PodName) - if err != nil || pod == nil { - ips[i].Status = "Deleted" - continue +func (c *Controller) checkReleasableAndStatus(fip *FloatingIP) (releasable bool, status string) { + if fip.labels != nil { + if _, ok := fip.labels[constant.ReserveFIPLabel]; ok { + return } - ips[i].Status = string(pod.Status.Phase) - // On public cloud, we can't release exist pod's ip, because we need to call unassign ip first - // TODO while on private environment, we can - ips[i].Releasable = false } - return nil + if fip.PodName == "" { + return + } + pod, err := c.podLister.Pods(fip.Namespace).Get(fip.PodName) + if err == nil { + status = string(pod.Status.Phase) + return + } + if errors.IsNotFound(err) { + releasable = true + status = "Deleted" + } else { + status = "Unknown" + } + return } // bySortParam defines sort funcs for FloatingIP array @@ -225,6 +228,8 @@ type ReleaseIPReq struct { type ReleaseIPResp struct { httputil.Resp Unreleased []string `json:"unreleased,omitempty"` + // Reason is the reason why this ip is not released + Reason []string `json:"reasons,omitempty"` } // SwaggerDoc generates swagger doc for release ip response @@ -242,7 +247,10 @@ func (c *Controller) ReleaseIPs(req *restful.Request, resp *restful.Response) { httputil.BadRequest(resp, err) return } - expectIPtoKey := make(map[string]string) + var ( + released, unreleasedIP, reasons []string + unbindRequests []*schedulerplugin.ReleaseRequest + ) for i := range releaseIPReq.IPs { temp := releaseIPReq.IPs[i] ip := net.ParseIP(temp.IP) @@ -259,36 +267,34 @@ func (c *Controller) ReleaseIPs(req *restful.Request, resp *restful.Response) { httputil.BadRequest(resp, fmt.Errorf("unknown app type %q", temp.AppType)) return } + releasable, status := c.checkReleasableAndStatus(&temp) + if !releasable { + unreleasedIP = append(unreleasedIP, temp.IP) + reasons = append(reasons, "releasable is false, pod status "+status) + continue + } keyObj := util.NewKeyObj(appTypePrefix, temp.Namespace, temp.AppName, temp.PodName, temp.PoolName) - expectIPtoKey[temp.IP] = keyObj.KeyInDB + unbindRequests = append(unbindRequests, &schedulerplugin.ReleaseRequest{IP: ip, KeyObj: keyObj}) } - if err := fillReleasableAndStatus(c.podLister, releaseIPReq.IPs); err != nil { - httputil.BadRequest(resp, err) - return - } - for _, ip := range releaseIPReq.IPs { - if !ip.Releasable { - httputil.BadRequest(resp, fmt.Errorf("%s is not releasable", ip.IP)) - return + for _, req := range unbindRequests { + if err := c.releaseFunc(req); err != nil { + unreleasedIP = append(unreleasedIP, req.IP.String()) + reasons = append(reasons, err.Error()) + } else { + released = append(released, req.IP.String()) } } - _, unreleased, err := batchReleaseIPs(expectIPtoKey, c.ipam) - var unreleasedIP []string - for ip := range unreleased { - unreleasedIP = append(unreleasedIP, ip) - } + glog.Infof("releaseIPs %v", released) var res *ReleaseIPResp - if err != nil { - res = &ReleaseIPResp{Resp: httputil.NewResp( - http.StatusInternalServerError, fmt.Sprintf("server error: %v", err))} - } else if len(unreleasedIP) > 0 { + if len(unreleasedIP) > 0 { res = &ReleaseIPResp{Resp: httputil.NewResp( - http.StatusAccepted, fmt.Sprintf("Unreleased ips have been released or allocated to other pods, "+ - "or are not within valid range"))} + http.StatusAccepted, fmt.Sprintf("Released %d ips, %d ips failed, please check the reasons "+ + "why they failed", len(released), len(unreleasedIP)))} } else { res = &ReleaseIPResp{Resp: httputil.NewResp(http.StatusOK, "")} } res.Unreleased = unreleasedIP + res.Reason = reasons resp.WriteHeaderAndEntity(res.Code, res) } @@ -328,12 +334,3 @@ func convert(fip *floatingip.FloatingIP) FloatingIP { UpdateTime: fip.UpdatedAt, labels: fip.Labels} } - -// batchReleaseIPs release ips from ipams -func batchReleaseIPs(ipToKey map[string]string, ipam floatingip.IPAM) (map[string]string, map[string]string, error) { - released, unreleased, err := ipam.ReleaseIPs(ipToKey) - if len(released) > 0 { - glog.Infof("releaseIPs %v", released) - } - return released, unreleased, err -} diff --git a/pkg/ipam/api/api_test.go b/pkg/ipam/api/api_test.go deleted file mode 100644 index 324341f9..00000000 --- a/pkg/ipam/api/api_test.go +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Tencent is pleased to support the open source community by making TKEStack available. - * - * Copyright (C) 2012-2019 Tencent. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * https://opensource.org/licenses/Apache-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package api - -import ( - "reflect" - "testing" - - "tkestack.io/galaxy/pkg/ipam/floatingip" -) - -type fakeIPAM struct { - floatingip.IPAM - allocatedIPs map[string]string - unallocatedIPs map[string]string - err error -} - -func (ipam fakeIPAM) ReleaseIPs(ipToKey map[string]string) (map[string]string, map[string]string, error) { - if ipam.err != nil { - return nil, ipToKey, ipam.err - } - released := map[string]string{} - for ip, k := range ipToKey { - allocatedK, ok := ipam.allocatedIPs[ip] - if !ok { - continue - } - if allocatedK == k { - ipam.unallocatedIPs[ip] = "" - delete(ipam.allocatedIPs, ip) - released[ip] = k - delete(ipToKey, ip) - } else { - ipToKey[ip] = allocatedK - } - } - return released, ipToKey, nil -} - -func TestBatchReleaseIPs(t *testing.T) { - ipToKey := map[string]string{"10.0.0.1": "k1", "10.0.0.2": "k2", "10.0.0.3": "k3", "10.0.0.4": "k4"} - ipam := fakeIPAM{allocatedIPs: map[string]string{"10.0.0.1": "k1", "10.0.0.2": "k2.1"}, - unallocatedIPs: map[string]string{}} - released, unreleased, err := batchReleaseIPs(ipToKey, ipam) - if err != nil { - t.Fatal() - } - if !reflect.DeepEqual(map[string]string{"10.0.0.1": "k1"}, released) { - t.Fatal(released) - } - if !reflect.DeepEqual(map[string]string{"10.0.0.2": "k2.1", "10.0.0.3": "k3", "10.0.0.4": "k4"}, unreleased) { - t.Fatal(unreleased) - } -} diff --git a/pkg/ipam/schedulerplugin/bind.go b/pkg/ipam/schedulerplugin/bind.go index cf739638..5407d061 100644 --- a/pkg/ipam/schedulerplugin/bind.go +++ b/pkg/ipam/schedulerplugin/bind.go @@ -200,3 +200,47 @@ func (p *FloatingIPPlugin) unbind(pod *corev1.Pod) error { } return p.unbindNoneDpPod(keyObj, policy, "during unbinding pod") } + +func (p *FloatingIPPlugin) Release(r *ReleaseRequest) error { + caller := "by " + getCaller() + k := r.KeyObj + defer p.lockPod(k.PodName, k.Namespace)() + // we are holding the pod's lock, query again in case the ip has been reallocated. + fip, err := p.ipam.ByIP(r.IP) + if err != nil { + return err + } + if fip.Key != k.KeyInDB { + // if key changed, abort + if fip.Key == "" { + glog.Infof("attempt to release %s key %s which is already released", r.IP.String(), k.KeyInDB) + return nil + } + return fmt.Errorf("ip allocated to another pod %s", fip.Key) + } + running, reason := p.podRunning(k.PodName, k.Namespace, fip.PodUid) + if running { + return fmt.Errorf("pod (uid %s) is running", fip.PodUid) + } + glog.Infof("%s is not running, %s, %s", k.KeyInDB, reason, caller) + if p.cloudProvider != nil && fip.NodeName != "" { + // For tapp and sts pod, nodeName will be updated to empty after unassigning + glog.Infof("UnAssignIP nodeName %s, ip %s, key %s %s", fip.NodeName, r.IP.String(), k.KeyInDB, caller) + if err := p.cloudProviderUnAssignIP(&rpc.UnAssignIPRequest{ + NodeName: fip.NodeName, + IPAddress: fip.IP.String(), + }); err != nil { + return fmt.Errorf("UnAssignIP nodeName %s, ip %s: %v", fip.NodeName, fip.IP.String(), err) + } + // for tapp and sts pod, we need to clean its node attr and uid + if err := p.reserveIP(k.KeyInDB, k.KeyInDB, "after UnAssignIP "+caller); err != nil { + return err + } + } + if err := p.ipam.Release(k.KeyInDB, r.IP); err != nil { + glog.Errorf("release ip %s: %v", caller, err) + return fmt.Errorf("release ip: %v", err) + } + glog.Infof("released floating ip %s from %s %s", r.IP.String(), k.KeyInDB, caller) + return nil +} diff --git a/pkg/ipam/schedulerplugin/floatingip_plugin.go b/pkg/ipam/schedulerplugin/floatingip_plugin.go index 9d30b9db..d7da4b85 100644 --- a/pkg/ipam/schedulerplugin/floatingip_plugin.go +++ b/pkg/ipam/schedulerplugin/floatingip_plugin.go @@ -235,14 +235,8 @@ func (p *FloatingIPPlugin) lockPod(name, namespace string) func() { p.podLockPool.LockKey(key) elapsed := (time.Now().UnixNano() - start.UnixNano()) / 1e6 if elapsed > 500 { - var caller string - pc, _, no, ok := runtime.Caller(1) - details := runtime.FuncForPC(pc) - if ok && details != nil { - caller = fmt.Sprintf("called from %s:%d\n", details.Name(), no) - } glog.Infof("acquire lock for %s took %d ms, started at %s, %s", key, elapsed, - start.Format("15:04:05.000"), caller) + start.Format("15:04:05.000"), getCaller()) } return func() { _ = p.podLockPool.UnlockKey(key) @@ -283,3 +277,13 @@ func (p *FloatingIPPlugin) supportReserveIPPolicy(obj *util.KeyObj, policy const } return nil } + +// getCaller returns the func packageName.funcName of the caller +func getCaller() string { + pc, _, no, ok := runtime.Caller(2) + details := runtime.FuncForPC(pc) + if ok && details != nil { + return fmt.Sprintf("called from %s:%d\n", details.Name(), no) + } + return "" +} diff --git a/pkg/ipam/schedulerplugin/types.go b/pkg/ipam/schedulerplugin/types.go index 6aa8155f..ad296bd0 100644 --- a/pkg/ipam/schedulerplugin/types.go +++ b/pkg/ipam/schedulerplugin/types.go @@ -18,8 +18,10 @@ package schedulerplugin import ( "errors" + "net" "tkestack.io/galaxy/pkg/ipam/floatingip" + "tkestack.io/galaxy/pkg/ipam/schedulerplugin/util" ) type NotSupportedReleasePolicyError error @@ -60,3 +62,8 @@ func (conf *Conf) validate() { conf.FloatingIPKey = "floatingips" } } + +type ReleaseRequest struct { + KeyObj *util.KeyObj + IP net.IP +} diff --git a/pkg/ipam/server/server.go b/pkg/ipam/server/server.go index 542ac1a9..bced9ed6 100644 --- a/pkg/ipam/server/server.go +++ b/pkg/ipam/server/server.go @@ -236,7 +236,7 @@ func (s *Server) startAPIServer() { Path("/v1"). Consumes(restful.MIME_JSON). Produces(restful.MIME_JSON) - c := api.NewController(s.plugin.GetIpam(), s.PodLister) + c := api.NewController(s.plugin.GetIpam(), s.PodLister, s.plugin.Release) ws.Route(ws.GET("/ip").To(c.ListIPs). Doc("List ips by keyword or params"). Param(ws.QueryParameter("keyword", "keyword").DataType("string")).