Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cns/ipampoolmonitor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ var (
)
ipamRequestedIPConfigCount = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "ipam_reuested_ips",
Name: "ipam_requested_ips",
Help: "Requested IP count.",
},
)
Expand Down
2 changes: 1 addition & 1 deletion cns/restserver/internalapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,8 @@ func (service *HTTPRestService) ReconcileNCState(
ipconfigRequest := cns.IPConfigRequest{
DesiredIPAddress: secIpConfig.IPAddress,
OrchestratorContext: jsonContext,
PodInterfaceID: podInfo.InterfaceID(),
InfraContainerID: podInfo.InfraContainerID(),
PodInterfaceID: podInfo.InterfaceID(),
}

if _, err := requestIPConfigHelper(service, ipconfigRequest); err != nil {
Expand Down
107 changes: 61 additions & 46 deletions cns/restserver/ipam.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,76 +12,96 @@ import (
"github.com/Azure/azure-container-networking/cns/filter"
"github.com/Azure/azure-container-networking/cns/logger"
"github.com/Azure/azure-container-networking/cns/types"
"github.com/pkg/errors"
)

// used to request an IPConfig from the CNS state
func (service *HTTPRestService) requestIPConfigHandler(w http.ResponseWriter, r *http.Request) {
var (
err error
ipconfigRequest cns.IPConfigRequest
podIpInfo cns.PodIpInfo
returnCode types.ResponseCode
returnMessage string
)

err = service.Listener.Decode(w, r, &ipconfigRequest)
var ipconfigRequest cns.IPConfigRequest
err := service.Listener.Decode(w, r, &ipconfigRequest)
operationName := "requestIPConfigHandler"
logger.Request(service.Name+operationName, ipconfigRequest, err)
if err != nil {
return
}

// retrieve ipconfig from nc
_, returnCode, returnMessage = service.validateIPConfigRequest(ipconfigRequest)
if returnCode == types.Success {
if podIpInfo, err = requestIPConfigHelper(service, ipconfigRequest); err != nil {
returnCode = types.FailedToAllocateIPConfig
returnMessage = fmt.Sprintf("AllocateIPConfig failed: %v, IP config request is %s", err, ipconfigRequest)
podInfo, returnCode, returnMessage := service.validateIPConfigRequest(ipconfigRequest)
if returnCode != types.Success {
reserveResp := &cns.IPConfigResponse{
Response: cns.Response{
ReturnCode: returnCode,
Message: returnMessage,
},
}
err = service.Listener.Encode(w, &reserveResp)
logger.ResponseEx(service.Name+operationName, ipconfigRequest, reserveResp, reserveResp.Response.ReturnCode, err)
return
}

resp := cns.Response{
ReturnCode: returnCode,
Message: returnMessage,
// record a pod requesting an IP
service.podsPendingIPAllocation.Push(podInfo.Key())

podIPInfo, err := requestIPConfigHelper(service, ipconfigRequest)
if err != nil {
reserveResp := &cns.IPConfigResponse{
Response: cns.Response{
ReturnCode: types.FailedToAllocateIPConfig,
Message: fmt.Sprintf("AllocateIPConfig failed: %v, IP config request is %s", err, ipconfigRequest),
},
PodIpInfo: podIPInfo,
}
err = service.Listener.Encode(w, &reserveResp)
logger.ResponseEx(service.Name+operationName, ipconfigRequest, reserveResp, reserveResp.Response.ReturnCode, err)
return
}

// record a pod allocated an IP
defer func() {
// observe allocation wait time
if since := service.podsPendingIPAllocation.Pop(podInfo.Key()); since > 0 {
ipAllocationLatency.Observe(float64(since))
}
}()
reserveResp := &cns.IPConfigResponse{
Response: resp,
Response: cns.Response{
ReturnCode: types.Success,
},
PodIpInfo: podIPInfo,
}
reserveResp.PodIpInfo = podIpInfo

err = service.Listener.Encode(w, &reserveResp)
logger.ResponseEx(service.Name+operationName, ipconfigRequest, reserveResp, resp.ReturnCode, err)
logger.ResponseEx(service.Name+operationName, ipconfigRequest, reserveResp, reserveResp.Response.ReturnCode, err)
}

func (service *HTTPRestService) releaseIPConfigHandler(w http.ResponseWriter, r *http.Request) {
req := cns.IPConfigRequest{}
resp := cns.Response{}
var err error

defer func() {
err = service.Listener.Encode(w, &resp)
logger.ResponseEx(service.Name, req, resp, resp.ReturnCode, err)
}()

err = service.Listener.Decode(w, r, &req)
var req cns.IPConfigRequest
err := service.Listener.Decode(w, r, &req)
logger.Request(service.Name+"releaseIPConfigHandler", req, err)
if err != nil {
resp.ReturnCode = types.UnexpectedError
resp.Message = err.Error()
resp := cns.Response{
ReturnCode: types.UnexpectedError,
Message: err.Error(),
}
logger.Errorf("releaseIPConfigHandler decode failed becase %v, release IP config info %s", resp.Message, req)
err = service.Listener.Encode(w, &resp)
logger.ResponseEx(service.Name, req, resp, resp.ReturnCode, err)
return
}

var podInfo cns.PodInfo
podInfo, resp.ReturnCode, resp.Message = service.validateIPConfigRequest(req)
podInfo, returnCode, message := service.validateIPConfigRequest(req)

if err = service.releaseIPConfig(podInfo); err != nil {
resp.ReturnCode = types.UnexpectedError
resp.Message = err.Error()
logger.Errorf("releaseIPConfigHandler releaseIPConfig failed because %v, release IP config info %s", resp.Message, req)
return
returnCode = types.UnexpectedError
message = err.Error()
logger.Errorf("releaseIPConfigHandler releaseIPConfig failed because %v, release IP config info %s", message, req)
}
resp := cns.Response{
ReturnCode: returnCode,
Message: message,
}
err = service.Listener.Encode(w, &resp)
logger.ResponseEx(service.Name, req, resp, resp.ReturnCode, err)
}

// MarkIPAsPendingRelease will set the IPs which are in PendingProgramming or Available to PendingRelease state
Expand Down Expand Up @@ -418,20 +438,15 @@ func (service *HTTPRestService) AllocateAnyAvailableIPConfig(podInfo cns.PodInfo

// If IPConfig is already allocated for pod, it returns that else it returns one of the available ipconfigs.
func requestIPConfigHelper(service *HTTPRestService, req cns.IPConfigRequest) (cns.PodIpInfo, error) {
var (
podIpInfo cns.PodIpInfo
isExist bool
)

// check if ipconfig already allocated for this pod and return if exists or error
// if error, ipstate is nil, if exists, ipstate is not nil and error is nil
podInfo, err := cns.NewPodInfoFromIPConfigRequest(req)
if err != nil {
return podIpInfo, err
return cns.PodIpInfo{}, errors.Wrapf(err, "failed to parse IPConfigRequest %v", req)
}

if podIpInfo, isExist, err = service.GetExistingIPConfig(podInfo); err != nil || isExist {
return podIpInfo, err
if podIPInfo, isExist, err := service.GetExistingIPConfig(podInfo); err != nil || isExist {
return podIPInfo, err
}

// return desired IPConfig
Expand Down
12 changes: 11 additions & 1 deletion cns/restserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ var httpRequestLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "http_request_latency_seconds",
Help: "Request latency in seconds by endpoint, verb, and response code.",
//nolint:gomnd
//nolint:gomnd // default bucket consts
Buckets: prometheus.ExponentialBuckets(0.001, 2, 15), // 1 ms to ~16 seconds
},
// TODO(rbtr):
Expand All @@ -22,9 +22,19 @@ var httpRequestLatency = prometheus.NewHistogramVec(
[]string{"url", "verb"},
)

var ipAllocationLatency = prometheus.NewHistogram(
prometheus.HistogramOpts{
Name: "ip_allocation_latency_seconds",
Help: "IP allocation latency in seconds",
//nolint:gomnd // default bucket consts
Buckets: prometheus.ExponentialBuckets(0.001, 2, 15), // 1 ms to ~16 seconds
},
)

func init() {
metrics.Registry.MustRegister(
httpRequestLatency,
ipAllocationLatency,
)
}

Expand Down
3 changes: 3 additions & 0 deletions cns/restserver/restserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/Azure/azure-container-networking/cns/nmagentclient"
"github.com/Azure/azure-container-networking/cns/routes"
"github.com/Azure/azure-container-networking/cns/types"
"github.com/Azure/azure-container-networking/cns/types/bounded"
acn "github.com/Azure/azure-container-networking/common"
"github.com/Azure/azure-container-networking/store"
)
Expand Down Expand Up @@ -48,6 +49,7 @@ type HTTPRestService struct {
routingTable *routes.RoutingTable
store store.KeyValueStore
state *httpRestServiceState
podsPendingIPAllocation *bounded.TimedSet
sync.RWMutex
dncPartitionKey string
}
Expand Down Expand Up @@ -137,6 +139,7 @@ func NewHTTPRestService(config *common.ServiceConfig, imdsClientInterface imdscl
PodIPConfigState: podIPConfigState,
routingTable: routingTable,
state: serviceState,
podsPendingIPAllocation: bounded.NewTimedSet(250), // nolint:gomnd // maxpods
}, nil
}

Expand Down
74 changes: 74 additions & 0 deletions cns/types/bounded/mappedheap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package bounded

import (
"container/heap"
)

// Item describes a type accepted by the mapped heap implementation.
type Item interface {
// Key is used for map index operations.
Key() string
// Less is used for heap sorting operations.
Less(Item) bool
// SetIndex is called by heap implementations to set the Item heap index.
SetIndex(int)
// Index returns the index of this Item.
Index() int
}

var _ heap.Interface = (*MappedHeap)(nil)

// MappedHeap is a combination of map and heap structures which allows for
// efficient sorting, uniqueness guarantees, and constant time lookups.
// Implements heap.Interface.
type MappedHeap struct {
m map[string]Item
items []Item
}

func NewMappedHeap() *MappedHeap {
return &MappedHeap{
m: make(map[string]Item),
}
}

func (mh MappedHeap) Contains(key string) (int, bool) {
item, ok := mh.m[key]
if ok {
return item.Index(), true
}
return -1, false
}

func (mh MappedHeap) Len() int {
return len(mh.items)
}

func (mh MappedHeap) Less(i, j int) bool {
return mh.items[i].Less(mh.items[j])
}

func (mh *MappedHeap) Swap(i, j int) {
mh.items[i], mh.items[j] = mh.items[j], mh.items[i]
mh.items[i].SetIndex(i)
mh.items[j].SetIndex(j)
}

func (mh *MappedHeap) Push(x interface{}) {
n := len(mh.items)
item := x.(Item)
item.SetIndex(n)
mh.items = append(mh.items, item)
mh.m[item.Key()] = item
}

func (mh *MappedHeap) Pop() interface{} {
old := mh.items
n := len(old)
item := old[n-1]
old[n-1] = nil
item.SetIndex(-1)
mh.items = old[0 : n-1]
delete(mh.m, item.Key())
return item
}
75 changes: 75 additions & 0 deletions cns/types/bounded/timedset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package bounded

import (
"container/heap"
"sync"
"time"
)

var _ Item = (*TimedItem)(nil)

// TimedItem implements Item for a string: time.Time tuple.
type TimedItem struct {
Name string
Time time.Time
index int
}

func (t *TimedItem) Key() string {
return t.Name
}

func (t *TimedItem) Less(o Item) bool {
other := o.(*TimedItem)
return t.Time.Before(other.Time)
}

func (t *TimedItem) Index() int {
return t.index
}

func (t *TimedItem) SetIndex(i int) {
t.index = i
}

type TimedSet struct {
sync.Mutex
capacity int
items *MappedHeap
}

func NewTimedSet(c int) *TimedSet {
return &TimedSet{
capacity: c,
items: NewMappedHeap(),
}
}

// Push registers the passed key and saves the timestamp it is first registered.
// If the key is already registered, does not overwrite the saved timestamp.
func (ts *TimedSet) Push(key string) {
ts.Lock()
defer ts.Unlock()
if _, ok := ts.items.Contains(key); ok {
return
}
if ts.items.Len() >= ts.capacity {
_ = heap.Pop(ts.items)
}
item := &TimedItem{Name: key}
item.Time = time.Now()
heap.Push(ts.items, item)
}

// Pop returns the elapsed duration since the passed key was first registered,
// or -1 if it is not found.
func (ts *TimedSet) Pop(key string) time.Duration {
ts.Lock()
defer ts.Unlock()
idx, ok := ts.items.Contains(key)
if !ok {
return -1
}
item := heap.Remove(ts.items, idx)
return time.Since(item.(*TimedItem).Time)
}
Loading