Skip to content

Commit

Permalink
Merge pull request tkestack#120 from chenchun/fix-api
Browse files Browse the repository at this point in the history
Fix release IP API race condition
  • Loading branch information
chenchun committed Jan 4, 2021
2 parents 170c23f + ad5ceaf commit 257349a
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 137 deletions.
117 changes: 57 additions & 60 deletions pkg/ipam/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,31 @@ import (
"time"

"github.com/emicklei/go-restful"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/listers/core/v1"
glog "k8s.io/klog"
"tkestack.io/galaxy/pkg/api/galaxy/constant"
"tkestack.io/galaxy/pkg/ipam/floatingip"
"tkestack.io/galaxy/pkg/ipam/schedulerplugin"
"tkestack.io/galaxy/pkg/ipam/schedulerplugin/util"
"tkestack.io/galaxy/pkg/utils/httputil"
pageutil "tkestack.io/galaxy/pkg/utils/page"
)

// Controller is the API controller
type Controller struct {
ipam floatingip.IPAM
podLister v1.PodLister
ipam floatingip.IPAM
releaseFunc func(r *schedulerplugin.ReleaseRequest) error
podLister v1.PodLister
}

// NewController construct a controller object
func NewController(ipam floatingip.IPAM, lister v1.PodLister) *Controller {
func NewController(
ipam floatingip.IPAM, lister v1.PodLister, releaseFunc func(r *schedulerplugin.ReleaseRequest) error) *Controller {
return &Controller{
ipam: ipam,
podLister: lister,
ipam: ipam,
podLister: lister,
releaseFunc: releaseFunc,
}
}

Expand Down Expand Up @@ -118,37 +123,35 @@ func (c *Controller) ListIPs(req *restful.Request, resp *restful.Response) {
sort.Sort(bySortParam{array: fips, lessFunc: sortFunc(sortParam)})
start, end, pagin := pageutil.Pagination(page, size, len(fips))
pagedFips := fips[start:end]
if err := fillReleasableAndStatus(c.podLister, pagedFips); err != nil {
httputil.InternalError(resp, err)
return
for i := range pagedFips {
releasable, status := c.checkReleasableAndStatus(&pagedFips[i])
pagedFips[i].Status = status
pagedFips[i].Releasable = releasable
}
resp.WriteEntity(ListIPResp{Page: *pagin, Content: pagedFips}) // nolint: errcheck
}

// fillReleasableAndStatus fills status and releasable field
func fillReleasableAndStatus(lister v1.PodLister, ips []FloatingIP) error {
for i := range ips {
if ips[i].labels != nil {
if _, ok := ips[i].labels[constant.ReserveFIPLabel]; ok {
ips[i].Releasable = false
continue
}
}
ips[i].Releasable = true
if ips[i].PodName == "" {
continue
}
pod, err := lister.Pods(ips[i].Namespace).Get(ips[i].PodName)
if err != nil || pod == nil {
ips[i].Status = "Deleted"
continue
func (c *Controller) checkReleasableAndStatus(fip *FloatingIP) (releasable bool, status string) {
if fip.labels != nil {
if _, ok := fip.labels[constant.ReserveFIPLabel]; ok {
return
}
ips[i].Status = string(pod.Status.Phase)
// On public cloud, we can't release exist pod's ip, because we need to call unassign ip first
// TODO while on private environment, we can
ips[i].Releasable = false
}
return nil
if fip.PodName == "" {
return
}
pod, err := c.podLister.Pods(fip.Namespace).Get(fip.PodName)
if err == nil {
status = string(pod.Status.Phase)
return
}
if errors.IsNotFound(err) {
releasable = true
status = "Deleted"
} else {
status = "Unknown"
}
return
}

// bySortParam defines sort funcs for FloatingIP array
Expand Down Expand Up @@ -225,6 +228,8 @@ type ReleaseIPReq struct {
type ReleaseIPResp struct {
httputil.Resp
Unreleased []string `json:"unreleased,omitempty"`
// Reason is the reason why this ip is not released
Reason []string `json:"reasons,omitempty"`
}

// SwaggerDoc generates swagger doc for release ip response
Expand All @@ -242,7 +247,10 @@ func (c *Controller) ReleaseIPs(req *restful.Request, resp *restful.Response) {
httputil.BadRequest(resp, err)
return
}
expectIPtoKey := make(map[string]string)
var (
released, unreleasedIP, reasons []string
unbindRequests []*schedulerplugin.ReleaseRequest
)
for i := range releaseIPReq.IPs {
temp := releaseIPReq.IPs[i]
ip := net.ParseIP(temp.IP)
Expand All @@ -259,36 +267,34 @@ func (c *Controller) ReleaseIPs(req *restful.Request, resp *restful.Response) {
httputil.BadRequest(resp, fmt.Errorf("unknown app type %q", temp.AppType))
return
}
releasable, status := c.checkReleasableAndStatus(&temp)
if !releasable {
unreleasedIP = append(unreleasedIP, temp.IP)
reasons = append(reasons, "releasable is false, pod status "+status)
continue
}
keyObj := util.NewKeyObj(appTypePrefix, temp.Namespace, temp.AppName, temp.PodName, temp.PoolName)
expectIPtoKey[temp.IP] = keyObj.KeyInDB
unbindRequests = append(unbindRequests, &schedulerplugin.ReleaseRequest{IP: ip, KeyObj: keyObj})
}
if err := fillReleasableAndStatus(c.podLister, releaseIPReq.IPs); err != nil {
httputil.BadRequest(resp, err)
return
}
for _, ip := range releaseIPReq.IPs {
if !ip.Releasable {
httputil.BadRequest(resp, fmt.Errorf("%s is not releasable", ip.IP))
return
for _, req := range unbindRequests {
if err := c.releaseFunc(req); err != nil {
unreleasedIP = append(unreleasedIP, req.IP.String())
reasons = append(reasons, err.Error())
} else {
released = append(released, req.IP.String())
}
}
_, unreleased, err := batchReleaseIPs(expectIPtoKey, c.ipam)
var unreleasedIP []string
for ip := range unreleased {
unreleasedIP = append(unreleasedIP, ip)
}
glog.Infof("releaseIPs %v", released)
var res *ReleaseIPResp
if err != nil {
res = &ReleaseIPResp{Resp: httputil.NewResp(
http.StatusInternalServerError, fmt.Sprintf("server error: %v", err))}
} else if len(unreleasedIP) > 0 {
if len(unreleasedIP) > 0 {
res = &ReleaseIPResp{Resp: httputil.NewResp(
http.StatusAccepted, fmt.Sprintf("Unreleased ips have been released or allocated to other pods, "+
"or are not within valid range"))}
http.StatusAccepted, fmt.Sprintf("Released %d ips, %d ips failed, please check the reasons "+
"why they failed", len(released), len(unreleasedIP)))}
} else {
res = &ReleaseIPResp{Resp: httputil.NewResp(http.StatusOK, "")}
}
res.Unreleased = unreleasedIP
res.Reason = reasons
resp.WriteHeaderAndEntity(res.Code, res)
}

Expand Down Expand Up @@ -328,12 +334,3 @@ func convert(fip *floatingip.FloatingIP) FloatingIP {
UpdateTime: fip.UpdatedAt,
labels: fip.Labels}
}

// batchReleaseIPs release ips from ipams
func batchReleaseIPs(ipToKey map[string]string, ipam floatingip.IPAM) (map[string]string, map[string]string, error) {
released, unreleased, err := ipam.ReleaseIPs(ipToKey)
if len(released) > 0 {
glog.Infof("releaseIPs %v", released)
}
return released, unreleased, err
}
69 changes: 0 additions & 69 deletions pkg/ipam/api/api_test.go

This file was deleted.

44 changes: 44 additions & 0 deletions pkg/ipam/schedulerplugin/bind.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,47 @@ func (p *FloatingIPPlugin) unbind(pod *corev1.Pod) error {
}
return p.unbindNoneDpPod(keyObj, policy, "during unbinding pod")
}

func (p *FloatingIPPlugin) Release(r *ReleaseRequest) error {
caller := "by " + getCaller()
k := r.KeyObj
defer p.lockPod(k.PodName, k.Namespace)()
// we are holding the pod's lock, query again in case the ip has been reallocated.
fip, err := p.ipam.ByIP(r.IP)
if err != nil {
return err
}
if fip.Key != k.KeyInDB {
// if key changed, abort
if fip.Key == "" {
glog.Infof("attempt to release %s key %s which is already released", r.IP.String(), k.KeyInDB)
return nil
}
return fmt.Errorf("ip allocated to another pod %s", fip.Key)
}
running, reason := p.podRunning(k.PodName, k.Namespace, fip.PodUid)
if running {
return fmt.Errorf("pod (uid %s) is running", fip.PodUid)
}
glog.Infof("%s is not running, %s, %s", k.KeyInDB, reason, caller)
if p.cloudProvider != nil && fip.NodeName != "" {
// For tapp and sts pod, nodeName will be updated to empty after unassigning
glog.Infof("UnAssignIP nodeName %s, ip %s, key %s %s", fip.NodeName, r.IP.String(), k.KeyInDB, caller)
if err := p.cloudProviderUnAssignIP(&rpc.UnAssignIPRequest{
NodeName: fip.NodeName,
IPAddress: fip.IP.String(),
}); err != nil {
return fmt.Errorf("UnAssignIP nodeName %s, ip %s: %v", fip.NodeName, fip.IP.String(), err)
}
// for tapp and sts pod, we need to clean its node attr and uid
if err := p.reserveIP(k.KeyInDB, k.KeyInDB, "after UnAssignIP "+caller); err != nil {
return err
}
}
if err := p.ipam.Release(k.KeyInDB, r.IP); err != nil {
glog.Errorf("release ip %s: %v", caller, err)
return fmt.Errorf("release ip: %v", err)
}
glog.Infof("released floating ip %s from %s %s", r.IP.String(), k.KeyInDB, caller)
return nil
}
18 changes: 11 additions & 7 deletions pkg/ipam/schedulerplugin/floatingip_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,14 +235,8 @@ func (p *FloatingIPPlugin) lockPod(name, namespace string) func() {
p.podLockPool.LockKey(key)
elapsed := (time.Now().UnixNano() - start.UnixNano()) / 1e6
if elapsed > 500 {
var caller string
pc, _, no, ok := runtime.Caller(1)
details := runtime.FuncForPC(pc)
if ok && details != nil {
caller = fmt.Sprintf("called from %s:%d\n", details.Name(), no)
}
glog.Infof("acquire lock for %s took %d ms, started at %s, %s", key, elapsed,
start.Format("15:04:05.000"), caller)
start.Format("15:04:05.000"), getCaller())
}
return func() {
_ = p.podLockPool.UnlockKey(key)
Expand Down Expand Up @@ -283,3 +277,13 @@ func (p *FloatingIPPlugin) supportReserveIPPolicy(obj *util.KeyObj, policy const
}
return nil
}

// getCaller returns the func packageName.funcName of the caller
func getCaller() string {
pc, _, no, ok := runtime.Caller(2)
details := runtime.FuncForPC(pc)
if ok && details != nil {
return fmt.Sprintf("called from %s:%d\n", details.Name(), no)
}
return ""
}
7 changes: 7 additions & 0 deletions pkg/ipam/schedulerplugin/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package schedulerplugin

import (
"errors"
"net"

"tkestack.io/galaxy/pkg/ipam/floatingip"
"tkestack.io/galaxy/pkg/ipam/schedulerplugin/util"
)

type NotSupportedReleasePolicyError error
Expand Down Expand Up @@ -60,3 +62,8 @@ func (conf *Conf) validate() {
conf.FloatingIPKey = "floatingips"
}
}

type ReleaseRequest struct {
KeyObj *util.KeyObj
IP net.IP
}
2 changes: 1 addition & 1 deletion pkg/ipam/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (s *Server) startAPIServer() {
Path("/v1").
Consumes(restful.MIME_JSON).
Produces(restful.MIME_JSON)
c := api.NewController(s.plugin.GetIpam(), s.PodLister)
c := api.NewController(s.plugin.GetIpam(), s.PodLister, s.plugin.Release)
ws.Route(ws.GET("/ip").To(c.ListIPs).
Doc("List ips by keyword or params").
Param(ws.QueryParameter("keyword", "keyword").DataType("string")).
Expand Down

0 comments on commit 257349a

Please sign in to comment.